Source code for direct.showbase.VFSImporter

"""The VFS importer allows importing Python modules from Panda3D's virtual
file system, through Python's standard import mechanism.

Calling the :func:`register()` function to register the import hooks should be
sufficient to enable this functionality.
"""

from __future__ import annotations

__all__ = ['register']

from panda3d.core import Filename, VirtualFile, VirtualFileSystem, VirtualFileMountSystem
from panda3d.core import OFileStream, copy_stream
import sys
import marshal
import _imp
import atexit
from importlib.abc import Loader, SourceLoader
from importlib.util import MAGIC_NUMBER, decode_source
from importlib.machinery import ModuleSpec, EXTENSION_SUFFIXES, BYTECODE_SUFFIXES
from types import ModuleType
from typing import Any

vfs = VirtualFileSystem.get_global_ptr()


def _make_spec(fullname: str, loader: VFSLoader, *, is_package: bool) -> ModuleSpec:
    filename = loader._vfile.get_filename()
    spec = ModuleSpec(fullname, loader, origin=filename.to_os_specific(), is_package=is_package)
    if is_package:
        assert spec.submodule_search_locations is not None
        spec.submodule_search_locations.append(Filename(filename.get_dirname()).to_os_specific())
    spec.has_location = True
    return spec


[docs]class VFSFinder: """ This class serves as a Python importer to support loading Python .py and .pyc/.pyo files from Panda's Virtual File System, which allows loading Python source files from mounted .mf files (among other places). """
[docs] def __init__(self, path: str) -> None: self.path = path
[docs] def find_spec(self, fullname: str, target: ModuleType | None = None) -> ModuleSpec | None: #print(f"find_spec({fullname}), dir_path = {dir_path}", file=sys.stderr) basename = fullname.split('.')[-1] filename = Filename(Filename.from_os_specific(self.path), basename) loader: VFSLoader # First, look for Python files. vfile = vfs.get_file(filename + '.py', True) if vfile: loader = VFSSourceLoader(fullname, vfile) return _make_spec(fullname, loader, is_package=False) # If there's no .py file, but there's a .pyc file, load that # anyway. for suffix in BYTECODE_SUFFIXES: vfile = vfs.get_file(filename + suffix, True) if vfile: loader = VFSCompiledLoader(fullname, vfile) return _make_spec(fullname, loader, is_package=False) # Look for a C/C++ extension module. for suffix in EXTENSION_SUFFIXES: vfile = vfs.get_file(filename + suffix, True) if vfile: loader = VFSExtensionLoader(fullname, vfile) return _make_spec(fullname, loader, is_package=False) # Consider a package, i.e. a directory containing __init__.py. init_filename = Filename(filename, '__init__.py') vfile = vfs.get_file(init_filename, True) if vfile: loader = VFSSourceLoader(fullname, vfile) return _make_spec(fullname, loader, is_package=True) for suffix in BYTECODE_SUFFIXES: init_filename = Filename(filename, '__init__' + suffix) vfile = vfs.get_file(init_filename, True) if vfile: loader = VFSCompiledLoader(fullname, vfile) return _make_spec(fullname, loader, is_package=True) # Consider a namespace package. if vfs.is_directory(filename): spec = ModuleSpec(fullname, VFSNamespaceLoader(), is_package=True) assert spec.submodule_search_locations is not None spec.submodule_search_locations.append(filename.to_os_specific()) return spec #print("not found.", file=sys.stderr) return None
[docs]class VFSLoader(Loader):
[docs] def __init__(self, fullname: str, vfile: VirtualFile) -> None: self.name = fullname self._vfile = vfile
[docs] def is_package(self, fullname): if fullname is not None and self.name != fullname: raise ImportError filename = self._vfile.get_filename().get_basename() filename_base = filename.rsplit('.', 1)[0] tail_name = fullname.rpartition('.')[2] return filename_base == '__init__' and tail_name != '__init__'
[docs] def create_module(self, spec: ModuleSpec) -> ModuleType | None: """Use default semantics for module creation."""
[docs] def exec_module(self, module: ModuleType) -> None: """Execute the module.""" code = self.get_code(module.__name__) # type: ignore[attr-defined] exec(code, module.__dict__)
[docs] def get_filename(self, fullname: str) -> str: if fullname is not None and self.name != fullname: raise ImportError return self._vfile.get_filename().to_os_specific()
[docs] @staticmethod def get_data(path: str) -> bytes: vfile = vfs.get_file(Filename.from_os_specific(path)) if vfile: return vfile.read_file(True) else: raise OSError
[docs] @staticmethod def path_stats(path: str) -> dict[str, Any]: vfile = vfs.get_file(Filename.from_os_specific(path)) if vfile: return {'mtime': vfile.get_timestamp(), 'size': vfile.get_file_size()} else: raise OSError
[docs] @staticmethod def path_mtime(path): vfile = vfs.get_file(Filename.from_os_specific(path)) if vfile: return vfile.get_timestamp() else: raise OSError
[docs]class VFSSourceLoader(VFSLoader, SourceLoader): # type: ignore[misc]
[docs] def get_source(self, fullname): if fullname is not None and self.name != fullname: raise ImportError return decode_source(self._vfile.read_file(True))
[docs]class VFSCompiledLoader(VFSLoader):
[docs] def get_code(self, fullname): if fullname is not None and self.name != fullname: raise ImportError vfile = self._vfile data = vfile.read_file(True) if data[:4] != MAGIC_NUMBER: raise ImportError("Bad magic number in %s" % (vfile)) return marshal.loads(data[16:])
[docs] def get_source(self, fullname): return None
[docs]class VFSExtensionLoader(VFSLoader):
[docs] def create_module(self, spec): vfile = self._vfile filename = vfile.get_filename() # We can only import an extension module if it already exists on # disk. This means if it's a truly virtual file that has no # on-disk equivalent, we have to write it to a temporary file # first. if isinstance(vfile.get_mount(), VirtualFileMountSystem): # It's a real file. pass elif filename.exists(): # It's a virtual file, but it's shadowing a real file in # the same directory. Assume they're the same, and load # the real one. pass else: # It's a virtual file with no real-world existence. Dump # it to disk. ext = filename.get_extension() tmp_filename = Filename.temporary('', filename.get_basename_wo_extension(), '.' + ext, type = Filename.T_dso) tmp_filename.set_extension(ext) tmp_filename.set_binary() sin = vfile.open_read_file(True) try: sout = OFileStream() if not tmp_filename.open_write(sout): raise IOError if not copy_stream(sin, sout): raise IOError finally: vfile.close_read_file(sin) del sout # Delete when the process ends. atexit.register(tmp_filename.unlink) # Make a dummy spec to pass to create_dynamic with the path to # our temporary file. spec = ModuleSpec(spec.name, spec.loader, origin=tmp_filename.to_os_specific(), is_package=False) module = _imp.create_dynamic(spec) module.__file__ = filename.to_os_specific() return module
[docs] def exec_module(self, module): _imp.exec_dynamic(module)
[docs] def is_package(self, fullname): return False
[docs] def get_code(self, fullname): return None
[docs] def get_source(self, fullname): return None
[docs]class VFSNamespaceLoader(Loader):
[docs] def create_module(self, spec: ModuleSpec) -> ModuleType | None: """Use default semantics for module creation."""
[docs] def exec_module(self, module: ModuleType) -> None: pass
[docs] def is_package(self, fullname): return True
[docs] def get_source(self, fullname): return ''
[docs] def get_code(self, fullname): return compile('', '<string>', 'exec', dont_inherit=True)
def _path_hook(entry: str) -> VFSFinder: # If this is a directory in the VFS, create a VFSFinder for this entry. vfile = vfs.get_file(Filename.from_os_specific(entry), False) if vfile and vfile.is_directory() and not isinstance(vfile.get_mount(), VirtualFileMountSystem): return VFSFinder(entry) else: raise ImportError _registered = False
[docs]def register() -> None: """ Register the VFSFinder on the path_hooks, if it has not already been registered, so that future Python import statements will vector through here (and therefore will take advantage of Panda's virtual file system). """ global _registered if not _registered: _registered = True sys.path_hooks.insert(0, _path_hook) # Blow away the importer cache, so we'll come back through the # VFSFinder for every folder in the future, even those # folders that previously were loaded directly. sys.path_importer_cache = {}