from direct.directnotify.DirectNotifyGlobal import directNotify
from direct.showbase.PythonUtil import Queue, invertDictLossless
from direct.showbase.PythonUtil import safeRepr
from direct.showbase.Job import Job
from direct.showbase.JobManagerGlobal import jobMgr
import types
[docs]class ContainerReport(Job):
notify = directNotify.newCategory("ContainerReport")
# set of containers that should not be included in the report
PrivateIds = set()
[docs] def __init__(self, name, log=False, limit=None, threaded=False):
Job.__init__(self, name)
self._log = log
self._limit = limit
# set up our data structures
self._visitedIds = set()
self._id2pathStr = {}
self._id2container = {}
self._type2id2len = {}
self._instanceDictIds = set()
# for breadth-first searching
self._queue = Queue()
jobMgr.add(self)
if not threaded:
jobMgr.finish(self)
[docs] def destroy(self):
del self._queue
del self._instanceDictIds
del self._type2id2len
del self._id2container
del self._id2pathStr
del self._visitedIds
del self._limit
del self._log
[docs] def finished(self):
if self._log:
self.destroy()
[docs] def run(self):
ContainerReport.PrivateIds.update(set([
id(ContainerReport.PrivateIds),
id(self._visitedIds),
id(self._id2pathStr),
id(self._id2container),
id(self._type2id2len),
id(self._queue),
id(self._instanceDictIds),
]))
# push on a few things that we want to give priority
# for the sake of the variable-name printouts
try:
base
except:
pass
else:
self._enqueueContainer( base.__dict__,
'base')
try:
simbase
except:
pass
else:
self._enqueueContainer( simbase.__dict__,
'simbase')
self._queue.push(__builtins__)
self._id2pathStr[id(__builtins__)] = ''
while len(self._queue) > 0:
# yield up here instead of at the end, since we skip back to the
# top of the while loop from various points
yield None
parentObj = self._queue.pop()
#print '%s: %s, %s' % (id(parentObj), type(parentObj), self._id2pathStr[id(parentObj)])
isInstanceDict = False
if id(parentObj) in self._instanceDictIds:
isInstanceDict = True
try:
if parentObj.__class__.__name__ == 'method-wrapper':
continue
except:
pass
if isinstance(parentObj, (str, bytes)):
continue
if type(parentObj) in (types.ModuleType, types.InstanceType):
child = parentObj.__dict__
if self._examine(child):
assert self._queue.back() is child
self._instanceDictIds.add(id(child))
self._id2pathStr[id(child)] = str(self._id2pathStr[id(parentObj)])
continue
if isinstance(parentObj, dict):
key = None
attr = None
keys = list(parentObj.keys())
try:
keys.sort()
except TypeError as e:
self.notify.warning('non-sortable dict keys: %s: %s' % (self._id2pathStr[id(parentObj)], repr(e)))
for key in keys:
try:
attr = parentObj[key]
except KeyError as e:
self.notify.warning('could not index into %s with key %s' % (self._id2pathStr[id(parentObj)],
key))
if id(attr) not in self._visitedIds:
self._visitedIds.add(id(attr))
if self._examine(attr):
assert self._queue.back() is attr
if parentObj is __builtins__:
self._id2pathStr[id(attr)] = key
else:
if isInstanceDict:
self._id2pathStr[id(attr)] = self._id2pathStr[id(parentObj)] + '.%s' % key
else:
self._id2pathStr[id(attr)] = self._id2pathStr[id(parentObj)] + '[%s]' % safeRepr(key)
del key
del attr
continue
if type(parentObj) is not types.FileType:
try:
itr = iter(parentObj)
except:
pass
else:
try:
index = 0
while 1:
try:
attr = next(itr)
except:
# some custom classes don't do well when iterated
attr = None
break
if id(attr) not in self._visitedIds:
self._visitedIds.add(id(attr))
if self._examine(attr):
assert self._queue.back() is attr
self._id2pathStr[id(attr)] = self._id2pathStr[id(parentObj)] + '[%s]' % index
index += 1
del attr
except StopIteration as e:
pass
del itr
continue
try:
childNames = dir(parentObj)
except:
pass
else:
childName = None
child = None
for childName in childNames:
child = getattr(parentObj, childName)
if id(child) not in self._visitedIds:
self._visitedIds.add(id(child))
if self._examine(child):
assert self._queue.back() is child
self._id2pathStr[id(child)] = self._id2pathStr[id(parentObj)] + '.%s' % childName
del childName
del child
continue
if self._log:
self.printingBegin()
for i in self._output(limit=self._limit):
yield None
self.printingEnd()
yield Job.Done
def _enqueueContainer(self, obj, pathStr=None):
# call this to add a container that should be examined before any (other) direct
# children of __builtins__
# this is mostly to fix up the names of variables
self._queue.push(obj)
objId = id(obj)
if pathStr is not None:
self._id2pathStr[objId] = pathStr
# if it's a container, put it in the tables
try:
length = len(obj)
except:
length = None
if length is not None and length > 0:
self._id2container[objId] = obj
self._type2id2len.setdefault(type(obj), {})
self._type2id2len[type(obj)][objId] = length
def _examine(self, obj):
# return False if it's an object that can't contain or lead to other objects
if type(obj) in (bool, types.BuiltinFunctionType, types.BuiltinMethodType,
complex, float, int, type(None), type(NotImplemented),
type, types.CodeType, types.FunctionType):
return False
# if it's an internal object, ignore it
if id(obj) in ContainerReport.PrivateIds:
return False
# this object might lead to more objects. put it on the queue
self._enqueueContainer(obj)
return True
def _outputType(self, type, limit=None):
if type not in self._type2id2len:
return
len2ids = invertDictLossless(self._type2id2len[type])
lengths = list(len2ids.keys())
lengths.sort()
lengths.reverse()
print('=====')
print('===== %s' % type)
count = 0
stop = False
for l in lengths:
#len2ids[l].sort()
pathStrList = list()
for id in len2ids[l]:
obj = self._id2container[id]
#print '%s: %s' % (l, self._id2pathStr[id])
pathStrList.append(self._id2pathStr[id])
count += 1
if (count & 0x7f) == 0:
yield None
pathStrList.sort()
for pathstr in pathStrList:
print('%s: %s' % (l, pathstr))
if limit is not None and count >= limit:
return
def _output(self, **kArgs):
print("===== ContainerReport: \'%s\' =====" % (self._name,))
initialTypes = (dict, list, tuple)
for type in initialTypes:
for i in self._outputType(type, **kArgs):
yield None
otherTypes = list(set(self._type2id2len.keys()).difference(set(initialTypes)))
otherTypes.sort()
for type in otherTypes:
for i in self._outputType(type, **kArgs):
yield None
[docs] def log(self, **kArgs):
self._output(**kArgs)