blob: 42cceafdc9fbd0152cca931c101611e6316eada6 [file] [log] [blame]
Guido van Rossum7d9ea502003-02-03 20:45:52 +00001import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002from test import support
Christian Heimesd5e2b6f2008-03-19 21:50:51 +00003import binascii
Serhiy Storchakad7a44152015-11-12 11:23:04 +02004import pickle
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +00005import random
Antoine Pitrouf3d22752011-02-21 18:09:00 +00006import sys
Antoine Pitrou94190bb2011-10-04 10:22:36 +02007from test.support import bigmemtest, _1G, _4G
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +00008
R. David Murraya21e4ca2009-03-31 23:16:50 +00009zlib = support.import_module('zlib')
10
Serhiy Storchaka43767632013-11-03 21:31:38 +020011requires_Compress_copy = unittest.skipUnless(
12 hasattr(zlib.compressobj(), "copy"),
13 'requires Compress.copy()')
14requires_Decompress_copy = unittest.skipUnless(
15 hasattr(zlib.decompressobj(), "copy"),
16 'requires Decompress.copy()')
17
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +000018
Nadeem Vawda64d25dd2011-09-12 00:04:13 +020019class VersionTestCase(unittest.TestCase):
20
21 def test_library_version(self):
Nadeem Vawda131c7072012-01-25 23:16:50 +020022 # Test that the major version of the actual library in use matches the
23 # major version that we were compiled against. We can't guarantee that
24 # the minor versions will match (even on the machine on which the module
25 # was compiled), and the API is stable between minor versions, so
Nadeem Vawdad770fe42012-01-28 17:32:47 +020026 # testing only the major versions avoids spurious failures.
Nadeem Vawda131c7072012-01-25 23:16:50 +020027 self.assertEqual(zlib.ZLIB_RUNTIME_VERSION[0], zlib.ZLIB_VERSION[0])
Nadeem Vawda64d25dd2011-09-12 00:04:13 +020028
29
Guido van Rossum7d9ea502003-02-03 20:45:52 +000030class ChecksumTestCase(unittest.TestCase):
31 # checksum test cases
32 def test_crc32start(self):
Guido van Rossum776152b2007-05-22 22:44:07 +000033 self.assertEqual(zlib.crc32(b""), zlib.crc32(b"", 0))
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000034 self.assertTrue(zlib.crc32(b"abc", 0xffffffff))
Andrew M. Kuchlingfcfc8d52001-08-10 15:50:11 +000035
Guido van Rossum7d9ea502003-02-03 20:45:52 +000036 def test_crc32empty(self):
Guido van Rossum776152b2007-05-22 22:44:07 +000037 self.assertEqual(zlib.crc32(b"", 0), 0)
38 self.assertEqual(zlib.crc32(b"", 1), 1)
39 self.assertEqual(zlib.crc32(b"", 432), 432)
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +000040
Guido van Rossum7d9ea502003-02-03 20:45:52 +000041 def test_adler32start(self):
Guido van Rossum776152b2007-05-22 22:44:07 +000042 self.assertEqual(zlib.adler32(b""), zlib.adler32(b"", 1))
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000043 self.assertTrue(zlib.adler32(b"abc", 0xffffffff))
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +000044
Guido van Rossum7d9ea502003-02-03 20:45:52 +000045 def test_adler32empty(self):
Guido van Rossum776152b2007-05-22 22:44:07 +000046 self.assertEqual(zlib.adler32(b"", 0), 0)
47 self.assertEqual(zlib.adler32(b"", 1), 1)
48 self.assertEqual(zlib.adler32(b"", 432), 432)
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +000049
Guido van Rossum7d9ea502003-02-03 20:45:52 +000050 def test_penguins(self):
Martin Panterb82032f2015-12-11 05:19:29 +000051 self.assertEqual(zlib.crc32(b"penguin", 0), 0x0e5c1a120)
52 self.assertEqual(zlib.crc32(b"penguin", 1), 0x43b6aa94)
53 self.assertEqual(zlib.adler32(b"penguin", 0), 0x0bcf02f6)
54 self.assertEqual(zlib.adler32(b"penguin", 1), 0x0bd602f7)
Guido van Rossum7d9ea502003-02-03 20:45:52 +000055
Guido van Rossum776152b2007-05-22 22:44:07 +000056 self.assertEqual(zlib.crc32(b"penguin"), zlib.crc32(b"penguin", 0))
57 self.assertEqual(zlib.adler32(b"penguin"),zlib.adler32(b"penguin",1))
Guido van Rossum7d9ea502003-02-03 20:45:52 +000058
Gregory P. Smithab0d8a12008-03-17 20:24:09 +000059 def test_crc32_adler32_unsigned(self):
Antoine Pitrou77b338b2009-12-14 18:00:06 +000060 foo = b'abcdefghijklmnop'
Gregory P. Smithab0d8a12008-03-17 20:24:09 +000061 # explicitly test signed behavior
Gregory P. Smith27275032008-03-20 06:20:09 +000062 self.assertEqual(zlib.crc32(foo), 2486878355)
Antoine Pitrou77b338b2009-12-14 18:00:06 +000063 self.assertEqual(zlib.crc32(b'spam'), 1138425661)
Gregory P. Smithab0d8a12008-03-17 20:24:09 +000064 self.assertEqual(zlib.adler32(foo+foo), 3573550353)
Antoine Pitrou77b338b2009-12-14 18:00:06 +000065 self.assertEqual(zlib.adler32(b'spam'), 72286642)
Gregory P. Smithab0d8a12008-03-17 20:24:09 +000066
Christian Heimesd5e2b6f2008-03-19 21:50:51 +000067 def test_same_as_binascii_crc32(self):
Martin v. Löwis15b16a32008-12-02 06:00:15 +000068 foo = b'abcdefghijklmnop'
Gregory P. Smith27275032008-03-20 06:20:09 +000069 crc = 2486878355
Christian Heimesd5e2b6f2008-03-19 21:50:51 +000070 self.assertEqual(binascii.crc32(foo), crc)
71 self.assertEqual(zlib.crc32(foo), crc)
Martin v. Löwis15b16a32008-12-02 06:00:15 +000072 self.assertEqual(binascii.crc32(b'spam'), zlib.crc32(b'spam'))
Guido van Rossum7d9ea502003-02-03 20:45:52 +000073
74
Victor Stinner8c663fd2017-11-08 14:44:44 -080075# Issue #10276 - check that inputs >=4 GiB are handled correctly.
Antoine Pitrouf3d22752011-02-21 18:09:00 +000076class ChecksumBigBufferTestCase(unittest.TestCase):
77
Nadeem Vawdabc8c8172012-02-23 14:16:15 +020078 @bigmemtest(size=_4G + 4, memuse=1, dry_run=False)
79 def test_big_buffer(self, size):
Nadeem Vawdab063a482012-02-23 13:36:25 +020080 data = b"nyan" * (_1G + 1)
81 self.assertEqual(zlib.crc32(data), 1044521549)
82 self.assertEqual(zlib.adler32(data), 2256789997)
Antoine Pitrouf3d22752011-02-21 18:09:00 +000083
Christian Heimesb186d002008-03-18 15:15:01 +000084
Guido van Rossum7d9ea502003-02-03 20:45:52 +000085class ExceptionTestCase(unittest.TestCase):
86 # make sure we generate some expected errors
Guido van Rossum8ce8a782007-11-01 19:42:39 +000087 def test_badlevel(self):
88 # specifying compression level out of range causes an error
89 # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib
90 # accepts 0 too)
Antoine Pitrou77b338b2009-12-14 18:00:06 +000091 self.assertRaises(zlib.error, zlib.compress, b'ERROR', 10)
92
93 def test_badargs(self):
94 self.assertRaises(TypeError, zlib.adler32)
95 self.assertRaises(TypeError, zlib.crc32)
96 self.assertRaises(TypeError, zlib.compress)
97 self.assertRaises(TypeError, zlib.decompress)
98 for arg in (42, None, '', 'abc', (), []):
99 self.assertRaises(TypeError, zlib.adler32, arg)
100 self.assertRaises(TypeError, zlib.crc32, arg)
101 self.assertRaises(TypeError, zlib.compress, arg)
102 self.assertRaises(TypeError, zlib.decompress, arg)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000103
104 def test_badcompressobj(self):
105 # verify failure on building compress object with bad params
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000106 self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0)
Guido van Rossum8ce8a782007-11-01 19:42:39 +0000107 # specifying total bits too large causes an error
108 self.assertRaises(ValueError,
109 zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000110
111 def test_baddecompressobj(self):
112 # verify failure on building decompress object with bad params
Antoine Pitrou90ee4df2010-04-06 17:23:13 +0000113 self.assertRaises(ValueError, zlib.decompressobj, -1)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000114
Christian Heimes5e696852008-04-09 08:37:03 +0000115 def test_decompressobj_badflush(self):
116 # verify failure on calling decompressobj.flush with bad params
117 self.assertRaises(ValueError, zlib.decompressobj().flush, 0)
118 self.assertRaises(ValueError, zlib.decompressobj().flush, -1)
119
Martin Pantere99e9772015-11-20 08:13:35 +0000120 @support.cpython_only
121 def test_overflow(self):
122 with self.assertRaisesRegex(OverflowError, 'int too large'):
123 zlib.decompress(b'', 15, sys.maxsize + 1)
124 with self.assertRaisesRegex(OverflowError, 'int too large'):
Martin Panter84544c12016-07-23 03:02:07 +0000125 zlib.decompressobj().decompress(b'', sys.maxsize + 1)
126 with self.assertRaisesRegex(OverflowError, 'int too large'):
Martin Pantere99e9772015-11-20 08:13:35 +0000127 zlib.decompressobj().flush(sys.maxsize + 1)
128
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000129
Antoine Pitrou89562712010-05-07 17:04:02 +0000130class BaseCompressTestCase(object):
131 def check_big_compress_buffer(self, size, compress_func):
132 _1M = 1024 * 1024
Victor Stinner8c663fd2017-11-08 14:44:44 -0800133 # Generate 10 MiB worth of random, and expand it by repeating it.
Antoine Pitrou89562712010-05-07 17:04:02 +0000134 # The assumption is that zlib's memory is not big enough to exploit
135 # such spread out redundancy.
136 data = b''.join([random.getrandbits(8 * _1M).to_bytes(_1M, 'little')
137 for i in range(10)])
138 data = data * (size // len(data) + 1)
139 try:
140 compress_func(data)
141 finally:
142 # Release memory
143 data = None
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000144
Antoine Pitrou89562712010-05-07 17:04:02 +0000145 def check_big_decompress_buffer(self, size, decompress_func):
146 data = b'x' * size
147 try:
148 compressed = zlib.compress(data, 1)
149 finally:
150 # Release memory
151 data = None
152 data = decompress_func(compressed)
153 # Sanity check
154 try:
155 self.assertEqual(len(data), size)
156 self.assertEqual(len(data.strip(b'x')), 0)
157 finally:
158 data = None
159
160
161class CompressTestCase(BaseCompressTestCase, unittest.TestCase):
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000162 # Test compression in one go (whole message compression)
163 def test_speech(self):
Neil Schemenauer6412b122004-06-05 19:34:28 +0000164 x = zlib.compress(HAMLET_SCENE)
165 self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000166
Martin Panter1fe0d132016-02-10 10:06:36 +0000167 def test_keywords(self):
Serhiy Storchaka95657cd2016-06-25 22:43:05 +0300168 x = zlib.compress(HAMLET_SCENE, level=3)
Martin Panter1fe0d132016-02-10 10:06:36 +0000169 self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
Serhiy Storchaka95657cd2016-06-25 22:43:05 +0300170 with self.assertRaises(TypeError):
171 zlib.compress(data=HAMLET_SCENE, level=3)
Serhiy Storchaka15f32282016-08-15 10:06:16 +0300172 self.assertEqual(zlib.decompress(x,
173 wbits=zlib.MAX_WBITS,
174 bufsize=zlib.DEF_BUF_SIZE),
175 HAMLET_SCENE)
Martin Panter1fe0d132016-02-10 10:06:36 +0000176
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000177 def test_speech128(self):
Neil Schemenauer6412b122004-06-05 19:34:28 +0000178 # compress more data
179 data = HAMLET_SCENE * 128
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000180 x = zlib.compress(data)
Antoine Pitrou77b338b2009-12-14 18:00:06 +0000181 self.assertEqual(zlib.compress(bytearray(data)), x)
182 for ob in x, bytearray(x):
183 self.assertEqual(zlib.decompress(ob), data)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000184
Antoine Pitrou53b21662010-05-11 23:46:02 +0000185 def test_incomplete_stream(self):
Martin Panter6245cb32016-04-15 02:14:19 +0000186 # A useful error message is given
Antoine Pitrou53b21662010-05-11 23:46:02 +0000187 x = zlib.compress(HAMLET_SCENE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000188 self.assertRaisesRegex(zlib.error,
Antoine Pitrou53b21662010-05-11 23:46:02 +0000189 "Error -5 while decompressing data: incomplete or truncated stream",
190 zlib.decompress, x[:-1])
191
Antoine Pitrou89562712010-05-07 17:04:02 +0000192 # Memory use of the following functions takes into account overallocation
193
Antoine Pitrou94190bb2011-10-04 10:22:36 +0200194 @bigmemtest(size=_1G + 1024 * 1024, memuse=3)
Antoine Pitrou89562712010-05-07 17:04:02 +0000195 def test_big_compress_buffer(self, size):
196 compress = lambda s: zlib.compress(s, 1)
197 self.check_big_compress_buffer(size, compress)
198
Antoine Pitrou94190bb2011-10-04 10:22:36 +0200199 @bigmemtest(size=_1G + 1024 * 1024, memuse=2)
Antoine Pitrou89562712010-05-07 17:04:02 +0000200 def test_big_decompress_buffer(self, size):
201 self.check_big_decompress_buffer(size, zlib.decompress)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000202
Martin Pantere99e9772015-11-20 08:13:35 +0000203 @bigmemtest(size=_4G, memuse=1)
204 def test_large_bufsize(self, size):
205 # Test decompress(bufsize) parameter greater than the internal limit
206 data = HAMLET_SCENE * 10
207 compressed = zlib.compress(data, 1)
208 self.assertEqual(zlib.decompress(compressed, 15, size), data)
209
210 def test_custom_bufsize(self):
211 data = HAMLET_SCENE * 10
212 compressed = zlib.compress(data, 1)
213 self.assertEqual(zlib.decompress(compressed, 15, CustomInt()), data)
214
Martin Panter84544c12016-07-23 03:02:07 +0000215 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
216 @bigmemtest(size=_4G + 100, memuse=4)
217 def test_64bit_compress(self, size):
218 data = b'x' * size
219 try:
220 comp = zlib.compress(data, 0)
221 self.assertEqual(zlib.decompress(comp), data)
222 finally:
223 comp = data = None
224
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000225
Antoine Pitrou89562712010-05-07 17:04:02 +0000226class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000227 # Test compression object
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000228 def test_pair(self):
Neil Schemenauer6412b122004-06-05 19:34:28 +0000229 # straightforward compress/decompress objects
Antoine Pitrou77b338b2009-12-14 18:00:06 +0000230 datasrc = HAMLET_SCENE * 128
231 datazip = zlib.compress(datasrc)
232 # should compress both bytes and bytearray data
233 for data in (datasrc, bytearray(datasrc)):
234 co = zlib.compressobj()
235 x1 = co.compress(data)
236 x2 = co.flush()
237 self.assertRaises(zlib.error, co.flush) # second flush should not work
238 self.assertEqual(x1 + x2, datazip)
239 for v1, v2 in ((x1, x2), (bytearray(x1), bytearray(x2))):
240 dco = zlib.decompressobj()
241 y1 = dco.decompress(v1 + v2)
242 y2 = dco.flush()
243 self.assertEqual(data, y1 + y2)
244 self.assertIsInstance(dco.unconsumed_tail, bytes)
245 self.assertIsInstance(dco.unused_data, bytes)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000246
Serhiy Storchaka15f32282016-08-15 10:06:16 +0300247 def test_keywords(self):
248 level = 2
249 method = zlib.DEFLATED
250 wbits = -12
251 memLevel = 9
252 strategy = zlib.Z_FILTERED
253 co = zlib.compressobj(level=level,
254 method=method,
255 wbits=wbits,
256 memLevel=memLevel,
257 strategy=strategy,
258 zdict=b"")
259 do = zlib.decompressobj(wbits=wbits, zdict=b"")
260 with self.assertRaises(TypeError):
261 co.compress(data=HAMLET_SCENE)
262 with self.assertRaises(TypeError):
263 do.decompress(data=zlib.compress(HAMLET_SCENE))
264 x = co.compress(HAMLET_SCENE) + co.flush()
265 y = do.decompress(x, max_length=len(HAMLET_SCENE)) + do.flush()
266 self.assertEqual(HAMLET_SCENE, y)
267
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000268 def test_compressoptions(self):
269 # specify lots of options to compressobj()
270 level = 2
271 method = zlib.DEFLATED
272 wbits = -12
Martin Panterbf19d162015-09-09 01:01:13 +0000273 memLevel = 9
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000274 strategy = zlib.Z_FILTERED
Martin Panterbf19d162015-09-09 01:01:13 +0000275 co = zlib.compressobj(level, method, wbits, memLevel, strategy)
Neil Schemenauer6412b122004-06-05 19:34:28 +0000276 x1 = co.compress(HAMLET_SCENE)
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000277 x2 = co.flush()
278 dco = zlib.decompressobj(wbits)
279 y1 = dco.decompress(x1 + x2)
280 y2 = dco.flush()
Neil Schemenauer6412b122004-06-05 19:34:28 +0000281 self.assertEqual(HAMLET_SCENE, y1 + y2)
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000282
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000283 def test_compressincremental(self):
284 # compress object in steps, decompress object as one-shot
Neil Schemenauer6412b122004-06-05 19:34:28 +0000285 data = HAMLET_SCENE * 128
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000286 co = zlib.compressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000287 bufs = []
288 for i in range(0, len(data), 256):
289 bufs.append(co.compress(data[i:i+256]))
290 bufs.append(co.flush())
Guido van Rossum776152b2007-05-22 22:44:07 +0000291 combuf = b''.join(bufs)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000292
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000293 dco = zlib.decompressobj()
Guido van Rossum776152b2007-05-22 22:44:07 +0000294 y1 = dco.decompress(b''.join(bufs))
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000295 y2 = dco.flush()
296 self.assertEqual(data, y1 + y2)
297
Neil Schemenauer6412b122004-06-05 19:34:28 +0000298 def test_decompinc(self, flush=False, source=None, cx=256, dcx=64):
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000299 # compress object in steps, decompress object in steps
Neil Schemenauer6412b122004-06-05 19:34:28 +0000300 source = source or HAMLET_SCENE
301 data = source * 128
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000302 co = zlib.compressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000303 bufs = []
Neil Schemenauer6412b122004-06-05 19:34:28 +0000304 for i in range(0, len(data), cx):
305 bufs.append(co.compress(data[i:i+cx]))
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000306 bufs.append(co.flush())
Guido van Rossum776152b2007-05-22 22:44:07 +0000307 combuf = b''.join(bufs)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000308
Gregory P. Smith693fc462008-09-06 20:13:06 +0000309 decombuf = zlib.decompress(combuf)
310 # Test type of return value
Ezio Melottie9615932010-01-24 19:26:24 +0000311 self.assertIsInstance(decombuf, bytes)
Gregory P. Smith693fc462008-09-06 20:13:06 +0000312
313 self.assertEqual(data, decombuf)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000314
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000315 dco = zlib.decompressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000316 bufs = []
Neil Schemenauer6412b122004-06-05 19:34:28 +0000317 for i in range(0, len(combuf), dcx):
318 bufs.append(dco.decompress(combuf[i:i+dcx]))
Guido van Rossum776152b2007-05-22 22:44:07 +0000319 self.assertEqual(b'', dco.unconsumed_tail, ########
320 "(A) uct should be b'': not %d long" %
Neil Schemenauer6412b122004-06-05 19:34:28 +0000321 len(dco.unconsumed_tail))
Amaury Forgeot d'Arce43d33a2008-07-02 20:50:16 +0000322 self.assertEqual(b'', dco.unused_data)
Neil Schemenauer6412b122004-06-05 19:34:28 +0000323 if flush:
324 bufs.append(dco.flush())
325 else:
326 while True:
Antoine Pitrou77b338b2009-12-14 18:00:06 +0000327 chunk = dco.decompress(b'')
Neil Schemenauer6412b122004-06-05 19:34:28 +0000328 if chunk:
329 bufs.append(chunk)
330 else:
331 break
Guido van Rossum776152b2007-05-22 22:44:07 +0000332 self.assertEqual(b'', dco.unconsumed_tail, ########
333 "(B) uct should be b'': not %d long" %
Neil Schemenauer6412b122004-06-05 19:34:28 +0000334 len(dco.unconsumed_tail))
Amaury Forgeot d'Arce43d33a2008-07-02 20:50:16 +0000335 self.assertEqual(b'', dco.unused_data)
Guido van Rossum776152b2007-05-22 22:44:07 +0000336 self.assertEqual(data, b''.join(bufs))
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000337 # Failure means: "decompressobj with init options failed"
338
Neil Schemenauer6412b122004-06-05 19:34:28 +0000339 def test_decompincflush(self):
340 self.test_decompinc(flush=True)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000341
Neil Schemenauer6412b122004-06-05 19:34:28 +0000342 def test_decompimax(self, source=None, cx=256, dcx=64):
343 # compress in steps, decompress in length-restricted steps
344 source = source or HAMLET_SCENE
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000345 # Check a decompression object with max_length specified
Neil Schemenauer6412b122004-06-05 19:34:28 +0000346 data = source * 128
347 co = zlib.compressobj()
348 bufs = []
349 for i in range(0, len(data), cx):
350 bufs.append(co.compress(data[i:i+cx]))
351 bufs.append(co.flush())
Guido van Rossum776152b2007-05-22 22:44:07 +0000352 combuf = b''.join(bufs)
Neil Schemenauer6412b122004-06-05 19:34:28 +0000353 self.assertEqual(data, zlib.decompress(combuf),
354 'compressed data failure')
355
356 dco = zlib.decompressobj()
357 bufs = []
358 cb = combuf
359 while cb:
360 #max_length = 1 + len(cb)//10
361 chunk = dco.decompress(cb, dcx)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000362 self.assertFalse(len(chunk) > dcx,
Neil Schemenauer6412b122004-06-05 19:34:28 +0000363 'chunk too big (%d>%d)' % (len(chunk), dcx))
364 bufs.append(chunk)
365 cb = dco.unconsumed_tail
366 bufs.append(dco.flush())
Guido van Rossum776152b2007-05-22 22:44:07 +0000367 self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved')
Neil Schemenauer6412b122004-06-05 19:34:28 +0000368
369 def test_decompressmaxlen(self, flush=False):
370 # Check a decompression object with max_length specified
371 data = HAMLET_SCENE * 128
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000372 co = zlib.compressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000373 bufs = []
374 for i in range(0, len(data), 256):
375 bufs.append(co.compress(data[i:i+256]))
376 bufs.append(co.flush())
Guido van Rossum776152b2007-05-22 22:44:07 +0000377 combuf = b''.join(bufs)
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000378 self.assertEqual(data, zlib.decompress(combuf),
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000379 'compressed data failure')
380
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000381 dco = zlib.decompressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000382 bufs = []
383 cb = combuf
384 while cb:
Guido van Rossumf3594102003-02-27 18:39:18 +0000385 max_length = 1 + len(cb)//10
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000386 chunk = dco.decompress(cb, max_length)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000387 self.assertFalse(len(chunk) > max_length,
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000388 'chunk too big (%d>%d)' % (len(chunk),max_length))
389 bufs.append(chunk)
390 cb = dco.unconsumed_tail
Neil Schemenauer6412b122004-06-05 19:34:28 +0000391 if flush:
392 bufs.append(dco.flush())
393 else:
394 while chunk:
Antoine Pitrou77b338b2009-12-14 18:00:06 +0000395 chunk = dco.decompress(b'', max_length)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000396 self.assertFalse(len(chunk) > max_length,
Neil Schemenauer6412b122004-06-05 19:34:28 +0000397 'chunk too big (%d>%d)' % (len(chunk),max_length))
398 bufs.append(chunk)
Guido van Rossum776152b2007-05-22 22:44:07 +0000399 self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved')
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000400
Neil Schemenauer6412b122004-06-05 19:34:28 +0000401 def test_decompressmaxlenflush(self):
402 self.test_decompressmaxlen(flush=True)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000403
404 def test_maxlenmisc(self):
405 # Misc tests of max_length
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000406 dco = zlib.decompressobj()
Antoine Pitrou77b338b2009-12-14 18:00:06 +0000407 self.assertRaises(ValueError, dco.decompress, b"", -1)
Guido van Rossum776152b2007-05-22 22:44:07 +0000408 self.assertEqual(b'', dco.unconsumed_tail)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000409
Martin Pantere99e9772015-11-20 08:13:35 +0000410 def test_maxlen_large(self):
411 # Sizes up to sys.maxsize should be accepted, although zlib is
412 # internally limited to expressing sizes with unsigned int
413 data = HAMLET_SCENE * 10
414 self.assertGreater(len(data), zlib.DEF_BUF_SIZE)
415 compressed = zlib.compress(data, 1)
416 dco = zlib.decompressobj()
417 self.assertEqual(dco.decompress(compressed, sys.maxsize), data)
418
419 def test_maxlen_custom(self):
420 data = HAMLET_SCENE * 10
421 compressed = zlib.compress(data, 1)
422 dco = zlib.decompressobj()
423 self.assertEqual(dco.decompress(compressed, CustomInt()), data[:100])
424
Nadeem Vawda7619e882011-05-14 14:05:20 +0200425 def test_clear_unconsumed_tail(self):
426 # Issue #12050: calling decompress() without providing max_length
427 # should clear the unconsumed_tail attribute.
428 cdata = b"x\x9cKLJ\x06\x00\x02M\x01" # "abc"
429 dco = zlib.decompressobj()
430 ddata = dco.decompress(cdata, 1)
431 ddata += dco.decompress(dco.unconsumed_tail)
432 self.assertEqual(dco.unconsumed_tail, b"")
433
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000434 def test_flushes(self):
435 # Test flush() with the various options, using all the
436 # different levels in order to provide more variations.
Miss Islington (bot)c4d77a62018-03-06 21:26:19 -0800437 sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH',
Miss Islington (bot)657fdf22018-09-24 05:43:33 -0700438 'Z_PARTIAL_FLUSH']
439
440 ver = tuple(int(v) for v in zlib.ZLIB_RUNTIME_VERSION.split('.'))
441 # Z_BLOCK has a known failure prior to 1.2.5.3
442 if ver >= (1, 2, 5, 3):
443 sync_opt.append('Z_BLOCK')
444
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000445 sync_opt = [getattr(zlib, opt) for opt in sync_opt
446 if hasattr(zlib, opt)]
Neil Schemenauer6412b122004-06-05 19:34:28 +0000447 data = HAMLET_SCENE * 8
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000448
449 for sync in sync_opt:
450 for level in range(10):
Miss Islington (bot)657fdf22018-09-24 05:43:33 -0700451 try:
452 obj = zlib.compressobj( level )
453 a = obj.compress( data[:3000] )
454 b = obj.flush( sync )
455 c = obj.compress( data[3000:] )
456 d = obj.flush()
457 except:
458 print("Error for flush mode={}, level={}"
459 .format(sync, level))
460 raise
Guido van Rossum776152b2007-05-22 22:44:07 +0000461 self.assertEqual(zlib.decompress(b''.join([a,b,c,d])),
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000462 data, ("Decompress failed: flush "
463 "mode=%i, level=%i") % (sync, level))
464 del obj
465
Serhiy Storchaka43767632013-11-03 21:31:38 +0200466 @unittest.skipUnless(hasattr(zlib, 'Z_SYNC_FLUSH'),
467 'requires zlib.Z_SYNC_FLUSH')
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000468 def test_odd_flush(self):
469 # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
470 import random
Serhiy Storchaka43767632013-11-03 21:31:38 +0200471 # Testing on 17K of "random" data
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000472
Serhiy Storchaka43767632013-11-03 21:31:38 +0200473 # Create compressor and decompressor objects
474 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
475 dco = zlib.decompressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000476
Serhiy Storchaka43767632013-11-03 21:31:38 +0200477 # Try 17K of data
478 # generate random data stream
479 try:
480 # In 2.3 and later, WichmannHill is the RNG of the bug report
481 gen = random.WichmannHill()
482 except AttributeError:
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000483 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +0200484 # 2.2 called it Random
485 gen = random.Random()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000486 except AttributeError:
Serhiy Storchaka43767632013-11-03 21:31:38 +0200487 # others might simply have a single RNG
488 gen = random
489 gen.seed(1)
490 data = genblock(1, 17 * 1024, generator=gen)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000491
Serhiy Storchaka43767632013-11-03 21:31:38 +0200492 # compress, sync-flush, and decompress
493 first = co.compress(data)
494 second = co.flush(zlib.Z_SYNC_FLUSH)
495 expanded = dco.decompress(first + second)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000496
Serhiy Storchaka43767632013-11-03 21:31:38 +0200497 # if decompressed data is different from the input data, choke.
498 self.assertEqual(expanded, data, "17K random source doesn't match")
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000499
Andrew M. Kuchling3b585b32004-12-28 20:10:48 +0000500 def test_empty_flush(self):
501 # Test that calling .flush() on unused objects works.
502 # (Bug #1083110 -- calling .flush() on decompress objects
503 # caused a core dump.)
504
505 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000506 self.assertTrue(co.flush()) # Returns a zlib header
Andrew M. Kuchling3b585b32004-12-28 20:10:48 +0000507 dco = zlib.decompressobj()
Guido van Rossum776152b2007-05-22 22:44:07 +0000508 self.assertEqual(dco.flush(), b"") # Returns nothing
Tim Peters5a9fb3c2005-01-07 16:01:32 +0000509
Nadeem Vawdafd8a8382012-06-21 02:13:12 +0200510 def test_dictionary(self):
511 h = HAMLET_SCENE
Nadeem Vawdacf5e1d82012-06-22 00:35:57 +0200512 # Build a simulated dictionary out of the words in HAMLET.
Nadeem Vawdafd8a8382012-06-21 02:13:12 +0200513 words = h.split()
514 random.shuffle(words)
515 zdict = b''.join(words)
Nadeem Vawdacf5e1d82012-06-22 00:35:57 +0200516 # Use it to compress HAMLET.
Nadeem Vawdafd8a8382012-06-21 02:13:12 +0200517 co = zlib.compressobj(zdict=zdict)
518 cd = co.compress(h) + co.flush()
Nadeem Vawdacf5e1d82012-06-22 00:35:57 +0200519 # Verify that it will decompress with the dictionary.
Nadeem Vawdafd8a8382012-06-21 02:13:12 +0200520 dco = zlib.decompressobj(zdict=zdict)
521 self.assertEqual(dco.decompress(cd) + dco.flush(), h)
Nadeem Vawdacf5e1d82012-06-22 00:35:57 +0200522 # Verify that it fails when not given the dictionary.
Nadeem Vawdafd8a8382012-06-21 02:13:12 +0200523 dco = zlib.decompressobj()
524 self.assertRaises(zlib.error, dco.decompress, cd)
525
526 def test_dictionary_streaming(self):
Nadeem Vawdacf5e1d82012-06-22 00:35:57 +0200527 # This simulates the reuse of a compressor object for compressing
528 # several separate data streams.
Nadeem Vawdafd8a8382012-06-21 02:13:12 +0200529 co = zlib.compressobj(zdict=HAMLET_SCENE)
530 do = zlib.decompressobj(zdict=HAMLET_SCENE)
531 piece = HAMLET_SCENE[1000:1500]
532 d0 = co.compress(piece) + co.flush(zlib.Z_SYNC_FLUSH)
533 d1 = co.compress(piece[100:]) + co.flush(zlib.Z_SYNC_FLUSH)
534 d2 = co.compress(piece[:-100]) + co.flush(zlib.Z_SYNC_FLUSH)
535 self.assertEqual(do.decompress(d0), piece)
536 self.assertEqual(do.decompress(d1), piece[100:])
537 self.assertEqual(do.decompress(d2), piece[:-100])
538
Antoine Pitrouc09c92f2010-05-11 23:36:40 +0000539 def test_decompress_incomplete_stream(self):
540 # This is 'foo', deflated
541 x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'
542 # For the record
543 self.assertEqual(zlib.decompress(x), b'foo')
544 self.assertRaises(zlib.error, zlib.decompress, x[:-5])
545 # Omitting the stream end works with decompressor objects
546 # (see issue #8672).
547 dco = zlib.decompressobj()
548 y = dco.decompress(x[:-5])
549 y += dco.flush()
550 self.assertEqual(y, b'foo')
551
Nadeem Vawda1c385462011-08-13 15:22:40 +0200552 def test_decompress_eof(self):
553 x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' # 'foo'
554 dco = zlib.decompressobj()
555 self.assertFalse(dco.eof)
556 dco.decompress(x[:-5])
557 self.assertFalse(dco.eof)
558 dco.decompress(x[-5:])
559 self.assertTrue(dco.eof)
560 dco.flush()
561 self.assertTrue(dco.eof)
562
563 def test_decompress_eof_incomplete_stream(self):
564 x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' # 'foo'
565 dco = zlib.decompressobj()
566 self.assertFalse(dco.eof)
567 dco.decompress(x[:-5])
568 self.assertFalse(dco.eof)
569 dco.flush()
570 self.assertFalse(dco.eof)
571
Nadeem Vawda39079942012-11-05 00:37:42 +0100572 def test_decompress_unused_data(self):
573 # Repeated calls to decompress() after EOF should accumulate data in
574 # dco.unused_data, instead of just storing the arg to the last call.
Nadeem Vawdaee7889d2012-11-11 02:14:36 +0100575 source = b'abcdefghijklmnopqrstuvwxyz'
576 remainder = b'0123456789'
577 y = zlib.compress(source)
578 x = y + remainder
579 for maxlen in 0, 1000:
580 for step in 1, 2, len(y), len(x):
581 dco = zlib.decompressobj()
582 data = b''
583 for i in range(0, len(x), step):
584 if i < len(y):
585 self.assertEqual(dco.unused_data, b'')
586 if maxlen == 0:
587 data += dco.decompress(x[i : i + step])
588 self.assertEqual(dco.unconsumed_tail, b'')
589 else:
590 data += dco.decompress(
591 dco.unconsumed_tail + x[i : i + step], maxlen)
592 data += dco.flush()
Nadeem Vawdadd1253a2012-11-11 02:21:22 +0100593 self.assertTrue(dco.eof)
Nadeem Vawdaee7889d2012-11-11 02:14:36 +0100594 self.assertEqual(data, source)
595 self.assertEqual(dco.unconsumed_tail, b'')
596 self.assertEqual(dco.unused_data, remainder)
Nadeem Vawda39079942012-11-05 00:37:42 +0100597
Martin Panter3f0ee832016-06-05 10:48:34 +0000598 # issue27164
599 def test_decompress_raw_with_dictionary(self):
600 zdict = b'abcdefghijklmnopqrstuvwxyz'
601 co = zlib.compressobj(wbits=-zlib.MAX_WBITS, zdict=zdict)
602 comp = co.compress(zdict) + co.flush()
603 dco = zlib.decompressobj(wbits=-zlib.MAX_WBITS, zdict=zdict)
604 uncomp = dco.decompress(comp) + dco.flush()
605 self.assertEqual(zdict, uncomp)
606
Nadeem Vawda7ee95552012-11-11 03:15:32 +0100607 def test_flush_with_freed_input(self):
608 # Issue #16411: decompressor accesses input to last decompress() call
609 # in flush(), even if this object has been freed in the meanwhile.
610 input1 = b'abcdefghijklmnopqrstuvwxyz'
611 input2 = b'QWERTYUIOPASDFGHJKLZXCVBNM'
612 data = zlib.compress(input1)
613 dco = zlib.decompressobj()
614 dco.decompress(data, 1)
615 del data
616 data = zlib.compress(input2)
617 self.assertEqual(dco.flush(), input1[1:])
618
Martin Pantere99e9772015-11-20 08:13:35 +0000619 @bigmemtest(size=_4G, memuse=1)
620 def test_flush_large_length(self, size):
621 # Test flush(length) parameter greater than internal limit UINT_MAX
622 input = HAMLET_SCENE * 10
623 data = zlib.compress(input, 1)
624 dco = zlib.decompressobj()
625 dco.decompress(data, 1)
626 self.assertEqual(dco.flush(size), input[1:])
627
628 def test_flush_custom_length(self):
629 input = HAMLET_SCENE * 10
630 data = zlib.compress(input, 1)
631 dco = zlib.decompressobj()
632 dco.decompress(data, 1)
633 self.assertEqual(dco.flush(CustomInt()), input[1:])
634
Serhiy Storchaka43767632013-11-03 21:31:38 +0200635 @requires_Compress_copy
636 def test_compresscopy(self):
637 # Test copying a compression object
638 data0 = HAMLET_SCENE
639 data1 = bytes(str(HAMLET_SCENE, "ascii").swapcase(), "ascii")
640 c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
641 bufs0 = []
642 bufs0.append(c0.compress(data0))
Thomas Wouters477c8d52006-05-27 19:21:47 +0000643
Serhiy Storchaka43767632013-11-03 21:31:38 +0200644 c1 = c0.copy()
645 bufs1 = bufs0[:]
Thomas Wouters477c8d52006-05-27 19:21:47 +0000646
Serhiy Storchaka43767632013-11-03 21:31:38 +0200647 bufs0.append(c0.compress(data0))
648 bufs0.append(c0.flush())
649 s0 = b''.join(bufs0)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000650
Serhiy Storchaka43767632013-11-03 21:31:38 +0200651 bufs1.append(c1.compress(data1))
652 bufs1.append(c1.flush())
653 s1 = b''.join(bufs1)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000654
Serhiy Storchaka43767632013-11-03 21:31:38 +0200655 self.assertEqual(zlib.decompress(s0),data0+data0)
656 self.assertEqual(zlib.decompress(s1),data0+data1)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000657
Serhiy Storchaka43767632013-11-03 21:31:38 +0200658 @requires_Compress_copy
659 def test_badcompresscopy(self):
660 # Test copying a compression object in an inconsistent state
661 c = zlib.compressobj()
662 c.compress(HAMLET_SCENE)
663 c.flush()
664 self.assertRaises(ValueError, c.copy)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000665
Serhiy Storchaka43767632013-11-03 21:31:38 +0200666 @requires_Decompress_copy
667 def test_decompresscopy(self):
668 # Test copying a decompression object
669 data = HAMLET_SCENE
670 comp = zlib.compress(data)
671 # Test type of return value
672 self.assertIsInstance(comp, bytes)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000673
Serhiy Storchaka43767632013-11-03 21:31:38 +0200674 d0 = zlib.decompressobj()
675 bufs0 = []
676 bufs0.append(d0.decompress(comp[:32]))
Thomas Wouters477c8d52006-05-27 19:21:47 +0000677
Serhiy Storchaka43767632013-11-03 21:31:38 +0200678 d1 = d0.copy()
679 bufs1 = bufs0[:]
Thomas Wouters477c8d52006-05-27 19:21:47 +0000680
Serhiy Storchaka43767632013-11-03 21:31:38 +0200681 bufs0.append(d0.decompress(comp[32:]))
682 s0 = b''.join(bufs0)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000683
Serhiy Storchaka43767632013-11-03 21:31:38 +0200684 bufs1.append(d1.decompress(comp[32:]))
685 s1 = b''.join(bufs1)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000686
Serhiy Storchaka43767632013-11-03 21:31:38 +0200687 self.assertEqual(s0,s1)
688 self.assertEqual(s0,data)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000689
Serhiy Storchaka43767632013-11-03 21:31:38 +0200690 @requires_Decompress_copy
691 def test_baddecompresscopy(self):
692 # Test copying a compression object in an inconsistent state
693 data = zlib.compress(HAMLET_SCENE)
694 d = zlib.decompressobj()
695 d.decompress(data)
696 d.flush()
697 self.assertRaises(ValueError, d.copy)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000698
Serhiy Storchakad7a44152015-11-12 11:23:04 +0200699 def test_compresspickle(self):
700 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
701 with self.assertRaises((TypeError, pickle.PicklingError)):
702 pickle.dumps(zlib.compressobj(zlib.Z_BEST_COMPRESSION), proto)
703
704 def test_decompresspickle(self):
705 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
706 with self.assertRaises((TypeError, pickle.PicklingError)):
707 pickle.dumps(zlib.decompressobj(), proto)
708
Antoine Pitrou89562712010-05-07 17:04:02 +0000709 # Memory use of the following functions takes into account overallocation
710
Antoine Pitrou94190bb2011-10-04 10:22:36 +0200711 @bigmemtest(size=_1G + 1024 * 1024, memuse=3)
Antoine Pitrou89562712010-05-07 17:04:02 +0000712 def test_big_compress_buffer(self, size):
713 c = zlib.compressobj(1)
714 compress = lambda s: c.compress(s) + c.flush()
715 self.check_big_compress_buffer(size, compress)
716
Antoine Pitrou94190bb2011-10-04 10:22:36 +0200717 @bigmemtest(size=_1G + 1024 * 1024, memuse=2)
Antoine Pitrou89562712010-05-07 17:04:02 +0000718 def test_big_decompress_buffer(self, size):
719 d = zlib.decompressobj()
720 decompress = lambda s: d.decompress(s) + d.flush()
721 self.check_big_decompress_buffer(size, decompress)
722
Martin Panter84544c12016-07-23 03:02:07 +0000723 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
724 @bigmemtest(size=_4G + 100, memuse=4)
725 def test_64bit_compress(self, size):
Nadeem Vawda0c3d96a2011-05-15 00:19:50 +0200726 data = b'x' * size
Martin Panter84544c12016-07-23 03:02:07 +0000727 co = zlib.compressobj(0)
728 do = zlib.decompressobj()
Nadeem Vawda0c3d96a2011-05-15 00:19:50 +0200729 try:
Martin Panter84544c12016-07-23 03:02:07 +0000730 comp = co.compress(data) + co.flush()
731 uncomp = do.decompress(comp) + do.flush()
732 self.assertEqual(uncomp, data)
Nadeem Vawda0c3d96a2011-05-15 00:19:50 +0200733 finally:
Martin Panter84544c12016-07-23 03:02:07 +0000734 comp = uncomp = data = None
735
736 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
737 @bigmemtest(size=_4G + 100, memuse=3)
738 def test_large_unused_data(self, size):
739 data = b'abcdefghijklmnop'
740 unused = b'x' * size
741 comp = zlib.compress(data) + unused
742 do = zlib.decompressobj()
743 try:
744 uncomp = do.decompress(comp) + do.flush()
745 self.assertEqual(unused, do.unused_data)
746 self.assertEqual(uncomp, data)
747 finally:
748 unused = comp = do = None
749
750 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
751 @bigmemtest(size=_4G + 100, memuse=5)
752 def test_large_unconsumed_tail(self, size):
753 data = b'x' * size
754 do = zlib.decompressobj()
755 try:
756 comp = zlib.compress(data, 0)
757 uncomp = do.decompress(comp, 1) + do.flush()
758 self.assertEqual(uncomp, data)
759 self.assertEqual(do.unconsumed_tail, b'')
760 finally:
761 comp = uncomp = data = None
Nadeem Vawda0c3d96a2011-05-15 00:19:50 +0200762
Martin Panter0fdf41d2016-05-27 07:32:11 +0000763 def test_wbits(self):
Martin Panterc618ae82016-05-27 11:20:21 +0000764 # wbits=0 only supported since zlib v1.2.3.5
765 # Register "1.2.3" as "1.2.3.0"
Miss Islington (bot)0cd35812018-02-18 20:09:59 -0800766 # or "1.2.0-linux","1.2.0.f","1.2.0.f-linux"
767 v = zlib.ZLIB_RUNTIME_VERSION.split('-', 1)[0].split('.')
768 if len(v) < 4:
769 v.append('0')
770 elif not v[-1].isnumeric():
771 v[-1] = '0'
772
773 v = tuple(map(int, v))
774 supports_wbits_0 = v >= (1, 2, 3, 5)
Martin Panterc618ae82016-05-27 11:20:21 +0000775
Martin Panter0fdf41d2016-05-27 07:32:11 +0000776 co = zlib.compressobj(level=1, wbits=15)
777 zlib15 = co.compress(HAMLET_SCENE) + co.flush()
778 self.assertEqual(zlib.decompress(zlib15, 15), HAMLET_SCENE)
Martin Panterc618ae82016-05-27 11:20:21 +0000779 if supports_wbits_0:
780 self.assertEqual(zlib.decompress(zlib15, 0), HAMLET_SCENE)
Martin Panter0fdf41d2016-05-27 07:32:11 +0000781 self.assertEqual(zlib.decompress(zlib15, 32 + 15), HAMLET_SCENE)
782 with self.assertRaisesRegex(zlib.error, 'invalid window size'):
783 zlib.decompress(zlib15, 14)
784 dco = zlib.decompressobj(wbits=32 + 15)
785 self.assertEqual(dco.decompress(zlib15), HAMLET_SCENE)
786 dco = zlib.decompressobj(wbits=14)
787 with self.assertRaisesRegex(zlib.error, 'invalid window size'):
788 dco.decompress(zlib15)
789
790 co = zlib.compressobj(level=1, wbits=9)
791 zlib9 = co.compress(HAMLET_SCENE) + co.flush()
792 self.assertEqual(zlib.decompress(zlib9, 9), HAMLET_SCENE)
793 self.assertEqual(zlib.decompress(zlib9, 15), HAMLET_SCENE)
Martin Panterc618ae82016-05-27 11:20:21 +0000794 if supports_wbits_0:
795 self.assertEqual(zlib.decompress(zlib9, 0), HAMLET_SCENE)
Martin Panter0fdf41d2016-05-27 07:32:11 +0000796 self.assertEqual(zlib.decompress(zlib9, 32 + 9), HAMLET_SCENE)
797 dco = zlib.decompressobj(wbits=32 + 9)
798 self.assertEqual(dco.decompress(zlib9), HAMLET_SCENE)
799
800 co = zlib.compressobj(level=1, wbits=-15)
801 deflate15 = co.compress(HAMLET_SCENE) + co.flush()
802 self.assertEqual(zlib.decompress(deflate15, -15), HAMLET_SCENE)
803 dco = zlib.decompressobj(wbits=-15)
804 self.assertEqual(dco.decompress(deflate15), HAMLET_SCENE)
805
806 co = zlib.compressobj(level=1, wbits=-9)
807 deflate9 = co.compress(HAMLET_SCENE) + co.flush()
808 self.assertEqual(zlib.decompress(deflate9, -9), HAMLET_SCENE)
809 self.assertEqual(zlib.decompress(deflate9, -15), HAMLET_SCENE)
810 dco = zlib.decompressobj(wbits=-9)
811 self.assertEqual(dco.decompress(deflate9), HAMLET_SCENE)
812
813 co = zlib.compressobj(level=1, wbits=16 + 15)
814 gzip = co.compress(HAMLET_SCENE) + co.flush()
815 self.assertEqual(zlib.decompress(gzip, 16 + 15), HAMLET_SCENE)
816 self.assertEqual(zlib.decompress(gzip, 32 + 15), HAMLET_SCENE)
817 dco = zlib.decompressobj(32 + 15)
818 self.assertEqual(dco.decompress(gzip), HAMLET_SCENE)
819
Antoine Pitrou89562712010-05-07 17:04:02 +0000820
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000821def genblock(seed, length, step=1024, generator=random):
822 """length-byte stream of random data from a seed (in step-byte blocks)."""
823 if seed is not None:
824 generator.seed(seed)
825 randint = generator.randint
826 if length < step or step < 2:
827 step = length
Guido van Rossum776152b2007-05-22 22:44:07 +0000828 blocks = bytes()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000829 for i in range(0, length, step):
Guido van Rossum776152b2007-05-22 22:44:07 +0000830 blocks += bytes(randint(0, 255) for x in range(step))
831 return blocks
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000832
833
834
835def choose_lines(source, number, seed=None, generator=random):
836 """Return a list of number lines randomly chosen from the source"""
837 if seed is not None:
838 generator.seed(seed)
839 sources = source.split('\n')
840 return [generator.choice(sources) for n in range(number)]
841
842
843
Guido van Rossum776152b2007-05-22 22:44:07 +0000844HAMLET_SCENE = b"""
Fred Drake004d5e62000-10-23 17:22:08 +0000845LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000846
847 O, fear me not.
848 I stay too long: but here my father comes.
849
850 Enter POLONIUS
851
852 A double blessing is a double grace,
853 Occasion smiles upon a second leave.
854
Fred Drake004d5e62000-10-23 17:22:08 +0000855LORD POLONIUS
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000856
857 Yet here, Laertes! aboard, aboard, for shame!
858 The wind sits in the shoulder of your sail,
859 And you are stay'd for. There; my blessing with thee!
860 And these few precepts in thy memory
861 See thou character. Give thy thoughts no tongue,
862 Nor any unproportioned thought his act.
863 Be thou familiar, but by no means vulgar.
864 Those friends thou hast, and their adoption tried,
865 Grapple them to thy soul with hoops of steel;
866 But do not dull thy palm with entertainment
867 Of each new-hatch'd, unfledged comrade. Beware
868 Of entrance to a quarrel, but being in,
869 Bear't that the opposed may beware of thee.
870 Give every man thy ear, but few thy voice;
871 Take each man's censure, but reserve thy judgment.
872 Costly thy habit as thy purse can buy,
873 But not express'd in fancy; rich, not gaudy;
874 For the apparel oft proclaims the man,
875 And they in France of the best rank and station
876 Are of a most select and generous chief in that.
877 Neither a borrower nor a lender be;
878 For loan oft loses both itself and friend,
879 And borrowing dulls the edge of husbandry.
880 This above all: to thine ownself be true,
881 And it must follow, as the night the day,
882 Thou canst not then be false to any man.
883 Farewell: my blessing season this in thee!
884
Fred Drake004d5e62000-10-23 17:22:08 +0000885LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000886
887 Most humbly do I take my leave, my lord.
888
Fred Drake004d5e62000-10-23 17:22:08 +0000889LORD POLONIUS
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000890
891 The time invites you; go; your servants tend.
892
Fred Drake004d5e62000-10-23 17:22:08 +0000893LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000894
895 Farewell, Ophelia; and remember well
896 What I have said to you.
897
Fred Drake004d5e62000-10-23 17:22:08 +0000898OPHELIA
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000899
900 'Tis in my memory lock'd,
901 And you yourself shall keep the key of it.
902
Fred Drake004d5e62000-10-23 17:22:08 +0000903LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000904
905 Farewell.
906"""
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000907
908
Martin Pantere99e9772015-11-20 08:13:35 +0000909class CustomInt:
910 def __int__(self):
911 return 100
912
913
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000914if __name__ == "__main__":
Zachary Ware38c707e2015-04-13 15:00:43 -0500915 unittest.main()