Source code for direct.showbase.JobManager

from __future__ import annotations

from collections.abc import Iterator

from panda3d.core import ConfigVariableBool, ConfigVariableDouble, ClockObject
from direct.directnotify.DirectNotifyGlobal import directNotify
from direct.task.TaskManagerGlobal import taskMgr
from direct.showbase.Job import Job
from direct.showbase.PythonUtil import flywheel
from direct.showbase.MessengerGlobal import messenger


[docs]class JobManager: """ Similar to the taskMgr but designed for tasks that are CPU-intensive and/or not time-critical. Jobs run in a fixed timeslice that the JobManager is allotted each frame. """ notify = directNotify.newCategory("JobManager") # there's one task for the JobManager, all jobs run in this task TaskName = 'jobManager'
[docs] def __init__(self, timeslice: float | None = None) -> None: # how long do we run per frame self._timeslice = timeslice # store the jobs in these structures to allow fast lookup by various keys # priority -> jobId -> job self._pri2jobId2job: dict[int, dict[int, Job]] = {} # priority -> chronological list of jobIds self._pri2jobIds: dict[int, list[int]] = {} # jobId -> priority self._jobId2pri: dict[int, int] = {} # how many timeslices to give each job; this is used to efficiently implement # the relative job priorities self._jobId2timeslices: dict[int, int] = {} # how much time did the job use beyond the allotted timeslice, used to balance # out CPU usage self._jobId2overflowTime: dict[int, float] = {} self._useOverflowTime: bool | None = None # this is a generator that we use to give high-priority jobs more timeslices, # it yields jobIds in a sequence that includes high-priority jobIds more often # than low-priority self._jobIdGenerator: Iterator[int] | None = None self._highestPriority: int = Job.Priorities.Normal # type: ignore[attr-defined]
[docs] def destroy(self): taskMgr.remove(JobManager.TaskName) del self._pri2jobId2job
[docs] def add(self, job): pri = job.getPriority() jobId = job._getJobId() # store the job in the main table self._pri2jobId2job.setdefault(pri, {}) self._pri2jobId2job[pri][jobId] = job # and also store a direct mapping from the job's ID to its priority self._jobId2pri[jobId] = pri # add the jobId onto the end of the list of jobIds for this priority self._pri2jobIds.setdefault(pri, []) self._pri2jobIds[pri].append(jobId) # record the job's relative timeslice count self._jobId2timeslices[jobId] = pri # init the overflow time tracking self._jobId2overflowTime[jobId] = 0. # reset the jobId round-robin self._jobIdGenerator = None if len(self._jobId2pri) == 1: taskMgr.add(self._process, JobManager.TaskName) self._highestPriority = pri elif pri > self._highestPriority: self._highestPriority = pri self.notify.debug('added job: %s' % job.getJobName())
[docs] def remove(self, job): jobId = job._getJobId() # look up the job's priority pri = self._jobId2pri.pop(jobId) # TODO: this removal is a linear search self._pri2jobIds[pri].remove(jobId) # remove the job from the main table del self._pri2jobId2job[pri][jobId] # clean up the job's generator, if any job._cleanupGenerator() # remove the job's timeslice count self._jobId2timeslices.pop(jobId) # remove the overflow time self._jobId2overflowTime.pop(jobId) if len(self._pri2jobId2job[pri]) == 0: del self._pri2jobId2job[pri] if pri == self._highestPriority: if len(self._jobId2pri) > 0: # calculate a new highest priority # TODO: this is not very fast priorities = self._getSortedPriorities() self._highestPriority = priorities[-1] else: taskMgr.remove(JobManager.TaskName) self._highestPriority = 0 self.notify.debug('removed job: %s' % job.getJobName())
[docs] def finish(self, job): # run this job, right now, until it finishes assert self.notify.debugCall() jobId = job._getJobId() # look up the job's priority pri = self._jobId2pri[jobId] # grab the job job = self._pri2jobId2job[pri][jobId] gen = job._getGenerator() if __debug__: job._pstats.start() job.resume() while True: try: result = next(gen) except StopIteration: # Job didn't yield Job.Done, it ran off the end and returned # treat it as if it returned Job.Done self.notify.warning('job %s never yielded Job.Done' % job) result = Job.Done if result is Job.Done: job.suspend() self.remove(job) job._setFinished() messenger.send(job.getFinishedEvent()) # job is done. break if __debug__: job._pstats.stop()
# how long should we run per frame?
[docs] @staticmethod def getDefaultTimeslice(): # run for 1/2 millisecond per frame by default # config is in milliseconds, this func returns value in seconds return ConfigVariableDouble('job-manager-timeslice-ms', .5).value / 1000.
[docs] def getTimeslice(self): if self._timeslice: return self._timeslice return self.getDefaultTimeslice()
[docs] def setTimeslice(self, timeslice): self._timeslice = timeslice
def _getSortedPriorities(self): # returns all job priorities in ascending order return sorted(self._pri2jobId2job) def _process(self, task=None): if self._useOverflowTime is None: self._useOverflowTime = ConfigVariableBool('job-use-overflow-time', 1).value if len(self._pri2jobId2job) > 0: clock = ClockObject.getGlobalClock() #assert self.notify.debugCall() # figure out how long we can run endT = clock.getRealTime() + (self.getTimeslice() * .9) while True: if self._jobIdGenerator is None: # round-robin the jobs, giving high-priority jobs more timeslices self._jobIdGenerator = flywheel( list(self._jobId2timeslices.keys()), countFunc = lambda jobId: self._jobId2timeslices[jobId]) try: # grab the next jobId in the sequence jobId = next(self._jobIdGenerator) except StopIteration: self._jobIdGenerator = None continue # OK, we've selected a job to run pri = self._jobId2pri.get(jobId) if pri is None: # this job is no longer present continue # check if there's overflow time that we need to make up for if self._useOverflowTime: overflowTime = self._jobId2overflowTime[jobId] timeLeft = endT - clock.getRealTime() if overflowTime >= timeLeft: self._jobId2overflowTime[jobId] = max(0., overflowTime-timeLeft) # don't run any more jobs this frame, this makes up # for the extra overflow time that was used before break job = self._pri2jobId2job[pri][jobId] gen = job._getGenerator() if __debug__: job._pstats.start() job.resume() while clock.getRealTime() < endT: try: result = next(gen) except StopIteration: # Job didn't yield Job.Done, it ran off the end and returned # treat it as if it returned Job.Done self.notify.warning('job %s never yielded Job.Done' % job) result = Job.Done if result is Job.Sleep: job.suspend() if __debug__: job._pstats.stop() # grab the next job if there's time left break elif result is Job.Done: job.suspend() self.remove(job) job._setFinished() if __debug__: job._pstats.stop() messenger.send(job.getFinishedEvent()) # grab the next job if there's time left break else: # we've run out of time #assert self.notify.debug('timeslice end: %s, %s' % (endT, clock.getRealTime())) job.suspend() overflowTime = clock.getRealTime() - endT if overflowTime > self.getTimeslice(): self._jobId2overflowTime[jobId] += overflowTime if __debug__: job._pstats.stop() break if len(self._pri2jobId2job) == 0: # there's nothing left to do, all the jobs are done! break return task.cont def __repr__(self): s = '=======================================================' s += '\nJobManager: active jobs in descending order of priority' s += '\n=======================================================' pris = self._getSortedPriorities() if len(pris) == 0: s += '\n no jobs running' else: pris.reverse() for pri in pris: jobId2job = self._pri2jobId2job[pri] # run through the jobs at this priority in the order that they will run for jobId in self._pri2jobIds[pri]: job = jobId2job[jobId] s += '\n%5d: %s (jobId %s)' % (pri, job.getJobName(), jobId) s += '\n' return s