Source code for direct.stdpy.thread

""" This module reimplements Python's native thread module using Panda
threading constructs.  It's designed as a drop-in replacement for the
thread module for code that works with Panda; it is necessary because
in some compilation models, Panda's threading constructs are
incompatible with the OS-provided threads used by Python's thread
module. """

from __future__ import annotations

__all__ = [
    'error', 'LockType',
    'start_new_thread',
    'interrupt_main',
    'exit', 'allocate_lock', 'get_ident',
    'stack_size',
    'force_yield', 'consider_yield',
    'forceYield', 'considerYield',
    'TIMEOUT_MAX'
]

from panda3d import core
import sys

from collections.abc import Callable, Iterable, Mapping
from typing import Any

if sys.platform == "win32":
    TIMEOUT_MAX = float(0xffffffff // 1000)
else:
    TIMEOUT_MAX = float(0x7fffffffffffffff // 1000000000)

# These methods are defined in Panda, and are particularly useful if
# you may be running in Panda's SIMPLE_THREADS compilation mode.
force_yield = core.Thread.force_yield
consider_yield = core.Thread.consider_yield

forceYield = force_yield
considerYield = consider_yield
error = RuntimeError


[docs]class LockType: """ Implements a mutex lock. Instead of directly subclassing PandaModules.Mutex, we reimplement the lock here, to allow us to provide the described Python lock semantics. In particular, this allows a different thread to release the lock than the one that acquired it. """
[docs] def __init__(self) -> None: self.__lock = core.Mutex('PythonLock') self.__cvar = core.ConditionVar(self.__lock) self.__locked = False
[docs] def acquire(self, waitflag: bool = True, timeout: float = -1) -> bool: self.__lock.acquire() try: if self.__locked and not waitflag: return False if timeout >= 0: while self.__locked: self.__cvar.wait(timeout) else: while self.__locked: self.__cvar.wait() self.__locked = True return True finally: self.__lock.release()
[docs] def release(self) -> None: self.__lock.acquire() try: if not self.__locked: raise error('Releasing unheld lock.') self.__locked = False self.__cvar.notify() finally: self.__lock.release()
[docs] def locked(self): return self.__locked
__enter__ = acquire def __exit__(self, t, v, tb): self.release()
# Helper to generate new thread names _counter = 0 def _newname(template: str = "Thread-%d") -> str: global _counter _counter = _counter + 1 return template % _counter _threads: dict[int, tuple[core.Thread, dict[int, dict[str, Any]], Any | None]] = {} _nextThreadId = 0 _threadsLock = core.Mutex('thread._threadsLock')
[docs]def start_new_thread( function: Callable[..., object], args: Iterable[Any], kwargs: Mapping[str, Any] = {}, name: str | None = None, ) -> int: def threadFunc(threadId, function = function, args = args, kwargs = kwargs): try: try: function(*args, **kwargs) except SystemExit: pass finally: _remove_thread_id(threadId) global _nextThreadId _threadsLock.acquire() try: threadId = _nextThreadId _nextThreadId += 1 if name is None: name = 'PythonThread-%s' % (threadId) thread = core.PythonThread(threadFunc, [threadId], name, name) thread.setPythonIndex(threadId) _threads[threadId] = (thread, {}, None) thread.start(core.TPNormal, False) return threadId finally: _threadsLock.release()
def _add_thread(thread: core.Thread, wrapper: Any) -> int: """ Adds the indicated core.Thread object, with the indicated Python wrapper, to the thread list. Returns the new thread ID. """ global _nextThreadId _threadsLock.acquire() try: threadId = _nextThreadId _nextThreadId += 1 thread.setPythonIndex(threadId) _threads[threadId] = (thread, {}, wrapper) return threadId finally: _threadsLock.release() def _get_thread_wrapper(thread: core.Thread, wrapperClass: Callable[[core.Thread, int], Any]) -> Any: """ Returns the thread wrapper for the indicated thread. If there is not one, creates an instance of the indicated wrapperClass instead. """ threadId = thread.getPythonIndex() if threadId == -1: # The thread has never been assigned a threadId. Go assign one. global _nextThreadId _threadsLock.acquire() try: threadId = _nextThreadId _nextThreadId += 1 thread.setPythonIndex(threadId) wrapper = wrapperClass(thread, threadId) _threads[threadId] = (thread, {}, wrapper) return wrapper finally: _threadsLock.release() else: # The thread has been assigned a threadId. Look for the wrapper. _threadsLock.acquire() try: t, locals, wrapper = _threads[threadId] assert t == thread if wrapper is None: wrapper = wrapperClass(thread, threadId) _threads[threadId] = (thread, locals, wrapper) return wrapper finally: _threadsLock.release() def _get_thread_locals(thread, i): """ Returns the locals dictionary for the indicated thread. If there is not one, creates an empty dictionary. """ threadId = thread.getPythonIndex() if threadId == -1: # The thread has never been assigned a threadId. Go assign one. global _nextThreadId _threadsLock.acquire() try: threadId = _nextThreadId _nextThreadId += 1 thread.setPythonIndex(threadId) locals = {} _threads[threadId] = (thread, locals, None) return locals.setdefault(i, {}) finally: _threadsLock.release() else: # The thread has been assigned a threadId. Get the locals. _threadsLock.acquire() try: t, locals, wrapper = _threads[threadId] assert t == thread return locals.setdefault(i, {}) finally: _threadsLock.release() def _remove_thread_id(threadId: int) -> None: """ Removes the thread with the indicated ID from the thread list. """ # On interpreter shutdown, Python may set module globals to None. if _threadsLock is None or _threads is None: return _threadsLock.acquire() try: if threadId in _threads: thread, locals, wrapper = _threads[threadId] assert thread.getPythonIndex() == threadId del _threads[threadId] thread.setPythonIndex(-1) finally: _threadsLock.release()
[docs]def interrupt_main(): # TODO. pass
[docs]def exit(): raise SystemExit
[docs]def allocate_lock() -> LockType: return LockType()
[docs]def get_ident() -> int: return core.Thread.getCurrentThread().this
[docs]def stack_size(size = 0): raise error
class _local(object): """ This class provides local thread storage using Panda's threading system. """ def __del__(self): i = id(self) # Delete this key from all threads. _threadsLock.acquire() try: for thread, locals, wrapper in list(_threads.values()): try: del locals[i] except KeyError: pass finally: _threadsLock.release() def __setattr__(self, key, value): d = _get_thread_locals(core.Thread.getCurrentThread(), id(self)) d[key] = value def __getattribute__(self, key): d = _get_thread_locals(core.Thread.getCurrentThread(), id(self)) if key == '__dict__': return d try: return d[key] except KeyError: return object.__getattribute__(self, key)