blob: c556060d1374ce5b4cbfc0decc94b09eb234a9f2 [file] [log] [blame]
Guido van Rossum7d9ea502003-02-03 20:45:52 +00001import unittest
Martin Panter1e411c52016-07-23 04:22:09 +00002from test import test_support as support
Victor Stinnerf9fb4342011-05-03 15:19:23 +02003from test.test_support import TESTFN, run_unittest, import_module, unlink, requires
Gregory P. Smithc856fa82008-03-18 22:27:41 +00004import binascii
Serhiy Storchaka76e6cc12015-11-12 11:36:42 +02005import pickle
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +00006import random
Victor Stinner7fd90c42011-05-04 21:27:39 +02007from test.test_support import precisionbigmemtest, _1G, _4G
Victor Stinnerf9fb4342011-05-03 15:19:23 +02008import sys
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +00009
Victor Stinnerf9fb4342011-05-03 15:19:23 +020010try:
11 import mmap
12except ImportError:
13 mmap = None
14
15zlib = import_module('zlib')
R. David Murray3db8a342009-03-30 23:05:48 +000016
Serhiy Storchaka32e23e72013-11-03 23:15:46 +020017requires_Compress_copy = unittest.skipUnless(
18 hasattr(zlib.compressobj(), "copy"),
19 'requires Compress.copy()')
20requires_Decompress_copy = unittest.skipUnless(
21 hasattr(zlib.decompressobj(), "copy"),
22 'requires Decompress.copy()')
23
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +000024
Guido van Rossum7d9ea502003-02-03 20:45:52 +000025class ChecksumTestCase(unittest.TestCase):
26 # checksum test cases
27 def test_crc32start(self):
28 self.assertEqual(zlib.crc32(""), zlib.crc32("", 0))
Benjamin Peterson5c8da862009-06-30 22:57:08 +000029 self.assertTrue(zlib.crc32("abc", 0xffffffff))
Andrew M. Kuchlingfcfc8d52001-08-10 15:50:11 +000030
Guido van Rossum7d9ea502003-02-03 20:45:52 +000031 def test_crc32empty(self):
32 self.assertEqual(zlib.crc32("", 0), 0)
33 self.assertEqual(zlib.crc32("", 1), 1)
34 self.assertEqual(zlib.crc32("", 432), 432)
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +000035
Guido van Rossum7d9ea502003-02-03 20:45:52 +000036 def test_adler32start(self):
37 self.assertEqual(zlib.adler32(""), zlib.adler32("", 1))
Benjamin Peterson5c8da862009-06-30 22:57:08 +000038 self.assertTrue(zlib.adler32("abc", 0xffffffff))
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +000039
Guido van Rossum7d9ea502003-02-03 20:45:52 +000040 def test_adler32empty(self):
41 self.assertEqual(zlib.adler32("", 0), 0)
42 self.assertEqual(zlib.adler32("", 1), 1)
43 self.assertEqual(zlib.adler32("", 432), 432)
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +000044
Guido van Rossum7d9ea502003-02-03 20:45:52 +000045 def assertEqual32(self, seen, expected):
46 # 32-bit values masked -- checksums on 32- vs 64- bit machines
47 # This is important if bit 31 (0x08000000L) is set.
48 self.assertEqual(seen & 0x0FFFFFFFFL, expected & 0x0FFFFFFFFL)
49
50 def test_penguins(self):
51 self.assertEqual32(zlib.crc32("penguin", 0), 0x0e5c1a120L)
52 self.assertEqual32(zlib.crc32("penguin", 1), 0x43b6aa94)
53 self.assertEqual32(zlib.adler32("penguin", 0), 0x0bcf02f6)
54 self.assertEqual32(zlib.adler32("penguin", 1), 0x0bd602f7)
55
56 self.assertEqual(zlib.crc32("penguin"), zlib.crc32("penguin", 0))
57 self.assertEqual(zlib.adler32("penguin"),zlib.adler32("penguin",1))
58
Gregory P. Smithf48f9d32008-03-17 18:48:05 +000059 def test_abcdefghijklmnop(self):
60 """test issue1202 compliance: signed crc32, adler32 in 2.x"""
61 foo = 'abcdefghijklmnop'
62 # explicitly test signed behavior
63 self.assertEqual(zlib.crc32(foo), -1808088941)
64 self.assertEqual(zlib.crc32('spam'), 1138425661)
65 self.assertEqual(zlib.adler32(foo+foo), -721416943)
66 self.assertEqual(zlib.adler32('spam'), 72286642)
67
Gregory P. Smithc856fa82008-03-18 22:27:41 +000068 def test_same_as_binascii_crc32(self):
69 foo = 'abcdefghijklmnop'
70 self.assertEqual(binascii.crc32(foo), zlib.crc32(foo))
71 self.assertEqual(binascii.crc32('spam'), zlib.crc32('spam'))
72
Gregory P. Smith88440962008-03-25 06:12:45 +000073 def test_negative_crc_iv_input(self):
74 # The range of valid input values for the crc state should be
75 # -2**31 through 2**32-1 to allow inputs artifically constrained
76 # to a signed 32-bit integer.
77 self.assertEqual(zlib.crc32('ham', -1), zlib.crc32('ham', 0xffffffffL))
78 self.assertEqual(zlib.crc32('spam', -3141593),
79 zlib.crc32('spam', 0xffd01027L))
80 self.assertEqual(zlib.crc32('spam', -(2**31)),
81 zlib.crc32('spam', (2**31)))
Guido van Rossum7d9ea502003-02-03 20:45:52 +000082
83
Martin Panter1e411c52016-07-23 04:22:09 +000084# Issue #10276 - check that inputs >=4GB are handled correctly.
85class ChecksumBigBufferTestCase(unittest.TestCase):
86
87 @precisionbigmemtest(size=_4G + 4, memuse=1, dry_run=False)
88 def test_big_buffer(self, size):
89 data = b"nyan" * (_1G + 1)
90 self.assertEqual(zlib.crc32(data) & 0xFFFFFFFF, 1044521549)
91 self.assertEqual(zlib.adler32(data) & 0xFFFFFFFF, 2256789997)
92
93
Guido van Rossum7d9ea502003-02-03 20:45:52 +000094class ExceptionTestCase(unittest.TestCase):
95 # make sure we generate some expected errors
Armin Rigoec560192007-10-15 07:48:35 +000096 def test_badlevel(self):
97 # specifying compression level out of range causes an error
98 # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib
99 # accepts 0 too)
100 self.assertRaises(zlib.error, zlib.compress, 'ERROR', 10)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000101
102 def test_badcompressobj(self):
103 # verify failure on building compress object with bad params
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000104 self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0)
Armin Rigoec560192007-10-15 07:48:35 +0000105 # specifying total bits too large causes an error
106 self.assertRaises(ValueError,
107 zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000108
109 def test_baddecompressobj(self):
110 # verify failure on building decompress object with bad params
Antoine Pitrou3b4c9892010-04-06 17:21:09 +0000111 self.assertRaises(ValueError, zlib.decompressobj, -1)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000112
Gregory P. Smith79e42a02008-04-09 00:25:17 +0000113 def test_decompressobj_badflush(self):
114 # verify failure on calling decompressobj.flush with bad params
115 self.assertRaises(ValueError, zlib.decompressobj().flush, 0)
116 self.assertRaises(ValueError, zlib.decompressobj().flush, -1)
117
Martin Panter1e411c52016-07-23 04:22:09 +0000118 @support.cpython_only
119 def test_overflow(self):
120 with self.assertRaisesRegexp(OverflowError, 'int too large'):
121 zlib.decompress(b'', 15, sys.maxsize + 1)
122 with self.assertRaisesRegexp(OverflowError, 'int too large'):
123 zlib.decompressobj().decompress(b'', sys.maxsize + 1)
124 with self.assertRaisesRegexp(OverflowError, 'int too large'):
125 zlib.decompressobj().flush(sys.maxsize + 1)
126
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000127
Antoine Pitrou3843cd82010-05-07 16:50:34 +0000128class BaseCompressTestCase(object):
129 def check_big_compress_buffer(self, size, compress_func):
130 _1M = 1024 * 1024
131 fmt = "%%0%dx" % (2 * _1M)
132 # Generate 10MB worth of random, and expand it by repeating it.
133 # The assumption is that zlib's memory is not big enough to exploit
134 # such spread out redundancy.
135 data = ''.join([binascii.a2b_hex(fmt % random.getrandbits(8 * _1M))
136 for i in range(10)])
137 data = data * (size // len(data) + 1)
138 try:
139 compress_func(data)
140 finally:
141 # Release memory
142 data = None
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000143
Antoine Pitrou3843cd82010-05-07 16:50:34 +0000144 def check_big_decompress_buffer(self, size, decompress_func):
145 data = 'x' * size
146 try:
147 compressed = zlib.compress(data, 1)
148 finally:
149 # Release memory
150 data = None
151 data = decompress_func(compressed)
152 # Sanity check
153 try:
154 self.assertEqual(len(data), size)
155 self.assertEqual(len(data.strip('x')), 0)
156 finally:
157 data = None
158
159
160class CompressTestCase(BaseCompressTestCase, unittest.TestCase):
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000161 # Test compression in one go (whole message compression)
162 def test_speech(self):
Neil Schemenauer6412b122004-06-05 19:34:28 +0000163 x = zlib.compress(HAMLET_SCENE)
164 self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000165
166 def test_speech128(self):
Neil Schemenauer6412b122004-06-05 19:34:28 +0000167 # compress more data
168 data = HAMLET_SCENE * 128
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000169 x = zlib.compress(data)
170 self.assertEqual(zlib.decompress(x), data)
171
Antoine Pitroufc3bfad2010-05-11 23:42:28 +0000172 def test_incomplete_stream(self):
Martin Panter6a8163a2016-04-15 02:14:19 +0000173 # A useful error message is given
Antoine Pitroufc3bfad2010-05-11 23:42:28 +0000174 x = zlib.compress(HAMLET_SCENE)
175 self.assertRaisesRegexp(zlib.error,
176 "Error -5 while decompressing data: incomplete or truncated stream",
177 zlib.decompress, x[:-1])
178
Antoine Pitrou3843cd82010-05-07 16:50:34 +0000179 # Memory use of the following functions takes into account overallocation
180
181 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3)
182 def test_big_compress_buffer(self, size):
183 compress = lambda s: zlib.compress(s, 1)
184 self.check_big_compress_buffer(size, compress)
185
186 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2)
187 def test_big_decompress_buffer(self, size):
188 self.check_big_decompress_buffer(size, zlib.decompress)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000189
Martin Panter1e411c52016-07-23 04:22:09 +0000190 @precisionbigmemtest(size=_4G, memuse=1)
191 def test_large_bufsize(self, size):
192 # Test decompress(bufsize) parameter greater than the internal limit
193 data = HAMLET_SCENE * 10
194 compressed = zlib.compress(data, 1)
195 self.assertEqual(zlib.decompress(compressed, 15, size), data)
196
197 def test_custom_bufsize(self):
198 data = HAMLET_SCENE * 10
199 compressed = zlib.compress(data, 1)
200 self.assertEqual(zlib.decompress(compressed, 15, CustomInt()), data)
201
202 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
203 @precisionbigmemtest(size=_4G + 100, memuse=4)
204 def test_64bit_compress(self, size):
205 data = b'x' * size
206 try:
207 comp = zlib.compress(data, 0)
208 self.assertEqual(zlib.decompress(comp), data)
209 finally:
210 comp = data = None
211
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000212
Antoine Pitrou3843cd82010-05-07 16:50:34 +0000213class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000214 # Test compression object
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000215 def test_pair(self):
Neil Schemenauer6412b122004-06-05 19:34:28 +0000216 # straightforward compress/decompress objects
217 data = HAMLET_SCENE * 128
218 co = zlib.compressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000219 x1 = co.compress(data)
220 x2 = co.flush()
221 self.assertRaises(zlib.error, co.flush) # second flush should not work
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000222 dco = zlib.decompressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000223 y1 = dco.decompress(x1 + x2)
224 y2 = dco.flush()
225 self.assertEqual(data, y1 + y2)
226
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000227 def test_compressoptions(self):
228 # specify lots of options to compressobj()
229 level = 2
230 method = zlib.DEFLATED
231 wbits = -12
232 memlevel = 9
233 strategy = zlib.Z_FILTERED
234 co = zlib.compressobj(level, method, wbits, memlevel, strategy)
Neil Schemenauer6412b122004-06-05 19:34:28 +0000235 x1 = co.compress(HAMLET_SCENE)
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000236 x2 = co.flush()
237 dco = zlib.decompressobj(wbits)
238 y1 = dco.decompress(x1 + x2)
239 y2 = dco.flush()
Neil Schemenauer6412b122004-06-05 19:34:28 +0000240 self.assertEqual(HAMLET_SCENE, y1 + y2)
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000241
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000242 def test_compressincremental(self):
243 # compress object in steps, decompress object as one-shot
Neil Schemenauer6412b122004-06-05 19:34:28 +0000244 data = HAMLET_SCENE * 128
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000245 co = zlib.compressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000246 bufs = []
247 for i in range(0, len(data), 256):
248 bufs.append(co.compress(data[i:i+256]))
249 bufs.append(co.flush())
250 combuf = ''.join(bufs)
251
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000252 dco = zlib.decompressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000253 y1 = dco.decompress(''.join(bufs))
254 y2 = dco.flush()
255 self.assertEqual(data, y1 + y2)
256
Neil Schemenauer6412b122004-06-05 19:34:28 +0000257 def test_decompinc(self, flush=False, source=None, cx=256, dcx=64):
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000258 # compress object in steps, decompress object in steps
Neil Schemenauer6412b122004-06-05 19:34:28 +0000259 source = source or HAMLET_SCENE
260 data = source * 128
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000261 co = zlib.compressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000262 bufs = []
Neil Schemenauer6412b122004-06-05 19:34:28 +0000263 for i in range(0, len(data), cx):
264 bufs.append(co.compress(data[i:i+cx]))
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000265 bufs.append(co.flush())
266 combuf = ''.join(bufs)
267
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000268 self.assertEqual(data, zlib.decompress(combuf))
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000269
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000270 dco = zlib.decompressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000271 bufs = []
Neil Schemenauer6412b122004-06-05 19:34:28 +0000272 for i in range(0, len(combuf), dcx):
273 bufs.append(dco.decompress(combuf[i:i+dcx]))
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000274 self.assertEqual('', dco.unconsumed_tail, ########
275 "(A) uct should be '': not %d long" %
Neil Schemenauer6412b122004-06-05 19:34:28 +0000276 len(dco.unconsumed_tail))
277 if flush:
278 bufs.append(dco.flush())
279 else:
280 while True:
281 chunk = dco.decompress('')
282 if chunk:
283 bufs.append(chunk)
284 else:
285 break
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000286 self.assertEqual('', dco.unconsumed_tail, ########
Neil Schemenauer6412b122004-06-05 19:34:28 +0000287 "(B) uct should be '': not %d long" %
288 len(dco.unconsumed_tail))
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000289 self.assertEqual(data, ''.join(bufs))
290 # Failure means: "decompressobj with init options failed"
291
Neil Schemenauer6412b122004-06-05 19:34:28 +0000292 def test_decompincflush(self):
293 self.test_decompinc(flush=True)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000294
Neil Schemenauer6412b122004-06-05 19:34:28 +0000295 def test_decompimax(self, source=None, cx=256, dcx=64):
296 # compress in steps, decompress in length-restricted steps
297 source = source or HAMLET_SCENE
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000298 # Check a decompression object with max_length specified
Neil Schemenauer6412b122004-06-05 19:34:28 +0000299 data = source * 128
300 co = zlib.compressobj()
301 bufs = []
302 for i in range(0, len(data), cx):
303 bufs.append(co.compress(data[i:i+cx]))
304 bufs.append(co.flush())
305 combuf = ''.join(bufs)
306 self.assertEqual(data, zlib.decompress(combuf),
307 'compressed data failure')
308
309 dco = zlib.decompressobj()
310 bufs = []
311 cb = combuf
312 while cb:
313 #max_length = 1 + len(cb)//10
314 chunk = dco.decompress(cb, dcx)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000315 self.assertFalse(len(chunk) > dcx,
Neil Schemenauer6412b122004-06-05 19:34:28 +0000316 'chunk too big (%d>%d)' % (len(chunk), dcx))
317 bufs.append(chunk)
318 cb = dco.unconsumed_tail
319 bufs.append(dco.flush())
320 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
321
322 def test_decompressmaxlen(self, flush=False):
323 # Check a decompression object with max_length specified
324 data = HAMLET_SCENE * 128
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000325 co = zlib.compressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000326 bufs = []
327 for i in range(0, len(data), 256):
328 bufs.append(co.compress(data[i:i+256]))
329 bufs.append(co.flush())
330 combuf = ''.join(bufs)
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000331 self.assertEqual(data, zlib.decompress(combuf),
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000332 'compressed data failure')
333
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000334 dco = zlib.decompressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000335 bufs = []
336 cb = combuf
337 while cb:
Guido van Rossumf3594102003-02-27 18:39:18 +0000338 max_length = 1 + len(cb)//10
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000339 chunk = dco.decompress(cb, max_length)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000340 self.assertFalse(len(chunk) > max_length,
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000341 'chunk too big (%d>%d)' % (len(chunk),max_length))
342 bufs.append(chunk)
343 cb = dco.unconsumed_tail
Neil Schemenauer6412b122004-06-05 19:34:28 +0000344 if flush:
345 bufs.append(dco.flush())
346 else:
347 while chunk:
348 chunk = dco.decompress('', max_length)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000349 self.assertFalse(len(chunk) > max_length,
Neil Schemenauer6412b122004-06-05 19:34:28 +0000350 'chunk too big (%d>%d)' % (len(chunk),max_length))
351 bufs.append(chunk)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000352 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
353
Neil Schemenauer6412b122004-06-05 19:34:28 +0000354 def test_decompressmaxlenflush(self):
355 self.test_decompressmaxlen(flush=True)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000356
357 def test_maxlenmisc(self):
358 # Misc tests of max_length
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000359 dco = zlib.decompressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000360 self.assertRaises(ValueError, dco.decompress, "", -1)
361 self.assertEqual('', dco.unconsumed_tail)
362
Martin Panter1e411c52016-07-23 04:22:09 +0000363 def test_maxlen_large(self):
364 # Sizes up to sys.maxsize should be accepted, although zlib is
365 # internally limited to expressing sizes with unsigned int
366 data = HAMLET_SCENE * 10
367 DEFAULTALLOC = 16 * 1024
368 self.assertGreater(len(data), DEFAULTALLOC)
369 compressed = zlib.compress(data, 1)
370 dco = zlib.decompressobj()
371 self.assertEqual(dco.decompress(compressed, sys.maxsize), data)
372
373 def test_maxlen_custom(self):
374 data = HAMLET_SCENE * 10
375 compressed = zlib.compress(data, 1)
376 dco = zlib.decompressobj()
377 self.assertEqual(dco.decompress(compressed, CustomInt()), data[:100])
378
Nadeem Vawda0cc4fd92011-05-14 14:29:07 +0200379 def test_clear_unconsumed_tail(self):
380 # Issue #12050: calling decompress() without providing max_length
381 # should clear the unconsumed_tail attribute.
382 cdata = "x\x9cKLJ\x06\x00\x02M\x01" # "abc"
383 dco = zlib.decompressobj()
384 ddata = dco.decompress(cdata, 1)
385 ddata += dco.decompress(dco.unconsumed_tail)
386 self.assertEqual(dco.unconsumed_tail, "")
387
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000388 def test_flushes(self):
389 # Test flush() with the various options, using all the
390 # different levels in order to provide more variations.
391 sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH']
392 sync_opt = [getattr(zlib, opt) for opt in sync_opt
393 if hasattr(zlib, opt)]
Neil Schemenauer6412b122004-06-05 19:34:28 +0000394 data = HAMLET_SCENE * 8
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000395
396 for sync in sync_opt:
397 for level in range(10):
398 obj = zlib.compressobj( level )
399 a = obj.compress( data[:3000] )
400 b = obj.flush( sync )
401 c = obj.compress( data[3000:] )
402 d = obj.flush()
403 self.assertEqual(zlib.decompress(''.join([a,b,c,d])),
404 data, ("Decompress failed: flush "
405 "mode=%i, level=%i") % (sync, level))
406 del obj
407
Serhiy Storchaka32e23e72013-11-03 23:15:46 +0200408 @unittest.skipUnless(hasattr(zlib, 'Z_SYNC_FLUSH'),
409 'requires zlib.Z_SYNC_FLUSH')
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000410 def test_odd_flush(self):
411 # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
412 import random
Serhiy Storchaka32e23e72013-11-03 23:15:46 +0200413 # Testing on 17K of "random" data
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000414
Serhiy Storchaka32e23e72013-11-03 23:15:46 +0200415 # Create compressor and decompressor objects
416 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
417 dco = zlib.decompressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000418
Serhiy Storchaka32e23e72013-11-03 23:15:46 +0200419 # Try 17K of data
420 # generate random data stream
421 try:
422 # In 2.3 and later, WichmannHill is the RNG of the bug report
423 gen = random.WichmannHill()
424 except AttributeError:
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000425 try:
Serhiy Storchaka32e23e72013-11-03 23:15:46 +0200426 # 2.2 called it Random
427 gen = random.Random()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000428 except AttributeError:
Serhiy Storchaka32e23e72013-11-03 23:15:46 +0200429 # others might simply have a single RNG
430 gen = random
431 gen.seed(1)
432 data = genblock(1, 17 * 1024, generator=gen)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000433
Serhiy Storchaka32e23e72013-11-03 23:15:46 +0200434 # compress, sync-flush, and decompress
435 first = co.compress(data)
436 second = co.flush(zlib.Z_SYNC_FLUSH)
437 expanded = dco.decompress(first + second)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000438
Serhiy Storchaka32e23e72013-11-03 23:15:46 +0200439 # if decompressed data is different from the input data, choke.
440 self.assertEqual(expanded, data, "17K random source doesn't match")
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000441
Andrew M. Kuchling3b585b32004-12-28 20:10:48 +0000442 def test_empty_flush(self):
443 # Test that calling .flush() on unused objects works.
444 # (Bug #1083110 -- calling .flush() on decompress objects
445 # caused a core dump.)
446
447 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000448 self.assertTrue(co.flush()) # Returns a zlib header
Andrew M. Kuchling3b585b32004-12-28 20:10:48 +0000449 dco = zlib.decompressobj()
450 self.assertEqual(dco.flush(), "") # Returns nothing
Tim Peters5a9fb3c2005-01-07 16:01:32 +0000451
Antoine Pitrou37ffc3e2010-05-11 23:32:31 +0000452 def test_decompress_incomplete_stream(self):
453 # This is 'foo', deflated
454 x = 'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'
455 # For the record
456 self.assertEqual(zlib.decompress(x), 'foo')
457 self.assertRaises(zlib.error, zlib.decompress, x[:-5])
458 # Omitting the stream end works with decompressor objects
459 # (see issue #8672).
460 dco = zlib.decompressobj()
461 y = dco.decompress(x[:-5])
462 y += dco.flush()
463 self.assertEqual(y, 'foo')
464
Nadeem Vawda3c309702012-11-11 03:14:56 +0100465 def test_flush_with_freed_input(self):
466 # Issue #16411: decompressor accesses input to last decompress() call
467 # in flush(), even if this object has been freed in the meanwhile.
468 input1 = 'abcdefghijklmnopqrstuvwxyz'
469 input2 = 'QWERTYUIOPASDFGHJKLZXCVBNM'
470 data = zlib.compress(input1)
471 dco = zlib.decompressobj()
472 dco.decompress(data, 1)
473 del data
474 data = zlib.compress(input2)
475 self.assertEqual(dco.flush(), input1[1:])
476
Martin Panter1e411c52016-07-23 04:22:09 +0000477 @precisionbigmemtest(size=_4G, memuse=1)
478 def test_flush_large_length(self, size):
479 # Test flush(length) parameter greater than internal limit UINT_MAX
480 input = HAMLET_SCENE * 10
481 data = zlib.compress(input, 1)
482 dco = zlib.decompressobj()
483 dco.decompress(data, 1)
484 self.assertEqual(dco.flush(size), input[1:])
485
486 def test_flush_custom_length(self):
487 input = HAMLET_SCENE * 10
488 data = zlib.compress(input, 1)
489 dco = zlib.decompressobj()
490 dco.decompress(data, 1)
491 self.assertEqual(dco.flush(CustomInt()), input[1:])
492
Serhiy Storchaka32e23e72013-11-03 23:15:46 +0200493 @requires_Compress_copy
494 def test_compresscopy(self):
495 # Test copying a compression object
496 data0 = HAMLET_SCENE
497 data1 = HAMLET_SCENE.swapcase()
498 c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
499 bufs0 = []
500 bufs0.append(c0.compress(data0))
Georg Brandl8d3342b2006-05-16 07:38:27 +0000501
Serhiy Storchaka32e23e72013-11-03 23:15:46 +0200502 c1 = c0.copy()
503 bufs1 = bufs0[:]
Georg Brandl8d3342b2006-05-16 07:38:27 +0000504
Serhiy Storchaka32e23e72013-11-03 23:15:46 +0200505 bufs0.append(c0.compress(data0))
506 bufs0.append(c0.flush())
507 s0 = ''.join(bufs0)
Georg Brandl8d3342b2006-05-16 07:38:27 +0000508
Serhiy Storchaka32e23e72013-11-03 23:15:46 +0200509 bufs1.append(c1.compress(data1))
510 bufs1.append(c1.flush())
511 s1 = ''.join(bufs1)
Georg Brandl8d3342b2006-05-16 07:38:27 +0000512
Serhiy Storchaka32e23e72013-11-03 23:15:46 +0200513 self.assertEqual(zlib.decompress(s0),data0+data0)
514 self.assertEqual(zlib.decompress(s1),data0+data1)
Georg Brandl8d3342b2006-05-16 07:38:27 +0000515
Serhiy Storchaka32e23e72013-11-03 23:15:46 +0200516 @requires_Compress_copy
517 def test_badcompresscopy(self):
518 # Test copying a compression object in an inconsistent state
519 c = zlib.compressobj()
520 c.compress(HAMLET_SCENE)
521 c.flush()
522 self.assertRaises(ValueError, c.copy)
Georg Brandl8d3342b2006-05-16 07:38:27 +0000523
Nadeem Vawda6cad3712012-11-05 00:55:06 +0100524 def test_decompress_unused_data(self):
525 # Repeated calls to decompress() after EOF should accumulate data in
526 # dco.unused_data, instead of just storing the arg to the last call.
Nadeem Vawda252f4dc2012-11-11 02:14:15 +0100527 source = b'abcdefghijklmnopqrstuvwxyz'
528 remainder = b'0123456789'
529 y = zlib.compress(source)
530 x = y + remainder
531 for maxlen in 0, 1000:
532 for step in 1, 2, len(y), len(x):
533 dco = zlib.decompressobj()
534 data = b''
535 for i in range(0, len(x), step):
536 if i < len(y):
537 self.assertEqual(dco.unused_data, b'')
538 if maxlen == 0:
539 data += dco.decompress(x[i : i + step])
540 self.assertEqual(dco.unconsumed_tail, b'')
541 else:
542 data += dco.decompress(
543 dco.unconsumed_tail + x[i : i + step], maxlen)
544 data += dco.flush()
545 self.assertEqual(data, source)
546 self.assertEqual(dco.unconsumed_tail, b'')
547 self.assertEqual(dco.unused_data, remainder)
Nadeem Vawda6cad3712012-11-05 00:55:06 +0100548
Serhiy Storchaka32e23e72013-11-03 23:15:46 +0200549 @requires_Decompress_copy
550 def test_decompresscopy(self):
551 # Test copying a decompression object
552 data = HAMLET_SCENE
553 comp = zlib.compress(data)
Georg Brandl8d3342b2006-05-16 07:38:27 +0000554
Serhiy Storchaka32e23e72013-11-03 23:15:46 +0200555 d0 = zlib.decompressobj()
556 bufs0 = []
557 bufs0.append(d0.decompress(comp[:32]))
Georg Brandl8d3342b2006-05-16 07:38:27 +0000558
Serhiy Storchaka32e23e72013-11-03 23:15:46 +0200559 d1 = d0.copy()
560 bufs1 = bufs0[:]
Georg Brandl8d3342b2006-05-16 07:38:27 +0000561
Serhiy Storchaka32e23e72013-11-03 23:15:46 +0200562 bufs0.append(d0.decompress(comp[32:]))
563 s0 = ''.join(bufs0)
Georg Brandl8d3342b2006-05-16 07:38:27 +0000564
Serhiy Storchaka32e23e72013-11-03 23:15:46 +0200565 bufs1.append(d1.decompress(comp[32:]))
566 s1 = ''.join(bufs1)
Georg Brandl8d3342b2006-05-16 07:38:27 +0000567
Serhiy Storchaka32e23e72013-11-03 23:15:46 +0200568 self.assertEqual(s0,s1)
569 self.assertEqual(s0,data)
Georg Brandl8d3342b2006-05-16 07:38:27 +0000570
Serhiy Storchaka32e23e72013-11-03 23:15:46 +0200571 @requires_Decompress_copy
572 def test_baddecompresscopy(self):
573 # Test copying a compression object in an inconsistent state
574 data = zlib.compress(HAMLET_SCENE)
575 d = zlib.decompressobj()
576 d.decompress(data)
577 d.flush()
578 self.assertRaises(ValueError, d.copy)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000579
Serhiy Storchaka76e6cc12015-11-12 11:36:42 +0200580 def test_compresspickle(self):
581 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
582 with self.assertRaises((TypeError, pickle.PicklingError)):
583 pickle.dumps(zlib.compressobj(zlib.Z_BEST_COMPRESSION), proto)
584
585 def test_decompresspickle(self):
586 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
587 with self.assertRaises((TypeError, pickle.PicklingError)):
588 pickle.dumps(zlib.decompressobj(), proto)
589
Antoine Pitrou3843cd82010-05-07 16:50:34 +0000590 # Memory use of the following functions takes into account overallocation
591
592 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3)
593 def test_big_compress_buffer(self, size):
594 c = zlib.compressobj(1)
595 compress = lambda s: c.compress(s) + c.flush()
596 self.check_big_compress_buffer(size, compress)
597
598 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2)
599 def test_big_decompress_buffer(self, size):
600 d = zlib.decompressobj()
601 decompress = lambda s: d.decompress(s) + d.flush()
602 self.check_big_decompress_buffer(size, decompress)
603
Martin Panter1e411c52016-07-23 04:22:09 +0000604 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
605 @precisionbigmemtest(size=_4G + 100, memuse=4)
606 def test_64bit_compress(self, size):
607 data = b'x' * size
608 co = zlib.compressobj(0)
609 do = zlib.decompressobj()
610 try:
611 comp = co.compress(data) + co.flush()
612 uncomp = do.decompress(comp) + do.flush()
613 self.assertEqual(uncomp, data)
614 finally:
615 comp = uncomp = data = None
616
617 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
618 @precisionbigmemtest(size=_4G + 100, memuse=3)
619 def test_large_unused_data(self, size):
620 data = b'abcdefghijklmnop'
621 unused = b'x' * size
622 comp = zlib.compress(data) + unused
623 do = zlib.decompressobj()
624 try:
625 uncomp = do.decompress(comp) + do.flush()
626 self.assertEqual(unused, do.unused_data)
627 self.assertEqual(uncomp, data)
628 finally:
629 unused = comp = do = None
630
631 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
632 @precisionbigmemtest(size=_4G + 100, memuse=5)
633 def test_large_unconsumed_tail(self, size):
634 data = b'x' * size
635 do = zlib.decompressobj()
636 try:
637 comp = zlib.compress(data, 0)
638 uncomp = do.decompress(comp, 1) + do.flush()
639 self.assertEqual(uncomp, data)
640 self.assertEqual(do.unconsumed_tail, b'')
641 finally:
642 comp = uncomp = data = None
643
Martin Panter9c946bb2016-05-27 07:32:11 +0000644 def test_wbits(self):
645 co = zlib.compressobj(1, zlib.DEFLATED, 15)
646 zlib15 = co.compress(HAMLET_SCENE) + co.flush()
647 self.assertEqual(zlib.decompress(zlib15, 15), HAMLET_SCENE)
Martin Panter9c946bb2016-05-27 07:32:11 +0000648 self.assertEqual(zlib.decompress(zlib15, 32 + 15), HAMLET_SCENE)
649 with self.assertRaisesRegexp(zlib.error, 'invalid window size'):
650 zlib.decompress(zlib15, 14)
651 dco = zlib.decompressobj(32 + 15)
652 self.assertEqual(dco.decompress(zlib15), HAMLET_SCENE)
653 dco = zlib.decompressobj(14)
654 with self.assertRaisesRegexp(zlib.error, 'invalid window size'):
655 dco.decompress(zlib15)
656
657 co = zlib.compressobj(1, zlib.DEFLATED, 9)
658 zlib9 = co.compress(HAMLET_SCENE) + co.flush()
659 self.assertEqual(zlib.decompress(zlib9, 9), HAMLET_SCENE)
660 self.assertEqual(zlib.decompress(zlib9, 15), HAMLET_SCENE)
Martin Panter9c946bb2016-05-27 07:32:11 +0000661 self.assertEqual(zlib.decompress(zlib9, 32 + 9), HAMLET_SCENE)
662 dco = zlib.decompressobj(32 + 9)
663 self.assertEqual(dco.decompress(zlib9), HAMLET_SCENE)
664
665 co = zlib.compressobj(1, zlib.DEFLATED, -15)
666 deflate15 = co.compress(HAMLET_SCENE) + co.flush()
667 self.assertEqual(zlib.decompress(deflate15, -15), HAMLET_SCENE)
668 dco = zlib.decompressobj(-15)
669 self.assertEqual(dco.decompress(deflate15), HAMLET_SCENE)
670
671 co = zlib.compressobj(1, zlib.DEFLATED, -9)
672 deflate9 = co.compress(HAMLET_SCENE) + co.flush()
673 self.assertEqual(zlib.decompress(deflate9, -9), HAMLET_SCENE)
674 self.assertEqual(zlib.decompress(deflate9, -15), HAMLET_SCENE)
675 dco = zlib.decompressobj(-9)
676 self.assertEqual(dco.decompress(deflate9), HAMLET_SCENE)
677
678 co = zlib.compressobj(1, zlib.DEFLATED, 16 + 15)
679 gzip = co.compress(HAMLET_SCENE) + co.flush()
680 self.assertEqual(zlib.decompress(gzip, 16 + 15), HAMLET_SCENE)
681 self.assertEqual(zlib.decompress(gzip, 32 + 15), HAMLET_SCENE)
682 dco = zlib.decompressobj(32 + 15)
683 self.assertEqual(dco.decompress(gzip), HAMLET_SCENE)
684
Antoine Pitrou3843cd82010-05-07 16:50:34 +0000685
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000686def genblock(seed, length, step=1024, generator=random):
687 """length-byte stream of random data from a seed (in step-byte blocks)."""
688 if seed is not None:
689 generator.seed(seed)
690 randint = generator.randint
691 if length < step or step < 2:
692 step = length
693 blocks = []
694 for i in range(0, length, step):
695 blocks.append(''.join([chr(randint(0,255))
696 for x in range(step)]))
697 return ''.join(blocks)[:length]
698
699
700
701def choose_lines(source, number, seed=None, generator=random):
702 """Return a list of number lines randomly chosen from the source"""
703 if seed is not None:
704 generator.seed(seed)
705 sources = source.split('\n')
706 return [generator.choice(sources) for n in range(number)]
707
708
709
Neil Schemenauer6412b122004-06-05 19:34:28 +0000710HAMLET_SCENE = """
Fred Drake004d5e62000-10-23 17:22:08 +0000711LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000712
713 O, fear me not.
714 I stay too long: but here my father comes.
715
716 Enter POLONIUS
717
718 A double blessing is a double grace,
719 Occasion smiles upon a second leave.
720
Fred Drake004d5e62000-10-23 17:22:08 +0000721LORD POLONIUS
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000722
723 Yet here, Laertes! aboard, aboard, for shame!
724 The wind sits in the shoulder of your sail,
725 And you are stay'd for. There; my blessing with thee!
726 And these few precepts in thy memory
727 See thou character. Give thy thoughts no tongue,
728 Nor any unproportioned thought his act.
729 Be thou familiar, but by no means vulgar.
730 Those friends thou hast, and their adoption tried,
731 Grapple them to thy soul with hoops of steel;
732 But do not dull thy palm with entertainment
733 Of each new-hatch'd, unfledged comrade. Beware
734 Of entrance to a quarrel, but being in,
735 Bear't that the opposed may beware of thee.
736 Give every man thy ear, but few thy voice;
737 Take each man's censure, but reserve thy judgment.
738 Costly thy habit as thy purse can buy,
739 But not express'd in fancy; rich, not gaudy;
740 For the apparel oft proclaims the man,
741 And they in France of the best rank and station
742 Are of a most select and generous chief in that.
743 Neither a borrower nor a lender be;
744 For loan oft loses both itself and friend,
745 And borrowing dulls the edge of husbandry.
746 This above all: to thine ownself be true,
747 And it must follow, as the night the day,
748 Thou canst not then be false to any man.
749 Farewell: my blessing season this in thee!
750
Fred Drake004d5e62000-10-23 17:22:08 +0000751LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000752
753 Most humbly do I take my leave, my lord.
754
Fred Drake004d5e62000-10-23 17:22:08 +0000755LORD POLONIUS
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000756
757 The time invites you; go; your servants tend.
758
Fred Drake004d5e62000-10-23 17:22:08 +0000759LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000760
761 Farewell, Ophelia; and remember well
762 What I have said to you.
763
Fred Drake004d5e62000-10-23 17:22:08 +0000764OPHELIA
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000765
766 'Tis in my memory lock'd,
767 And you yourself shall keep the key of it.
768
Fred Drake004d5e62000-10-23 17:22:08 +0000769LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000770
771 Farewell.
772"""
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000773
774
Martin Panter1e411c52016-07-23 04:22:09 +0000775class CustomInt:
776 def __int__(self):
777 return 100
778
779
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000780def test_main():
Victor Stinnerf9fb4342011-05-03 15:19:23 +0200781 run_unittest(
Walter Dörwald21d3a322003-05-01 17:45:56 +0000782 ChecksumTestCase,
Martin Panter1e411c52016-07-23 04:22:09 +0000783 ChecksumBigBufferTestCase,
Walter Dörwald21d3a322003-05-01 17:45:56 +0000784 ExceptionTestCase,
785 CompressTestCase,
786 CompressObjectTestCase
787 )
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000788
789if __name__ == "__main__":
790 test_main()