Source code for direct.distributed.DoInterestManager

"""
The DoInterestManager keeps track of which parent/zones that we currently
have interest in.  When you want to "look" into a zone you add an interest
to that zone.  When you want to get rid of, or ignore, the objects in that
zone, remove interest in that zone.

p.s. A great deal of this code is just code moved from ClientRepository.py.
"""

from panda3d.core import *
from panda3d.direct import *
from .MsgTypes import *
from direct.showbase.PythonUtil import *
from direct.showbase import DirectObject
from direct.showbase.MessengerGlobal import messenger
from .PyDatagram import PyDatagram
from direct.directnotify.DirectNotifyGlobal import directNotify
import types
from direct.showbase.PythonUtil import report

[docs]class InterestState: StateActive = 'Active' StatePendingDel = 'PendingDel'
[docs] def __init__(self, desc, state, context, event, parentId, zoneIdList, eventCounter, auto=False): self.desc = desc self.state = state self.context = context # We must be ready to keep track of multiple events. If somebody # requested an interest to be removed and we get a second request # for removal of the same interest before we get a response for the # first interest removal, we now have two parts of the codebase # waiting for a response on the removal of a single interest. self.events = [] self.eventCounter = eventCounter if event: self.addEvent(event) self.parentId = parentId self.zoneIdList = zoneIdList self.auto = auto
[docs] def addEvent(self, event): self.events.append(event) self.eventCounter.num += 1
[docs] def getEvents(self): return list(self.events)
[docs] def clearEvents(self): self.eventCounter.num -= len(self.events) assert self.eventCounter.num >= 0 self.events = []
[docs] def sendEvents(self): for event in self.events: messenger.send(event) self.clearEvents()
[docs] def setDesc(self, desc): self.desc = desc
[docs] def isPendingDelete(self): return self.state == InterestState.StatePendingDel
def __repr__(self): return 'InterestState(desc=%s, state=%s, context=%s, event=%s, parentId=%s, zoneIdList=%s)' % ( self.desc, self.state, self.context, self.events, self.parentId, self.zoneIdList)
[docs]class InterestHandle: """This class helps to ensure that valid handles get passed in to DoInterestManager funcs"""
[docs] def __init__(self, id): self._id = id
[docs] def asInt(self): return self._id
def __eq__(self, other): if type(self) == type(other): return self._id == other._id return self._id == other def __repr__(self): return '%s(%s)' % (self.__class__.__name__, self._id)
# context value for interest changes that have no complete event NO_CONTEXT = 0
[docs]class DoInterestManager(DirectObject.DirectObject): """ Top level Interest Manager """ notify = directNotify.newCategory("DoInterestManager") InterestDebug = ConfigVariableBool('interest-debug', False) # 'handle' is a number that represents a single interest set that the # client has requested; the interest set may be modified _HandleSerialNum = 0 # high bit is reserved for server interests _HandleMask = 0x7FFF # 'context' refers to a single request to change an interest set _ContextIdSerialNum = 100 _ContextIdMask = 0x3FFFFFFF # avoid making Python create a long _interests = {} if __debug__: _debug_interestHistory = [] _debug_maxDescriptionLen = 40 _SerialGen = SerialNumGen() _SerialNum = serialNum()
[docs] def __init__(self): assert DoInterestManager.notify.debugCall() DirectObject.DirectObject.__init__(self) self._addInterestEvent = uniqueName('DoInterestManager-Add') self._removeInterestEvent = uniqueName('DoInterestManager-Remove') self._noNewInterests = False self._completeDelayedCallback = None # keep track of request contexts that have not completed self._completeEventCount = ScratchPad(num=0) self._allInterestsCompleteCallbacks = []
def __verbose(self): return self.InterestDebug.getValue() or self.getVerbose() def _getAnonymousEvent(self, desc): return 'anonymous-%s-%s' % (desc, DoInterestManager._SerialGen.next())
[docs] def setNoNewInterests(self, flag): self._noNewInterests = flag
[docs] def noNewInterests(self): return self._noNewInterests
[docs] def setAllInterestsCompleteCallback(self, callback): if ((self._completeEventCount.num == 0) and (self._completeDelayedCallback is None)): callback() else: self._allInterestsCompleteCallbacks.append(callback)
[docs] def getAllInterestsCompleteEvent(self): return 'allInterestsComplete-%s' % DoInterestManager._SerialNum
[docs] def resetInterestStateForConnectionLoss(self): DoInterestManager._interests.clear() self._completeEventCount = ScratchPad(num=0) if __debug__: self._addDebugInterestHistory("RESET", "", 0, 0, 0, [])
[docs] def isValidInterestHandle(self, handle): # pass in a handle (or anything else) and this will return true if it is # still a valid interest handle if not isinstance(handle, InterestHandle): return False return handle.asInt() in DoInterestManager._interests
[docs] def updateInterestDescription(self, handle, desc): iState = DoInterestManager._interests.get(handle.asInt()) if iState: iState.setDesc(desc)
[docs] def addInterest(self, parentId, zoneIdList, description, event=None): """ Look into a (set of) zone(s). """ assert DoInterestManager.notify.debugCall() handle = self._getNextHandle() # print 'base.cr.addInterest(',description,',',handle,'):',base.clock.getFrameCount() if self._noNewInterests: DoInterestManager.notify.warning( "addInterest: addingInterests on delete: %s" % (handle)) return # make sure we've got parenting rules set in the DC if parentId not in (self.getGameDoId(),): parent = self.getDo(parentId) if not parent: DoInterestManager.notify.error( 'addInterest: attempting to add interest under unknown object %s' % parentId) else: if not parent.hasParentingRules(): DoInterestManager.notify.error( 'addInterest: no setParentingRules defined in the DC for object %s (%s)' '' % (parentId, parent.__class__.__name__)) if event: contextId = self._getNextContextId() else: contextId = 0 # event = self._getAnonymousEvent('addInterest') DoInterestManager._interests[handle] = InterestState( description, InterestState.StateActive, contextId, event, parentId, zoneIdList, self._completeEventCount) if self.__verbose(): print('CR::INTEREST.addInterest(handle=%s, parentId=%s, zoneIdList=%s, description=%s, event=%s)' % ( handle, parentId, zoneIdList, description, event)) self._sendAddInterest(handle, contextId, parentId, zoneIdList, description) if event: messenger.send(self._getAddInterestEvent(), [event]) assert self.printInterestsIfDebug() return InterestHandle(handle)
[docs] def addAutoInterest(self, parentId, zoneIdList, description): """ Look into a (set of) zone(s). """ assert DoInterestManager.notify.debugCall() handle = self._getNextHandle() if self._noNewInterests: DoInterestManager.notify.warning( "addInterest: addingInterests on delete: %s" % (handle)) return # make sure we've got parenting rules set in the DC if parentId not in (self.getGameDoId(),): parent = self.getDo(parentId) if not parent: DoInterestManager.notify.error( 'addInterest: attempting to add interest under unknown object %s' % parentId) else: if not parent.hasParentingRules(): DoInterestManager.notify.error( 'addInterest: no setParentingRules defined in the DC for object %s (%s)' '' % (parentId, parent.__class__.__name__)) DoInterestManager._interests[handle] = InterestState( description, InterestState.StateActive, 0, None, parentId, zoneIdList, self._completeEventCount, True) if self.__verbose(): print('CR::INTEREST.addInterest(handle=%s, parentId=%s, zoneIdList=%s, description=%s)' % ( handle, parentId, zoneIdList, description)) assert self.printInterestsIfDebug() return InterestHandle(handle)
[docs] def removeInterest(self, handle, event = None): """ Stop looking in a (set of) zone(s) """ # print 'base.cr.removeInterest(',handle,'):',base.clock.getFrameCount() assert DoInterestManager.notify.debugCall() assert isinstance(handle, InterestHandle) existed = False if not event: event = self._getAnonymousEvent('removeInterest') handle = handle.asInt() if handle in DoInterestManager._interests: existed = True intState = DoInterestManager._interests[handle] if event: messenger.send(self._getRemoveInterestEvent(), [event, intState.parentId, intState.zoneIdList]) if intState.isPendingDelete(): self.notify.warning( 'removeInterest: interest %s already pending removal' % handle) # this interest is already pending delete, so let's just tack this # callback onto the list if event is not None: intState.addEvent(event) else: if len(intState.events) > 0: # we're not pending a removal, but we have outstanding events? # probably we are waiting for an add/alter complete. # should we send those events now? assert self.notify.warning('removeInterest: abandoning events: %s' % intState.events) intState.clearEvents() intState.state = InterestState.StatePendingDel contextId = self._getNextContextId() intState.context = contextId if event: intState.addEvent(event) self._sendRemoveInterest(handle, contextId) if not event: self._considerRemoveInterest(handle) if self.__verbose(): print('CR::INTEREST.removeInterest(handle=%s, event=%s)' % ( handle, event)) else: DoInterestManager.notify.warning( "removeInterest: handle not found: %s" % (handle)) assert self.printInterestsIfDebug() return existed
[docs] def removeAutoInterest(self, handle): """ Stop looking in a (set of) zone(s) """ assert DoInterestManager.notify.debugCall() assert isinstance(handle, InterestHandle) existed = False handle = handle.asInt() if handle in DoInterestManager._interests: existed = True intState = DoInterestManager._interests[handle] if intState.isPendingDelete(): self.notify.warning( 'removeInterest: interest %s already pending removal' % handle) # this interest is already pending delete, so let's just tack this # callback onto the list else: if len(intState.events) > 0: # we're not pending a removal, but we have outstanding events? # probably we are waiting for an add/alter complete. # should we send those events now? self.notify.warning('removeInterest: abandoning events: %s' % intState.events) intState.clearEvents() intState.state = InterestState.StatePendingDel self._considerRemoveInterest(handle) if self.__verbose(): print('CR::INTEREST.removeAutoInterest(handle=%s)' % (handle)) else: DoInterestManager.notify.warning( "removeInterest: handle not found: %s" % (handle)) assert self.printInterestsIfDebug() return existed
[docs] @report(types = ['args'], dConfigParam = 'guildmgr') def removeAIInterest(self, handle): """ handle is NOT an InterestHandle. It's just a bare integer representing an AI opened interest. We're making the client close down this interest since the AI has trouble removing interests(that its opened) when the avatar goes offline. See GuildManager(UD) for how it's being used. """ self._sendRemoveAIInterest(handle)
[docs] def alterInterest(self, handle, parentId, zoneIdList, description=None, event=None): """ Removes old interests and adds new interests. Note that when an interest is changed, only the most recent change's event will be triggered. Previous events are abandoned. If this is a problem, consider opening multiple interests. """ assert DoInterestManager.notify.debugCall() assert isinstance(handle, InterestHandle) #assert not self._noNewInterests handle = handle.asInt() if self._noNewInterests: DoInterestManager.notify.warning( "alterInterest: addingInterests on delete: %s" % (handle)) return exists = False if event is None: event = self._getAnonymousEvent('alterInterest') if handle in DoInterestManager._interests: if description is not None: DoInterestManager._interests[handle].desc = description else: description = DoInterestManager._interests[handle].desc # are we overriding an existing change? if DoInterestManager._interests[handle].context != NO_CONTEXT: DoInterestManager._interests[handle].clearEvents() contextId = self._getNextContextId() DoInterestManager._interests[handle].context = contextId DoInterestManager._interests[handle].parentId = parentId DoInterestManager._interests[handle].zoneIdList = zoneIdList DoInterestManager._interests[handle].addEvent(event) if self.__verbose(): print('CR::INTEREST.alterInterest(handle=%s, parentId=%s, zoneIdList=%s, description=%s, event=%s)' % ( handle, parentId, zoneIdList, description, event)) self._sendAddInterest(handle, contextId, parentId, zoneIdList, description, action='modify') exists = True assert self.printInterestsIfDebug() else: DoInterestManager.notify.warning( "alterInterest: handle not found: %s" % (handle)) return exists
[docs] def openAutoInterests(self, obj): if hasattr(obj, '_autoInterestHandle'): # must be multiple inheritance self.notify.debug('openAutoInterests(%s): interests already open' % obj.__class__.__name__) return autoInterests = obj.getAutoInterests() obj._autoInterestHandle = None if len(autoInterests) == 0: return obj._autoInterestHandle = self.addAutoInterest(obj.doId, autoInterests, '%s-autoInterest' % obj.__class__.__name__)
[docs] def closeAutoInterests(self, obj): if not hasattr(obj, '_autoInterestHandle'): # must be multiple inheritance self.notify.debug('closeAutoInterests(%s): interests already closed' % obj) return if obj._autoInterestHandle is not None: self.removeAutoInterest(obj._autoInterestHandle) del obj._autoInterestHandle
# events for InterestWatcher def _getAddInterestEvent(self): return self._addInterestEvent def _getRemoveInterestEvent(self): return self._removeInterestEvent def _getInterestState(self, handle): return DoInterestManager._interests[handle] def _getNextHandle(self): handle = DoInterestManager._HandleSerialNum while True: handle = (handle + 1) & DoInterestManager._HandleMask # skip handles that are already in use if handle not in DoInterestManager._interests: break DoInterestManager.notify.warning( 'interest %s already in use' % handle) DoInterestManager._HandleSerialNum = handle return DoInterestManager._HandleSerialNum def _getNextContextId(self): contextId = DoInterestManager._ContextIdSerialNum while True: contextId = (contextId + 1) & DoInterestManager._ContextIdMask # skip over the 'no context' id if contextId != NO_CONTEXT: break DoInterestManager._ContextIdSerialNum = contextId return DoInterestManager._ContextIdSerialNum def _considerRemoveInterest(self, handle): """ Consider whether we should cull the interest set. """ assert DoInterestManager.notify.debugCall() if handle in DoInterestManager._interests: if DoInterestManager._interests[handle].isPendingDelete(): # make sure there is no pending event for this interest if DoInterestManager._interests[handle].context == NO_CONTEXT: assert len(DoInterestManager._interests[handle].events) == 0 del DoInterestManager._interests[handle] if __debug__:
[docs] def printInterestsIfDebug(self): if DoInterestManager.notify.getDebug(): self.printInterests() return 1 # for assert
def _addDebugInterestHistory(self, action, description, handle, contextId, parentId, zoneIdList): if description is None: description = '' DoInterestManager._debug_interestHistory.append( (action, description, handle, contextId, parentId, zoneIdList)) DoInterestManager._debug_maxDescriptionLen = max( DoInterestManager._debug_maxDescriptionLen, len(description))
[docs] def printInterestHistory(self): print("***************** Interest History *************") format = '%9s %' + str(DoInterestManager._debug_maxDescriptionLen) + 's %6s %6s %9s %s' print(format % ( "Action", "Description", "Handle", "Context", "ParentId", "ZoneIdList")) for i in DoInterestManager._debug_interestHistory: print(format % tuple(i)) print("Note: interests with a Context of 0 do not get" \ " done/finished notices.")
[docs] def printInterestSets(self): print("******************* Interest Sets **************") format = '%6s %' + str(DoInterestManager._debug_maxDescriptionLen) + 's %11s %11s %8s %8s %8s' print(format % ( "Handle", "Description", "ParentId", "ZoneIdList", "State", "Context", "Event")) for id, state in DoInterestManager._interests.items(): if len(state.events) == 0: event = '' elif len(state.events) == 1: event = state.events[0] else: event = state.events print(format % (id, state.desc, state.parentId, state.zoneIdList, state.state, state.context, event)) print("************************************************")
[docs] def printInterests(self): self.printInterestHistory() self.printInterestSets()
def _sendAddInterest(self, handle, contextId, parentId, zoneIdList, description, action=None): """ Part of the new otp-server code. handle is a client-side created number that refers to a set of interests. The same handle number doesn't necessarily have any relationship to the same handle on another client. """ assert DoInterestManager.notify.debugCall() if __debug__: if isinstance(zoneIdList, list): zoneIdList.sort() if action is None: action = 'add' self._addDebugInterestHistory( action, description, handle, contextId, parentId, zoneIdList) if parentId == 0: DoInterestManager.notify.error( 'trying to set interest to invalid parent: %s' % parentId) datagram = PyDatagram() # Add message type if isinstance(zoneIdList, list): vzl = list(zoneIdList) vzl.sort() uniqueElements(vzl) datagram.addUint16(CLIENT_ADD_INTEREST_MULTIPLE) datagram.addUint32(contextId) datagram.addUint16(handle) datagram.addUint32(parentId) datagram.addUint16(len(vzl)) for zone in vzl: datagram.addUint32(zone) else: datagram.addUint16(CLIENT_ADD_INTEREST) datagram.addUint32(contextId) datagram.addUint16(handle) datagram.addUint32(parentId) datagram.addUint32(zoneIdList) self.send(datagram) def _sendRemoveInterest(self, handle, contextId): """ handle is a client-side created number that refers to a set of interests. The same handle number doesn't necessarily have any relationship to the same handle on another client. """ assert DoInterestManager.notify.debugCall() assert handle in DoInterestManager._interests datagram = PyDatagram() # Add message type datagram.addUint16(CLIENT_REMOVE_INTEREST) datagram.addUint32(contextId) datagram.addUint16(handle) self.send(datagram) if __debug__: state = DoInterestManager._interests[handle] self._addDebugInterestHistory( "remove", state.desc, handle, contextId, state.parentId, state.zoneIdList) def _sendRemoveAIInterest(self, handle): """ handle is a bare int, NOT an InterestHandle. Use this to close an AI opened interest. """ datagram = PyDatagram() # Add message type datagram.addUint16(CLIENT_REMOVE_INTEREST) datagram.addUint16((1<<15) + handle) self.send(datagram)
[docs] def cleanupWaitAllInterestsComplete(self): if self._completeDelayedCallback is not None: self._completeDelayedCallback.destroy() self._completeDelayedCallback = None
[docs] def queueAllInterestsCompleteEvent(self, frames=5): # wait for N frames, if no new interests, send out all-done event # calling this is OK even if there are no pending interest completes def checkMoreInterests(): # if there are new interests, cancel this delayed callback, another # will automatically be scheduled when all interests complete # print 'checkMoreInterests(',self._completeEventCount.num,'):',base.clock.getFrameCount() return self._completeEventCount.num > 0 def sendEvent(): messenger.send(self.getAllInterestsCompleteEvent()) for callback in self._allInterestsCompleteCallbacks: callback() self._allInterestsCompleteCallbacks = [] self.cleanupWaitAllInterestsComplete() self._completeDelayedCallback = FrameDelayedCall( 'waitForAllInterestCompletes', callback=sendEvent, frames=frames, cancelFunc=checkMoreInterests) checkMoreInterests = None sendEvent = None
[docs] def handleInterestDoneMessage(self, di): """ This handles the interest done messages and may dispatch an event """ assert DoInterestManager.notify.debugCall() contextId = di.getUint32() handle = di.getUint16() if self.__verbose(): print('CR::INTEREST.interestDone(handle=%s)' % handle) DoInterestManager.notify.debug( "handleInterestDoneMessage--> Received handle %s, context %s" % ( handle, contextId)) if handle in DoInterestManager._interests: eventsToSend = [] # if the context matches, send out the event if contextId == DoInterestManager._interests[handle].context: DoInterestManager._interests[handle].context = NO_CONTEXT # the event handlers may call back into the interest manager. Send out # the events after we're once again in a stable state. #DoInterestManager._interests[handle].sendEvents() eventsToSend = list(DoInterestManager._interests[handle].getEvents()) DoInterestManager._interests[handle].clearEvents() else: DoInterestManager.notify.debug( "handleInterestDoneMessage--> handle: %s: Expecting context %s, got %s" % ( handle, DoInterestManager._interests[handle].context, contextId)) if __debug__: state = DoInterestManager._interests[handle] self._addDebugInterestHistory( "finished", state.desc, handle, contextId, state.parentId, state.zoneIdList) self._considerRemoveInterest(handle) for event in eventsToSend: messenger.send(event) else: DoInterestManager.notify.warning( "handleInterestDoneMessage: handle not found: %s" % (handle)) # if there are no more outstanding interest-completes, send out global all-done event if self._completeEventCount.num == 0: self.queueAllInterestsCompleteEvent() assert self.printInterestsIfDebug()
if __debug__: import unittest
[docs] class AsyncTestCase(unittest.TestCase):
[docs] def setCompleted(self): self._async_completed = True
[docs] def isCompleted(self): return getattr(self, '_async_completed', False)
[docs] class AsyncTestSuite(unittest.TestSuite): pass
[docs] class AsyncTestLoader(unittest.TestLoader): suiteClass = AsyncTestSuite
[docs] class AsyncTextTestRunner(unittest.TextTestRunner):
[docs] def run(self, testCase): result = self._makeResult() startTime = time.time() test(result) stopTime = time.time() timeTaken = stopTime - startTime result.printErrors() self.stream.writeln(result.separator2) run = result.testsRun self.stream.writeln("Ran %d test%s in %.3fs" % (run, run != 1 and "s" or "", timeTaken)) self.stream.writeln() if not result.wasSuccessful(): self.stream.write("FAILED (") failed, errored = map(len, (result.failures, result.errors)) if failed: self.stream.write("failures=%d" % failed) if errored: if failed: self.stream.write(", ") self.stream.write("errors=%d" % errored) self.stream.writeln(")") else: self.stream.writeln("OK") return result
[docs] class TestInterestAddRemove(AsyncTestCase, DirectObject.DirectObject):
[docs] def testInterestAdd(self): event = uniqueName('InterestAdd') self.acceptOnce(event, self.gotInterestAddResponse) self.handle = base.cr.addInterest(base.cr.GameGlobalsId, 100, 'TestInterest', event=event)
[docs] def gotInterestAddResponse(self): event = uniqueName('InterestRemove') self.acceptOnce(event, self.gotInterestRemoveResponse) base.cr.removeInterest(self.handle, event=event)
[docs] def gotInterestRemoveResponse(self): self.setCompleted()
[docs] def runTests(): suite = unittest.makeSuite(TestInterestAddRemove) unittest.AsyncTextTestRunner(verbosity=2).run(suite)