"""DistributedSmoothNodeBase module: contains the DistributedSmoothNodeBase class"""
from .ClockDelta import *
from direct.task import Task
from direct.task.TaskManagerGlobal import taskMgr
from direct.showbase.PythonUtil import randFloat
from panda3d.direct import CDistributedSmoothNodeBase
from enum import IntEnum
[docs]class DummyTaskClass:
[docs] def setDelay(self, blah):
pass
DummyTask = DummyTaskClass()
[docs]class DistributedSmoothNodeBase:
"""common base class for DistributedSmoothNode and DistributedSmoothNodeAI
"""
[docs] class BroadcastTypes(IntEnum):
FULL = 0
XYH = 1
XY = 2
[docs] def __init__(self):
self.__broadcastPeriod = None
[docs] def generate(self):
self.cnode = CDistributedSmoothNodeBase()
self.cnode.setClockDelta(globalClockDelta)
self.d_broadcastPosHpr = None
[docs] def disable(self):
del self.cnode
# make sure our task is gone
self.stopPosHprBroadcast()
[docs] def b_clearSmoothing(self):
self.d_clearSmoothing()
self.clearSmoothing()
[docs] def d_clearSmoothing(self):
self.sendUpdate("clearSmoothing", [0])
### posHprBroadcast ###
[docs] def getPosHprBroadcastTaskName(self):
# presumably, we have a doId at this point
return "sendPosHpr-%s" % self.doId
[docs] def setPosHprBroadcastPeriod(self, period):
# call this at any time to change the delay between broadcasts
self.__broadcastPeriod = period
[docs] def getPosHprBroadcastPeriod(self):
# query the current delay between broadcasts
return self.__broadcastPeriod
[docs] def stopPosHprBroadcast(self):
taskMgr.remove(self.getPosHprBroadcastTaskName())
# Delete this callback because it maintains a reference to self
self.d_broadcastPosHpr = None
[docs] def posHprBroadcastStarted(self):
return self.d_broadcastPosHpr is not None
[docs] def wantSmoothPosBroadcastTask(self):
return True
[docs] def startPosHprBroadcast(self, period=.2, stagger=0, type=None):
if self.cnode is None:
self.initializeCnode()
BT = DistributedSmoothNodeBase.BroadcastTypes
if type is None:
type = BT.FULL
# set the broadcast type
self.broadcastType = type
broadcastFuncs = {
BT.FULL: self.cnode.broadcastPosHprFull,
BT.XYH: self.cnode.broadcastPosHprXyh,
BT.XY: self.cnode.broadcastPosHprXy,
}
# this comment is here so it will show up in a grep for 'def d_broadcastPosHpr'
self.d_broadcastPosHpr = broadcastFuncs[self.broadcastType]
# Set stagger to non-zero to randomly delay the initial task execution
# over 'period' seconds, to spread out task processing over time
# when a large number of SmoothNodes are created simultaneously.
taskName = self.getPosHprBroadcastTaskName()
# Set up telemetry optimization variables
self.cnode.initialize(self, self.dclass, self.doId)
self.setPosHprBroadcastPeriod(period)
# Broadcast our initial position
self.b_clearSmoothing()
self.cnode.sendEverything()
# remove any old tasks
taskMgr.remove(taskName)
# spawn the new task
delay = 0.
if stagger:
delay = randFloat(period)
if self.wantSmoothPosBroadcastTask():
taskMgr.doMethodLater(self.__broadcastPeriod + delay,
self._posHprBroadcast, taskName)
def _posHprBroadcast(self, task=DummyTask):
# TODO: we explicitly stagger the initial task timing in
# startPosHprBroadcast; we should at least make an effort to keep
# this task accurately aligned with its period and starting time.
self.d_broadcastPosHpr()
task.setDelay(self.__broadcastPeriod)
return Task.again
[docs] def sendCurrentPosition(self):
# if we're not currently broadcasting, make sure things are set up
if self.d_broadcastPosHpr is None:
self.cnode.initialize(self, self.dclass, self.doId)
self.cnode.sendEverything()