""" This module reimplements Python's file I/O mechanisms using Panda
constructs. This enables Python to interface more easily with Panda's
virtual file system, and it also better-supports Panda's
SIMPLE_THREADS model, by avoiding blocking all threads while waiting
for I/O to complete. """
__all__ = [
'open', 'listdir', 'walk', 'join',
'isfile', 'isdir', 'exists', 'lexists', 'getmtime', 'getsize',
'execfile',
]
from panda3d import core
import os
import io
import encodings
from posixpath import join
_vfs = core.VirtualFileSystem.getGlobalPtr()
[docs]def open(file, mode='r', buffering=-1, encoding=None, errors=None, newline=None, closefd=True):
"""This function emulates the built-in Python open() function, additionally
providing support for Panda's virtual file system. It takes the same
arguments as Python's built-in open() function.
"""
for ch in mode:
if ch not in 'rwxabt+U':
raise ValueError("invalid mode: '%s'" % (mode))
creating = 'x' in mode
writing = 'w' in mode
appending = 'a' in mode
updating = '+' in mode
binary = 'b' in mode
universal = 'U' in mode
reading = universal or 'r' in mode
if binary and 't' in mode:
raise ValueError("can't have text and binary mode at once")
if creating + reading + writing + appending > 1:
raise ValueError("must have exactly one of create/read/write/append mode")
if binary:
if encoding:
raise ValueError("binary mode doesn't take an encoding argument")
if errors:
raise ValueError("binary mode doesn't take an errors argument")
if newline:
raise ValueError("binary mode doesn't take a newline argument")
if isinstance(file, core.Istream) or isinstance(file, core.Ostream):
# If we were given a stream instead of a filename, assign
# it directly.
raw = StreamIOWrapper(file)
raw.mode = mode
else:
vfile = None
if isinstance(file, core.VirtualFile):
# We can also "open" a VirtualFile object for reading.
vfile = file
filename = vfile.getFilename()
elif isinstance(file, str):
# If a raw string is given, assume it's an os-specific
# filename.
filename = core.Filename.fromOsSpecificW(file)
else:
# It's either a Filename object or an os.PathLike.
# If a Filename is given, make a writable copy anyway.
filename = core.Filename(file)
filename.setBinary()
if not vfile:
vfile = _vfs.getFile(filename)
if not vfile:
if reading:
raise FileNotFoundError("No such file or directory: '%s'" % (filename))
vfile = _vfs.createFile(filename)
if not vfile:
raise IOError("Failed to create file: '%s'" % (filename))
elif creating:
# In 'creating' mode, we have to raise FileExistsError
# if the file already exists. Otherwise, it's the same
# as 'writing' mode.
raise FileExistsError("File exists: '%s'" % (filename))
elif vfile.isDirectory():
raise IsADirectoryError("Is a directory: '%s'" % (filename))
# Actually open the streams.
if reading:
if updating:
stream = vfile.openReadWriteFile(False)
else:
stream = vfile.openReadFile(False)
if not stream:
raise IOError("Could not open %s for reading" % (filename))
elif writing or creating:
if updating:
stream = vfile.openReadWriteFile(True)
else:
stream = vfile.openWriteFile(False, True)
if not stream:
raise IOError("Could not open %s for writing" % (filename))
elif appending:
if updating:
stream = vfile.openReadAppendFile()
else:
stream = vfile.openAppendFile()
if not stream:
raise IOError("Could not open %s for appending" % (filename))
else:
raise ValueError("Must have exactly one of create/read/write/append mode and at most one plus")
raw = StreamIOWrapper(stream, needsVfsClose=True)
raw.mode = mode
raw.name = vfile.getFilename().toOsSpecific()
# If a binary stream was requested, return the stream we've created.
if binary:
return raw
line_buffering = False
if buffering == 1:
line_buffering = True
elif buffering == 0:
raise ValueError("can't have unbuffered text I/O")
# Otherwise, create a TextIOWrapper object to wrap it.
wrapper = io.TextIOWrapper(raw, encoding, errors, newline, line_buffering)
wrapper.mode = mode
return wrapper
[docs]class StreamIOWrapper(io.IOBase):
""" This is a file-like object that wraps around a C++ istream and/or
ostream object. It only deals with binary data; to work with text I/O,
create an io.TextIOWrapper object around this, or use the open()
function that is also provided with this module. """
[docs] def __init__(self, stream, needsVfsClose=False):
self.__stream = stream
self.__needsVfsClose = needsVfsClose
self.__reader = None
self.__writer = None
self.__lastWrite = False
if isinstance(stream, core.Istream):
self.__reader = core.StreamReader(stream, False)
if isinstance(stream, core.Ostream):
self.__writer = core.StreamWriter(stream, False)
self.__lastWrite = True
self.__write = self.__writer.appendData
def __repr__(self):
s = "<direct.stdpy.file.StreamIOWrapper"
if hasattr(self, 'name'):
s += " name='%s'" % (self.name)
if hasattr(self, 'mode'):
s += " mode='%s'" % (self.mode)
s += ">"
return s
[docs] def readable(self):
return self.__reader is not None
[docs] def writable(self):
return self.__writer is not None
[docs] def close(self):
if self.__needsVfsClose:
if self.__reader and self.__writer:
_vfs.closeReadWriteFile(self.__stream)
elif self.__reader:
_vfs.closeReadFile(self.__stream)
else: # self.__writer:
_vfs.closeWriteFile(self.__stream)
self.__needsVfsClose = False
self.__stream = None
self.__reader = None
self.__writer = None
[docs] def flush(self):
if self.__writer:
self.__stream.clear() # clear eof flag
self.__stream.flush()
[docs] def read(self, size=-1):
if not self.__reader:
if not self.__writer:
# The stream is not even open at all.
raise ValueError("I/O operation on closed file")
# The stream is open only in write mode.
raise IOError("Attempt to read from write-only stream")
self.__stream.clear() # clear eof flag
self.__lastWrite = False
if size is not None and size >= 0:
return self.__reader.extractBytes(size)
else:
# Read to end-of-file.
result = bytearray()
while not self.__stream.eof():
result += self.__reader.extractBytes(4096)
return bytes(result)
read1 = read
[docs] def readline(self, size=-1):
if not self.__reader:
if not self.__writer:
# The stream is not even open at all.
raise ValueError("I/O operation on closed file")
# The stream is open only in write mode.
raise IOError("Attempt to read from write-only stream")
self.__stream.clear() # clear eof flag
self.__lastWrite = False
return self.__reader.readline()
[docs] def seek(self, offset, whence = 0):
if self.__stream:
self.__stream.clear() # clear eof flag
if self.__reader:
self.__stream.seekg(offset, whence)
if self.__writer:
self.__stream.seekp(offset, whence)
[docs] def tell(self):
if self.__lastWrite:
if self.__writer:
return self.__stream.tellp()
else:
if self.__reader:
return self.__stream.tellg()
raise ValueError("I/O operation on closed file")
[docs] def write(self, b):
if not self.__writer:
if not self.__reader:
# The stream is not even open at all.
raise ValueError("I/O operation on closed file")
# The stream is open only in read mode.
raise IOError("Attempt to write to read-only stream")
self.__stream.clear() # clear eof flag
self.__write(b)
self.__lastWrite = True
return len(b)
[docs] def writelines(self, lines):
if not self.__writer:
if not self.__reader:
# The stream is not even open at all.
raise ValueError("I/O operation on closed file")
# The stream is open only in read mode.
raise IOError("Attempt to write to read-only stream")
self.__stream.clear() # clear eof flag
for line in lines:
self.__write(line)
self.__lastWrite = True
[docs]def listdir(path):
""" Implements os.listdir over vfs. """
files = []
dirlist = _vfs.scanDirectory(core.Filename.fromOsSpecific(path))
if dirlist is None:
raise OSError("No such file or directory: '%s'" % (path))
for file in dirlist:
files.append(file.getFilename().getBasename())
return files
[docs]def walk(top, topdown = True, onerror = None, followlinks = True):
""" Implements os.walk over vfs.
Note: we don't support onerror or followlinks; errors are ignored
and links are always followed. """
dirnames = []
filenames = []
dirlist = _vfs.scanDirectory(top)
if dirlist:
for file in dirlist:
if file.isDirectory():
dirnames.append(file.getFilename().getBasename())
else:
filenames.append(file.getFilename().getBasename())
if topdown:
yield (top, dirnames, filenames)
for dir in dirnames:
next = join(top, dir)
for tuple in walk(next, topdown = topdown):
yield tuple
if not topdown:
yield (top, dirnames, filenames)
[docs]def isfile(path):
return _vfs.isRegularFile(core.Filename.fromOsSpecific(path))
[docs]def isdir(path):
return _vfs.isDirectory(core.Filename.fromOsSpecific(path))
[docs]def exists(path):
return _vfs.exists(core.Filename.fromOsSpecific(path))
[docs]def lexists(path):
return _vfs.exists(core.Filename.fromOsSpecific(path))
[docs]def getmtime(path):
file = _vfs.getFile(core.Filename.fromOsSpecific(path), True)
if not file:
raise os.error
return file.getTimestamp()
[docs]def getsize(path):
file = _vfs.getFile(core.Filename.fromOsSpecific(path), True)
if not file:
raise os.error
return file.getFileSize()
[docs]def execfile(path, globals=None, locals=None):
file = _vfs.getFile(core.Filename.fromOsSpecific(path), True)
if not file:
raise os.error
data = file.readFile(False)
exec(data, globals, locals)