Source code for direct.showbase.VFSImporter

"""The VFS importer allows importing Python modules from Panda3D's virtual
file system, through Python's standard import mechanism.

Calling the :func:`register()` function to register the import hooks should be
sufficient to enable this functionality.
"""

__all__ = ['register', 'sharedPackages',
           'reloadSharedPackage', 'reloadSharedPackages']

from panda3d.core import Filename, VirtualFileSystem, VirtualFileMountSystem, OFileStream, copyStream
from direct.stdpy.file import open
import sys
import marshal
import imp
import types

#: The sharedPackages dictionary lists all of the "shared packages",
#: special Python packages that automatically span multiple directories
#: via magic in the VFSImporter.  You can make a package "shared"
#: simply by adding its name into this dictionary (and then calling
#: reloadSharedPackages() if it's already been imported).
#:
#: When a package name is in this dictionary at import time, *all*
#: instances of the package are located along sys.path, and merged into
#: a single Python module with a __path__ setting that represents the
#: union.  Thus, you can have a direct.showbase.foo in your own
#: application, and loading it won't shadow the system
#: direct.showbase.ShowBase which is in a different directory on disk.
sharedPackages = {}

vfs = VirtualFileSystem.getGlobalPtr()

compiledExtensions = [ 'pyc', 'pyo' ]
if not __debug__:
    # In optimized mode, we prefer loading .pyo files over .pyc files.
    # We implement that by reversing the extension names.
    compiledExtensions = [ 'pyo', 'pyc' ]


[docs]class VFSImporter: """ This class serves as a Python importer to support loading Python .py and .pyc/.pyo files from Panda's Virtual File System, which allows loading Python source files from mounted .mf files (among other places). """
[docs] def __init__(self, path): if isinstance(path, Filename): self.dir_path = Filename(path) else: self.dir_path = Filename.fromOsSpecific(path)
[docs] def find_module(self, fullname, path = None): if path is None: dir_path = self.dir_path else: dir_path = path #print >>sys.stderr, "find_module(%s), dir_path = %s" % (fullname, dir_path) basename = fullname.split('.')[-1] path = Filename(dir_path, basename) # First, look for Python files. filename = Filename(path) filename.setExtension('py') vfile = vfs.getFile(filename, True) if vfile: return VFSLoader(dir_path, vfile, filename, desc=('.py', 'U' if sys.version_info < (3, 4) else 'r', imp.PY_SOURCE)) # If there's no .py file, but there's a .pyc file, load that # anyway. for ext in compiledExtensions: filename = Filename(path) filename.setExtension(ext) vfile = vfs.getFile(filename, True) if vfile: return VFSLoader(dir_path, vfile, filename, desc=('.'+ext, 'rb', imp.PY_COMPILED)) # Look for a C/C++ extension module. for desc in imp.get_suffixes(): if desc[2] != imp.C_EXTENSION: continue filename = Filename(path + desc[0]) vfile = vfs.getFile(filename, True) if vfile: return VFSLoader(dir_path, vfile, filename, desc=desc) # Finally, consider a package, i.e. a directory containing # __init__.py. filename = Filename(path, '__init__.py') vfile = vfs.getFile(filename, True) if vfile: return VFSLoader(dir_path, vfile, filename, packagePath=path, desc=('.py', 'U' if sys.version_info < (3, 4) else 'r', imp.PY_SOURCE)) for ext in compiledExtensions: filename = Filename(path, '__init__.' + ext) vfile = vfs.getFile(filename, True) if vfile: return VFSLoader(dir_path, vfile, filename, packagePath=path, desc=('.'+ext, 'rb', imp.PY_COMPILED)) #print >>sys.stderr, "not found." return None
[docs]class VFSLoader: """ The second part of VFSImporter, this is created for a particular .py file or directory. """
[docs] def __init__(self, dir_path, vfile, filename, desc, packagePath=None): self.dir_path = dir_path self.timestamp = None if vfile: self.timestamp = vfile.getTimestamp() self.filename = filename self.desc = desc self.packagePath = packagePath
[docs] def load_module(self, fullname, loadingShared = False): #print >>sys.stderr, "load_module(%s), dir_path = %s, filename = %s" % (fullname, self.dir_path, self.filename) if self.desc[2] == imp.PY_FROZEN: return self._import_frozen_module(fullname) if self.desc[2] == imp.C_EXTENSION: return self._import_extension_module(fullname) # Check if this is a child of a shared package. if not loadingShared and self.packagePath and '.' in fullname: parentname = fullname.rsplit('.', 1)[0] if parentname in sharedPackages: # It is. That means it's a shared package too. parent = sys.modules[parentname] path = getattr(parent, '__path__', None) importer = VFSSharedImporter() sharedPackages[fullname] = True loader = importer.find_module(fullname, path = path) assert loader return loader.load_module(fullname) code = self._read_code() if not code: raise ImportError('No Python code in %s' % (fullname)) mod = sys.modules.setdefault(fullname, imp.new_module(fullname)) mod.__file__ = self.filename.toOsSpecific() mod.__loader__ = self if self.packagePath: mod.__path__ = [self.packagePath.toOsSpecific()] #print >> sys.stderr, "loaded %s, path = %s" % (fullname, mod.__path__) exec(code, mod.__dict__) return sys.modules[fullname]
[docs] def getdata(self, path): path = Filename(self.dir_path, Filename.fromOsSpecific(path)) vfile = vfs.getFile(path) if not vfile: raise IOError("Could not find '%s'" % (path)) return vfile.readFile(True)
[docs] def is_package(self, fullname): return bool(self.packagePath)
[docs] def get_code(self, fullname): return self._read_code()
[docs] def get_source(self, fullname): return self._read_source()
[docs] def get_filename(self, fullname): return self.filename.toOsSpecific()
def _read_source(self): """ Returns the Python source for this file, if it is available, or None if it is not. May raise IOError. """ if self.desc[2] == imp.PY_COMPILED or \ self.desc[2] == imp.C_EXTENSION: return None filename = Filename(self.filename) filename.setExtension('py') filename.setText() if sys.version_info >= (3, 0): # Use the tokenize module to detect the encoding. import tokenize fh = open(self.filename, 'rb') encoding, lines = tokenize.detect_encoding(fh.readline) return (b''.join(lines) + fh.read()).decode(encoding) else: return open(self.filename, self.desc[1]).read() def _import_extension_module(self, fullname): """ Loads the binary shared object as a Python module, and returns it. """ vfile = vfs.getFile(self.filename, False) # We can only import an extension module if it already exists on # disk. This means if it's a truly virtual file that has no # on-disk equivalent, we have to write it to a temporary file # first. if hasattr(vfile, 'getMount') and \ isinstance(vfile.getMount(), VirtualFileMountSystem): # It's a real file. filename = self.filename elif self.filename.exists(): # It's a virtual file, but it's shadowing a real file in # the same directory. Assume they're the same, and load # the real one. filename = self.filename else: # It's a virtual file with no real-world existence. Dump # it to disk. TODO: clean up this filename. filename = Filename.temporary('', self.filename.getBasenameWoExtension(), '.' + self.filename.getExtension(), type = Filename.TDso) filename.setExtension(self.filename.getExtension()) filename.setBinary() sin = vfile.openReadFile(True) sout = OFileStream() if not filename.openWrite(sout): raise IOError if not copyStream(sin, sout): raise IOError vfile.closeReadFile(sin) del sout module = imp.load_module(fullname, None, filename.toOsSpecific(), self.desc) module.__file__ = self.filename.toOsSpecific() return module def _import_frozen_module(self, fullname): """ Imports the frozen module without messing around with searching any more. """ #print >>sys.stderr, "importing frozen %s" % (fullname) module = imp.load_module(fullname, None, fullname, ('', '', imp.PY_FROZEN)) # Workaround for bug in Python 2. if getattr(module, '__path__', None) == fullname: module.__path__ = [] return module def _read_code(self): """ Returns the Python compiled code object for this file, if it is available, or None if it is not. May raise IOError, ValueError, SyntaxError, or a number of other errors generated by the low-level system. """ if self.desc[2] == imp.PY_COMPILED: # It's a pyc file; just read it directly. pycVfile = vfs.getFile(self.filename, False) if pycVfile: return self._loadPyc(pycVfile, None) raise IOError('Could not read %s' % (self.filename)) elif self.desc[2] == imp.C_EXTENSION: return None # It's a .py file (or an __init__.py file; same thing). Read # the .pyc file if it is available and current; otherwise read # the .py file and compile it. t_pyc = None for ext in compiledExtensions: pycFilename = Filename(self.filename) pycFilename.setExtension(ext) pycVfile = vfs.getFile(pycFilename, False) if pycVfile: t_pyc = pycVfile.getTimestamp() break code = None if t_pyc and t_pyc >= self.timestamp: try: code = self._loadPyc(pycVfile, self.timestamp) except ValueError: code = None if not code: source = self._read_source() filename = Filename(self.filename) filename.setExtension('py') code = self._compile(filename, source) return code def _loadPyc(self, vfile, timestamp): """ Reads and returns the marshal data from a .pyc file. Raises ValueError if there is a problem. """ code = None data = vfile.readFile(True) if data[:4] != imp.get_magic(): raise ValueError("Bad magic number in %s" % (vfile)) if sys.version_info >= (3, 0): t = int.from_bytes(data[4:8], 'little') data = data[12:] else: t = ord(data[4]) + (ord(data[5]) << 8) + \ (ord(data[6]) << 16) + (ord(data[7]) << 24) data = data[8:] if not timestamp or t == timestamp: return marshal.loads(data) else: raise ValueError("Timestamp wrong on %s" % (vfile)) def _compile(self, filename, source): """ Compiles the Python source code to a code object and attempts to write it to an appropriate .pyc file. May raise SyntaxError or other errors generated by the compiler. """ if source and source[-1] != '\n': source = source + '\n' code = compile(source, filename.toOsSpecific(), 'exec') # try to cache the compiled code pycFilename = Filename(filename) pycFilename.setExtension(compiledExtensions[0]) try: f = open(pycFilename.toOsSpecific(), 'wb') except IOError: pass else: f.write(imp.get_magic()) if sys.version_info >= (3, 0): f.write((self.timestamp & 0xffffffff).to_bytes(4, 'little')) f.write(b'\0\0\0\0') else: f.write(chr(self.timestamp & 0xff) + chr((self.timestamp >> 8) & 0xff) + chr((self.timestamp >> 16) & 0xff) + chr((self.timestamp >> 24) & 0xff)) f.write(marshal.dumps(code)) f.close() return code
[docs]class VFSSharedImporter: """ This is a special importer that is added onto the meta_path list, so that it is called before sys.path is traversed. It uses special logic to load one of the "shared" packages, by searching the entire sys.path for all instances of this shared package, and merging them. """
[docs] def __init__(self): pass
[docs] def find_module(self, fullname, path = None, reload = False): #print >>sys.stderr, "shared find_module(%s), path = %s" % (fullname, path) if fullname not in sharedPackages: # Not a shared package; fall back to normal import. return None if path is None: path = sys.path excludePaths = [] if reload: # If reload is true, we are simply reloading the module, # looking for new paths to add. mod = sys.modules[fullname] excludePaths = getattr(mod, '_vfs_shared_path', None) if excludePaths is None: # If there isn't a _vfs_shared_path symbol already, # the module must have been loaded through # conventional means. Try to guess which path it was # found on. d = self.getLoadedDirname(mod) excludePaths = [d] loaders = [] for dir in path: if dir in excludePaths: continue importer = sys.path_importer_cache.get(dir, None) if importer is None: try: importer = VFSImporter(dir) except ImportError: continue sys.path_importer_cache[dir] = importer try: loader = importer.find_module(fullname) if not loader: continue except ImportError: continue loaders.append(loader) if not loaders: return None return VFSSharedLoader(loaders, reload = reload)
[docs] def getLoadedDirname(self, mod): """ Returns the directory name that the indicated conventionally-loaded module must have been loaded from. """ if not getattr(mod, '__file__', None): return None fullname = mod.__name__ dirname = Filename.fromOsSpecific(mod.__file__).getDirname() parentname = None basename = fullname if '.' in fullname: parentname, basename = fullname.rsplit('.', 1) path = None if parentname: parent = sys.modules[parentname] path = parent.__path__ if path is None: path = sys.path for dir in path: pdir = str(Filename.fromOsSpecific(dir)) if pdir + '/' + basename == dirname: # We found it! return dir # Couldn't figure it out. return None
[docs]class VFSSharedLoader: """ The second part of VFSSharedImporter, this imports a list of packages and combines them. """
[docs] def __init__(self, loaders, reload): self.loaders = loaders self.reload = reload
[docs] def load_module(self, fullname): #print >>sys.stderr, "shared load_module(%s), loaders = %s" % (fullname, map(lambda l: l.dir_path, self.loaders)) mod = None message = None path = [] vfs_shared_path = [] if self.reload: mod = sys.modules[fullname] path = mod.__path__ or [] if path == fullname: # Work around Python bug setting __path__ of frozen modules. path = [] vfs_shared_path = getattr(mod, '_vfs_shared_path', []) for loader in self.loaders: try: mod = loader.load_module(fullname, loadingShared = True) except ImportError: etype, evalue, etraceback = sys.exc_info() print("%s on %s: %s" % (etype.__name__, fullname, evalue)) if not message: message = '%s: %s' % (fullname, evalue) continue for dir in getattr(mod, '__path__', []): if dir not in path: path.append(dir) if mod is None: # If all of them failed to load, raise ImportError. raise ImportError(message) # If at least one of them loaded successfully, return the # union of loaded modules. mod.__path__ = path mod.__package__ = fullname # Also set this special symbol, which records that this is a # shared package, and also lists the paths we have already # loaded. mod._vfs_shared_path = vfs_shared_path + [l.dir_path for l in self.loaders] return mod
_registered = False
[docs]def register(): """ Register the VFSImporter on the path_hooks, if it has not already been registered, so that future Python import statements will vector through here (and therefore will take advantage of Panda's virtual file system). """ global _registered if not _registered: _registered = True sys.path_hooks.insert(0, VFSImporter) sys.meta_path.insert(0, VFSSharedImporter()) # Blow away the importer cache, so we'll come back through the # VFSImporter for every folder in the future, even those # folders that previously were loaded directly. sys.path_importer_cache = {}
[docs]def reloadSharedPackage(mod): """ Reloads the specific module as a shared package, adding any new directories that might have appeared on the search path. """ fullname = mod.__name__ path = None if '.' in fullname: parentname = fullname.rsplit('.', 1)[0] parent = sys.modules[parentname] path = parent.__path__ importer = VFSSharedImporter() loader = importer.find_module(fullname, path = path, reload = True) if loader: loader.load_module(fullname) # Also force any child packages to become shared packages, if # they aren't already. for basename, child in list(mod.__dict__.items()): if isinstance(child, types.ModuleType): childname = child.__name__ if childname == fullname + '.' + basename and \ hasattr(child, '__path__') and \ childname not in sharedPackages: sharedPackages[childname] = True reloadSharedPackage(child)
[docs]def reloadSharedPackages(): """ Walks through the sharedPackages list, and forces a reload of any modules on that list that have already been loaded. This allows new directories to be added to the search path. """ #print >> sys.stderr, "reloadSharedPackages, path = %s, sharedPackages = %s" % (sys.path, sharedPackages.keys()) # Sort the list, just to make sure parent packages are reloaded # before child packages are. for fullname in sorted(sharedPackages.keys()): mod = sys.modules.get(fullname, None) if not mod: continue reloadSharedPackage(mod)