from direct.directnotify.DirectNotifyGlobal import directNotify
import direct.showbase.DConfig as config
from direct.showbase.PythonUtil import makeFlywheelGen
from direct.showbase.PythonUtil import itype, serialNum, safeRepr, fastRepr
from direct.showbase.Job import Job
from direct.showbase.JobManagerGlobal import jobMgr
from direct.showbase.MessengerGlobal import messenger
from direct.task.TaskManagerGlobal import taskMgr
import types
import weakref
import random
import builtins
deadEndTypes = (bool, types.BuiltinFunctionType,
types.BuiltinMethodType, complex,
float, int,
type(None), type(NotImplemented),
type, types.CodeType, types.FunctionType,
bytes, str, tuple)
def _createContainerLeak():
def leakContainer(task=None):
base = getBase()
if not hasattr(base, 'leakContainer'):
base.leakContainer = {}
# use tuples as keys since they can't be weakref'd, and use an instance
# since it can't be repr/eval'd
# that will force the leak detector to hold a normal 'non-weak' reference
class LeakKey:
pass
base.leakContainer[(LeakKey(),)] = {}
# test the non-weakref object reference handling
if random.random() < .01:
key = random.choice(list(base.leakContainer.keys()))
ContainerLeakDetector.notify.debug(
'removing reference to leakContainer key %s so it will be garbage-collected' % safeRepr(key))
del base.leakContainer[key]
taskMgr.doMethodLater(10, leakContainer, 'leakContainer-%s' % serialNum())
if task:
return task.done
leakContainer()
def _createTaskLeak():
leakTaskName = uniqueName('leakedTask')
leakDoLaterName = uniqueName('leakedDoLater')
def nullTask(task=None):
return task.cont
def nullDoLater(task=None):
return task.done
def leakTask(task=None, leakTaskName=leakTaskName):
base = getBase()
taskMgr.add(nullTask, uniqueName(leakTaskName))
taskMgr.doMethodLater(1 << 31, nullDoLater, uniqueName(leakDoLaterName))
taskMgr.doMethodLater(10, leakTask, 'doLeakTask-%s' % serialNum())
if task:
return task.done
leakTask()
[docs]class Indirection:
"""
Represents the indirection that brings you from a container to an element of the container.
Stored as a string to be used as part of an eval, or as a key to be looked up in a dict.
Each dictionary dereference is individually eval'd since the dict key might have been
garbage-collected
TODO: store string components that are duplicates of strings in the actual system so that
Python will keep one copy and reduce memory usage
"""
[docs] def __init__(self, evalStr=None, dictKey=NoDictKey):
# if this is a dictionary lookup, pass dictKey instead of evalStr
self.evalStr = evalStr
self.dictKey = NoDictKey
# is the dictKey a weak reference?
self._isWeakRef = False
self._refCount = 0
if dictKey is not NoDictKey:
# if we can repr/eval the key, store it as an evalStr
keyRepr = safeRepr(dictKey)
useEval = False
try:
keyEval = eval(keyRepr)
useEval = True
except:
pass
if useEval:
# check to make sure the eval succeeded
if hash(keyEval) != hash(dictKey):
useEval = False
if useEval:
# eval/repr succeeded, store as an evalStr
self.evalStr = '[%s]' % keyRepr
else:
try:
# store a weakref to the key
self.dictKey = weakref.ref(dictKey)
self._isWeakRef = True
except TypeError as e:
ContainerLeakDetector.notify.debug('could not weakref dict key %s' % keyRepr)
self.dictKey = dictKey
self._isWeakRef = False
[docs] def destroy(self):
# re-entrant
self.dictKey = NoDictKey
[docs] def acquire(self):
self._refCount += 1
[docs] def release(self):
self._refCount -= 1
if self._refCount == 0:
self.destroy()
[docs] def isDictKey(self):
# is this an indirection through a dictionary?
return self.dictKey is not NoDictKey
def _getNonWeakDictKey(self):
if not self._isWeakRef:
return self.dictKey
else:
key = self.dictKey()
if key is None:
return '<garbage-collected dict key>'
return key
[docs] def dereferenceDictKey(self, parentDict):
# look ourselves up in parentDict
key = self._getNonWeakDictKey()
# objects in builtins will have parentDict==None
if parentDict is None:
return key
return parentDict[key]
[docs] def getString(self, prevIndirection=None, nextIndirection=None):
# return our contribution to the full name of an object
instanceDictStr = '.__dict__'
if self.evalStr is not None:
# if we're an instance dict, skip over this one (obj.__dict__[keyName] == obj.keyName)
if nextIndirection is not None and self.evalStr[-len(instanceDictStr):] == instanceDictStr:
return self.evalStr[:-len(instanceDictStr)]
# if the previous indirection was an instance dict, change our syntax from ['key'] to .key
if prevIndirection is not None and prevIndirection.evalStr is not None:
if prevIndirection.evalStr[-len(instanceDictStr):] == instanceDictStr:
return '.%s' % self.evalStr[2:-2]
return self.evalStr
# we're stored as a dict key
keyRepr = safeRepr(self._getNonWeakDictKey())
# if the previous indirection was an instance dict, change our syntax from ['key'] to .key
if prevIndirection is not None and prevIndirection.evalStr is not None:
if prevIndirection.evalStr[-len(instanceDictStr):] == instanceDictStr:
return '.%s' % keyRepr
return '[%s]' % keyRepr
def __repr__(self):
return self.getString()
[docs]class ObjectRef:
"""
stores a reference to a container in a way that does not prevent garbage
collection of the container if possible
stored as a series of 'indirections' (obj.foo -> '.foo', dict[key] -> '[key]', etc.)
"""
notify = directNotify.newCategory("ObjectRef")
[docs] class FailedEval(Exception):
pass
[docs] def __init__(self, indirection, objId, other=None):
self._indirections = []
# are we building off of an existing ref?
if other is not None:
for ind in other._indirections:
self._indirections.append(ind)
# make sure we're not storing a reference to the actual object,
# that could cause a memory leak
assert type(objId) is int
# prevent cycles (i.e. base.loader.base.loader)
assert not self.goesThrough(objId=objId)
self._indirections.append(indirection)
# make sure our indirections don't get destroyed while we're using them
for ind in self._indirections:
ind.acquire()
self.notify.debug(repr(self))
[docs] def destroy(self):
for indirection in self._indirections:
indirection.release()
del self._indirections
[docs] def getNumIndirections(self):
return len(self._indirections)
[docs] def goesThroughGen(self, obj=None, objId=None):
if obj is None:
assert type(objId) is int
else:
objId = id(obj)
o = None
evalStr = ''
curObj = None
# make sure the indirections don't go away on us
indirections = self._indirections
for indirection in indirections:
yield None
indirection.acquire()
for indirection in indirections:
yield None
if not indirection.isDictKey():
# build up a string to be eval'd
evalStr += indirection.getString()
else:
curObj = self._getContainerByEval(evalStr, curObj=curObj)
if curObj is None:
raise FailedEval(evalStr)
# try to look up this key in the curObj dictionary
curObj = indirection.dereferenceDictKey(curObj)
evalStr = ''
yield None
o = self._getContainerByEval(evalStr, curObj=curObj)
if id(o) == objId:
break
for indirection in indirections:
yield None
indirection.release()
yield id(o) == objId
[docs] def goesThrough(self, obj=None, objId=None):
# since we cache the ids involved in this reference,
# this isn't perfect, for example if base.myObject is reassigned
# to a different object after this Ref was created this would return
# false, allowing a ref to base.myObject.otherObject.myObject
for goesThrough in self.goesThroughGen(obj=obj, objId=objId):
pass
return goesThrough
def _getContainerByEval(self, evalStr, curObj=None):
if curObj is not None:
# eval('curObj.foo.bar.someDict')
evalStr = 'curObj%s' % evalStr
else:
# this eval is not based off of curObj, use the globalbuiltins namespace
# put builtins at the start if it's not already there
bis = 'builtins'
if evalStr[:len(bis)] != bis:
evalStr = '%s.%s' % (bis, evalStr)
try:
container = eval(evalStr)
except NameError as ne:
return None
except AttributeError as ae:
return None
except KeyError as ke:
return None
return container
[docs] def getContainerGen(self, getInstance=False):
# try to get a handle on the container by eval'ing and looking things
# up in dictionaries, depending on the type of each indirection
# if getInstance is True, will return instance instead of instance dict
#import pdb;pdb.set_trace()
evalStr = ''
curObj = None
# make sure the indirections don't go away on us
indirections = self._indirections
for indirection in indirections:
indirection.acquire()
for indirection in indirections:
yield None
if not indirection.isDictKey():
# build up a string to be eval'd
evalStr += indirection.getString()
else:
curObj = self._getContainerByEval(evalStr, curObj=curObj)
if curObj is None:
raise FailedEval(evalStr)
# try to look up this key in the curObj dictionary
curObj = indirection.dereferenceDictKey(curObj)
evalStr = ''
for indirection in indirections:
yield None
indirection.release()
if getInstance:
lenDict = len('.__dict__')
if evalStr[-lenDict:] == '.__dict__':
evalStr = evalStr[:-lenDict]
# TODO: check that this is still the object we originally pointed to
yield self._getContainerByEval(evalStr, curObj=curObj)
[docs] def getEvalStrGen(self, getInstance=False):
str = ''
prevIndirection = None
curIndirection = None
nextIndirection = None
# make sure the indirections don't go away on us
indirections = self._indirections
for indirection in indirections:
indirection.acquire()
for i in range(len(indirections)):
yield None
if i > 0:
prevIndirection = indirections[i-1]
else:
prevIndirection = None
curIndirection = indirections[i]
if i < len(indirections)-1:
nextIndirection = indirections[i+1]
else:
nextIndirection = None
str += curIndirection.getString(prevIndirection=prevIndirection,
nextIndirection=nextIndirection)
if getInstance:
lenDict = len('.__dict__')
if str[-lenDict:] == '.__dict__':
str = str[:-lenDict]
for indirection in indirections:
yield None
indirection.release()
yield str
[docs] def getFinalIndirectionStr(self):
prevIndirection = None
if len(self._indirections) > 1:
prevIndirection = self._indirections[-2]
return self._indirections[-1].getString(prevIndirection=prevIndirection)
def __repr__(self):
for result in self.getEvalStrGen():
pass
return result
[docs]class FindContainers(Job):
"""
Explore the Python graph, looking for objects that support __len__()
"""
[docs] def __init__(self, name, leakDetector):
Job.__init__(self, name)
self._leakDetector = leakDetector
self._id2ref = self._leakDetector._id2ref
# these hold objects that we should start traversals from often and not-as-often,
# respectively
self._id2baseStartRef = {}
self._id2discoveredStartRef = {}
# these are working copies so that our iterations aren't disturbed by changes to the
# definitive ref sets
self._baseStartRefWorkingList = ScratchPad(refGen=nullGen(),
source=self._id2baseStartRef)
self._discoveredStartRefWorkingList = ScratchPad(refGen=nullGen(),
source=self._id2discoveredStartRef)
self.notify = self._leakDetector.notify
ContainerLeakDetector.addPrivateObj(self.__dict__)
# set up the base containers, the ones that hold most objects
ref = ObjectRef(Indirection(evalStr='builtins.__dict__'), id(builtins.__dict__))
self._id2baseStartRef[id(builtins.__dict__)] = ref
# container for objects that want to make sure they are found by
# the object exploration algorithm, including objects that exist
# just to measure things such as C++ memory usage, scene graph size,
# framerate, etc. See LeakDetectors.py
if not hasattr(builtins, "leakDetectors"):
builtins.leakDetectors = {}
ref = ObjectRef(Indirection(evalStr='leakDetectors'), id(leakDetectors))
self._id2baseStartRef[id(leakDetectors)] = ref
for i in self._addContainerGen(builtins.__dict__, ref):
pass
try:
base
except:
pass
else:
ref = ObjectRef(Indirection(evalStr='base.__dict__'), id(base.__dict__))
self._id2baseStartRef[id(base.__dict__)] = ref
for i in self._addContainerGen(base.__dict__, ref):
pass
try:
simbase
except:
pass
else:
ref = ObjectRef(Indirection(evalStr='simbase.__dict__'), id(simbase.__dict__))
self._id2baseStartRef[id(simbase.__dict__)] = ref
for i in self._addContainerGen(simbase.__dict__, ref):
pass
[docs] def destroy(self):
ContainerLeakDetector.removePrivateObj(self.__dict__)
Job.destroy(self)
[docs] def getPriority(self):
return Job.Priorities.Low
[docs] @staticmethod
def getStartObjAffinity(startObj):
# how good of a starting object is this object for traversing the object graph?
try:
return len(startObj)
except:
return 1
def _isDeadEnd(self, obj, objName=None):
if type(obj) in deadEndTypes:
return True
# if it's an internal object, ignore it
if id(obj) in ContainerLeakDetector.PrivateIds:
return True
# prevent crashes in objects that define __cmp__ and don't handle strings
if type(objName) == str and objName in ('im_self', 'im_class'):
return True
try:
className = obj.__class__.__name__
except:
pass
else:
# prevent infinite recursion in built-in containers related to methods
if className == 'method-wrapper':
return True
return False
def _hasLength(self, obj):
return hasattr(obj, '__len__')
def _addContainerGen(self, cont, objRef):
contId = id(cont)
# if this container is new, or the objRef repr is shorter than what we already have,
# put it in the table
if contId in self._id2ref:
for existingRepr in self._id2ref[contId].getEvalStrGen():
yield None
for newRepr in objRef.getEvalStrGen():
yield None
if contId not in self._id2ref or len(newRepr) < len(existingRepr):
if contId in self._id2ref:
self._leakDetector.removeContainerById(contId)
self._id2ref[contId] = objRef
def _addDiscoveredStartRef(self, obj, ref):
# we've discovered an object that can be used to start an object graph traversal
objId = id(obj)
if objId in self._id2discoveredStartRef:
existingRef = self._id2discoveredStartRef[objId]
if type(existingRef) is not int:
if existingRef.getNumIndirections() >= ref.getNumIndirections():
# the ref that we already have is more concise than the new ref
return
if objId in self._id2ref:
if self._id2ref[objId].getNumIndirections() >= ref.getNumIndirections():
# the ref that we already have is more concise than the new ref
return
storedItem = ref
# if we already are storing a reference to this object, don't store a second reference
if objId in self._id2ref:
storedItem = objId
self._id2discoveredStartRef[objId] = storedItem
[docs] def run(self):
try:
# this yields a different set of start refs every time we start a new traversal
# force creation of a new workingListSelector inside the while loop right off the bat
workingListSelector = nullGen()
# this holds the current step of the current traversal
curObjRef = None
while True:
# yield up here instead of at the end, since we skip back to the
# top of the while loop from various points
yield None
#import pdb;pdb.set_trace()
if curObjRef is None:
# choose an object to start a traversal from
try:
startRefWorkingList = next(workingListSelector)
except StopIteration:
# do relative # of traversals on each set based on how many refs it contains
baseLen = len(self._baseStartRefWorkingList.source)
discLen = len(self._discoveredStartRefWorkingList.source)
minLen = float(max(1, min(baseLen, discLen)))
# this will cut down the traversals of the larger set by 2/3
minLen *= 3.
workingListSelector = flywheel([self._baseStartRefWorkingList, self._discoveredStartRefWorkingList],
[baseLen/minLen, discLen/minLen])
yield None
continue
# grab the next start ref from this sequence and see if it's still valid
while True:
yield None
try:
curObjRef = next(startRefWorkingList.refGen)
break
except StopIteration:
# we've run out of refs, grab a new set
if len(startRefWorkingList.source) == 0:
# ref set is empty, choose another
break
# make a generator that yields containers a # of times that is
# proportional to their length
for fw in makeFlywheelGen(
list(startRefWorkingList.source.values()),
countFunc=lambda x: self.getStartObjAffinity(x),
scale=.05):
yield None
startRefWorkingList.refGen = fw
if curObjRef is None:
# this ref set is empty, choose another
# the base set should never be empty (builtins etc.)
continue
# do we need to go look up the object in _id2ref? sometimes we do that
# to avoid storing multiple redundant refs to a single item
if type(curObjRef) is int:
startId = curObjRef
curObjRef = None
try:
for containerRef in self._leakDetector.getContainerByIdGen(startId):
yield None
except:
# ref is invalid
self.notify.debug('invalid startRef, stored as id %s' % startId)
self._leakDetector.removeContainerById(startId)
continue
curObjRef = containerRef
try:
for curObj in curObjRef.getContainerGen():
yield None
except:
self.notify.debug('lost current container, ref.getContainerGen() failed')
# that container is gone, try again
curObjRef = None
continue
self.notify.debug('--> %s' % curObjRef)
#import pdb;pdb.set_trace()
# store a copy of the current objRef
parentObjRef = curObjRef
# if we hit a dead end, start over from another container
curObjRef = None
if hasattr(curObj, '__dict__'):
child = curObj.__dict__
hasLength = self._hasLength(child)
notDeadEnd = not self._isDeadEnd(child)
if hasLength or notDeadEnd:
# prevent cycles in the references (i.e. base.loader.base)
for goesThrough in parentObjRef.goesThroughGen(child):
# don't yield, container might lose this element
pass
if not goesThrough:
objRef = ObjectRef(Indirection(evalStr='.__dict__'),
id(child), parentObjRef)
yield None
if hasLength:
for i in self._addContainerGen(child, objRef):
yield None
if notDeadEnd:
self._addDiscoveredStartRef(child, objRef)
curObjRef = objRef
continue
if type(curObj) is dict:
key = None
attr = None
keys = list(curObj.keys())
# we will continue traversing the object graph via one key of the dict,
# choose it at random without taking a big chunk of CPU time
numKeysLeft = len(keys) + 1
for key in keys:
yield None
numKeysLeft -= 1
try:
attr = curObj[key]
except KeyError as e:
# this is OK because we are yielding during the iteration
self.notify.debug('could not index into %s with key %s' % (
parentObjRef, safeRepr(key)))
continue
hasLength = self._hasLength(attr)
notDeadEnd = False
# if we haven't picked the next ref, check if this one is a candidate
if curObjRef is None:
notDeadEnd = not self._isDeadEnd(attr, key)
if hasLength or notDeadEnd:
# prevent cycles in the references (i.e. base.loader.base)
for goesThrough in parentObjRef.goesThroughGen(curObj[key]):
# don't yield, container might lose this element
pass
if not goesThrough:
if curObj is builtins.__dict__:
objRef = ObjectRef(Indirection(evalStr='%s' % key),
id(curObj[key]))
else:
objRef = ObjectRef(Indirection(dictKey=key),
id(curObj[key]), parentObjRef)
yield None
if hasLength:
for i in self._addContainerGen(attr, objRef):
yield None
if notDeadEnd:
self._addDiscoveredStartRef(attr, objRef)
if curObjRef is None and random.randrange(numKeysLeft) == 0:
curObjRef = objRef
del key
del attr
continue
try:
childNames = dir(curObj)
except:
pass
else:
try:
index = -1
attrs = []
while 1:
yield None
try:
attr = next(itr)
except:
# some custom classes don't do well when iterated
attr = None
break
attrs.append(attr)
# we will continue traversing the object graph via one attr,
# choose it at random without taking a big chunk of CPU time
numAttrsLeft = len(attrs) + 1
for attr in attrs:
yield None
index += 1
numAttrsLeft -= 1
hasLength = self._hasLength(attr)
notDeadEnd = False
if curObjRef is None:
notDeadEnd = not self._isDeadEnd(attr)
if hasLength or notDeadEnd:
# prevent cycles in the references (i.e. base.loader.base)
for goesThrough in parentObjRef.goesThrough(curObj[index]):
# don't yield, container might lose this element
pass
if not goesThrough:
objRef = ObjectRef(Indirection(evalStr='[%s]' % index),
id(curObj[index]), parentObjRef)
yield None
if hasLength:
for i in self._addContainerGen(attr, objRef):
yield None
if notDeadEnd:
self._addDiscoveredStartRef(attr, objRef)
if curObjRef is None and random.randrange(numAttrsLeft) == 0:
curObjRef = objRef
del attr
except StopIteration as e:
pass
del itr
continue
except Exception as e:
print('FindContainers job caught exception: %s' % e)
if __dev__:
raise
yield Job.Done
[docs]class CheckContainers(Job):
"""
Job to check container sizes and find potential leaks; sub-job of ContainerLeakDetector
"""
ReprItems = 5
[docs] def __init__(self, name, leakDetector, index):
Job.__init__(self, name)
self._leakDetector = leakDetector
self.notify = self._leakDetector.notify
self._index = index
ContainerLeakDetector.addPrivateObj(self.__dict__)
[docs] def destroy(self):
ContainerLeakDetector.removePrivateObj(self.__dict__)
Job.destroy(self)
[docs] def getPriority(self):
return Job.Priorities.Normal
[docs] def run(self):
try:
self._leakDetector._index2containerId2len[self._index] = {}
ids = self._leakDetector.getContainerIds()
# record the current len of each container
for objId in ids:
yield None
try:
for result in self._leakDetector.getContainerByIdGen(objId):
yield None
container = result
except Exception as e:
# this container no longer exists
if self.notify.getDebug():
for contName in self._leakDetector.getContainerNameByIdGen(objId):
yield None
self.notify.debug(
'%s no longer exists; caught exception in getContainerById (%s)' % (
contName, e))
self._leakDetector.removeContainerById(objId)
continue
if container is None:
# this container no longer exists
if self.notify.getDebug():
for contName in self._leakDetector.getContainerNameByIdGen(objId):
yield None
self.notify.debug('%s no longer exists; getContainerById returned None' %
contName)
self._leakDetector.removeContainerById(objId)
continue
try:
cLen = len(container)
except Exception as e:
# this container no longer exists
if self.notify.getDebug():
for contName in self._leakDetector.getContainerNameByIdGen(objId):
yield None
self.notify.debug(
'%s is no longer a container, it is now %s (%s)' %
(contName, safeRepr(container), e))
self._leakDetector.removeContainerById(objId)
continue
self._leakDetector._index2containerId2len[self._index][objId] = cLen
# compare the current len of each container to past lens
if self._index > 0:
idx2id2len = self._leakDetector._index2containerId2len
for objId in idx2id2len[self._index]:
yield None
if objId in idx2id2len[self._index-1]:
diff = idx2id2len[self._index][objId] - idx2id2len[self._index-1][objId]
"""
# this check is too spammy
if diff > 20:
if diff > idx2id2len[self._index-1][objId]:
minutes = (self._leakDetector._index2delay[self._index] -
self._leakDetector._index2delay[self._index-1]) / 60.
name = self._leakDetector.getContainerNameById(objId)
if idx2id2len[self._index-1][objId] != 0:
percent = 100. * (float(diff) / float(idx2id2len[self._index-1][objId]))
try:
for container in self._leakDetector.getContainerByIdGen(objId):
yield None
except:
# TODO
self.notify.debug('caught exception in getContainerByIdGen (1)')
else:
self.notify.warning(
'%s (%s) grew %.2f%% in %.2f minutes (%s items at last measurement, current contents: %s)' % (
name, itype(container), percent, minutes, idx2id2len[self._index][objId],
fastRepr(container, maxLen=CheckContainers.ReprItems)))
yield None
"""
if (self._index > 2 and
objId in idx2id2len[self._index-2] and
objId in idx2id2len[self._index-3]):
diff2 = idx2id2len[self._index-1][objId] - idx2id2len[self._index-2][objId]
diff3 = idx2id2len[self._index-2][objId] - idx2id2len[self._index-3][objId]
if self._index <= 4:
if diff > 0 and diff2 > 0 and diff3 > 0:
name = self._leakDetector.getContainerNameById(objId)
try:
for container in self._leakDetector.getContainerByIdGen(objId):
yield None
except:
# TODO
self.notify.debug('caught exception in getContainerByIdGen (2)')
else:
msg = ('%s (%s) consistently increased in size over the last '
'3 periods (%s items at last measurement, current contents: %s)' %
(name, itype(container), idx2id2len[self._index][objId],
fastRepr(container, maxLen=CheckContainers.ReprItems)))
self.notify.warning(msg)
yield None
elif (objId in idx2id2len[self._index-4] and
objId in idx2id2len[self._index-5]):
# if size has consistently increased over the last 5 checks,
# send out a warning
diff4 = idx2id2len[self._index-3][objId] - idx2id2len[self._index-4][objId]
diff5 = idx2id2len[self._index-4][objId] - idx2id2len[self._index-5][objId]
if diff > 0 and diff2 > 0 and diff3 > 0 and diff4 > 0 and diff5 > 0:
name = self._leakDetector.getContainerNameById(objId)
try:
for container in self._leakDetector.getContainerByIdGen(objId):
yield None
except:
# TODO
self.notify.debug('caught exception in getContainerByIdGen (3)')
else:
msg = ('leak detected: %s (%s) consistently increased in size over the last '
'5 periods (%s items at last measurement, current contents: %s)' %
(name, itype(container), idx2id2len[self._index][objId],
fastRepr(container, maxLen=CheckContainers.ReprItems)))
self.notify.warning(msg)
yield None
messenger.send(self._leakDetector.getLeakEvent(), [container, name])
if config.GetBool('pdb-on-leak-detect', 0):
import pdb;pdb.set_trace()
pass
except Exception as e:
print('CheckContainers job caught exception: %s' % e)
if __dev__:
raise
yield Job.Done
[docs]class FPTObjsOfType(Job):
[docs] def __init__(self, name, leakDetector, otn, doneCallback=None):
Job.__init__(self, name)
self._leakDetector = leakDetector
self.notify = self._leakDetector.notify
self._otn = otn
self._doneCallback = doneCallback
self._ldde = self._leakDetector._getDestroyEvent()
self.accept(self._ldde, self._handleLDDestroy)
ContainerLeakDetector.addPrivateObj(self.__dict__)
[docs] def destroy(self):
self.ignore(self._ldde)
self._leakDetector = None
self._doneCallback = None
ContainerLeakDetector.removePrivateObj(self.__dict__)
Job.destroy(self)
def _handleLDDestroy(self):
self.destroy()
[docs] def getPriority(self):
return Job.Priorities.High
[docs] def run(self):
ids = self._leakDetector.getContainerIds()
try:
for id in ids:
getInstance = (self._otn.lower() not in 'dict')
yield None
try:
for container in self._leakDetector.getContainerByIdGen(
id, getInstance=getInstance):
yield None
except:
pass
else:
if hasattr(container, '__class__'):
cName = container.__class__.__name__
else:
cName = container.__name__
if self._otn.lower() in cName.lower():
try:
for ptc in self._leakDetector.getContainerNameByIdGen(
id, getInstance=getInstance):
yield None
except:
pass
else:
print('GPTC(' + self._otn + '):' + self.getJobName() + ': ' + ptc)
except Exception as e:
print('FPTObjsOfType job caught exception: %s' % e)
if __dev__:
raise
yield Job.Done
[docs] def finished(self):
if self._doneCallback:
self._doneCallback(self)
[docs]class FPTObjsNamed(Job):
[docs] def __init__(self, name, leakDetector, on, doneCallback=None):
Job.__init__(self, name)
self._leakDetector = leakDetector
self.notify = self._leakDetector.notify
self._on = on
self._doneCallback = doneCallback
self._ldde = self._leakDetector._getDestroyEvent()
self.accept(self._ldde, self._handleLDDestroy)
ContainerLeakDetector.addPrivateObj(self.__dict__)
[docs] def destroy(self):
self.ignore(self._ldde)
self._leakDetector = None
self._doneCallback = None
ContainerLeakDetector.removePrivateObj(self.__dict__)
Job.destroy(self)
def _handleLDDestroy(self):
self.destroy()
[docs] def getPriority(self):
return Job.Priorities.High
[docs] def run(self):
ids = self._leakDetector.getContainerIds()
try:
for id in ids:
yield None
try:
for container in self._leakDetector.getContainerByIdGen(id):
yield None
except:
pass
else:
name = self._leakDetector._id2ref[id].getFinalIndirectionStr()
if self._on.lower() in name.lower():
try:
for ptc in self._leakDetector.getContainerNameByIdGen(id):
yield None
except:
pass
else:
print('GPTCN(' + self._on + '):' + self.getJobName() + ': ' + ptc)
except Exception as e:
print('FPTObjsNamed job caught exception: %s' % e)
if __dev__:
raise
yield Job.Done
[docs] def finished(self):
if self._doneCallback:
self._doneCallback(self)
[docs]class PruneObjectRefs(Job):
"""
Job to destroy any container refs that are no longer valid.
Checks validity by asking for each container
"""
[docs] def __init__(self, name, leakDetector):
Job.__init__(self, name)
self._leakDetector = leakDetector
self.notify = self._leakDetector.notify
ContainerLeakDetector.addPrivateObj(self.__dict__)
[docs] def destroy(self):
ContainerLeakDetector.removePrivateObj(self.__dict__)
Job.destroy(self)
[docs] def getPriority(self):
return Job.Priorities.Normal
[docs] def run(self):
try:
ids = self._leakDetector.getContainerIds()
for id in ids:
yield None
try:
for container in self._leakDetector.getContainerByIdGen(id):
yield None
except:
# reference is invalid, remove it
self._leakDetector.removeContainerById(id)
_id2baseStartRef = self._leakDetector._findContainersJob._id2baseStartRef
ids = list(_id2baseStartRef.keys())
for id in ids:
yield None
try:
for container in _id2baseStartRef[id].getContainerGen():
yield None
except:
# reference is invalid, remove it
del _id2baseStartRef[id]
_id2discoveredStartRef = self._leakDetector._findContainersJob._id2discoveredStartRef
ids = list(_id2discoveredStartRef.keys())
for id in ids:
yield None
try:
for container in _id2discoveredStartRef[id].getContainerGen():
yield None
except:
# reference is invalid, remove it
del _id2discoveredStartRef[id]
except Exception as e:
print('PruneObjectRefs job caught exception: %s' % e)
if __dev__:
raise
yield Job.Done
[docs]class ContainerLeakDetector(Job):
"""
Low-priority Python object-graph walker that looks for leaking containers.
To reduce memory usage, this does a random walk of the Python objects to
discover containers rather than keep a set of all visited objects; it may
visit the same object many times but eventually it will discover every object.
Checks container sizes at ever-increasing intervals.
"""
notify = directNotify.newCategory("ContainerLeakDetector")
# set of containers that should not be examined
PrivateIds = set()
[docs] def __init__(self, name, firstCheckDelay = None):
Job.__init__(self, name)
self._serialNum = serialNum()
self._findContainersJob = None
self._checkContainersJob = None
self._pruneContainersJob = None
if firstCheckDelay is None:
firstCheckDelay = 60. * 15.
# divide by two, since the first check just takes length measurements and
# doesn't check for leaks
self._nextCheckDelay = firstCheckDelay/2.
self._checkDelayScale = config.GetFloat('leak-detector-check-delay-scale', 1.5)
self._pruneTaskPeriod = config.GetFloat('leak-detector-prune-period', 60. * 30.)
# main dict of id(container)->containerRef
self._id2ref = {}
# storage for results of check-container job
self._index2containerId2len = {}
self._index2delay = {}
if config.GetBool('leak-container', 0):
_createContainerLeak()
if config.GetBool('leak-tasks', 0):
_createTaskLeak()
# don't check our own tables for leaks
ContainerLeakDetector.addPrivateObj(ContainerLeakDetector.PrivateIds)
ContainerLeakDetector.addPrivateObj(self.__dict__)
self.setPriority(Job.Priorities.Min)
jobMgr.add(self)
[docs] def destroy(self):
messenger.send(self._getDestroyEvent())
self.ignoreAll()
if self._pruneContainersJob is not None:
jobMgr.remove(self._pruneContainersJob)
self._pruneContainersJob = None
if self._checkContainersJob is not None:
jobMgr.remove(self._checkContainersJob)
self._checkContainersJob = None
jobMgr.remove(self._findContainersJob)
self._findContainersJob = None
del self._id2ref
del self._index2containerId2len
del self._index2delay
def _getDestroyEvent(self):
# sent when leak detector is about to be destroyed
return 'cldDestroy-%s' % self._serialNum
[docs] def getLeakEvent(self):
# sent when a leak is detected
# passes description string as argument
return 'containerLeakDetected-%s' % self._serialNum
[docs] @classmethod
def addPrivateObj(cls, obj):
cls.PrivateIds.add(id(obj))
[docs] @classmethod
def removePrivateObj(cls, obj):
cls.PrivateIds.remove(id(obj))
def _getCheckTaskName(self):
return 'checkForLeakingContainers-%s' % self._serialNum
def _getPruneTaskName(self):
return 'pruneLeakingContainerRefs-%s' % self._serialNum
[docs] def getContainerIds(self):
return list(self._id2ref.keys())
[docs] def getContainerByIdGen(self, id, **kwArgs):
# return a generator to look up a container
return self._id2ref[id].getContainerGen(**kwArgs)
[docs] def getContainerById(self, id):
for result in self._id2ref[id].getContainerGen():
pass
return result
[docs] def getContainerNameByIdGen(self, id, **kwArgs):
return self._id2ref[id].getEvalStrGen(**kwArgs)
[docs] def getContainerNameById(self, id):
if id in self._id2ref:
return repr(self._id2ref[id])
return '<unknown container>'
[docs] def removeContainerById(self, id):
if id in self._id2ref:
self._id2ref[id].destroy()
del self._id2ref[id]
[docs] def run(self):
# start looking for containers
self._findContainersJob = FindContainers(
'%s-findContainers' % self.getJobName(), self)
jobMgr.add(self._findContainersJob)
self._scheduleNextLeakCheck()
self._scheduleNextPruning()
while True:
yield Job.Sleep
[docs] def getPathsToContainers(self, name, ot, doneCallback=None):
j = FPTObjsOfType(name, self, ot, doneCallback)
jobMgr.add(j)
return j
[docs] def getPathsToContainersNamed(self, name, on, doneCallback=None):
j = FPTObjsNamed(name, self, on, doneCallback)
jobMgr.add(j)
return j
def _scheduleNextLeakCheck(self):
taskMgr.doMethodLater(self._nextCheckDelay, self._checkForLeaks,
self._getCheckTaskName())
# delay between checks
# fib: 1 1 2 3 5 8 13 21 34 55 89
# * 2.: 1 2 4 8 16 32 64 128 256 512 1024
# * 1.5: 1 1.5 2.3 3.4 5.1 7.6 11.4 17.1 25.6 38.4 57.7
#
# delay from job start
# fib: 1 2 4 7 12 20 33 54 88 143 232
# * 2.: 1 3 7 15 31 63 127 255 511 1023 2047
# * 1.5: 1 2.5 4.75 8.1 13.2 20.8 32.2 49.3 74.9 113.3 171
self._nextCheckDelay = self._nextCheckDelay * self._checkDelayScale
def _checkForLeaks(self, task=None):
self._index2delay[len(self._index2containerId2len)] = self._nextCheckDelay
self._checkContainersJob = CheckContainers(
'%s-checkForLeaks' % self.getJobName(), self, len(self._index2containerId2len))
self.acceptOnce(self._checkContainersJob.getFinishedEvent(),
self._scheduleNextLeakCheck)
jobMgr.add(self._checkContainersJob)
return task.done
def _scheduleNextPruning(self):
taskMgr.doMethodLater(self._pruneTaskPeriod, self._pruneObjectRefs,
self._getPruneTaskName())
def _pruneObjectRefs(self, task=None):
self._pruneContainersJob = PruneObjectRefs(
'%s-pruneObjectRefs' % self.getJobName(), self)
self.acceptOnce(self._pruneContainersJob.getFinishedEvent(),
self._scheduleNextPruning)
jobMgr.add(self._pruneContainersJob)
return task.done