from __future__ import annotations
from direct.directnotify.DirectNotifyGlobal import directNotify
from direct.showbase.PythonUtil import Queue, invertDictLossless
from direct.showbase.PythonUtil import safeRepr
from direct.showbase.Job import Job
from direct.showbase.JobManagerGlobal import jobMgr
from direct.showbase.ContainerLeakDetector import deadEndTypes
import types
import io
[docs]class ContainerReport(Job):
notify = directNotify.newCategory("ContainerReport")
# set of containers that should not be included in the report
PrivateIds: set[int] = set()
[docs] def __init__(self, name, log=False, limit=None, threaded=False):
Job.__init__(self, name)
self._log = log
self._limit = limit
# set up our data structures
self._visitedIds = set()
self._id2pathStr = {}
self._id2container = {}
self._type2id2len = {}
self._instanceDictIds = set()
# for breadth-first searching
self._queue = Queue()
jobMgr.add(self)
if not threaded:
jobMgr.finish(self)
[docs] def destroy(self):
del self._queue
del self._instanceDictIds
del self._type2id2len
del self._id2container
del self._id2pathStr
del self._visitedIds
del self._limit
del self._log
[docs] def finished(self):
if self._log:
self.destroy()
[docs] def run(self):
ContainerReport.PrivateIds.update(set([
id(ContainerReport.PrivateIds),
id(self._visitedIds),
id(self._id2pathStr),
id(self._id2container),
id(self._type2id2len),
id(self._queue),
id(self._instanceDictIds),
]))
# push on a few things that we want to give priority
# for the sake of the variable-name printouts
try:
base
except NameError:
pass
else:
self._enqueueContainer(base.__dict__,
'base')
try:
simbase
except NameError:
pass
else:
self._enqueueContainer(simbase.__dict__,
'simbase')
self._queue.push(__builtins__)
self._id2pathStr[id(__builtins__)] = ''
while len(self._queue) > 0:
# yield up here instead of at the end, since we skip back to the
# top of the while loop from various points
yield None
parentObj = self._queue.pop()
#print '%s: %s, %s' % (id(parentObj), type(parentObj), self._id2pathStr[id(parentObj)])
isInstanceDict = False
if id(parentObj) in self._instanceDictIds:
isInstanceDict = True
try:
if parentObj.__class__.__name__ == 'method-wrapper':
continue
except Exception:
pass
if isinstance(parentObj, (str, bytes)):
continue
if isinstance(parentObj, dict):
key = None
attr = None
keys = list(parentObj.keys())
try:
keys.sort()
except TypeError as e:
self.notify.warning('non-sortable dict keys: %s: %s' % (self._id2pathStr[id(parentObj)], repr(e)))
for key in keys:
try:
attr = parentObj[key]
except KeyError as e:
self.notify.warning('could not index into %s with key %s' % (self._id2pathStr[id(parentObj)],
key))
if id(attr) not in self._visitedIds:
self._visitedIds.add(id(attr))
if self._examine(attr):
assert self._queue.back() is attr
if parentObj is __builtins__:
self._id2pathStr[id(attr)] = key
else:
if isInstanceDict:
self._id2pathStr[id(attr)] = self._id2pathStr[id(parentObj)] + '.%s' % key
else:
self._id2pathStr[id(attr)] = self._id2pathStr[id(parentObj)] + '[%s]' % safeRepr(key)
del key
del attr
continue
# types.CellType was added in Python 3.8
if type(parentObj) is types.CellType:
child = parentObj.cell_contents
if self._examine(child):
assert (self._queue.back() is child)
self._instanceDictIds.add(id(child))
self._id2pathStr[id(child)] = str(self._id2pathStr[id(parentObj)]) + '.cell_contents'
continue
if hasattr(parentObj, '__dict__'):
# Instance of a class
child = parentObj.__dict__
if self._examine(child):
assert (self._queue.back() is child)
self._instanceDictIds.add(id(child))
self._id2pathStr[id(child)] = str(self._id2pathStr[id(parentObj)])
continue
if not isinstance(parentObj, io.TextIOWrapper):
try:
itr = iter(parentObj)
except Exception:
pass
else:
try:
index = 0
while 1:
try:
attr = next(itr)
except Exception:
# some custom classes don't do well when iterated
attr = None
break
if id(attr) not in self._visitedIds:
self._visitedIds.add(id(attr))
if self._examine(attr):
assert self._queue.back() is attr
self._id2pathStr[id(attr)] = self._id2pathStr[id(parentObj)] + '[%s]' % index
index += 1
del attr
except StopIteration as e:
pass
del itr
continue
try:
childNames = dir(parentObj)
except Exception:
pass
else:
childName = None
child = None
for childName in childNames:
try:
child = getattr(parentObj, childName)
except Exception:
continue
if id(child) not in self._visitedIds:
self._visitedIds.add(id(child))
if self._examine(child):
assert self._queue.back() is child
self._id2pathStr[id(child)] = self._id2pathStr[id(parentObj)] + '.%s' % childName
del childName
del child
continue
if self._log:
self.printingBegin()
for i in self._output(limit=self._limit):
yield None
self.printingEnd()
yield Job.Done
def _enqueueContainer(self, obj, pathStr=None):
# call this to add a container that should be examined before any (other) direct
# children of __builtins__
# this is mostly to fix up the names of variables
self._queue.push(obj)
objId = id(obj)
if pathStr is not None:
self._id2pathStr[objId] = pathStr
# if it's a container, put it in the tables
try:
length = len(obj)
except Exception:
length = None
if length is not None and length > 0:
self._id2container[objId] = obj
self._type2id2len.setdefault(type(obj), {})
self._type2id2len[type(obj)][objId] = length
def _examine(self, obj):
# return False if it's an object that can't contain or lead to other objects
if type(obj) in deadEndTypes:
return False
# if it's an internal object, ignore it
if id(obj) in ContainerReport.PrivateIds:
return False
# this object might lead to more objects. put it on the queue
self._enqueueContainer(obj)
return True
def _outputType(self, type, limit=None):
if type not in self._type2id2len:
return
len2ids = invertDictLossless(self._type2id2len[type])
print('=====')
print('===== %s' % type)
count = 0
stop = False
for l in sorted(len2ids, reverse=True):
#len2ids[l].sort()
pathStrList = list()
for id in len2ids[l]:
obj = self._id2container[id]
#print '%s: %s' % (l, self._id2pathStr[id])
pathStrList.append(self._id2pathStr[id])
count += 1
if (count & 0x7f) == 0:
yield None
pathStrList.sort()
for pathstr in pathStrList:
print('%s: %s' % (l, pathstr))
if limit is not None and count >= limit:
return
def _output(self, **kArgs):
print("===== ContainerReport: \'%s\' =====" % (self._name,))
initialTypes = (dict, list, tuple)
for type in initialTypes:
for i in self._outputType(type, **kArgs):
yield None
otherTypes = set(self._type2id2len).difference(initialTypes)
for type in sorted(otherTypes, key=lambda obj: obj.__name__):
for i in self._outputType(type, **kArgs):
yield None
[docs] def log(self, **kArgs):
self._output(**kArgs)