mirror of
https://github.com/python/cpython
synced 2024-11-05 18:12:54 +00:00
f9bedb630e
Faster bz2/lzma/zlib via new output buffering. Also adds .readall() function to _compression.DecompressReader class to take best advantage of this in the consume-all-output at once scenario. Often a 5-20% speedup in common scenarios due to less data copying. Contributed by Ma Lin.
162 lines
5.5 KiB
Python
162 lines
5.5 KiB
Python
"""Internal classes used by the gzip, lzma and bz2 modules"""
|
|
|
|
import io
|
|
import sys
|
|
|
|
BUFFER_SIZE = io.DEFAULT_BUFFER_SIZE # Compressed data read chunk size
|
|
|
|
|
|
class BaseStream(io.BufferedIOBase):
|
|
"""Mode-checking helper functions."""
|
|
|
|
def _check_not_closed(self):
|
|
if self.closed:
|
|
raise ValueError("I/O operation on closed file")
|
|
|
|
def _check_can_read(self):
|
|
if not self.readable():
|
|
raise io.UnsupportedOperation("File not open for reading")
|
|
|
|
def _check_can_write(self):
|
|
if not self.writable():
|
|
raise io.UnsupportedOperation("File not open for writing")
|
|
|
|
def _check_can_seek(self):
|
|
if not self.readable():
|
|
raise io.UnsupportedOperation("Seeking is only supported "
|
|
"on files open for reading")
|
|
if not self.seekable():
|
|
raise io.UnsupportedOperation("The underlying file object "
|
|
"does not support seeking")
|
|
|
|
|
|
class DecompressReader(io.RawIOBase):
|
|
"""Adapts the decompressor API to a RawIOBase reader API"""
|
|
|
|
def readable(self):
|
|
return True
|
|
|
|
def __init__(self, fp, decomp_factory, trailing_error=(), **decomp_args):
|
|
self._fp = fp
|
|
self._eof = False
|
|
self._pos = 0 # Current offset in decompressed stream
|
|
|
|
# Set to size of decompressed stream once it is known, for SEEK_END
|
|
self._size = -1
|
|
|
|
# Save the decompressor factory and arguments.
|
|
# If the file contains multiple compressed streams, each
|
|
# stream will need a separate decompressor object. A new decompressor
|
|
# object is also needed when implementing a backwards seek().
|
|
self._decomp_factory = decomp_factory
|
|
self._decomp_args = decomp_args
|
|
self._decompressor = self._decomp_factory(**self._decomp_args)
|
|
|
|
# Exception class to catch from decompressor signifying invalid
|
|
# trailing data to ignore
|
|
self._trailing_error = trailing_error
|
|
|
|
def close(self):
|
|
self._decompressor = None
|
|
return super().close()
|
|
|
|
def seekable(self):
|
|
return self._fp.seekable()
|
|
|
|
def readinto(self, b):
|
|
with memoryview(b) as view, view.cast("B") as byte_view:
|
|
data = self.read(len(byte_view))
|
|
byte_view[:len(data)] = data
|
|
return len(data)
|
|
|
|
def read(self, size=-1):
|
|
if size < 0:
|
|
return self.readall()
|
|
|
|
if not size or self._eof:
|
|
return b""
|
|
data = None # Default if EOF is encountered
|
|
# Depending on the input data, our call to the decompressor may not
|
|
# return any data. In this case, try again after reading another block.
|
|
while True:
|
|
if self._decompressor.eof:
|
|
rawblock = (self._decompressor.unused_data or
|
|
self._fp.read(BUFFER_SIZE))
|
|
if not rawblock:
|
|
break
|
|
# Continue to next stream.
|
|
self._decompressor = self._decomp_factory(
|
|
**self._decomp_args)
|
|
try:
|
|
data = self._decompressor.decompress(rawblock, size)
|
|
except self._trailing_error:
|
|
# Trailing data isn't a valid compressed stream; ignore it.
|
|
break
|
|
else:
|
|
if self._decompressor.needs_input:
|
|
rawblock = self._fp.read(BUFFER_SIZE)
|
|
if not rawblock:
|
|
raise EOFError("Compressed file ended before the "
|
|
"end-of-stream marker was reached")
|
|
else:
|
|
rawblock = b""
|
|
data = self._decompressor.decompress(rawblock, size)
|
|
if data:
|
|
break
|
|
if not data:
|
|
self._eof = True
|
|
self._size = self._pos
|
|
return b""
|
|
self._pos += len(data)
|
|
return data
|
|
|
|
def readall(self):
|
|
chunks = []
|
|
# sys.maxsize means the max length of output buffer is unlimited,
|
|
# so that the whole input buffer can be decompressed within one
|
|
# .decompress() call.
|
|
while data := self.read(sys.maxsize):
|
|
chunks.append(data)
|
|
|
|
return b"".join(chunks)
|
|
|
|
# Rewind the file to the beginning of the data stream.
|
|
def _rewind(self):
|
|
self._fp.seek(0)
|
|
self._eof = False
|
|
self._pos = 0
|
|
self._decompressor = self._decomp_factory(**self._decomp_args)
|
|
|
|
def seek(self, offset, whence=io.SEEK_SET):
|
|
# Recalculate offset as an absolute file position.
|
|
if whence == io.SEEK_SET:
|
|
pass
|
|
elif whence == io.SEEK_CUR:
|
|
offset = self._pos + offset
|
|
elif whence == io.SEEK_END:
|
|
# Seeking relative to EOF - we need to know the file's size.
|
|
if self._size < 0:
|
|
while self.read(io.DEFAULT_BUFFER_SIZE):
|
|
pass
|
|
offset = self._size + offset
|
|
else:
|
|
raise ValueError("Invalid value for whence: {}".format(whence))
|
|
|
|
# Make it so that offset is the number of bytes to skip forward.
|
|
if offset < self._pos:
|
|
self._rewind()
|
|
else:
|
|
offset -= self._pos
|
|
|
|
# Read and discard data until we reach the desired position.
|
|
while offset > 0:
|
|
data = self.read(min(io.DEFAULT_BUFFER_SIZE, offset))
|
|
if not data:
|
|
break
|
|
offset -= len(data)
|
|
|
|
return self._pos
|
|
|
|
def tell(self):
|
|
"""Return the current file position."""
|
|
return self._pos
|