| # Copyright 2006 Google, Inc. All Rights Reserved. |
| # Licensed to PSF under a Contributor Agreement. |
| |
| """New I/O library. |
| |
| See PEP XXX; for now: http://docs.google.com/Doc?id=dfksfvqd_1cn5g5m |
| """ |
| |
| __author__ = "Guido van Rossum <guido@python.org>" |
| |
| __all__ = ["open", "RawIOBase", "FileIO", "SocketIO", "BytesIO"] |
| |
| import os |
| |
| def open(filename, mode="r", buffering=None, *, encoding=None): |
| """Replacement for the built-in open function, with encoding parameter.""" |
| assert isinstance(filename, str) |
| assert isinstance(mode, str) |
| assert buffering is None or isinstance(buffering, int) |
| assert encoding is None or isinstance(encoding, str) |
| modes = set(mode) |
| if modes - set("arwb+t") or len(mode) > len(modes): |
| raise ValueError("invalid mode: %r" % mode) |
| reading = "r" in modes |
| writing = "w" in modes or "a" in modes |
| binary = "b" in modes |
| appending = "a" in modes |
| updating = "+" in modes |
| text = "t" in modes or not binary |
| if text and binary: |
| raise ValueError("can't have text and binary mode at once") |
| if reading + writing + appending > 1: |
| raise ValueError("can't have read/write/append mode at once") |
| if not (reading or writing or appending): |
| raise ValueError("must have exactly one of read/write/append mode") |
| if binary and encoding is not None: |
| raise ValueError("binary mode doesn't take an encoding") |
| raw = FileIO(filename, |
| (reading and "r" or "") + |
| (writing and "w" or "") + |
| (appending and "a" or "") + |
| (updating and "+" or "")) |
| if buffering is None: |
| buffering = 8*1024 # International standard buffer size |
| if buffering < 0: |
| raise ValueError("invalid buffering size") |
| if buffering == 0: |
| if binary: |
| return raw |
| raise ValueError("can't have unbuffered text I/O") |
| if updating: |
| buffer = BufferedRandom(raw, buffering) |
| elif writing: |
| buffer = BufferedWriter(raw, buffering) |
| else: |
| assert reading |
| buffer = BufferedReader(raw, buffering) |
| if binary: |
| return buffer |
| assert text |
| textio = TextIOWrapper(buffer) # Universal newlines default to on |
| return textio |
| |
| |
| class RawIOBase: |
| |
| """Base class for raw binary I/O.""" |
| |
| def read(self, n): |
| b = bytes(n.__index__()) |
| self.readinto(b) |
| return b |
| |
| def readinto(self, b): |
| raise IOError(".readinto() not supported") |
| |
| def write(self, b): |
| raise IOError(".write() not supported") |
| |
| def seek(self, pos, whence=0): |
| raise IOError(".seek() not supported") |
| |
| def tell(self): |
| raise IOError(".tell() not supported") |
| |
| def truncate(self, pos=None): |
| raise IOError(".truncate() not supported") |
| |
| def close(self): |
| pass |
| |
| def seekable(self): |
| return False |
| |
| def readable(self): |
| return False |
| |
| def writable(self): |
| return False |
| |
| def __enter__(self): |
| return self |
| |
| def __exit__(self, *args): |
| self.close() |
| |
| def fileno(self): |
| raise IOError(".fileno() not supported") |
| |
| |
| class FileIO(RawIOBase): |
| |
| """Raw I/O implementation for OS files.""" |
| |
| def __init__(self, filename, mode): |
| self._seekable = None |
| self._mode = mode |
| if mode == "r": |
| flags = os.O_RDONLY |
| elif mode == "w": |
| flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC |
| self._writable = True |
| elif mode == "r+": |
| flags = os.O_RDWR |
| else: |
| assert 0, "unsupported mode %r (for now)" % mode |
| if hasattr(os, "O_BINARY"): |
| flags |= os.O_BINARY |
| self._fd = os.open(filename, flags) |
| |
| def readinto(self, b): |
| # XXX We really should have os.readinto() |
| b[:] = os.read(self._fd, len(b)) |
| return len(b) |
| |
| def write(self, b): |
| return os.write(self._fd, b) |
| |
| def seek(self, pos, whence=0): |
| os.lseek(self._fd, pos, whence) |
| |
| def tell(self): |
| return os.lseek(self._fd, 0, 1) |
| |
| def truncate(self, pos=None): |
| if pos is None: |
| pos = self.tell() |
| os.ftruncate(self._fd, pos) |
| |
| def close(self): |
| os.close(self._fd) |
| |
| def readable(self): |
| return "r" in self._mode or "+" in self._mode |
| |
| def writable(self): |
| return "w" in self._mode or "+" in self._mode or "a" in self._mode |
| |
| def seekable(self): |
| if self._seekable is None: |
| try: |
| os.lseek(self._fd, 0, 1) |
| except os.error: |
| self._seekable = False |
| else: |
| self._seekable = True |
| return self._seekable |
| |
| # XXX(nnorwitz): is there any reason to redefine __enter__ & __exit__? |
| # Both already have the same impl in the base class. |
| def __enter__(self): |
| return self |
| |
| def __exit__(self, *args): |
| self.close() |
| |
| def fileno(self): |
| return self._fd |
| |
| |
| class SocketIO(RawIOBase): |
| |
| """Raw I/O implementation for stream sockets.""" |
| |
| def __init__(self, sock, mode): |
| assert mode in ("r", "w", "rw") |
| self._sock = sock |
| self._mode = mode |
| self._readable = "r" in mode |
| self._writable = "w" in mode |
| self._seekable = False |
| |
| def readinto(self, b): |
| return self._sock.recv_into(b) |
| |
| def write(self, b): |
| return self._sock.send(b) |
| |
| def close(self): |
| self._sock.close() |
| |
| def readable(self): |
| return "r" in self._mode |
| |
| def writable(self): |
| return "w" in self._mode |
| |
| # XXX(nnorwitz)??? def fileno(self): return self._sock.fileno() |
| |
| |
| class BytesIO(RawIOBase): |
| |
| """Raw I/O implementation for bytes, like StringIO.""" |
| |
| def __init__(self, inital_bytes=None): |
| self._buffer = b"" |
| self._pos = 0 |
| if inital_bytes is not None: |
| self._buffer += inital_bytes |
| |
| def getvalue(self): |
| return self._buffer |
| |
| def read(self, n): |
| assert n >= 0 |
| newpos = min(len(self._buffer), self._pos + n) |
| b = self._buffer[self._pos : newpos] |
| self._pos = newpos |
| return b |
| |
| def readinto(self, b): |
| b[:] = self.read(len(b)) |
| |
| def write(self, b): |
| n = len(b) |
| newpos = self._pos + n |
| self._buffer[self._pos:newpos] = b |
| self._pos = newpos |
| return n |
| |
| def seek(self, pos, whence=0): |
| if whence == 0: |
| self._pos = max(0, pos) |
| elif whence == 1: |
| self._pos = max(0, self._pos + pos) |
| elif whence == 2: |
| self._pos = max(0, len(self._buffer) + pos) |
| else: |
| raise IOError("invalid whence value") |
| |
| def tell(self): |
| return self._pos |
| |
| def truncate(self, pos=None): |
| if pos is None: |
| pos = self._pos |
| else: |
| self._pos = max(0, pos) |
| del self._buffer[pos:] |
| |
| def readable(self): |
| return True |
| |
| def writable(self): |
| return True |
| |
| def seekable(self): |
| return True |