Source code for direct.fsm.ClassicFSM

"""Finite State Machine module: contains the ClassicFSM class.

Note:
    This module and class exist only for backward compatibility with
    existing code.  New code should use the :mod:`.FSM` module instead.
"""

__all__ = ['ClassicFSM']

from direct.directnotify.DirectNotifyGlobal import directNotify
from direct.showbase.DirectObject import DirectObject
from direct.showbase.MessengerGlobal import messenger
import weakref

if __debug__:
    _debugFsms = {}

[docs] def printDebugFsmList(): global _debugFsms for k in sorted(_debugFsms.keys()): print("%s %s" % (k, _debugFsms[k]()))
__builtins__['debugFsmList'] = printDebugFsmList
[docs]class ClassicFSM(DirectObject): """ Finite State Machine class. This module and class exist only for backward compatibility with existing code. New code should use the FSM class instead. """ # create ClassicFSM DirectNotify category notify = directNotify.newCategory("ClassicFSM") # special methods # these are flags that tell the ClassicFSM what to do when an # undefined transition is requested: ALLOW = 0 # print a warning, and do the transition DISALLOW = 1 # silently ignore the request (don't do the transition) DISALLOW_VERBOSE = 2 # print a warning, and don't do the transition ERROR = 3 # print an error message and raise an exception
[docs] def __init__(self, name, states=[], initialStateName=None, finalStateName=None, onUndefTransition=DISALLOW_VERBOSE): """__init__(self, string, State[], string, string, int) ClassicFSM constructor: takes name, list of states, initial state and final state as:: fsm = ClassicFSM.ClassicFSM('stopLight', [State.State('red', enterRed, exitRed, ['green']), State.State('yellow', enterYellow, exitYellow, ['red']), State.State('green', enterGreen, exitGreen, ['yellow'])], 'red', 'red') each state's last argument, a list of allowed state transitions, is optional; if left out (or explicitly specified to be State.State.Any) then any transition from the state is 'defined' and allowed 'onUndefTransition' flag determines behavior when undefined transition is requested; see flag definitions above """ self.setName(name) self.setStates(states) self.setInitialState(initialStateName) self.setFinalState(finalStateName) self.onUndefTransition = onUndefTransition # Flag to see if we are inspecting self.inspecting = 0 # We do not enter the initial state to separate # construction from activation self.__currentState = None # We set this while we are modifying the state. No one else # should recursively attempt to modify the state while we are # doing this. self.__internalStateInFlux = 0 if __debug__: global _debugFsms _debugFsms[name] = weakref.ref(self)
# I know this isn't how __repr__ is supposed to be used, but it # is nice and convenient. def __repr__(self): return self.__str__() def __str__(self): """ Print out something useful about the fsm """ currentState = self.getCurrentState() if currentState: str = ("ClassicFSM " + self.getName() + ' in state "' + currentState.getName() + '"') else: str = ("ClassicFSM " + self.getName() + ' not in any state') return str
[docs] def enterInitialState(self, argList=[]): assert not self.__internalStateInFlux if self.__currentState == self.__initialState: return assert self.__currentState is None self.__internalStateInFlux = 1 self.__enter(self.__initialState, argList) assert not self.__internalStateInFlux
# setters and getters
[docs] def getName(self): return self.__name
[docs] def setName(self, name): self.__name = name
[docs] def getStates(self): return list(self.__states.values())
[docs] def setStates(self, states): """setStates(self, State[])""" # Make a dictionary from stateName -> state self.__states = {} for state in states: self.__states[state.getName()] = state
[docs] def addState(self, state): self.__states[state.getName()] = state
[docs] def getInitialState(self): return self.__initialState
[docs] def setInitialState(self, initialStateName): self.__initialState = self.getStateNamed(initialStateName)
[docs] def getFinalState(self): return self.__finalState
[docs] def setFinalState(self, finalStateName): self.__finalState = self.getStateNamed(finalStateName)
[docs] def requestFinalState(self): self.request(self.getFinalState().getName())
[docs] def getCurrentState(self): return self.__currentState
# lookup funcs
[docs] def getStateNamed(self, stateName): """ Return the state with given name if found, issue warning otherwise """ state = self.__states.get(stateName) if state: return state else: ClassicFSM.notify.warning("[%s]: getStateNamed: %s, no such state" % (self.__name, stateName))
[docs] def hasStateNamed(self, stateName): """ Return True if stateName is a valid state, False otherwise. """ result = False state = self.__states.get(stateName) if state: result = True return result
# basic ClassicFSM functionality def __exitCurrent(self, argList): """ Exit the current state """ assert self.__internalStateInFlux assert ClassicFSM.notify.debug("[%s]: exiting %s" % (self.__name, self.__currentState.getName())) self.__currentState.exit(argList) # Only send the state change event if we are inspecting it # If this event turns out to be generally useful, we can # turn it on all the time, but for now nobody else is using it if self.inspecting: messenger.send(self.getName() + '_' + self.__currentState.getName() + '_exited') self.__currentState = None def __enter(self, aState, argList=[]): """ Enter a given state, if it exists """ assert self.__internalStateInFlux stateName = aState.getName() if stateName in self.__states: assert ClassicFSM.notify.debug("[%s]: entering %s" % (self.__name, stateName)) self.__currentState = aState # Only send the state change event if we are inspecting it # If this event turns out to be generally useful, we can # turn it on all the time, but for now nobody else is using it if self.inspecting: messenger.send(self.getName() + '_' + stateName + '_entered') # Once we begin entering the new state, we're allow to # recursively request a transition to another state. # Indicate this by marking our internal state no longer in # flux. self.__internalStateInFlux = 0 aState.enter(argList) else: # notify.error is going to raise an exception; reset the # flux flag first self.__internalStateInFlux = 0 ClassicFSM.notify.error("[%s]: enter: no such state" % (self.__name)) def __transition(self, aState, enterArgList=[], exitArgList=[]): """ Exit currentState and enter given one """ assert not self.__internalStateInFlux self.__internalStateInFlux = 1 self.__exitCurrent(exitArgList) self.__enter(aState, enterArgList) assert not self.__internalStateInFlux
[docs] def request(self, aStateName, enterArgList=[], exitArgList=[], force=0): """ Attempt transition from currentState to given one. Return true is transition exists to given state, false otherwise. """ # If you trigger this assertion failure, you must have # recursively requested a state transition from within the # exitState() function for the previous state. This is not # supported because we're not fully transitioned into the new # state yet. assert not self.__internalStateInFlux if not self.__currentState: # Make this a warning for now ClassicFSM.notify.warning("[%s]: request: never entered initial state" % (self.__name)) self.__currentState = self.__initialState if isinstance(aStateName, str): aState = self.getStateNamed(aStateName) else: # Allow the caller to pass in a state in itself, not just # the name of a state. aState = aStateName aStateName = aState.getName() if aState is None: ClassicFSM.notify.error("[%s]: request: %s, no such state" % (self.__name, aStateName)) # is the transition defined? if it isn't, should we allow it? transitionDefined = self.__currentState.isTransitionDefined(aStateName) transitionAllowed = transitionDefined if self.onUndefTransition == ClassicFSM.ALLOW: transitionAllowed = 1 if not transitionDefined: # the transition is not defined, but we're going to do it # anyway. print a warning. ClassicFSM.notify.warning( "[%s]: performing undefined transition from %s to %s" % (self.__name, self.__currentState.getName(), aStateName)) if transitionAllowed or force: self.__transition(aState, enterArgList, exitArgList) return 1 # We can implicitly always transition to our final state. elif aStateName == self.__finalState.getName(): if self.__currentState == self.__finalState: # Do not do the transition if we are already in the # final state assert ClassicFSM.notify.debug( "[%s]: already in final state: %s" % (self.__name, aStateName)) return 1 else: # Force a transition to allow for cleanup assert ClassicFSM.notify.debug( "[%s]: implicit transition to final state: %s" % (self.__name, aStateName)) self.__transition(aState, enterArgList, exitArgList) return 1 # are we already in this state? elif aStateName == self.__currentState.getName(): assert ClassicFSM.notify.debug( "[%s]: already in state %s and no self transition" % (self.__name, aStateName)) return 0 else: msg = ("[%s]: no transition exists from %s to %s" % (self.__name, self.__currentState.getName(), aStateName)) if self.onUndefTransition == ClassicFSM.ERROR: ClassicFSM.notify.error(msg) elif self.onUndefTransition == ClassicFSM.DISALLOW_VERBOSE: ClassicFSM.notify.warning(msg) return 0
[docs] def forceTransition(self, aStateName, enterArgList=[], exitArgList=[]): """ force a transition -- for debugging ONLY """ self.request(aStateName, enterArgList, exitArgList, force=1)
[docs] def conditional_request(self, aStateName, enterArgList=[], exitArgList=[]): """ 'if this transition is defined, do it' Attempt transition from currentState to given one, if it exists. Return true if transition exists to given state, false otherwise. It is NOT an error/warning to attempt a cond_request if the transition doesn't exist. This lets people be sloppy about ClassicFSM transitions, letting the same fn be used for different states that may not have the same out transitions. """ assert not self.__internalStateInFlux if not self.__currentState: # Make this a warning for now ClassicFSM.notify.warning("[%s]: request: never entered initial state" % (self.__name)) self.__currentState = self.__initialState if isinstance(aStateName, str): aState = self.getStateNamed(aStateName) else: # Allow the caller to pass in a state in itself, not just # the name of a state. aState = aStateName aStateName = aState.getName() if aState is None: ClassicFSM.notify.error("[%s]: request: %s, no such state" % (self.__name, aStateName)) transitionDefined = ( self.__currentState.isTransitionDefined(aStateName) or aStateName in [self.__currentState.getName(), self.__finalState.getName()] ) if transitionDefined: return self.request(aStateName, enterArgList, exitArgList) else: assert ClassicFSM.notify.debug( "[%s]: condition_request: %s, transition doesnt exist" % (self.__name, aStateName)) return 0
[docs] def view(self): # Don't use a regular import, to prevent ModuleFinder from picking # it up as a dependency when building a .p3d package. import importlib FSMInspector = importlib.import_module('direct.tkpanels.FSMInspector') FSMInspector.FSMInspector(self)
[docs] def isInternalStateInFlux(self): return self.__internalStateInFlux