mirror of
https://github.com/python/cpython
synced 2024-10-08 21:09:49 +00:00
c1f779cb01
svn+ssh://pythondev@svn.python.org/python/branches/p3yk ........ r56127 | georg.brandl | 2007-06-30 09:32:49 +0200 (Sat, 30 Jun 2007) | 2 lines Fix a place where floor division would be in order. ........ r56135 | guido.van.rossum | 2007-07-01 06:13:54 +0200 (Sun, 01 Jul 2007) | 28 lines Make map() and filter() identical to itertools.imap() and .ifilter(), respectively. I fixed two bootstrap issues, due to the dynamic import of itertools: 1. Starting python requires that map() and filter() are not used until site.py has added build/lib.<arch> to sys.path. 2. Building python requires that setup.py and distutils and everything they use is free of map() and filter() calls. Beyond this, I only fixed the tests in test_builtin.py. Others, please help fixing the remaining tests that are now broken! The fixes are usually simple: a. map(None, X) -> list(X) b. map(F, X) -> list(map(F, X)) c. map(lambda x: F(x), X) -> [F(x) for x in X] d. filter(F, X) -> list(filter(F, X)) e. filter(lambda x: P(x), X) -> [x for x in X if P(x)] Someone, please also contribute a fixer for 2to3 to do this. It can leave map()/filter() calls alone that are already inside a list() or sorted() call or for-loop. Only in rare cases have I seen code that depends on map() of lists of different lengths going to the end of the longest, or on filter() of a string or tuple returning an object of the same type; these will need more thought to fix. ........ r56136 | guido.van.rossum | 2007-07-01 06:22:01 +0200 (Sun, 01 Jul 2007) | 3 lines Make it so that test_decimal fails instead of hangs, to help automated test runners. ........ r56139 | georg.brandl | 2007-07-01 18:20:58 +0200 (Sun, 01 Jul 2007) | 2 lines Fix a few test cases after the map->imap change. ........ r56142 | neal.norwitz | 2007-07-02 06:38:12 +0200 (Mon, 02 Jul 2007) | 1 line Get a bunch more tests passing after converting map/filter to return iterators. ........ r56147 | guido.van.rossum | 2007-07-02 15:32:02 +0200 (Mon, 02 Jul 2007) | 4 lines Fix the remaining failing unit tests (at least on OSX). Also tweaked urllib2 so it doesn't raise socket.gaierror when all network interfaces are turned off. ........
1197 lines
45 KiB
Python
1197 lines
45 KiB
Python
"""
|
|
Read and write ZIP files.
|
|
|
|
XXX references to utf-8 need further investigation.
|
|
"""
|
|
import struct, os, time, sys
|
|
import binascii, io
|
|
|
|
try:
|
|
import zlib # We may need its compression method
|
|
except ImportError:
|
|
zlib = None
|
|
|
|
__all__ = ["BadZipfile", "error", "ZIP_STORED", "ZIP_DEFLATED", "is_zipfile",
|
|
"ZipInfo", "ZipFile", "PyZipFile", "LargeZipFile" ]
|
|
|
|
class BadZipfile(Exception):
|
|
pass
|
|
|
|
|
|
class LargeZipFile(Exception):
|
|
"""
|
|
Raised when writing a zipfile, the zipfile requires ZIP64 extensions
|
|
and those extensions are disabled.
|
|
"""
|
|
|
|
error = BadZipfile # The exception raised by this module
|
|
|
|
ZIP64_LIMIT= (1 << 31) - 1
|
|
|
|
# constants for Zip file compression methods
|
|
ZIP_STORED = 0
|
|
ZIP_DEFLATED = 8
|
|
# Other ZIP compression methods not supported
|
|
|
|
# Here are some struct module formats for reading headers
|
|
structEndArchive = "<4s4H2lH" # 9 items, end of archive, 22 bytes
|
|
stringEndArchive = b"PK\005\006" # magic number for end of archive record
|
|
structCentralDir = "<4s4B4HlLL5HLl"# 19 items, central directory, 46 bytes
|
|
stringCentralDir = b"PK\001\002" # magic number for central directory
|
|
structFileHeader = "<4s2B4HlLL2H" # 12 items, file header record, 30 bytes
|
|
stringFileHeader = b"PK\003\004" # magic number for file header
|
|
structEndArchive64Locator = "<4slql" # 4 items, locate Zip64 header, 20 bytes
|
|
stringEndArchive64Locator = b"PK\x06\x07" # magic token for locator header
|
|
structEndArchive64 = "<4sqhhllqqqq" # 10 items, end of archive (Zip64), 56 bytes
|
|
stringEndArchive64 = b"PK\x06\x06" # magic token for Zip64 header
|
|
|
|
|
|
# indexes of entries in the central directory structure
|
|
_CD_SIGNATURE = 0
|
|
_CD_CREATE_VERSION = 1
|
|
_CD_CREATE_SYSTEM = 2
|
|
_CD_EXTRACT_VERSION = 3
|
|
_CD_EXTRACT_SYSTEM = 4 # is this meaningful?
|
|
_CD_FLAG_BITS = 5
|
|
_CD_COMPRESS_TYPE = 6
|
|
_CD_TIME = 7
|
|
_CD_DATE = 8
|
|
_CD_CRC = 9
|
|
_CD_COMPRESSED_SIZE = 10
|
|
_CD_UNCOMPRESSED_SIZE = 11
|
|
_CD_FILENAME_LENGTH = 12
|
|
_CD_EXTRA_FIELD_LENGTH = 13
|
|
_CD_COMMENT_LENGTH = 14
|
|
_CD_DISK_NUMBER_START = 15
|
|
_CD_INTERNAL_FILE_ATTRIBUTES = 16
|
|
_CD_EXTERNAL_FILE_ATTRIBUTES = 17
|
|
_CD_LOCAL_HEADER_OFFSET = 18
|
|
|
|
# indexes of entries in the local file header structure
|
|
_FH_SIGNATURE = 0
|
|
_FH_EXTRACT_VERSION = 1
|
|
_FH_EXTRACT_SYSTEM = 2 # is this meaningful?
|
|
_FH_GENERAL_PURPOSE_FLAG_BITS = 3
|
|
_FH_COMPRESSION_METHOD = 4
|
|
_FH_LAST_MOD_TIME = 5
|
|
_FH_LAST_MOD_DATE = 6
|
|
_FH_CRC = 7
|
|
_FH_COMPRESSED_SIZE = 8
|
|
_FH_UNCOMPRESSED_SIZE = 9
|
|
_FH_FILENAME_LENGTH = 10
|
|
_FH_EXTRA_FIELD_LENGTH = 11
|
|
|
|
def is_zipfile(filename):
|
|
"""Quickly see if file is a ZIP file by checking the magic number."""
|
|
try:
|
|
fpin = io.open(filename, "rb")
|
|
endrec = _EndRecData(fpin)
|
|
fpin.close()
|
|
if endrec:
|
|
return True # file has correct magic number
|
|
except IOError:
|
|
pass
|
|
return False
|
|
|
|
def _EndRecData64(fpin, offset, endrec):
|
|
"""
|
|
Read the ZIP64 end-of-archive records and use that to update endrec
|
|
"""
|
|
locatorSize = struct.calcsize(structEndArchive64Locator)
|
|
fpin.seek(offset - locatorSize, 2)
|
|
data = fpin.read(locatorSize)
|
|
sig, diskno, reloff, disks = struct.unpack(structEndArchive64Locator, data)
|
|
if sig != stringEndArchive64Locator:
|
|
return endrec
|
|
|
|
if diskno != 0 or disks != 1:
|
|
raise BadZipfile("zipfiles that span multiple disks are not supported")
|
|
|
|
# Assume no 'zip64 extensible data'
|
|
endArchiveSize = struct.calcsize(structEndArchive64)
|
|
fpin.seek(offset - locatorSize - endArchiveSize, 2)
|
|
data = fpin.read(endArchiveSize)
|
|
sig, sz, create_version, read_version, disk_num, disk_dir, \
|
|
dircount, dircount2, dirsize, diroffset = \
|
|
struct.unpack(structEndArchive64, data)
|
|
if sig != stringEndArchive64:
|
|
return endrec
|
|
|
|
# Update the original endrec using data from the ZIP64 record
|
|
endrec[1] = disk_num
|
|
endrec[2] = disk_dir
|
|
endrec[3] = dircount
|
|
endrec[4] = dircount2
|
|
endrec[5] = dirsize
|
|
endrec[6] = diroffset
|
|
return endrec
|
|
|
|
|
|
def _EndRecData(fpin):
|
|
"""Return data from the "End of Central Directory" record, or None.
|
|
|
|
The data is a list of the nine items in the ZIP "End of central dir"
|
|
record followed by a tenth item, the file seek offset of this record."""
|
|
fpin.seek(-22, 2) # Assume no archive comment.
|
|
filesize = fpin.tell() + 22 # Get file size
|
|
data = fpin.read()
|
|
if data[0:4] == stringEndArchive and data[-2:] == "\000\000":
|
|
endrec = struct.unpack(structEndArchive, data)
|
|
endrec = list(endrec)
|
|
endrec.append("") # Append the archive comment
|
|
endrec.append(filesize - 22) # Append the record start offset
|
|
if endrec[-4] == -1 or endrec[-4] == 0xffffffff:
|
|
return _EndRecData64(fpin, -22, endrec)
|
|
return endrec
|
|
# Search the last END_BLOCK bytes of the file for the record signature.
|
|
# The comment is appended to the ZIP file and has a 16 bit length.
|
|
# So the comment may be up to 64K long. We limit the search for the
|
|
# signature to a few Kbytes at the end of the file for efficiency.
|
|
# also, the signature must not appear in the comment.
|
|
END_BLOCK = min(filesize, 1024 * 4)
|
|
fpin.seek(filesize - END_BLOCK, 0)
|
|
data = fpin.read()
|
|
start = data.rfind(stringEndArchive)
|
|
if start >= 0: # Correct signature string was found
|
|
endrec = struct.unpack(structEndArchive, data[start:start+22])
|
|
endrec = list(endrec)
|
|
comment = data[start+22:]
|
|
if endrec[7] == len(comment): # Comment length checks out
|
|
# Append the archive comment and start offset
|
|
endrec.append(comment)
|
|
endrec.append(filesize - END_BLOCK + start)
|
|
if endrec[-4] == -1 or endrec[-4] == 0xffffffff:
|
|
return _EndRecData64(fpin, - END_BLOCK + start, endrec)
|
|
return endrec
|
|
return # Error, return None
|
|
|
|
|
|
class ZipInfo (object):
|
|
"""Class with attributes describing each file in the ZIP archive."""
|
|
|
|
__slots__ = (
|
|
'orig_filename',
|
|
'filename',
|
|
'date_time',
|
|
'compress_type',
|
|
'comment',
|
|
'extra',
|
|
'create_system',
|
|
'create_version',
|
|
'extract_version',
|
|
'reserved',
|
|
'flag_bits',
|
|
'volume',
|
|
'internal_attr',
|
|
'external_attr',
|
|
'header_offset',
|
|
'CRC',
|
|
'compress_size',
|
|
'file_size',
|
|
)
|
|
|
|
def __init__(self, filename="NoName", date_time=(1980,1,1,0,0,0)):
|
|
self.orig_filename = filename # Original file name in archive
|
|
|
|
# Terminate the file name at the first null byte. Null bytes in file
|
|
# names are used as tricks by viruses in archives.
|
|
null_byte = filename.find(chr(0))
|
|
if null_byte >= 0:
|
|
filename = filename[0:null_byte]
|
|
# This is used to ensure paths in generated ZIP files always use
|
|
# forward slashes as the directory separator, as required by the
|
|
# ZIP format specification.
|
|
if os.sep != "/" and os.sep in filename:
|
|
filename = filename.replace(os.sep, "/")
|
|
|
|
self.filename = filename # Normalized file name
|
|
self.date_time = date_time # year, month, day, hour, min, sec
|
|
# Standard values:
|
|
self.compress_type = ZIP_STORED # Type of compression for the file
|
|
self.comment = b"" # Comment for each file
|
|
self.extra = b"" # ZIP extra data
|
|
if sys.platform == 'win32':
|
|
self.create_system = 0 # System which created ZIP archive
|
|
else:
|
|
# Assume everything else is unix-y
|
|
self.create_system = 3 # System which created ZIP archive
|
|
self.create_version = 20 # Version which created ZIP archive
|
|
self.extract_version = 20 # Version needed to extract archive
|
|
self.reserved = 0 # Must be zero
|
|
self.flag_bits = 0 # ZIP flag bits
|
|
self.volume = 0 # Volume number of file header
|
|
self.internal_attr = 0 # Internal attributes
|
|
self.external_attr = 0 # External file attributes
|
|
# Other attributes are set by class ZipFile:
|
|
# header_offset Byte offset to the file header
|
|
# CRC CRC-32 of the uncompressed file
|
|
# compress_size Size of the compressed file
|
|
# file_size Size of the uncompressed file
|
|
|
|
def FileHeader(self):
|
|
"""Return the per-file header as a string."""
|
|
dt = self.date_time
|
|
dosdate = (dt[0] - 1980) << 9 | dt[1] << 5 | dt[2]
|
|
dostime = dt[3] << 11 | dt[4] << 5 | (dt[5] // 2)
|
|
if self.flag_bits & 0x08:
|
|
# Set these to zero because we write them after the file data
|
|
CRC = compress_size = file_size = 0
|
|
else:
|
|
CRC = self.CRC
|
|
compress_size = self.compress_size
|
|
file_size = self.file_size
|
|
|
|
extra = self.extra
|
|
|
|
if file_size > ZIP64_LIMIT or compress_size > ZIP64_LIMIT:
|
|
# File is larger than what fits into a 4 byte integer,
|
|
# fall back to the ZIP64 extension
|
|
fmt = '<hhqq'
|
|
extra = extra + struct.pack(fmt,
|
|
1, struct.calcsize(fmt)-4, file_size, compress_size)
|
|
file_size = 0xffffffff # -1
|
|
compress_size = 0xffffffff # -1
|
|
self.extract_version = max(45, self.extract_version)
|
|
self.create_version = max(45, self.extract_version)
|
|
|
|
header = struct.pack(structFileHeader, stringFileHeader,
|
|
self.extract_version, self.reserved, self.flag_bits,
|
|
self.compress_type, dostime, dosdate, CRC,
|
|
compress_size, file_size,
|
|
len(self.filename), len(extra))
|
|
return header + self.filename.encode("utf-8") + extra
|
|
|
|
def _decodeExtra(self):
|
|
# Try to decode the extra field.
|
|
extra = self.extra
|
|
unpack = struct.unpack
|
|
while extra:
|
|
tp, ln = unpack('<hh', extra[:4])
|
|
if tp == 1:
|
|
if ln >= 24:
|
|
counts = unpack('<qqq', extra[4:28])
|
|
elif ln == 16:
|
|
counts = unpack('<qq', extra[4:20])
|
|
elif ln == 8:
|
|
counts = unpack('<q', extra[4:12])
|
|
elif ln == 0:
|
|
counts = ()
|
|
else:
|
|
raise RuntimeError, "Corrupt extra field %s"%(ln,)
|
|
|
|
idx = 0
|
|
|
|
# ZIP64 extension (large files and/or large archives)
|
|
if self.file_size == -1 or self.file_size == 0xFFFFFFFF:
|
|
self.file_size = counts[idx]
|
|
idx += 1
|
|
|
|
if self.compress_size == -1 or self.compress_size == 0xFFFFFFFF:
|
|
self.compress_size = counts[idx]
|
|
idx += 1
|
|
|
|
if self.header_offset == -1 or self.header_offset == 0xffffffff:
|
|
old = self.header_offset
|
|
self.header_offset = counts[idx]
|
|
idx+=1
|
|
|
|
extra = extra[ln+4:]
|
|
|
|
|
|
class _ZipDecrypter:
|
|
"""Class to handle decryption of files stored within a ZIP archive.
|
|
|
|
ZIP supports a password-based form of encryption. Even though known
|
|
plaintext attacks have been found against it, it is still useful
|
|
for low-level securicy.
|
|
|
|
Usage:
|
|
zd = _ZipDecrypter(mypwd)
|
|
plain_char = zd(cypher_char)
|
|
plain_text = map(zd, cypher_text)
|
|
"""
|
|
|
|
def _GenerateCRCTable():
|
|
"""Generate a CRC-32 table.
|
|
|
|
ZIP encryption uses the CRC32 one-byte primitive for scrambling some
|
|
internal keys. We noticed that a direct implementation is faster than
|
|
relying on binascii.crc32().
|
|
"""
|
|
poly = 0xedb88320
|
|
table = [0] * 256
|
|
for i in range(256):
|
|
crc = i
|
|
for j in range(8):
|
|
if crc & 1:
|
|
crc = ((crc >> 1) & 0x7FFFFFFF) ^ poly
|
|
else:
|
|
crc = ((crc >> 1) & 0x7FFFFFFF)
|
|
table[i] = crc
|
|
return table
|
|
crctable = _GenerateCRCTable()
|
|
|
|
def _crc32(self, ch, crc):
|
|
"""Compute the CRC32 primitive on one byte."""
|
|
return ((crc >> 8) & 0xffffff) ^ self.crctable[(crc ^ ch) & 0xff]
|
|
|
|
def __init__(self, pwd):
|
|
self.key0 = 305419896
|
|
self.key1 = 591751049
|
|
self.key2 = 878082192
|
|
for p in pwd:
|
|
self._UpdateKeys(p)
|
|
|
|
def _UpdateKeys(self, c):
|
|
self.key0 = self._crc32(c, self.key0)
|
|
self.key1 = (self.key1 + (self.key0 & 255)) & 4294967295
|
|
self.key1 = (self.key1 * 134775813 + 1) & 4294967295
|
|
self.key2 = self._crc32((self.key1 >> 24) & 255, self.key2)
|
|
|
|
def __call__(self, c):
|
|
"""Decrypt a single character."""
|
|
assert isinstance(c, int)
|
|
k = self.key2 | 2
|
|
c = c ^ (((k * (k^1)) >> 8) & 255)
|
|
self._UpdateKeys(c)
|
|
return c
|
|
|
|
class ZipExtFile:
|
|
"""File-like object for reading an archive member.
|
|
Is returned by ZipFile.open().
|
|
"""
|
|
|
|
def __init__(self, fileobj, zipinfo, decrypt=None):
|
|
self.fileobj = fileobj
|
|
self.decrypter = decrypt
|
|
self.bytes_read = 0
|
|
self.rawbuffer = b''
|
|
self.readbuffer = b''
|
|
self.linebuffer = b''
|
|
self.eof = False
|
|
self.univ_newlines = False
|
|
self.nlSeps = (b"\n", )
|
|
self.lastdiscard = b''
|
|
|
|
self.compress_type = zipinfo.compress_type
|
|
self.compress_size = zipinfo.compress_size
|
|
|
|
self.closed = False
|
|
self.mode = "r"
|
|
self.name = zipinfo.filename
|
|
|
|
# read from compressed files in 64k blocks
|
|
self.compreadsize = 64*1024
|
|
if self.compress_type == ZIP_DEFLATED:
|
|
self.dc = zlib.decompressobj(-15)
|
|
|
|
def set_univ_newlines(self, univ_newlines):
|
|
self.univ_newlines = univ_newlines
|
|
|
|
# pick line separator char(s) based on universal newlines flag
|
|
self.nlSeps = (b"\n", )
|
|
if self.univ_newlines:
|
|
self.nlSeps = (b"\r\n", b"\r", b"\n")
|
|
|
|
def __iter__(self):
|
|
return self
|
|
|
|
def __next__(self):
|
|
nextline = self.readline()
|
|
if not nextline:
|
|
raise StopIteration()
|
|
|
|
return nextline
|
|
|
|
def close(self):
|
|
self.closed = True
|
|
|
|
def _checkfornewline(self):
|
|
nl, nllen = -1, -1
|
|
if self.linebuffer:
|
|
# ugly check for cases where half of an \r\n pair was
|
|
# read on the last pass, and the \r was discarded. In this
|
|
# case we just throw away the \n at the start of the buffer.
|
|
if (self.lastdiscard, self.linebuffer[0]) == (b'\r', b'\n'):
|
|
self.linebuffer = self.linebuffer[1:]
|
|
|
|
for sep in self.nlSeps:
|
|
nl = self.linebuffer.find(sep)
|
|
if nl >= 0:
|
|
nllen = len(sep)
|
|
return nl, nllen
|
|
|
|
return nl, nllen
|
|
|
|
def readline(self, size = -1):
|
|
"""Read a line with approx. size. If size is negative,
|
|
read a whole line.
|
|
"""
|
|
if size < 0:
|
|
size = sys.maxint
|
|
elif size == 0:
|
|
return b''
|
|
|
|
# check for a newline already in buffer
|
|
nl, nllen = self._checkfornewline()
|
|
|
|
if nl >= 0:
|
|
# the next line was already in the buffer
|
|
nl = min(nl, size)
|
|
else:
|
|
# no line break in buffer - try to read more
|
|
size -= len(self.linebuffer)
|
|
while nl < 0 and size > 0:
|
|
buf = self.read(min(size, 100))
|
|
if not buf:
|
|
break
|
|
self.linebuffer += buf
|
|
size -= len(buf)
|
|
|
|
# check for a newline in buffer
|
|
nl, nllen = self._checkfornewline()
|
|
|
|
# we either ran out of bytes in the file, or
|
|
# met the specified size limit without finding a newline,
|
|
# so return current buffer
|
|
if nl < 0:
|
|
s = self.linebuffer
|
|
self.linebuffer = b''
|
|
return s
|
|
|
|
buf = self.linebuffer[:nl]
|
|
self.lastdiscard = self.linebuffer[nl:nl + nllen]
|
|
self.linebuffer = self.linebuffer[nl + nllen:]
|
|
|
|
# line is always returned with \n as newline char (except possibly
|
|
# for a final incomplete line in the file, which is handled above).
|
|
return buf + b"\n"
|
|
|
|
def readlines(self, sizehint = -1):
|
|
"""Return a list with all (following) lines. The sizehint parameter
|
|
is ignored in this implementation.
|
|
"""
|
|
result = []
|
|
while True:
|
|
line = self.readline()
|
|
if not line: break
|
|
result.append(line)
|
|
return result
|
|
|
|
def read(self, size = None):
|
|
# act like file() obj and return empty string if size is 0
|
|
if size == 0:
|
|
return ''
|
|
|
|
# determine read size
|
|
bytesToRead = self.compress_size - self.bytes_read
|
|
|
|
# adjust read size for encrypted files since the first 12 bytes
|
|
# are for the encryption/password information
|
|
if self.decrypter is not None:
|
|
bytesToRead -= 12
|
|
|
|
if size is not None and size >= 0:
|
|
if self.compress_type == ZIP_STORED:
|
|
lr = len(self.readbuffer)
|
|
bytesToRead = min(bytesToRead, size - lr)
|
|
elif self.compress_type == ZIP_DEFLATED:
|
|
if len(self.readbuffer) > size:
|
|
# the user has requested fewer bytes than we've already
|
|
# pulled through the decompressor; don't read any more
|
|
bytesToRead = 0
|
|
else:
|
|
# user will use up the buffer, so read some more
|
|
lr = len(self.rawbuffer)
|
|
bytesToRead = min(bytesToRead, self.compreadsize - lr)
|
|
|
|
# avoid reading past end of file contents
|
|
if bytesToRead + self.bytes_read > self.compress_size:
|
|
bytesToRead = self.compress_size - self.bytes_read
|
|
|
|
# try to read from file (if necessary)
|
|
if bytesToRead > 0:
|
|
data = self.fileobj.read(bytesToRead)
|
|
self.bytes_read += len(data)
|
|
try:
|
|
self.rawbuffer += data
|
|
except:
|
|
print(repr(self.fileobj), repr(self.rawbuffer),
|
|
repr(data))
|
|
raise
|
|
|
|
# handle contents of raw buffer
|
|
if self.rawbuffer:
|
|
newdata = self.rawbuffer
|
|
self.rawbuffer = b''
|
|
|
|
# decrypt new data if we were given an object to handle that
|
|
if newdata and self.decrypter is not None:
|
|
newdata = bytes(map(self.decrypter, newdata))
|
|
|
|
# decompress newly read data if necessary
|
|
if newdata and self.compress_type == ZIP_DEFLATED:
|
|
newdata = self.dc.decompress(newdata)
|
|
self.rawbuffer = self.dc.unconsumed_tail
|
|
if self.eof and len(self.rawbuffer) == 0:
|
|
# we're out of raw bytes (both from the file and
|
|
# the local buffer); flush just to make sure the
|
|
# decompressor is done
|
|
newdata += self.dc.flush()
|
|
# prevent decompressor from being used again
|
|
self.dc = None
|
|
|
|
self.readbuffer += newdata
|
|
|
|
|
|
# return what the user asked for
|
|
if size is None or len(self.readbuffer) <= size:
|
|
data = self.readbuffer
|
|
self.readbuffer = b''
|
|
else:
|
|
data = self.readbuffer[:size]
|
|
self.readbuffer = self.readbuffer[size:]
|
|
|
|
return data
|
|
|
|
|
|
class ZipFile:
|
|
""" Class with methods to open, read, write, close, list zip files.
|
|
|
|
z = ZipFile(file, mode="r", compression=ZIP_STORED, allowZip64=True)
|
|
|
|
file: Either the path to the file, or a file-like object.
|
|
If it is a path, the file will be opened and closed by ZipFile.
|
|
mode: The mode can be either read "r", write "w" or append "a".
|
|
compression: ZIP_STORED (no compression) or ZIP_DEFLATED (requires zlib).
|
|
allowZip64: if True ZipFile will create files with ZIP64 extensions when
|
|
needed, otherwise it will raise an exception when this would
|
|
be necessary.
|
|
|
|
"""
|
|
|
|
fp = None # Set here since __del__ checks it
|
|
|
|
def __init__(self, file, mode="r", compression=ZIP_STORED, allowZip64=False):
|
|
"""Open the ZIP file with mode read "r", write "w" or append "a"."""
|
|
self._allowZip64 = allowZip64
|
|
self._didModify = False
|
|
if compression == ZIP_STORED:
|
|
pass
|
|
elif compression == ZIP_DEFLATED:
|
|
if not zlib:
|
|
raise RuntimeError,\
|
|
"Compression requires the (missing) zlib module"
|
|
else:
|
|
raise RuntimeError, "That compression method is not supported"
|
|
self.debug = 0 # Level of printing: 0 through 3
|
|
self.NameToInfo = {} # Find file info given name
|
|
self.filelist = [] # List of ZipInfo instances for archive
|
|
self.compression = compression # Method of compression
|
|
self.mode = key = mode.replace('b', '')[0]
|
|
self.pwd = None
|
|
|
|
# Check if we were passed a file-like object
|
|
if isinstance(file, basestring):
|
|
# No, it's a filename
|
|
self._filePassed = 0
|
|
self.filename = file
|
|
modeDict = {'r' : 'rb', 'w': 'wb', 'a' : 'r+b'}
|
|
try:
|
|
self.fp = io.open(file, modeDict[mode])
|
|
except IOError:
|
|
if mode == 'a':
|
|
mode = key = 'w'
|
|
self.fp = io.open(file, modeDict[mode])
|
|
else:
|
|
raise
|
|
else:
|
|
self._filePassed = 1
|
|
self.fp = file
|
|
self.filename = getattr(file, 'name', None)
|
|
|
|
if key == 'r':
|
|
self._GetContents()
|
|
elif key == 'w':
|
|
pass
|
|
elif key == 'a':
|
|
try: # See if file is a zip file
|
|
self._RealGetContents()
|
|
# seek to start of directory and overwrite
|
|
self.fp.seek(self.start_dir, 0)
|
|
except BadZipfile: # file is not a zip file, just append
|
|
self.fp.seek(0, 2)
|
|
else:
|
|
if not self._filePassed:
|
|
self.fp.close()
|
|
self.fp = None
|
|
raise RuntimeError, 'Mode must be "r", "w" or "a"'
|
|
|
|
def _GetContents(self):
|
|
"""Read the directory, making sure we close the file if the format
|
|
is bad."""
|
|
try:
|
|
self._RealGetContents()
|
|
except BadZipfile:
|
|
if not self._filePassed:
|
|
self.fp.close()
|
|
self.fp = None
|
|
raise
|
|
|
|
def _RealGetContents(self):
|
|
"""Read in the table of contents for the ZIP file."""
|
|
fp = self.fp
|
|
endrec = _EndRecData(fp)
|
|
if not endrec:
|
|
raise BadZipfile, "File is not a zip file"
|
|
if self.debug > 1:
|
|
print(endrec)
|
|
size_cd = endrec[5] # bytes in central directory
|
|
offset_cd = endrec[6] # offset of central directory
|
|
self.comment = endrec[8] # archive comment
|
|
# endrec[9] is the offset of the "End of Central Dir" record
|
|
if endrec[9] > ZIP64_LIMIT:
|
|
x = endrec[9] - size_cd - 56 - 20
|
|
else:
|
|
x = endrec[9] - size_cd
|
|
# "concat" is zero, unless zip was concatenated to another file
|
|
concat = x - offset_cd
|
|
if self.debug > 2:
|
|
print("given, inferred, offset", offset_cd, x, concat)
|
|
# self.start_dir: Position of start of central directory
|
|
self.start_dir = offset_cd + concat
|
|
fp.seek(self.start_dir, 0)
|
|
data = fp.read(size_cd)
|
|
fp = io.BytesIO(data)
|
|
total = 0
|
|
while total < size_cd:
|
|
centdir = fp.read(46)
|
|
total = total + 46
|
|
if centdir[0:4] != stringCentralDir:
|
|
raise BadZipfile, "Bad magic number for central directory"
|
|
centdir = struct.unpack(structCentralDir, centdir)
|
|
if self.debug > 2:
|
|
print(centdir)
|
|
filename = fp.read(centdir[_CD_FILENAME_LENGTH])
|
|
# Create ZipInfo instance to store file information
|
|
x = ZipInfo(str(filename))
|
|
x.extra = fp.read(centdir[_CD_EXTRA_FIELD_LENGTH])
|
|
x.comment = fp.read(centdir[_CD_COMMENT_LENGTH])
|
|
total = (total + centdir[_CD_FILENAME_LENGTH]
|
|
+ centdir[_CD_EXTRA_FIELD_LENGTH]
|
|
+ centdir[_CD_COMMENT_LENGTH])
|
|
x.header_offset = centdir[_CD_LOCAL_HEADER_OFFSET]
|
|
(x.create_version, x.create_system, x.extract_version, x.reserved,
|
|
x.flag_bits, x.compress_type, t, d,
|
|
x.CRC, x.compress_size, x.file_size) = centdir[1:12]
|
|
x.volume, x.internal_attr, x.external_attr = centdir[15:18]
|
|
# Convert date/time code to (year, month, day, hour, min, sec)
|
|
x.date_time = ( (d>>9)+1980, (d>>5)&0xF, d&0x1F,
|
|
t>>11, (t>>5)&0x3F, (t&0x1F) * 2 )
|
|
|
|
x._decodeExtra()
|
|
x.header_offset = x.header_offset + concat
|
|
self.filelist.append(x)
|
|
self.NameToInfo[x.filename] = x
|
|
if self.debug > 2:
|
|
print("total", total)
|
|
|
|
|
|
def namelist(self):
|
|
"""Return a list of file names in the archive."""
|
|
l = []
|
|
for data in self.filelist:
|
|
l.append(data.filename)
|
|
return l
|
|
|
|
def infolist(self):
|
|
"""Return a list of class ZipInfo instances for files in the
|
|
archive."""
|
|
return self.filelist
|
|
|
|
def printdir(self, file=None):
|
|
"""Print a table of contents for the zip file."""
|
|
print("%-46s %19s %12s" % ("File Name", "Modified ", "Size"),
|
|
file=file)
|
|
for zinfo in self.filelist:
|
|
date = "%d-%02d-%02d %02d:%02d:%02d" % zinfo.date_time
|
|
print("%-46s %s %12d" % (zinfo.filename, date, zinfo.file_size),
|
|
file=file)
|
|
|
|
def testzip(self):
|
|
"""Read all the files and check the CRC."""
|
|
for zinfo in self.filelist:
|
|
try:
|
|
self.read(zinfo.filename) # Check CRC-32
|
|
except BadZipfile:
|
|
return zinfo.filename
|
|
|
|
|
|
def getinfo(self, name):
|
|
"""Return the instance of ZipInfo given 'name'."""
|
|
return self.NameToInfo[name]
|
|
|
|
def setpassword(self, pwd):
|
|
"""Set default password for encrypted files."""
|
|
assert isinstance(pwd, bytes)
|
|
self.pwd = pwd
|
|
|
|
def read(self, name, pwd=None):
|
|
"""Return file bytes (as a string) for name."""
|
|
return self.open(name, "r", pwd).read()
|
|
|
|
def open(self, name, mode="r", pwd=None):
|
|
"""Return file-like object for 'name'."""
|
|
if mode not in ("r", "U", "rU"):
|
|
raise RuntimeError, 'open() requires mode "r", "U", or "rU"'
|
|
if not self.fp:
|
|
raise RuntimeError, \
|
|
"Attempt to read ZIP archive that was already closed"
|
|
|
|
# Only open a new file for instances where we were not
|
|
# given a file object in the constructor
|
|
if self._filePassed:
|
|
zef_file = self.fp
|
|
else:
|
|
zef_file = io.open(self.filename, 'rb')
|
|
|
|
# Get info object for name
|
|
zinfo = self.getinfo(name)
|
|
|
|
filepos = zef_file.tell()
|
|
|
|
zef_file.seek(zinfo.header_offset, 0)
|
|
|
|
# Skip the file header:
|
|
fheader = zef_file.read(30)
|
|
if fheader[0:4] != stringFileHeader:
|
|
raise BadZipfile, "Bad magic number for file header"
|
|
|
|
fheader = struct.unpack(structFileHeader, fheader)
|
|
fname = zef_file.read(fheader[_FH_FILENAME_LENGTH])
|
|
if fheader[_FH_EXTRA_FIELD_LENGTH]:
|
|
zef_file.read(fheader[_FH_EXTRA_FIELD_LENGTH])
|
|
|
|
if fname != zinfo.orig_filename.encode("utf-8"):
|
|
raise BadZipfile, \
|
|
'File name in directory %r and header %r differ.' % (
|
|
zinfo.orig_filename, fname)
|
|
|
|
# check for encrypted flag & handle password
|
|
is_encrypted = zinfo.flag_bits & 0x1
|
|
zd = None
|
|
if is_encrypted:
|
|
if not pwd:
|
|
pwd = self.pwd
|
|
if not pwd:
|
|
raise RuntimeError, "File %s is encrypted, " \
|
|
"password required for extraction" % name
|
|
|
|
zd = _ZipDecrypter(pwd)
|
|
# The first 12 bytes in the cypher stream is an encryption header
|
|
# used to strengthen the algorithm. The first 11 bytes are
|
|
# completely random, while the 12th contains the MSB of the CRC,
|
|
# and is used to check the correctness of the password.
|
|
bytes = zef_file.read(12)
|
|
h = list(map(zd, bytes[0:12]))
|
|
if h[11] != ((zinfo.CRC>>24) & 255):
|
|
raise RuntimeError, "Bad password for file %s" % name
|
|
|
|
# build and return a ZipExtFile
|
|
if zd is None:
|
|
zef = ZipExtFile(zef_file, zinfo)
|
|
else:
|
|
zef = ZipExtFile(zef_file, zinfo, zd)
|
|
|
|
# set universal newlines on ZipExtFile if necessary
|
|
if "U" in mode:
|
|
zef.set_univ_newlines(True)
|
|
return zef
|
|
|
|
def _writecheck(self, zinfo):
|
|
"""Check for errors before writing a file to the archive."""
|
|
if zinfo.filename in self.NameToInfo:
|
|
if self.debug: # Warning for duplicate names
|
|
print("Duplicate name:", zinfo.filename)
|
|
if self.mode not in ("w", "a"):
|
|
raise RuntimeError, 'write() requires mode "w" or "a"'
|
|
if not self.fp:
|
|
raise RuntimeError, \
|
|
"Attempt to write ZIP archive that was already closed"
|
|
if zinfo.compress_type == ZIP_DEFLATED and not zlib:
|
|
raise RuntimeError, \
|
|
"Compression requires the (missing) zlib module"
|
|
if zinfo.compress_type not in (ZIP_STORED, ZIP_DEFLATED):
|
|
raise RuntimeError, \
|
|
"That compression method is not supported"
|
|
if zinfo.file_size > ZIP64_LIMIT:
|
|
if not self._allowZip64:
|
|
raise LargeZipFile("Filesize would require ZIP64 extensions")
|
|
if zinfo.header_offset > ZIP64_LIMIT:
|
|
if not self._allowZip64:
|
|
raise LargeZipFile("Zipfile size would require ZIP64 extensions")
|
|
|
|
def write(self, filename, arcname=None, compress_type=None):
|
|
"""Put the bytes from filename into the archive under the name
|
|
arcname."""
|
|
st = os.stat(filename)
|
|
mtime = time.localtime(st.st_mtime)
|
|
date_time = mtime[0:6]
|
|
# Create ZipInfo instance to store file information
|
|
if arcname is None:
|
|
arcname = filename
|
|
arcname = os.path.normpath(os.path.splitdrive(arcname)[1])
|
|
while arcname[0] in (os.sep, os.altsep):
|
|
arcname = arcname[1:]
|
|
zinfo = ZipInfo(arcname, date_time)
|
|
zinfo.external_attr = (st[0] & 0xFFFF) << 16 # Unix attributes
|
|
if compress_type is None:
|
|
zinfo.compress_type = self.compression
|
|
else:
|
|
zinfo.compress_type = compress_type
|
|
|
|
zinfo.file_size = st.st_size
|
|
zinfo.flag_bits = 0x00
|
|
zinfo.header_offset = self.fp.tell() # Start of header bytes
|
|
|
|
self._writecheck(zinfo)
|
|
self._didModify = True
|
|
fp = io.open(filename, "rb")
|
|
# Must overwrite CRC and sizes with correct data later
|
|
zinfo.CRC = CRC = 0
|
|
zinfo.compress_size = compress_size = 0
|
|
zinfo.file_size = file_size = 0
|
|
self.fp.write(zinfo.FileHeader())
|
|
if zinfo.compress_type == ZIP_DEFLATED:
|
|
cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
|
|
zlib.DEFLATED, -15)
|
|
else:
|
|
cmpr = None
|
|
while 1:
|
|
buf = fp.read(1024 * 8)
|
|
if not buf:
|
|
break
|
|
file_size = file_size + len(buf)
|
|
CRC = binascii.crc32(buf, CRC)
|
|
if cmpr:
|
|
buf = cmpr.compress(buf)
|
|
compress_size = compress_size + len(buf)
|
|
self.fp.write(buf)
|
|
fp.close()
|
|
if cmpr:
|
|
buf = cmpr.flush()
|
|
compress_size = compress_size + len(buf)
|
|
self.fp.write(buf)
|
|
zinfo.compress_size = compress_size
|
|
else:
|
|
zinfo.compress_size = file_size
|
|
zinfo.CRC = CRC
|
|
zinfo.file_size = file_size
|
|
# Seek backwards and write CRC and file sizes
|
|
position = self.fp.tell() # Preserve current position in file
|
|
self.fp.seek(zinfo.header_offset + 14, 0)
|
|
self.fp.write(struct.pack("<lLL", zinfo.CRC, zinfo.compress_size,
|
|
zinfo.file_size))
|
|
self.fp.seek(position, 0)
|
|
self.filelist.append(zinfo)
|
|
self.NameToInfo[zinfo.filename] = zinfo
|
|
|
|
def writestr(self, zinfo_or_arcname, bytes):
|
|
"""Write a file into the archive. The contents is the string
|
|
'bytes'. 'zinfo_or_arcname' is either a ZipInfo instance or
|
|
the name of the file in the archive."""
|
|
if not isinstance(zinfo_or_arcname, ZipInfo):
|
|
zinfo = ZipInfo(filename=zinfo_or_arcname,
|
|
date_time=time.localtime(time.time()))
|
|
zinfo.compress_type = self.compression
|
|
else:
|
|
zinfo = zinfo_or_arcname
|
|
zinfo.file_size = len(bytes) # Uncompressed size
|
|
zinfo.header_offset = self.fp.tell() # Start of header bytes
|
|
self._writecheck(zinfo)
|
|
self._didModify = True
|
|
zinfo.CRC = binascii.crc32(bytes) # CRC-32 checksum
|
|
if zinfo.compress_type == ZIP_DEFLATED:
|
|
co = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
|
|
zlib.DEFLATED, -15)
|
|
bytes = co.compress(bytes) + co.flush()
|
|
zinfo.compress_size = len(bytes) # Compressed size
|
|
else:
|
|
zinfo.compress_size = zinfo.file_size
|
|
zinfo.header_offset = self.fp.tell() # Start of header bytes
|
|
self.fp.write(zinfo.FileHeader())
|
|
self.fp.write(bytes)
|
|
self.fp.flush()
|
|
if zinfo.flag_bits & 0x08:
|
|
# Write CRC and file sizes after the file data
|
|
self.fp.write(struct.pack("<lLL", zinfo.CRC, zinfo.compress_size,
|
|
zinfo.file_size))
|
|
self.filelist.append(zinfo)
|
|
self.NameToInfo[zinfo.filename] = zinfo
|
|
|
|
def __del__(self):
|
|
"""Call the "close()" method in case the user forgot."""
|
|
self.close()
|
|
|
|
def close(self):
|
|
"""Close the file, and for mode "w" and "a" write the ending
|
|
records."""
|
|
if self.fp is None:
|
|
return
|
|
|
|
if self.mode in ("w", "a") and self._didModify: # write ending records
|
|
count = 0
|
|
pos1 = self.fp.tell()
|
|
for zinfo in self.filelist: # write central directory
|
|
count = count + 1
|
|
dt = zinfo.date_time
|
|
dosdate = (dt[0] - 1980) << 9 | dt[1] << 5 | dt[2]
|
|
dostime = dt[3] << 11 | dt[4] << 5 | (dt[5] // 2)
|
|
extra = []
|
|
if zinfo.file_size > ZIP64_LIMIT \
|
|
or zinfo.compress_size > ZIP64_LIMIT:
|
|
extra.append(zinfo.file_size)
|
|
extra.append(zinfo.compress_size)
|
|
file_size = 0xffffffff #-1
|
|
compress_size = 0xffffffff #-1
|
|
else:
|
|
file_size = zinfo.file_size
|
|
compress_size = zinfo.compress_size
|
|
|
|
if zinfo.header_offset > ZIP64_LIMIT:
|
|
extra.append(zinfo.header_offset)
|
|
header_offset = -1 # struct "l" format: 32 one bits
|
|
else:
|
|
header_offset = zinfo.header_offset
|
|
|
|
extra_data = zinfo.extra
|
|
if extra:
|
|
# Append a ZIP64 field to the extra's
|
|
extra_data = struct.pack(
|
|
'<hh' + 'q'*len(extra),
|
|
1, 8*len(extra), *extra) + extra_data
|
|
|
|
extract_version = max(45, zinfo.extract_version)
|
|
create_version = max(45, zinfo.create_version)
|
|
else:
|
|
extract_version = zinfo.extract_version
|
|
create_version = zinfo.create_version
|
|
|
|
centdir = struct.pack(structCentralDir,
|
|
stringCentralDir, create_version,
|
|
zinfo.create_system, extract_version, zinfo.reserved,
|
|
zinfo.flag_bits, zinfo.compress_type, dostime, dosdate,
|
|
zinfo.CRC, compress_size, file_size,
|
|
len(zinfo.filename), len(extra_data), len(zinfo.comment),
|
|
0, zinfo.internal_attr, zinfo.external_attr,
|
|
header_offset)
|
|
self.fp.write(centdir)
|
|
self.fp.write(zinfo.filename.encode("utf-8"))
|
|
self.fp.write(extra_data)
|
|
self.fp.write(zinfo.comment)
|
|
|
|
pos2 = self.fp.tell()
|
|
# Write end-of-zip-archive record
|
|
if pos1 > ZIP64_LIMIT:
|
|
# Need to write the ZIP64 end-of-archive records
|
|
zip64endrec = struct.pack(
|
|
structEndArchive64, stringEndArchive64,
|
|
44, 45, 45, 0, 0, count, count, pos2 - pos1, pos1)
|
|
self.fp.write(zip64endrec)
|
|
|
|
zip64locrec = struct.pack(
|
|
structEndArchive64Locator,
|
|
stringEndArchive64Locator, 0, pos2, 1)
|
|
self.fp.write(zip64locrec)
|
|
|
|
# XXX Why is `pos3` computed next? It's never referenced.
|
|
pos3 = self.fp.tell()
|
|
endrec = struct.pack(structEndArchive, stringEndArchive,
|
|
0, 0, count, count, pos2 - pos1, -1, 0)
|
|
self.fp.write(endrec)
|
|
|
|
else:
|
|
endrec = struct.pack(structEndArchive, stringEndArchive,
|
|
0, 0, count, count, pos2 - pos1, pos1, 0)
|
|
self.fp.write(endrec)
|
|
self.fp.flush()
|
|
if not self._filePassed:
|
|
self.fp.close()
|
|
self.fp = None
|
|
|
|
|
|
class PyZipFile(ZipFile):
|
|
"""Class to create ZIP archives with Python library files and packages."""
|
|
|
|
def writepy(self, pathname, basename = ""):
|
|
"""Add all files from "pathname" to the ZIP archive.
|
|
|
|
If pathname is a package directory, search the directory and
|
|
all package subdirectories recursively for all *.py and enter
|
|
the modules into the archive. If pathname is a plain
|
|
directory, listdir *.py and enter all modules. Else, pathname
|
|
must be a Python *.py file and the module will be put into the
|
|
archive. Added modules are always module.pyo or module.pyc.
|
|
This method will compile the module.py into module.pyc if
|
|
necessary.
|
|
"""
|
|
dir, name = os.path.split(pathname)
|
|
if os.path.isdir(pathname):
|
|
initname = os.path.join(pathname, "__init__.py")
|
|
if os.path.isfile(initname):
|
|
# This is a package directory, add it
|
|
if basename:
|
|
basename = "%s/%s" % (basename, name)
|
|
else:
|
|
basename = name
|
|
if self.debug:
|
|
print("Adding package in", pathname, "as", basename)
|
|
fname, arcname = self._get_codename(initname[0:-3], basename)
|
|
if self.debug:
|
|
print("Adding", arcname)
|
|
self.write(fname, arcname)
|
|
dirlist = os.listdir(pathname)
|
|
dirlist.remove("__init__.py")
|
|
# Add all *.py files and package subdirectories
|
|
for filename in dirlist:
|
|
path = os.path.join(pathname, filename)
|
|
root, ext = os.path.splitext(filename)
|
|
if os.path.isdir(path):
|
|
if os.path.isfile(os.path.join(path, "__init__.py")):
|
|
# This is a package directory, add it
|
|
self.writepy(path, basename) # Recursive call
|
|
elif ext == ".py":
|
|
fname, arcname = self._get_codename(path[0:-3],
|
|
basename)
|
|
if self.debug:
|
|
print("Adding", arcname)
|
|
self.write(fname, arcname)
|
|
else:
|
|
# This is NOT a package directory, add its files at top level
|
|
if self.debug:
|
|
print("Adding files from directory", pathname)
|
|
for filename in os.listdir(pathname):
|
|
path = os.path.join(pathname, filename)
|
|
root, ext = os.path.splitext(filename)
|
|
if ext == ".py":
|
|
fname, arcname = self._get_codename(path[0:-3],
|
|
basename)
|
|
if self.debug:
|
|
print("Adding", arcname)
|
|
self.write(fname, arcname)
|
|
else:
|
|
if pathname[-3:] != ".py":
|
|
raise RuntimeError, \
|
|
'Files added with writepy() must end with ".py"'
|
|
fname, arcname = self._get_codename(pathname[0:-3], basename)
|
|
if self.debug:
|
|
print("Adding file", arcname)
|
|
self.write(fname, arcname)
|
|
|
|
def _get_codename(self, pathname, basename):
|
|
"""Return (filename, archivename) for the path.
|
|
|
|
Given a module name path, return the correct file path and
|
|
archive name, compiling if necessary. For example, given
|
|
/python/lib/string, return (/python/lib/string.pyc, string).
|
|
"""
|
|
file_py = pathname + ".py"
|
|
file_pyc = pathname + ".pyc"
|
|
file_pyo = pathname + ".pyo"
|
|
if os.path.isfile(file_pyo) and \
|
|
os.stat(file_pyo).st_mtime >= os.stat(file_py).st_mtime:
|
|
fname = file_pyo # Use .pyo file
|
|
elif not os.path.isfile(file_pyc) or \
|
|
os.stat(file_pyc).st_mtime < os.stat(file_py).st_mtime:
|
|
import py_compile
|
|
if self.debug:
|
|
print("Compiling", file_py)
|
|
try:
|
|
py_compile.compile(file_py, file_pyc, None, True)
|
|
except py_compile.PyCompileError as err:
|
|
print(err.msg)
|
|
fname = file_pyc
|
|
else:
|
|
fname = file_pyc
|
|
archivename = os.path.split(fname)[1]
|
|
if basename:
|
|
archivename = "%s/%s" % (basename, archivename)
|
|
return (fname, archivename)
|
|
|
|
|
|
def main(args = None):
|
|
import textwrap
|
|
USAGE=textwrap.dedent("""\
|
|
Usage:
|
|
zipfile.py -l zipfile.zip # Show listing of a zipfile
|
|
zipfile.py -t zipfile.zip # Test if a zipfile is valid
|
|
zipfile.py -e zipfile.zip target # Extract zipfile into target dir
|
|
zipfile.py -c zipfile.zip src ... # Create zipfile from sources
|
|
""")
|
|
if args is None:
|
|
args = sys.argv[1:]
|
|
|
|
if not args or args[0] not in ('-l', '-c', '-e', '-t'):
|
|
print(USAGE)
|
|
sys.exit(1)
|
|
|
|
if args[0] == '-l':
|
|
if len(args) != 2:
|
|
print(USAGE)
|
|
sys.exit(1)
|
|
zf = ZipFile(args[1], 'r')
|
|
zf.printdir()
|
|
zf.close()
|
|
|
|
elif args[0] == '-t':
|
|
if len(args) != 2:
|
|
print(USAGE)
|
|
sys.exit(1)
|
|
zf = ZipFile(args[1], 'r')
|
|
zf.testzip()
|
|
print("Done testing")
|
|
|
|
elif args[0] == '-e':
|
|
if len(args) != 3:
|
|
print(USAGE)
|
|
sys.exit(1)
|
|
|
|
zf = ZipFile(args[1], 'r')
|
|
out = args[2]
|
|
for path in zf.namelist():
|
|
if path.startswith('./'):
|
|
tgt = os.path.join(out, path[2:])
|
|
else:
|
|
tgt = os.path.join(out, path)
|
|
|
|
tgtdir = os.path.dirname(tgt)
|
|
if not os.path.exists(tgtdir):
|
|
os.makedirs(tgtdir)
|
|
fp = io.open(tgt, 'wb')
|
|
fp.write(zf.read(path))
|
|
fp.close()
|
|
zf.close()
|
|
|
|
elif args[0] == '-c':
|
|
if len(args) < 3:
|
|
print(USAGE)
|
|
sys.exit(1)
|
|
|
|
def addToZip(zf, path, zippath):
|
|
if os.path.isfile(path):
|
|
zf.write(path, zippath, ZIP_DEFLATED)
|
|
elif os.path.isdir(path):
|
|
for nm in os.listdir(path):
|
|
addToZip(zf,
|
|
os.path.join(path, nm), os.path.join(zippath, nm))
|
|
# else: ignore
|
|
|
|
zf = ZipFile(args[1], 'w', allowZip64=True)
|
|
for src in args[2:]:
|
|
addToZip(zf, src, os.path.basename(src))
|
|
|
|
zf.close()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|