| """Internal classes used by the gzip, lzma and bz2 modules""" |
| |
| import io |
| |
| |
| BUFFER_SIZE = io.DEFAULT_BUFFER_SIZE # Compressed data read chunk size |
| |
| |
| class BaseStream(io.BufferedIOBase): |
| """Mode-checking helper functions.""" |
| |
| def _check_not_closed(self): |
| if self.closed: |
| raise ValueError("I/O operation on closed file") |
| |
| def _check_can_read(self): |
| if not self.readable(): |
| raise io.UnsupportedOperation("File not open for reading") |
| |
| def _check_can_write(self): |
| if not self.writable(): |
| raise io.UnsupportedOperation("File not open for writing") |
| |
| def _check_can_seek(self): |
| if not self.readable(): |
| raise io.UnsupportedOperation("Seeking is only supported " |
| "on files open for reading") |
| if not self.seekable(): |
| raise io.UnsupportedOperation("The underlying file object " |
| "does not support seeking") |
| |
| |
| class DecompressReader(io.RawIOBase): |
| """Adapts the decompressor API to a RawIOBase reader API""" |
| |
| def readable(self): |
| return True |
| |
| def __init__(self, fp, decomp_factory, trailing_error=(), **decomp_args): |
| self._fp = fp |
| self._eof = False |
| self._pos = 0 # Current offset in decompressed stream |
| |
| # Set to size of decompressed stream once it is known, for SEEK_END |
| self._size = -1 |
| |
| # Save the decompressor factory and arguments. |
| # If the file contains multiple compressed streams, each |
| # stream will need a separate decompressor object. A new decompressor |
| # object is also needed when implementing a backwards seek(). |
| self._decomp_factory = decomp_factory |
| self._decomp_args = decomp_args |
| self._decompressor = self._decomp_factory(**self._decomp_args) |
| |
| # Exception class to catch from decompressor signifying invalid |
| # trailing data to ignore |
| self._trailing_error = trailing_error |
| |
| def close(self): |
| self._decompressor = None |
| return super().close() |
| |
| def seekable(self): |
| return self._fp.seekable() |
| |
| def readinto(self, b): |
| with memoryview(b) as view, view.cast("B") as byte_view: |
| data = self.read(len(byte_view)) |
| byte_view[:len(data)] = data |
| return len(data) |
| |
| def read(self, size=-1): |
| if size < 0: |
| return self.readall() |
| |
| if not size or self._eof: |
| return b"" |
| data = None # Default if EOF is encountered |
| # Depending on the input data, our call to the decompressor may not |
| # return any data. In this case, try again after reading another block. |
| while True: |
| if self._decompressor.eof: |
| rawblock = (self._decompressor.unused_data or |
| self._fp.read(BUFFER_SIZE)) |
| if not rawblock: |
| break |
| # Continue to next stream. |
| self._decompressor = self._decomp_factory( |
| **self._decomp_args) |
| try: |
| data = self._decompressor.decompress(rawblock, size) |
| except self._trailing_error: |
| # Trailing data isn't a valid compressed stream; ignore it. |
| break |
| else: |
| if self._decompressor.needs_input: |
| rawblock = self._fp.read(BUFFER_SIZE) |
| if not rawblock: |
| raise EOFError("Compressed file ended before the " |
| "end-of-stream marker was reached") |
| else: |
| rawblock = b"" |
| data = self._decompressor.decompress(rawblock, size) |
| if data: |
| break |
| if not data: |
| self._eof = True |
| self._size = self._pos |
| return b"" |
| self._pos += len(data) |
| return data |
| |
| # Rewind the file to the beginning of the data stream. |
| def _rewind(self): |
| self._fp.seek(0) |
| self._eof = False |
| self._pos = 0 |
| self._decompressor = self._decomp_factory(**self._decomp_args) |
| |
| def seek(self, offset, whence=io.SEEK_SET): |
| # Recalculate offset as an absolute file position. |
| if whence == io.SEEK_SET: |
| pass |
| elif whence == io.SEEK_CUR: |
| offset = self._pos + offset |
| elif whence == io.SEEK_END: |
| # Seeking relative to EOF - we need to know the file's size. |
| if self._size < 0: |
| while self.read(io.DEFAULT_BUFFER_SIZE): |
| pass |
| offset = self._size + offset |
| else: |
| raise ValueError("Invalid value for whence: {}".format(whence)) |
| |
| # Make it so that offset is the number of bytes to skip forward. |
| if offset < self._pos: |
| self._rewind() |
| else: |
| offset -= self._pos |
| |
| # Read and discard data until we reach the desired position. |
| while offset > 0: |
| data = self.read(min(io.DEFAULT_BUFFER_SIZE, offset)) |
| if not data: |
| break |
| offset -= len(data) |
| |
| return self._pos |
| |
| def tell(self): |
| """Return the current file position.""" |
| return self._pos |