Source code for direct.showbase.ReferrerSearch

import inspect
import sys
import gc
from direct.showbase.PythonUtil import _getSafeReprNotify, fastRepr
from direct.showbase.Job import Job
from direct.showbase.MessengerGlobal import messenger
from direct.task.TaskManagerGlobal import taskMgr


[docs]class ReferrerSearch(Job):
[docs] def __init__(self, obj, maxRefs = 100): Job.__init__(self, 'ReferrerSearch') self.obj = obj self.maxRefs = maxRefs self.visited = set() self.depth = 0 self.found = 0 self.shouldPrintStats = False
def __call__(self): safeReprNotify = _getSafeReprNotify() info = safeReprNotify.getInfo() safeReprNotify.setInfo(0) self.visited = set() try: self.step(0, [self.obj]) finally: self.obj = None safeReprNotify.setInfo(info)
[docs] def run(self): safeReprNotify = _getSafeReprNotify() self.info = safeReprNotify.getInfo() safeReprNotify.setInfo(0) print('RefPath(%s): Beginning ReferrerSearch for %s' %(self._id, fastRepr(self.obj))) self.visited = set() for x in self.stepGenerator(0, [self.obj]): yield None yield Job.Done
[docs] def finished(self): print('RefPath(%s): Finished ReferrerSearch for %s' %(self._id, fastRepr(self.obj))) self.obj = None safeReprNotify = _getSafeReprNotify() safeReprNotify.setInfo(self.info)
def __del__(self): print('ReferrerSearch garbage collected')
[docs] def truncateAtNewLine(self, s): if s.find('\n') == -1: return s else: return s[:s.find('\n')]
[docs] def printStatsWhenAble(self): self.shouldPrintStats = True
[docs] def myrepr(self, referrer, refersTo): pre = '' if isinstance(referrer, dict): for k,v in referrer.items(): if v is refersTo: pre = self.truncateAtNewLine(fastRepr(k)) + ']-> ' break elif isinstance(referrer, (list, tuple)): for x, ref in enumerate(referrer): if ref is refersTo: pre = '%s]-> ' % (x) break if isinstance(refersTo, dict): post = 'dict[' elif isinstance(refersTo, list): post = 'list[' elif isinstance(refersTo, tuple): post = 'tuple[' elif isinstance(refersTo, set): post = 'set->' else: post = self.truncateAtNewLine(fastRepr(refersTo)) + "-> " return '%s%s' % (pre, post)
[docs] def step(self, depth, path): if self.shouldPrintStats: self.printStats(path) self.shouldPrintStats = False at = path[-1] if id(at) in self.visited: # don't continue down this path return # check for success if self.isAtRoot(at, path): self.found += 1 return # mark our progress after checking goal self.visited.add(id(at)) referrers = [ref for ref in gc.get_referrers(at) \ if not (ref is path or \ inspect.isframe(ref) or \ (isinstance(ref, dict) and \ list(ref.keys()) == list(locals().keys())) or \ ref is self.__dict__ or \ id(ref) in self.visited) ] # Check to see if this object has an unusually large # ref-count. This usually indicates that it is some # sort of global, singleton, or manager object # and as such no further knowledge would be gained from # traversing further up the ref tree. if self.isManyRef(at, path, referrers): return while referrers: ref = referrers.pop() self.depth += 1 for x in self.stepGenerator(depth + 1, path + [ref]): pass self.depth -= 1
[docs] def stepGenerator(self, depth, path): if self.shouldPrintStats: self.printStats(path) self.shouldPrintStats = False at = path[-1] # check for success if self.isAtRoot(at, path): self.found += 1 raise StopIteration if id(at) in self.visited: # don't continue down this path raise StopIteration # mark our progress after checking goal self.visited.add(id(at)) # Look for all referrers, culling out the ones that # we know to be red herrings. referrers = [ref for ref in gc.get_referrers(at) \ if not (# we disregard the steps of our traversal ref is path or \ # The referrer is this call frame inspect.isframe(ref) or \ # The referrer is the locals() dictionary (closure) (isinstance(ref, dict) and list(ref.keys()) == list(locals().keys())) or \ # We found the reference on self ref is self.__dict__ or \ # We've already seen this referrer id(ref) in self.visited) ] # Check to see if this object has an unusually large # ref-count. This usually indicates that it is some # sort of global, singleton, or manager object # and as such no further knowledge would be gained from # traversing further up the ref tree. if self.isManyRef(at, path, referrers): raise StopIteration while referrers: ref = referrers.pop() self.depth += 1 for x in self.stepGenerator(depth + 1, path + [ref]): yield None self.depth -= 1 yield None
[docs] def printStats(self, path): path = list(reversed(path)) path.insert(0,0) print('RefPath(%s) - Stats - visited(%s) | found(%s) | depth(%s) | CurrentPath(%s)' % \ (self._id, len(self.visited), self.found, self.depth, ''.join(self.myrepr(path[x], path[x+1]) for x in range(len(path) - 1))))
[docs] def isAtRoot(self, at, path): # Now we define our 'roots', or places where we will # end this particular thread of search # We found a circular reference if at in path: sys.stdout.write("RefPath(%s): Circular: " % self._id) path = list(reversed(path)) path.insert(0,0) for x in range(len(path) - 1): sys.stdout.write(self.myrepr(path[x], path[x+1])) print("") return True # __builtins__ if at is __builtins__: sys.stdout.write("RefPath(%s): __builtins__-> " % self._id) path = list(reversed(path)) path.insert(0,0) for x in range(len(path) - 1): sys.stdout.write(self.myrepr(path[x], path[x+1])) print("") return True # any module scope if inspect.ismodule(at): sys.stdout.write("RefPath(%s): Module(%s)-> " % (self._id, at.__name__)) path = list(reversed(path)) for x in range(len(path) - 1): sys.stdout.write(self.myrepr(path[x], path[x+1])) print("") return True # any class scope if inspect.isclass(at): sys.stdout.write("RefPath(%s): Class(%s)-> " % (self._id, at.__name__)) path = list(reversed(path)) for x in range(len(path) - 1): sys.stdout.write(self.myrepr(path[x], path[x+1])) print("") return True # simbase if at is simbase: sys.stdout.write("RefPath(%s): simbase-> " % self._id) path = list(reversed(path)) for x in range(len(path) - 1): sys.stdout.write(self.myrepr(path[x], path[x+1])) print("") return True # simbase.air if at is simbase.air: sys.stdout.write("RefPath(%s): simbase.air-> " % self._id) path = list(reversed(path)) for x in range(len(path) - 1): sys.stdout.write(self.myrepr(path[x], path[x+1])) print("") return True # messenger if at is messenger: sys.stdout.write("RefPath(%s): messenger-> " % self._id) path = list(reversed(path)) for x in range(len(path) - 1): sys.stdout.write(self.myrepr(path[x], path[x+1])) print("") return True # taskMgr if at is taskMgr: sys.stdout.write("RefPath(%s): taskMgr-> " % self._id) path = list(reversed(path)) for x in range(len(path) - 1): sys.stdout.write(self.myrepr(path[x], path[x+1])) print("") return True # world if hasattr(simbase.air, 'mainWorld') and at is simbase.air.mainWorld: sys.stdout.write("RefPath(%s): mainWorld-> " % self._id) path = list(reversed(path)) for x in range(len(path) - 1): sys.stdout.write(self.myrepr(path[x], path[x+1])) print("") return True return False
[docs] def isManyRef(self, at, path, referrers): if (len(referrers) > self.maxRefs and \ at is not self.obj): if not isinstance(at, (list, tuple, dict, set)): sys.stdout.write("RefPath(%s): ManyRefs(%s)[%s]-> " % (self._id, len(referrers), fastRepr(at))) path = list(reversed(path)) path.insert(0,0) for x in range(len(path) - 1): sys.stdout.write(self.myrepr(path[x], path[x+1])) print("") return True else: sys.stdout.write("RefPath(%s): ManyRefsAllowed(%s)[%s]-> " % (self._id, len(referrers), fastRepr(at, maxLen = 1, strFactor = 30))) print("") return False
#from direct.showbase.ReferrerSearch import ReferrerSearch #door = simbase.air.doFind("DistributedBuildingDoorAI") #class A: pass #door = A() #ReferrerSearch()(door) #reload(ReferrerSearch); from direct.showbase.PythonUtil import ReferrerSearch