from panda3d.core import *
from panda3d.direct import *
from direct.task import Task
from direct.task.TaskManagerGlobal import taskMgr
from direct.directnotify import DirectNotifyGlobal
from direct.distributed.CRDataCache import CRDataCache
from direct.distributed.ConnectionRepository import ConnectionRepository
from direct.showbase.PythonUtil import safeRepr, itype, makeList
from direct.showbase.MessengerGlobal import messenger
from .MsgTypes import *
from . import CRCache
from . import ParentMgr
from . import RelatedObjectMgr
from .ClockDelta import *
import time
[docs]class ClientRepositoryBase(ConnectionRepository):
"""
This maintains a client-side connection with a Panda server.
This base class exists to collect the common code between
ClientRepository, which is the CMU-provided, open-source version
of the client repository code, and OTPClientRepository, which is
the VR Studio's implementation of the same.
"""
notify = DirectNotifyGlobal.directNotify.newCategory("ClientRepositoryBase")
[docs] def __init__(self, dcFileNames = None, dcSuffix = '',
connectMethod = None, threadedNet = None):
if connectMethod is None:
connectMethod = self.CM_HTTP
ConnectionRepository.__init__(self, connectMethod, base.config, hasOwnerView = True, threadedNet = threadedNet)
self.dcSuffix = dcSuffix
if hasattr(self, 'setVerbose'):
if ConfigVariableBool('verbose-clientrepository', False):
self.setVerbose(1)
self.context=100000
self.setClientDatagram(1)
self.deferredGenerates = []
self.deferredDoIds = {}
self.lastGenerate = 0
self.setDeferInterval(ConfigVariableDouble('deferred-generate-interval', 0.2).value)
self.noDefer = False # Set this True to temporarily disable deferring.
self.recorder = base.recorder
self.readDCFile(dcFileNames)
self.cache=CRCache.CRCache()
self.doDataCache = CRDataCache()
self.cacheOwner=CRCache.CRCache()
self.serverDelta = 0
self.bootedIndex = None
self.bootedText = None
# create a parentMgr to handle distributed reparents
# this used to be 'token2nodePath'
self.parentMgr = ParentMgr.ParentMgr()
# The RelatedObjectMgr helps distributed objects find each
# other.
self.relatedObjectMgr = RelatedObjectMgr.RelatedObjectMgr(self)
# This will be filled in when a TimeManager is created.
self.timeManager = None
# Keep track of how recently we last sent a heartbeat message.
# We want to keep these coming at heartbeatInterval seconds.
self.heartbeatInterval = ConfigVariableDouble('heartbeat-interval', 10).value
self.heartbeatStarted = 0
self.lastHeartbeat = 0
self._delayDeletedDOs = {}
self.specialNameNumber = 0
[docs] def setDeferInterval(self, deferInterval):
"""Specifies the minimum amount of time, in seconds, that must
elapse before generating any two DistributedObjects whose
class type is marked "deferrable". Set this to 0 to indicate
no deferring will occur."""
self.deferInterval = deferInterval
self.setHandleCUpdates(self.deferInterval == 0)
if self.deferredGenerates:
taskMgr.remove('deferredGenerate')
taskMgr.doMethodLater(self.deferInterval, self.doDeferredGenerate, 'deferredGenerate')
## def queryObjectAll(self, doID, context=0):
## """
## Get a one-time snapshot look at the object.
## """
## assert self.notify.debugStateCall(self)
## # Create a message
## datagram = PyDatagram()
## datagram.addServerHeader(
## doID, localAvatar.getDoId(), 2020)
## # A context that can be used to index the response if needed
## datagram.addUint32(context)
## self.send(datagram)
## # Make sure the message gets there.
## self.flush()
[docs] def specialName(self, label):
name = ("SpecialName %s %s" % (self.specialNameNumber, label))
self.specialNameNumber += 1
return name
[docs] def getTables(self, ownerView):
if ownerView:
return self.doId2ownerView, self.cacheOwner
else:
return self.doId2do, self.cache
def _getMsgName(self, msgId):
# we might get a list of message names, use the first one
return makeList(MsgId2Names.get(msgId, 'UNKNOWN MESSAGE: %s' % msgId))[0]
[docs] def allocateContext(self):
self.context+=1
return self.context
[docs] def setServerDelta(self, delta):
"""
Indicates the approximate difference in seconds between the
client's clock and the server's clock, in universal time (not
including timezone shifts). This is mainly useful for
reporting synchronization information to the logs; don't
depend on it for any precise timing requirements.
Also see Notify.setServerDelta(), which also accounts for a
timezone shift.
"""
self.serverDelta = delta
[docs] def getServerDelta(self):
return self.serverDelta
[docs] def getServerTimeOfDay(self):
"""
Returns the current time of day (seconds elapsed since the
1972 epoch) according to the server's clock. This is in GMT,
and hence is irrespective of timezones.
The value is computed based on the client's clock and the
known delta from the server's clock, which is not terribly
precisely measured and may drift slightly after startup, but
it should be accurate plus or minus a couple of seconds.
"""
return time.time() + self.serverDelta
[docs] def doGenerate(self, parentId, zoneId, classId, doId, di):
# Look up the dclass
assert parentId == self.GameGlobalsId or parentId in self.doId2do
dclass = self.dclassesByNumber[classId]
assert self.notify.debug("performing generate for %s %s" % (dclass.getName(), doId))
dclass.startGenerate()
# Create a new distributed object, and put it in the dictionary
distObj = self.generateWithRequiredOtherFields(dclass, doId, di, parentId, zoneId)
dclass.stopGenerate()
[docs] def flushGenerates(self):
""" Forces all pending generates to be performed immediately. """
while self.deferredGenerates:
msgType, extra = self.deferredGenerates[0]
del self.deferredGenerates[0]
self.replayDeferredGenerate(msgType, extra)
taskMgr.remove('deferredGenerate')
[docs] def replayDeferredGenerate(self, msgType, extra):
""" Override this to do something appropriate with deferred
"generate" messages when they are replayed().
"""
if msgType == CLIENT_ENTER_OBJECT_REQUIRED_OTHER:
# It's a generate message.
doId = extra
if doId in self.deferredDoIds:
args, deferrable, dg, updates = self.deferredDoIds[doId]
del self.deferredDoIds[doId]
self.doGenerate(*args)
if deferrable:
self.lastGenerate = ClockObject.getGlobalClock().getFrameTime()
for dg, di in updates:
# non-DC updates that need to be played back in-order are
# stored as (msgType, (dg, di))
if isinstance(di, tuple):
msgType = dg
dg, di = di
self.replayDeferredGenerate(msgType, (dg, di))
else:
# ovUpdated is set to True since its OV
# is assumbed to have occured when the
# deferred update was originally received
self.__doUpdate(doId, di, True)
else:
self.notify.warning("Ignoring deferred message %s" % (msgType))
[docs] def doDeferredGenerate(self, task):
""" This is the task that generates an object on the deferred
queue. """
now = ClockObject.getGlobalClock().getFrameTime()
while self.deferredGenerates:
if now - self.lastGenerate < self.deferInterval:
# Come back later.
return Task.again
# Generate the next deferred object.
msgType, extra = self.deferredGenerates[0]
del self.deferredGenerates[0]
self.replayDeferredGenerate(msgType, extra)
# All objects are generaetd.
return Task.done
[docs] def generateWithRequiredFields(self, dclass, doId, di, parentId, zoneId):
if doId in self.doId2do:
# ...it is in our dictionary.
# Just update it.
distObj = self.doId2do[doId]
assert distObj.dclass == dclass
distObj.generate()
distObj.setLocation(parentId, zoneId)
distObj.updateRequiredFields(dclass, di)
# updateRequiredFields calls announceGenerate
elif self.cache.contains(doId):
# ...it is in the cache.
# Pull it out of the cache:
distObj = self.cache.retrieve(doId)
assert distObj.dclass == dclass
# put it in the dictionary:
self.doId2do[doId] = distObj
# and update it.
distObj.generate()
# make sure we don't have a stale location
distObj.parentId = None
distObj.zoneId = None
distObj.setLocation(parentId, zoneId)
distObj.updateRequiredFields(dclass, di)
# updateRequiredFields calls announceGenerate
else:
# ...it is not in the dictionary or the cache.
# Construct a new one
classDef = dclass.getClassDef()
if classDef is None:
self.notify.error("Could not create an undefined %s object." % (dclass.getName()))
distObj = classDef(self)
distObj.dclass = dclass
# Assign it an Id
distObj.doId = doId
# Put the new do in the dictionary
self.doId2do[doId] = distObj
# Update the required fields
distObj.generateInit() # Only called when constructed
distObj._retrieveCachedData()
distObj.generate()
distObj.setLocation(parentId, zoneId)
distObj.updateRequiredFields(dclass, di)
# updateRequiredFields calls announceGenerate
self.notify.debug("New DO:%s, dclass:%s" % (doId, dclass.getName()))
return distObj
[docs] def generateWithRequiredOtherFields(self, dclass, doId, di,
parentId = None, zoneId = None):
if doId in self.doId2do:
# ...it is in our dictionary.
# Just update it.
distObj = self.doId2do[doId]
assert distObj.dclass == dclass
distObj.generate()
distObj.setLocation(parentId, zoneId)
distObj.updateRequiredOtherFields(dclass, di)
# updateRequiredOtherFields calls announceGenerate
elif self.cache.contains(doId):
# ...it is in the cache.
# Pull it out of the cache:
distObj = self.cache.retrieve(doId)
assert distObj.dclass == dclass
# put it in the dictionary:
self.doId2do[doId] = distObj
# and update it.
distObj.generate()
# make sure we don't have a stale location
distObj.parentId = None
distObj.zoneId = None
distObj.setLocation(parentId, zoneId)
distObj.updateRequiredOtherFields(dclass, di)
# updateRequiredOtherFields calls announceGenerate
else:
# ...it is not in the dictionary or the cache.
# Construct a new one
classDef = dclass.getClassDef()
if classDef is None:
self.notify.error("Could not create an undefined %s object." % (dclass.getName()))
distObj = classDef(self)
distObj.dclass = dclass
# Assign it an Id
distObj.doId = doId
# Put the new do in the dictionary
self.doId2do[doId] = distObj
# Update the required fields
distObj.generateInit() # Only called when constructed
distObj._retrieveCachedData()
distObj.generate()
distObj.setLocation(parentId, zoneId)
distObj.updateRequiredOtherFields(dclass, di)
# updateRequiredOtherFields calls announceGenerate
return distObj
[docs] def generateWithRequiredOtherFieldsOwner(self, dclass, doId, di):
if doId in self.doId2ownerView:
# ...it is in our dictionary.
# Just update it.
self.notify.error('duplicate owner generate for %s (%s)' % (
doId, dclass.getName()))
distObj = self.doId2ownerView[doId]
assert distObj.dclass == dclass
distObj.generate()
distObj.updateRequiredOtherFields(dclass, di)
# updateRequiredOtherFields calls announceGenerate
elif self.cacheOwner.contains(doId):
# ...it is in the cache.
# Pull it out of the cache:
distObj = self.cacheOwner.retrieve(doId)
assert distObj.dclass == dclass
# put it in the dictionary:
self.doId2ownerView[doId] = distObj
# and update it.
distObj.generate()
distObj.updateRequiredOtherFields(dclass, di)
# updateRequiredOtherFields calls announceGenerate
else:
# ...it is not in the dictionary or the cache.
# Construct a new one
classDef = dclass.getOwnerClassDef()
if classDef is None:
self.notify.error("Could not create an undefined %s object. Have you created an owner view?" % (dclass.getName()))
distObj = classDef(self)
distObj.dclass = dclass
# Assign it an Id
distObj.doId = doId
# Put the new do in the dictionary
self.doId2ownerView[doId] = distObj
# Update the required fields
distObj.generateInit() # Only called when constructed
distObj.generate()
distObj.updateRequiredOtherFields(dclass, di)
# updateRequiredOtherFields calls announceGenerate
return distObj
[docs] def disableDoId(self, doId, ownerView=False):
table, cache = self.getTables(ownerView)
# Make sure the object exists
if doId in table:
# Look up the object
distObj = table[doId]
# remove the object from the dictionary
del table[doId]
# Only cache the object if it is a "cacheable" type
# object; this way we don't clutter up the caches with
# trivial objects that don't benefit from caching.
# also don't try to cache an object that is delayDeleted
cached = False
if distObj.getCacheable() and distObj.getDelayDeleteCount() <= 0:
cached = cache.cache(distObj)
if not cached:
distObj.deleteOrDelay()
if distObj.getDelayDeleteCount() <= 0:
# make sure we're not leaking
distObj.detectLeaks()
elif doId in self.deferredDoIds:
# The object had been deferred. Great; we don't even have
# to generate it now.
del self.deferredDoIds[doId]
i = self.deferredGenerates.index((CLIENT_ENTER_OBJECT_REQUIRED_OTHER, doId))
del self.deferredGenerates[i]
if len(self.deferredGenerates) == 0:
taskMgr.remove('deferredGenerate')
else:
self._logFailedDisable(doId, ownerView)
def _logFailedDisable(self, doId, ownerView):
self.notify.warning(
"Disable failed. DistObj "
+ str(doId) +
" is not in dictionary, ownerView=%s" % ownerView)
[docs] def handleDelete(self, di):
# overridden by ClientRepository
assert 0
[docs] def handleUpdateField(self, di):
"""
This method is called when a CLIENT_OBJECT_UPDATE_FIELD
message is received; it decodes the update, unpacks the
arguments, and calls the corresponding method on the indicated
DistributedObject.
In fact, this method is exactly duplicated by the C++ method
cConnectionRepository::handle_update_field(), which was
written to optimize the message loop by handling all of the
CLIENT_OBJECT_UPDATE_FIELD messages in C++. That means that
nowadays, this Python method will probably never be called,
since UPDATE_FIELD messages will not even be passed to the
Python message handlers. But this method remains for
documentation purposes, and also as a "just in case" handler
in case we ever do come across a situation in the future in
which python might handle the UPDATE_FIELD message.
"""
# Get the DO Id
doId = di.getUint32()
ovUpdated = self.__doUpdateOwner(doId, di)
if doId in self.deferredDoIds:
# This object hasn't really been generated yet. Sit on
# the update.
args, deferrable, dg0, updates = self.deferredDoIds[doId]
# Keep a copy of the datagram, and move the di to the copy
dg = Datagram(di.getDatagram())
di = DatagramIterator(dg, di.getCurrentIndex())
updates.append((dg, di))
else:
# This object has been fully generated. It's OK to update.
self.__doUpdate(doId, di, ovUpdated)
def __doUpdate(self, doId, di, ovUpdated):
# Find the DO
do = self.doId2do.get(doId)
if do is not None:
# Let the dclass finish the job
do.dclass.receiveUpdate(do, di)
elif not ovUpdated:
# this next bit is looking for avatar handles so that if you get an update
# for an avatar that isn't in your doId2do table but there is a
# avatar handle for that object then it's messages will be forwarded to that
# object. We are currently using that for whisper echoing
# if you need a more general perpose system consider registering proxy objects on
# a dict and adding the avatar handles to that dict when they are created
# then change/remove the old method. I didn't do that because I couldn't think
# of a use for it. -JML
try:
handle = self.identifyAvatar(doId)
if handle:
dclass = self.dclassesByName[handle.dclassName]
dclass.receiveUpdate(handle, di)
else:
self.notify.warning(
"Asked to update non-existent DistObj " + str(doId))
except:
self.notify.warning(
"Asked to update non-existent DistObj " + str(doId) + "and failed to find it")
def __doUpdateOwner(self, doId, di):
ovObj = self.doId2ownerView.get(doId)
if ovObj:
odg = Datagram(di.getDatagram())
odi = DatagramIterator(odg, di.getCurrentIndex())
ovObj.dclass.receiveUpdate(ovObj, odi)
return True
return False
[docs] def handleGoGetLost(self, di):
# The server told us it's about to drop the connection on us.
# Get ready!
if di.getRemainingSize() > 0:
self.bootedIndex = di.getUint16()
self.bootedText = di.getString()
self.notify.warning(
"Server is booting us out (%d): %s" % (self.bootedIndex, self.bootedText))
else:
self.bootedIndex = None
self.bootedText = None
self.notify.warning(
"Server is booting us out with no explanation.")
# disconnect now, don't wait for send/recv to fail
self.stopReaderPollTask()
self.lostConnection()
[docs] def handleServerHeartbeat(self, di):
# Got a heartbeat message from the server.
if ConfigVariableBool('server-heartbeat-info', True):
self.notify.info("Server heartbeat.")
[docs] def handleSystemMessage(self, di):
# Got a system message from the server.
message = di.getString()
self.notify.info('Message from server: %s' % (message))
return message
[docs] def handleSystemMessageAknowledge(self, di):
# Got a system message from the server.
message = di.getString()
self.notify.info('Message with aknowledge from server: %s' % (message))
messenger.send("system message aknowledge", [message])
return message
[docs] def getObjectsOfClass(self, objClass):
""" returns dict of doId:object, containing all objects
that inherit from 'class'. returned dict is safely mutable. """
doDict = {}
for doId, do in self.doId2do.items():
if isinstance(do, objClass):
doDict[doId] = do
return doDict
[docs] def getObjectsOfExactClass(self, objClass):
""" returns dict of doId:object, containing all objects that
are exactly of type 'class' (neglecting inheritance). returned
dict is safely mutable. """
doDict = {}
for doId, do in self.doId2do.items():
if do.__class__ == objClass:
doDict[doId] = do
return doDict
[docs] def considerHeartbeat(self):
"""Send a heartbeat message if we haven't sent one recently."""
if not self.heartbeatStarted:
self.notify.debug("Heartbeats not started; not sending.")
return
elapsed = ClockObject.getGlobalClock().getRealTime() - self.lastHeartbeat
if elapsed < 0 or elapsed > self.heartbeatInterval:
# It's time to send the heartbeat again (or maybe someone
# reset the clock back).
self.notify.info("Sending heartbeat mid-frame.")
self.startHeartbeat()
[docs] def stopHeartbeat(self):
taskMgr.remove("heartBeat")
self.heartbeatStarted = 0
[docs] def startHeartbeat(self):
self.stopHeartbeat()
self.heartbeatStarted = 1
self.sendHeartbeat()
self.waitForNextHeartBeat()
[docs] def sendHeartbeatTask(self, task):
self.sendHeartbeat()
return Task.again
[docs] def waitForNextHeartBeat(self):
taskMgr.doMethodLater(self.heartbeatInterval, self.sendHeartbeatTask,
"heartBeat", taskChain = 'net')
[docs] def replaceMethod(self, oldMethod, newFunction):
return 0
[docs] def getWorld(self, doId):
# Get the world node for this object
obj = self.doId2do[doId]
worldNP = obj.getParent()
while 1:
nextNP = worldNP.getParent()
if nextNP == render:
break
elif worldNP.isEmpty():
return None
return worldNP
[docs] def isLive(self):
if ConfigVariableBool('force-live', False):
return True
return not (__dev__ or launcher.isTestServer())
[docs] def isLocalId(self, id):
# By default, no ID's are local. See also
# ClientRepository.isLocalId().
return 0
# methods for tracking delaydeletes
def _addDelayDeletedDO(self, do):
# use the id of the object, it's possible to have multiple DelayDeleted instances
# with identical doIds if an object gets deleted then re-generated
key = id(do)
assert key not in self._delayDeletedDOs
self._delayDeletedDOs[key] = do
def _removeDelayDeletedDO(self, do):
key = id(do)
del self._delayDeletedDOs[key]
[docs] def printDelayDeletes(self):
print('DelayDeletes:')
print('=============')
for obj in self._delayDeletedDOs.values():
print('%s\t%s (%s)\tdelayDeletes=%s' % (
obj.doId, safeRepr(obj), itype(obj), obj.getDelayDeleteNames()))