blob: f99190031cd1466ad54e14e9068bab58277013ba [file] [log] [blame]
Guido van Rossum7d9ea502003-02-03 20:45:52 +00001import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002from test import support
Christian Heimesd5e2b6f2008-03-19 21:50:51 +00003import binascii
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +00004import random
Antoine Pitrou9e719b62011-02-28 23:48:16 +00005import sys
6from test.support import precisionbigmemtest, _1G, _4G
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +00007
R. David Murraya21e4ca2009-03-31 23:16:50 +00008zlib = support.import_module('zlib')
9
Antoine Pitrou9e719b62011-02-28 23:48:16 +000010try:
11 import mmap
12except ImportError:
13 mmap = None
14
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +000015
Guido van Rossum7d9ea502003-02-03 20:45:52 +000016class ChecksumTestCase(unittest.TestCase):
17 # checksum test cases
18 def test_crc32start(self):
Guido van Rossum776152b2007-05-22 22:44:07 +000019 self.assertEqual(zlib.crc32(b""), zlib.crc32(b"", 0))
Georg Brandlab91fde2009-08-13 08:51:18 +000020 self.assertTrue(zlib.crc32(b"abc", 0xffffffff))
Andrew M. Kuchlingfcfc8d52001-08-10 15:50:11 +000021
Guido van Rossum7d9ea502003-02-03 20:45:52 +000022 def test_crc32empty(self):
Guido van Rossum776152b2007-05-22 22:44:07 +000023 self.assertEqual(zlib.crc32(b"", 0), 0)
24 self.assertEqual(zlib.crc32(b"", 1), 1)
25 self.assertEqual(zlib.crc32(b"", 432), 432)
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +000026
Guido van Rossum7d9ea502003-02-03 20:45:52 +000027 def test_adler32start(self):
Guido van Rossum776152b2007-05-22 22:44:07 +000028 self.assertEqual(zlib.adler32(b""), zlib.adler32(b"", 1))
Georg Brandlab91fde2009-08-13 08:51:18 +000029 self.assertTrue(zlib.adler32(b"abc", 0xffffffff))
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +000030
Guido van Rossum7d9ea502003-02-03 20:45:52 +000031 def test_adler32empty(self):
Guido van Rossum776152b2007-05-22 22:44:07 +000032 self.assertEqual(zlib.adler32(b"", 0), 0)
33 self.assertEqual(zlib.adler32(b"", 1), 1)
34 self.assertEqual(zlib.adler32(b"", 432), 432)
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +000035
Guido van Rossum7d9ea502003-02-03 20:45:52 +000036 def assertEqual32(self, seen, expected):
37 # 32-bit values masked -- checksums on 32- vs 64- bit machines
38 # This is important if bit 31 (0x08000000L) is set.
Guido van Rossume2a383d2007-01-15 16:59:06 +000039 self.assertEqual(seen & 0x0FFFFFFFF, expected & 0x0FFFFFFFF)
Guido van Rossum7d9ea502003-02-03 20:45:52 +000040
41 def test_penguins(self):
Guido van Rossum776152b2007-05-22 22:44:07 +000042 self.assertEqual32(zlib.crc32(b"penguin", 0), 0x0e5c1a120)
43 self.assertEqual32(zlib.crc32(b"penguin", 1), 0x43b6aa94)
44 self.assertEqual32(zlib.adler32(b"penguin", 0), 0x0bcf02f6)
45 self.assertEqual32(zlib.adler32(b"penguin", 1), 0x0bd602f7)
Guido van Rossum7d9ea502003-02-03 20:45:52 +000046
Guido van Rossum776152b2007-05-22 22:44:07 +000047 self.assertEqual(zlib.crc32(b"penguin"), zlib.crc32(b"penguin", 0))
48 self.assertEqual(zlib.adler32(b"penguin"),zlib.adler32(b"penguin",1))
Guido van Rossum7d9ea502003-02-03 20:45:52 +000049
Gregory P. Smithab0d8a12008-03-17 20:24:09 +000050 def test_crc32_adler32_unsigned(self):
Antoine Pitrouc8428d32009-12-14 18:23:30 +000051 foo = b'abcdefghijklmnop'
Gregory P. Smithab0d8a12008-03-17 20:24:09 +000052 # explicitly test signed behavior
Gregory P. Smith27275032008-03-20 06:20:09 +000053 self.assertEqual(zlib.crc32(foo), 2486878355)
Antoine Pitrouc8428d32009-12-14 18:23:30 +000054 self.assertEqual(zlib.crc32(b'spam'), 1138425661)
Gregory P. Smithab0d8a12008-03-17 20:24:09 +000055 self.assertEqual(zlib.adler32(foo+foo), 3573550353)
Antoine Pitrouc8428d32009-12-14 18:23:30 +000056 self.assertEqual(zlib.adler32(b'spam'), 72286642)
Gregory P. Smithab0d8a12008-03-17 20:24:09 +000057
Christian Heimesd5e2b6f2008-03-19 21:50:51 +000058 def test_same_as_binascii_crc32(self):
Martin v. Löwis15b16a32008-12-02 06:00:15 +000059 foo = b'abcdefghijklmnop'
Gregory P. Smith27275032008-03-20 06:20:09 +000060 crc = 2486878355
Christian Heimesd5e2b6f2008-03-19 21:50:51 +000061 self.assertEqual(binascii.crc32(foo), crc)
62 self.assertEqual(zlib.crc32(foo), crc)
Martin v. Löwis15b16a32008-12-02 06:00:15 +000063 self.assertEqual(binascii.crc32(b'spam'), zlib.crc32(b'spam'))
Guido van Rossum7d9ea502003-02-03 20:45:52 +000064
65
Antoine Pitrou9e719b62011-02-28 23:48:16 +000066# Issue #10276 - check that inputs >=4GB are handled correctly.
67class ChecksumBigBufferTestCase(unittest.TestCase):
68
Antoine Pitrou88f416e2011-03-01 00:29:11 +000069 @unittest.skipUnless(mmap, "mmap() is not available.")
70 @unittest.skipUnless(sys.maxsize > _4G, "Can't run on a 32-bit system.")
71 @unittest.skipUnless(support.is_resource_enabled("largefile"),
72 "May use lots of disk space.")
Antoine Pitrou9e719b62011-02-28 23:48:16 +000073 def setUp(self):
74 with open(support.TESTFN, "wb+") as f:
75 f.seek(_4G)
76 f.write(b"asdf")
Victor Stinnera6cd0cf2011-05-02 01:05:37 +020077 f.flush()
Antoine Pitrou9e719b62011-02-28 23:48:16 +000078 self.mapping = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
79
80 def tearDown(self):
81 self.mapping.close()
82 support.unlink(support.TESTFN)
83
Antoine Pitrou9e719b62011-02-28 23:48:16 +000084 def test_big_buffer(self):
85 self.assertEqual(zlib.crc32(self.mapping), 3058686908)
86 self.assertEqual(zlib.adler32(self.mapping), 82837919)
87
Christian Heimesb186d002008-03-18 15:15:01 +000088
Guido van Rossum7d9ea502003-02-03 20:45:52 +000089class ExceptionTestCase(unittest.TestCase):
90 # make sure we generate some expected errors
Guido van Rossum8ce8a782007-11-01 19:42:39 +000091 def test_badlevel(self):
92 # specifying compression level out of range causes an error
93 # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib
94 # accepts 0 too)
Antoine Pitrouc8428d32009-12-14 18:23:30 +000095 self.assertRaises(zlib.error, zlib.compress, b'ERROR', 10)
96
97 def test_badargs(self):
98 self.assertRaises(TypeError, zlib.adler32)
99 self.assertRaises(TypeError, zlib.crc32)
100 self.assertRaises(TypeError, zlib.compress)
101 self.assertRaises(TypeError, zlib.decompress)
102 for arg in (42, None, '', 'abc', (), []):
103 self.assertRaises(TypeError, zlib.adler32, arg)
104 self.assertRaises(TypeError, zlib.crc32, arg)
105 self.assertRaises(TypeError, zlib.compress, arg)
106 self.assertRaises(TypeError, zlib.decompress, arg)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000107
108 def test_badcompressobj(self):
109 # verify failure on building compress object with bad params
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000110 self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0)
Guido van Rossum8ce8a782007-11-01 19:42:39 +0000111 # specifying total bits too large causes an error
112 self.assertRaises(ValueError,
113 zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000114
115 def test_baddecompressobj(self):
116 # verify failure on building decompress object with bad params
Antoine Pitroue96bf092010-04-06 17:25:56 +0000117 self.assertRaises(ValueError, zlib.decompressobj, -1)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000118
Christian Heimes5e696852008-04-09 08:37:03 +0000119 def test_decompressobj_badflush(self):
120 # verify failure on calling decompressobj.flush with bad params
121 self.assertRaises(ValueError, zlib.decompressobj().flush, 0)
122 self.assertRaises(ValueError, zlib.decompressobj().flush, -1)
123
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000124
Antoine Pitrou4b3fe142010-05-07 17:08:54 +0000125class BaseCompressTestCase(object):
126 def check_big_compress_buffer(self, size, compress_func):
127 _1M = 1024 * 1024
128 fmt = "%%0%dx" % (2 * _1M)
129 # Generate 10MB worth of random, and expand it by repeating it.
130 # The assumption is that zlib's memory is not big enough to exploit
131 # such spread out redundancy.
Antoine Pitrouadfe27a2010-05-07 18:09:36 +0000132 data = b''.join([binascii.a2b_hex(fmt % random.getrandbits(8 * _1M))
Antoine Pitrou4b3fe142010-05-07 17:08:54 +0000133 for i in range(10)])
134 data = data * (size // len(data) + 1)
135 try:
136 compress_func(data)
137 finally:
138 # Release memory
139 data = None
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000140
Antoine Pitrou4b3fe142010-05-07 17:08:54 +0000141 def check_big_decompress_buffer(self, size, decompress_func):
142 data = b'x' * size
143 try:
144 compressed = zlib.compress(data, 1)
145 finally:
146 # Release memory
147 data = None
148 data = decompress_func(compressed)
149 # Sanity check
150 try:
151 self.assertEqual(len(data), size)
152 self.assertEqual(len(data.strip(b'x')), 0)
153 finally:
154 data = None
155
156
157class CompressTestCase(BaseCompressTestCase, unittest.TestCase):
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000158 # Test compression in one go (whole message compression)
159 def test_speech(self):
Neil Schemenauer6412b122004-06-05 19:34:28 +0000160 x = zlib.compress(HAMLET_SCENE)
161 self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000162
163 def test_speech128(self):
Neil Schemenauer6412b122004-06-05 19:34:28 +0000164 # compress more data
165 data = HAMLET_SCENE * 128
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000166 x = zlib.compress(data)
Antoine Pitrouc8428d32009-12-14 18:23:30 +0000167 self.assertEqual(zlib.compress(bytearray(data)), x)
168 for ob in x, bytearray(x):
169 self.assertEqual(zlib.decompress(ob), data)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000170
Antoine Pitrou96f212b2010-05-11 23:49:58 +0000171 def test_incomplete_stream(self):
172 # An useful error message is given
173 x = zlib.compress(HAMLET_SCENE)
174 self.assertRaisesRegexp(zlib.error,
175 "Error -5 while decompressing data: incomplete or truncated stream",
176 zlib.decompress, x[:-1])
177
Antoine Pitrou4b3fe142010-05-07 17:08:54 +0000178 # Memory use of the following functions takes into account overallocation
179
180 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3)
181 def test_big_compress_buffer(self, size):
182 compress = lambda s: zlib.compress(s, 1)
183 self.check_big_compress_buffer(size, compress)
184
185 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2)
186 def test_big_decompress_buffer(self, size):
187 self.check_big_decompress_buffer(size, zlib.decompress)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000188
189
Antoine Pitrou4b3fe142010-05-07 17:08:54 +0000190class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000191 # Test compression object
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000192 def test_pair(self):
Neil Schemenauer6412b122004-06-05 19:34:28 +0000193 # straightforward compress/decompress objects
Antoine Pitrouc8428d32009-12-14 18:23:30 +0000194 datasrc = HAMLET_SCENE * 128
195 datazip = zlib.compress(datasrc)
196 # should compress both bytes and bytearray data
197 for data in (datasrc, bytearray(datasrc)):
198 co = zlib.compressobj()
199 x1 = co.compress(data)
200 x2 = co.flush()
201 self.assertRaises(zlib.error, co.flush) # second flush should not work
202 self.assertEqual(x1 + x2, datazip)
203 for v1, v2 in ((x1, x2), (bytearray(x1), bytearray(x2))):
204 dco = zlib.decompressobj()
205 y1 = dco.decompress(v1 + v2)
206 y2 = dco.flush()
207 self.assertEqual(data, y1 + y2)
208 self.assertTrue(isinstance(dco.unconsumed_tail, bytes))
209 self.assertTrue(isinstance(dco.unused_data, bytes))
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000210
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000211 def test_compressoptions(self):
212 # specify lots of options to compressobj()
213 level = 2
214 method = zlib.DEFLATED
215 wbits = -12
216 memlevel = 9
217 strategy = zlib.Z_FILTERED
218 co = zlib.compressobj(level, method, wbits, memlevel, strategy)
Neil Schemenauer6412b122004-06-05 19:34:28 +0000219 x1 = co.compress(HAMLET_SCENE)
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000220 x2 = co.flush()
221 dco = zlib.decompressobj(wbits)
222 y1 = dco.decompress(x1 + x2)
223 y2 = dco.flush()
Neil Schemenauer6412b122004-06-05 19:34:28 +0000224 self.assertEqual(HAMLET_SCENE, y1 + y2)
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000225
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000226 def test_compressincremental(self):
227 # compress object in steps, decompress object as one-shot
Neil Schemenauer6412b122004-06-05 19:34:28 +0000228 data = HAMLET_SCENE * 128
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000229 co = zlib.compressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000230 bufs = []
231 for i in range(0, len(data), 256):
232 bufs.append(co.compress(data[i:i+256]))
233 bufs.append(co.flush())
Guido van Rossum776152b2007-05-22 22:44:07 +0000234 combuf = b''.join(bufs)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000235
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000236 dco = zlib.decompressobj()
Guido van Rossum776152b2007-05-22 22:44:07 +0000237 y1 = dco.decompress(b''.join(bufs))
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000238 y2 = dco.flush()
239 self.assertEqual(data, y1 + y2)
240
Neil Schemenauer6412b122004-06-05 19:34:28 +0000241 def test_decompinc(self, flush=False, source=None, cx=256, dcx=64):
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000242 # compress object in steps, decompress object in steps
Neil Schemenauer6412b122004-06-05 19:34:28 +0000243 source = source or HAMLET_SCENE
244 data = source * 128
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000245 co = zlib.compressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000246 bufs = []
Neil Schemenauer6412b122004-06-05 19:34:28 +0000247 for i in range(0, len(data), cx):
248 bufs.append(co.compress(data[i:i+cx]))
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000249 bufs.append(co.flush())
Guido van Rossum776152b2007-05-22 22:44:07 +0000250 combuf = b''.join(bufs)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000251
Gregory P. Smith693fc462008-09-06 20:13:06 +0000252 decombuf = zlib.decompress(combuf)
253 # Test type of return value
Georg Brandlab91fde2009-08-13 08:51:18 +0000254 self.assertTrue(isinstance(decombuf, bytes))
Gregory P. Smith693fc462008-09-06 20:13:06 +0000255
256 self.assertEqual(data, decombuf)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000257
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000258 dco = zlib.decompressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000259 bufs = []
Neil Schemenauer6412b122004-06-05 19:34:28 +0000260 for i in range(0, len(combuf), dcx):
261 bufs.append(dco.decompress(combuf[i:i+dcx]))
Guido van Rossum776152b2007-05-22 22:44:07 +0000262 self.assertEqual(b'', dco.unconsumed_tail, ########
263 "(A) uct should be b'': not %d long" %
Neil Schemenauer6412b122004-06-05 19:34:28 +0000264 len(dco.unconsumed_tail))
Amaury Forgeot d'Arce43d33a2008-07-02 20:50:16 +0000265 self.assertEqual(b'', dco.unused_data)
Neil Schemenauer6412b122004-06-05 19:34:28 +0000266 if flush:
267 bufs.append(dco.flush())
268 else:
269 while True:
Antoine Pitrouc8428d32009-12-14 18:23:30 +0000270 chunk = dco.decompress(b'')
Neil Schemenauer6412b122004-06-05 19:34:28 +0000271 if chunk:
272 bufs.append(chunk)
273 else:
274 break
Guido van Rossum776152b2007-05-22 22:44:07 +0000275 self.assertEqual(b'', dco.unconsumed_tail, ########
276 "(B) uct should be b'': not %d long" %
Neil Schemenauer6412b122004-06-05 19:34:28 +0000277 len(dco.unconsumed_tail))
Amaury Forgeot d'Arce43d33a2008-07-02 20:50:16 +0000278 self.assertEqual(b'', dco.unused_data)
Guido van Rossum776152b2007-05-22 22:44:07 +0000279 self.assertEqual(data, b''.join(bufs))
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000280 # Failure means: "decompressobj with init options failed"
281
Neil Schemenauer6412b122004-06-05 19:34:28 +0000282 def test_decompincflush(self):
283 self.test_decompinc(flush=True)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000284
Neil Schemenauer6412b122004-06-05 19:34:28 +0000285 def test_decompimax(self, source=None, cx=256, dcx=64):
286 # compress in steps, decompress in length-restricted steps
287 source = source or HAMLET_SCENE
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000288 # Check a decompression object with max_length specified
Neil Schemenauer6412b122004-06-05 19:34:28 +0000289 data = source * 128
290 co = zlib.compressobj()
291 bufs = []
292 for i in range(0, len(data), cx):
293 bufs.append(co.compress(data[i:i+cx]))
294 bufs.append(co.flush())
Guido van Rossum776152b2007-05-22 22:44:07 +0000295 combuf = b''.join(bufs)
Neil Schemenauer6412b122004-06-05 19:34:28 +0000296 self.assertEqual(data, zlib.decompress(combuf),
297 'compressed data failure')
298
299 dco = zlib.decompressobj()
300 bufs = []
301 cb = combuf
302 while cb:
303 #max_length = 1 + len(cb)//10
304 chunk = dco.decompress(cb, dcx)
Georg Brandlab91fde2009-08-13 08:51:18 +0000305 self.assertFalse(len(chunk) > dcx,
Neil Schemenauer6412b122004-06-05 19:34:28 +0000306 'chunk too big (%d>%d)' % (len(chunk), dcx))
307 bufs.append(chunk)
308 cb = dco.unconsumed_tail
309 bufs.append(dco.flush())
Guido van Rossum776152b2007-05-22 22:44:07 +0000310 self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved')
Neil Schemenauer6412b122004-06-05 19:34:28 +0000311
312 def test_decompressmaxlen(self, flush=False):
313 # Check a decompression object with max_length specified
314 data = HAMLET_SCENE * 128
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000315 co = zlib.compressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000316 bufs = []
317 for i in range(0, len(data), 256):
318 bufs.append(co.compress(data[i:i+256]))
319 bufs.append(co.flush())
Guido van Rossum776152b2007-05-22 22:44:07 +0000320 combuf = b''.join(bufs)
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000321 self.assertEqual(data, zlib.decompress(combuf),
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000322 'compressed data failure')
323
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000324 dco = zlib.decompressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000325 bufs = []
326 cb = combuf
327 while cb:
Guido van Rossumf3594102003-02-27 18:39:18 +0000328 max_length = 1 + len(cb)//10
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000329 chunk = dco.decompress(cb, max_length)
Georg Brandlab91fde2009-08-13 08:51:18 +0000330 self.assertFalse(len(chunk) > max_length,
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000331 'chunk too big (%d>%d)' % (len(chunk),max_length))
332 bufs.append(chunk)
333 cb = dco.unconsumed_tail
Neil Schemenauer6412b122004-06-05 19:34:28 +0000334 if flush:
335 bufs.append(dco.flush())
336 else:
337 while chunk:
Antoine Pitrouc8428d32009-12-14 18:23:30 +0000338 chunk = dco.decompress(b'', max_length)
Georg Brandlab91fde2009-08-13 08:51:18 +0000339 self.assertFalse(len(chunk) > max_length,
Neil Schemenauer6412b122004-06-05 19:34:28 +0000340 'chunk too big (%d>%d)' % (len(chunk),max_length))
341 bufs.append(chunk)
Guido van Rossum776152b2007-05-22 22:44:07 +0000342 self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved')
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000343
Neil Schemenauer6412b122004-06-05 19:34:28 +0000344 def test_decompressmaxlenflush(self):
345 self.test_decompressmaxlen(flush=True)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000346
347 def test_maxlenmisc(self):
348 # Misc tests of max_length
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000349 dco = zlib.decompressobj()
Antoine Pitrouc8428d32009-12-14 18:23:30 +0000350 self.assertRaises(ValueError, dco.decompress, b"", -1)
Guido van Rossum776152b2007-05-22 22:44:07 +0000351 self.assertEqual(b'', dco.unconsumed_tail)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000352
Nadeem Vawda7619e882011-05-14 14:05:20 +0200353 def test_clear_unconsumed_tail(self):
354 # Issue #12050: calling decompress() without providing max_length
355 # should clear the unconsumed_tail attribute.
356 cdata = b"x\x9cKLJ\x06\x00\x02M\x01" # "abc"
357 dco = zlib.decompressobj()
358 ddata = dco.decompress(cdata, 1)
359 ddata += dco.decompress(dco.unconsumed_tail)
360 self.assertEqual(dco.unconsumed_tail, b"")
361
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000362 def test_flushes(self):
363 # Test flush() with the various options, using all the
364 # different levels in order to provide more variations.
365 sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH']
366 sync_opt = [getattr(zlib, opt) for opt in sync_opt
367 if hasattr(zlib, opt)]
Neil Schemenauer6412b122004-06-05 19:34:28 +0000368 data = HAMLET_SCENE * 8
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000369
370 for sync in sync_opt:
371 for level in range(10):
372 obj = zlib.compressobj( level )
373 a = obj.compress( data[:3000] )
374 b = obj.flush( sync )
375 c = obj.compress( data[3000:] )
376 d = obj.flush()
Guido van Rossum776152b2007-05-22 22:44:07 +0000377 self.assertEqual(zlib.decompress(b''.join([a,b,c,d])),
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000378 data, ("Decompress failed: flush "
379 "mode=%i, level=%i") % (sync, level))
380 del obj
381
382 def test_odd_flush(self):
383 # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
384 import random
385
386 if hasattr(zlib, 'Z_SYNC_FLUSH'):
387 # Testing on 17K of "random" data
388
389 # Create compressor and decompressor objects
Neil Schemenauer6412b122004-06-05 19:34:28 +0000390 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000391 dco = zlib.decompressobj()
392
393 # Try 17K of data
394 # generate random data stream
395 try:
396 # In 2.3 and later, WichmannHill is the RNG of the bug report
397 gen = random.WichmannHill()
398 except AttributeError:
399 try:
400 # 2.2 called it Random
401 gen = random.Random()
402 except AttributeError:
403 # others might simply have a single RNG
404 gen = random
405 gen.seed(1)
406 data = genblock(1, 17 * 1024, generator=gen)
407
408 # compress, sync-flush, and decompress
409 first = co.compress(data)
410 second = co.flush(zlib.Z_SYNC_FLUSH)
411 expanded = dco.decompress(first + second)
412
413 # if decompressed data is different from the input data, choke.
414 self.assertEqual(expanded, data, "17K random source doesn't match")
415
Andrew M. Kuchling3b585b32004-12-28 20:10:48 +0000416 def test_empty_flush(self):
417 # Test that calling .flush() on unused objects works.
418 # (Bug #1083110 -- calling .flush() on decompress objects
419 # caused a core dump.)
420
421 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
Georg Brandlab91fde2009-08-13 08:51:18 +0000422 self.assertTrue(co.flush()) # Returns a zlib header
Andrew M. Kuchling3b585b32004-12-28 20:10:48 +0000423 dco = zlib.decompressobj()
Guido van Rossum776152b2007-05-22 22:44:07 +0000424 self.assertEqual(dco.flush(), b"") # Returns nothing
Tim Peters5a9fb3c2005-01-07 16:01:32 +0000425
Antoine Pitroubbff8cf2010-05-11 23:38:15 +0000426 def test_decompress_incomplete_stream(self):
427 # This is 'foo', deflated
428 x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'
429 # For the record
430 self.assertEqual(zlib.decompress(x), b'foo')
431 self.assertRaises(zlib.error, zlib.decompress, x[:-5])
432 # Omitting the stream end works with decompressor objects
433 # (see issue #8672).
434 dco = zlib.decompressobj()
435 y = dco.decompress(x[:-5])
436 y += dco.flush()
437 self.assertEqual(y, b'foo')
438
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000439 if hasattr(zlib.compressobj(), "copy"):
440 def test_compresscopy(self):
441 # Test copying a compression object
442 data0 = HAMLET_SCENE
Guido van Rossum776152b2007-05-22 22:44:07 +0000443 data1 = bytes(str(HAMLET_SCENE, "ascii").swapcase(), "ascii")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000444 c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
445 bufs0 = []
446 bufs0.append(c0.compress(data0))
Thomas Wouters477c8d52006-05-27 19:21:47 +0000447
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000448 c1 = c0.copy()
449 bufs1 = bufs0[:]
Thomas Wouters477c8d52006-05-27 19:21:47 +0000450
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000451 bufs0.append(c0.compress(data0))
452 bufs0.append(c0.flush())
Guido van Rossum776152b2007-05-22 22:44:07 +0000453 s0 = b''.join(bufs0)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000454
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000455 bufs1.append(c1.compress(data1))
456 bufs1.append(c1.flush())
Guido van Rossum776152b2007-05-22 22:44:07 +0000457 s1 = b''.join(bufs1)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000458
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000459 self.assertEqual(zlib.decompress(s0),data0+data0)
460 self.assertEqual(zlib.decompress(s1),data0+data1)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000461
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000462 def test_badcompresscopy(self):
463 # Test copying a compression object in an inconsistent state
464 c = zlib.compressobj()
465 c.compress(HAMLET_SCENE)
466 c.flush()
467 self.assertRaises(ValueError, c.copy)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000468
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000469 if hasattr(zlib.decompressobj(), "copy"):
470 def test_decompresscopy(self):
471 # Test copying a decompression object
472 data = HAMLET_SCENE
473 comp = zlib.compress(data)
Gregory P. Smith693fc462008-09-06 20:13:06 +0000474 # Test type of return value
Georg Brandlab91fde2009-08-13 08:51:18 +0000475 self.assertTrue(isinstance(comp, bytes))
Thomas Wouters477c8d52006-05-27 19:21:47 +0000476
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000477 d0 = zlib.decompressobj()
478 bufs0 = []
479 bufs0.append(d0.decompress(comp[:32]))
Thomas Wouters477c8d52006-05-27 19:21:47 +0000480
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000481 d1 = d0.copy()
482 bufs1 = bufs0[:]
Thomas Wouters477c8d52006-05-27 19:21:47 +0000483
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000484 bufs0.append(d0.decompress(comp[32:]))
Guido van Rossum776152b2007-05-22 22:44:07 +0000485 s0 = b''.join(bufs0)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000486
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000487 bufs1.append(d1.decompress(comp[32:]))
Guido van Rossum776152b2007-05-22 22:44:07 +0000488 s1 = b''.join(bufs1)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000489
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000490 self.assertEqual(s0,s1)
491 self.assertEqual(s0,data)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000492
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000493 def test_baddecompresscopy(self):
494 # Test copying a compression object in an inconsistent state
495 data = zlib.compress(HAMLET_SCENE)
496 d = zlib.decompressobj()
497 d.decompress(data)
498 d.flush()
499 self.assertRaises(ValueError, d.copy)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000500
Antoine Pitrou4b3fe142010-05-07 17:08:54 +0000501 # Memory use of the following functions takes into account overallocation
502
503 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3)
504 def test_big_compress_buffer(self, size):
505 c = zlib.compressobj(1)
506 compress = lambda s: c.compress(s) + c.flush()
507 self.check_big_compress_buffer(size, compress)
508
509 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2)
510 def test_big_decompress_buffer(self, size):
511 d = zlib.decompressobj()
512 decompress = lambda s: d.decompress(s) + d.flush()
513 self.check_big_decompress_buffer(size, decompress)
514
515
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000516def genblock(seed, length, step=1024, generator=random):
517 """length-byte stream of random data from a seed (in step-byte blocks)."""
518 if seed is not None:
519 generator.seed(seed)
520 randint = generator.randint
521 if length < step or step < 2:
522 step = length
Guido van Rossum776152b2007-05-22 22:44:07 +0000523 blocks = bytes()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000524 for i in range(0, length, step):
Guido van Rossum776152b2007-05-22 22:44:07 +0000525 blocks += bytes(randint(0, 255) for x in range(step))
526 return blocks
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000527
528
529
530def choose_lines(source, number, seed=None, generator=random):
531 """Return a list of number lines randomly chosen from the source"""
532 if seed is not None:
533 generator.seed(seed)
534 sources = source.split('\n')
535 return [generator.choice(sources) for n in range(number)]
536
537
538
Guido van Rossum776152b2007-05-22 22:44:07 +0000539HAMLET_SCENE = b"""
Fred Drake004d5e62000-10-23 17:22:08 +0000540LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000541
542 O, fear me not.
543 I stay too long: but here my father comes.
544
545 Enter POLONIUS
546
547 A double blessing is a double grace,
548 Occasion smiles upon a second leave.
549
Fred Drake004d5e62000-10-23 17:22:08 +0000550LORD POLONIUS
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000551
552 Yet here, Laertes! aboard, aboard, for shame!
553 The wind sits in the shoulder of your sail,
554 And you are stay'd for. There; my blessing with thee!
555 And these few precepts in thy memory
556 See thou character. Give thy thoughts no tongue,
557 Nor any unproportioned thought his act.
558 Be thou familiar, but by no means vulgar.
559 Those friends thou hast, and their adoption tried,
560 Grapple them to thy soul with hoops of steel;
561 But do not dull thy palm with entertainment
562 Of each new-hatch'd, unfledged comrade. Beware
563 Of entrance to a quarrel, but being in,
564 Bear't that the opposed may beware of thee.
565 Give every man thy ear, but few thy voice;
566 Take each man's censure, but reserve thy judgment.
567 Costly thy habit as thy purse can buy,
568 But not express'd in fancy; rich, not gaudy;
569 For the apparel oft proclaims the man,
570 And they in France of the best rank and station
571 Are of a most select and generous chief in that.
572 Neither a borrower nor a lender be;
573 For loan oft loses both itself and friend,
574 And borrowing dulls the edge of husbandry.
575 This above all: to thine ownself be true,
576 And it must follow, as the night the day,
577 Thou canst not then be false to any man.
578 Farewell: my blessing season this in thee!
579
Fred Drake004d5e62000-10-23 17:22:08 +0000580LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000581
582 Most humbly do I take my leave, my lord.
583
Fred Drake004d5e62000-10-23 17:22:08 +0000584LORD POLONIUS
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000585
586 The time invites you; go; your servants tend.
587
Fred Drake004d5e62000-10-23 17:22:08 +0000588LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000589
590 Farewell, Ophelia; and remember well
591 What I have said to you.
592
Fred Drake004d5e62000-10-23 17:22:08 +0000593OPHELIA
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000594
595 'Tis in my memory lock'd,
596 And you yourself shall keep the key of it.
597
Fred Drake004d5e62000-10-23 17:22:08 +0000598LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000599
600 Farewell.
601"""
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000602
603
604def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000605 support.run_unittest(
Walter Dörwald21d3a322003-05-01 17:45:56 +0000606 ChecksumTestCase,
Antoine Pitrou9e719b62011-02-28 23:48:16 +0000607 ChecksumBigBufferTestCase,
Walter Dörwald21d3a322003-05-01 17:45:56 +0000608 ExceptionTestCase,
609 CompressTestCase,
610 CompressObjectTestCase
611 )
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000612
613if __name__ == "__main__":
Guido van Rossum776152b2007-05-22 22:44:07 +0000614 unittest.main() # XXX
615 ###test_main()