blob: f828b4c737a5fcf7f14633076a601ab0b533e382 [file] [log] [blame]
Guido van Rossum7d9ea502003-02-03 20:45:52 +00001import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002from test import support
Christian Heimesd5e2b6f2008-03-19 21:50:51 +00003import binascii
Zackery Spytzd2cbfff2018-06-27 12:04:51 -06004import copy
Serhiy Storchakad7a44152015-11-12 11:23:04 +02005import pickle
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +00006import random
Antoine Pitrouf3d22752011-02-21 18:09:00 +00007import sys
Antoine Pitrou94190bb2011-10-04 10:22:36 +02008from test.support import bigmemtest, _1G, _4G
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +00009
R. David Murraya21e4ca2009-03-31 23:16:50 +000010zlib = support.import_module('zlib')
11
Serhiy Storchaka43767632013-11-03 21:31:38 +020012requires_Compress_copy = unittest.skipUnless(
13 hasattr(zlib.compressobj(), "copy"),
14 'requires Compress.copy()')
15requires_Decompress_copy = unittest.skipUnless(
16 hasattr(zlib.decompressobj(), "copy"),
17 'requires Decompress.copy()')
18
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +000019
Nadeem Vawda64d25dd2011-09-12 00:04:13 +020020class VersionTestCase(unittest.TestCase):
21
22 def test_library_version(self):
Nadeem Vawda131c7072012-01-25 23:16:50 +020023 # Test that the major version of the actual library in use matches the
24 # major version that we were compiled against. We can't guarantee that
25 # the minor versions will match (even on the machine on which the module
26 # was compiled), and the API is stable between minor versions, so
Nadeem Vawdad770fe42012-01-28 17:32:47 +020027 # testing only the major versions avoids spurious failures.
Nadeem Vawda131c7072012-01-25 23:16:50 +020028 self.assertEqual(zlib.ZLIB_RUNTIME_VERSION[0], zlib.ZLIB_VERSION[0])
Nadeem Vawda64d25dd2011-09-12 00:04:13 +020029
30
Guido van Rossum7d9ea502003-02-03 20:45:52 +000031class ChecksumTestCase(unittest.TestCase):
32 # checksum test cases
33 def test_crc32start(self):
Guido van Rossum776152b2007-05-22 22:44:07 +000034 self.assertEqual(zlib.crc32(b""), zlib.crc32(b"", 0))
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000035 self.assertTrue(zlib.crc32(b"abc", 0xffffffff))
Andrew M. Kuchlingfcfc8d52001-08-10 15:50:11 +000036
Guido van Rossum7d9ea502003-02-03 20:45:52 +000037 def test_crc32empty(self):
Guido van Rossum776152b2007-05-22 22:44:07 +000038 self.assertEqual(zlib.crc32(b"", 0), 0)
39 self.assertEqual(zlib.crc32(b"", 1), 1)
40 self.assertEqual(zlib.crc32(b"", 432), 432)
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +000041
Guido van Rossum7d9ea502003-02-03 20:45:52 +000042 def test_adler32start(self):
Guido van Rossum776152b2007-05-22 22:44:07 +000043 self.assertEqual(zlib.adler32(b""), zlib.adler32(b"", 1))
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000044 self.assertTrue(zlib.adler32(b"abc", 0xffffffff))
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +000045
Guido van Rossum7d9ea502003-02-03 20:45:52 +000046 def test_adler32empty(self):
Guido van Rossum776152b2007-05-22 22:44:07 +000047 self.assertEqual(zlib.adler32(b"", 0), 0)
48 self.assertEqual(zlib.adler32(b"", 1), 1)
49 self.assertEqual(zlib.adler32(b"", 432), 432)
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +000050
Guido van Rossum7d9ea502003-02-03 20:45:52 +000051 def test_penguins(self):
Martin Panterb82032f2015-12-11 05:19:29 +000052 self.assertEqual(zlib.crc32(b"penguin", 0), 0x0e5c1a120)
53 self.assertEqual(zlib.crc32(b"penguin", 1), 0x43b6aa94)
54 self.assertEqual(zlib.adler32(b"penguin", 0), 0x0bcf02f6)
55 self.assertEqual(zlib.adler32(b"penguin", 1), 0x0bd602f7)
Guido van Rossum7d9ea502003-02-03 20:45:52 +000056
Guido van Rossum776152b2007-05-22 22:44:07 +000057 self.assertEqual(zlib.crc32(b"penguin"), zlib.crc32(b"penguin", 0))
58 self.assertEqual(zlib.adler32(b"penguin"),zlib.adler32(b"penguin",1))
Guido van Rossum7d9ea502003-02-03 20:45:52 +000059
Gregory P. Smithab0d8a12008-03-17 20:24:09 +000060 def test_crc32_adler32_unsigned(self):
Antoine Pitrou77b338b2009-12-14 18:00:06 +000061 foo = b'abcdefghijklmnop'
Gregory P. Smithab0d8a12008-03-17 20:24:09 +000062 # explicitly test signed behavior
Gregory P. Smith27275032008-03-20 06:20:09 +000063 self.assertEqual(zlib.crc32(foo), 2486878355)
Antoine Pitrou77b338b2009-12-14 18:00:06 +000064 self.assertEqual(zlib.crc32(b'spam'), 1138425661)
Gregory P. Smithab0d8a12008-03-17 20:24:09 +000065 self.assertEqual(zlib.adler32(foo+foo), 3573550353)
Antoine Pitrou77b338b2009-12-14 18:00:06 +000066 self.assertEqual(zlib.adler32(b'spam'), 72286642)
Gregory P. Smithab0d8a12008-03-17 20:24:09 +000067
Christian Heimesd5e2b6f2008-03-19 21:50:51 +000068 def test_same_as_binascii_crc32(self):
Martin v. Löwis15b16a32008-12-02 06:00:15 +000069 foo = b'abcdefghijklmnop'
Gregory P. Smith27275032008-03-20 06:20:09 +000070 crc = 2486878355
Christian Heimesd5e2b6f2008-03-19 21:50:51 +000071 self.assertEqual(binascii.crc32(foo), crc)
72 self.assertEqual(zlib.crc32(foo), crc)
Martin v. Löwis15b16a32008-12-02 06:00:15 +000073 self.assertEqual(binascii.crc32(b'spam'), zlib.crc32(b'spam'))
Guido van Rossum7d9ea502003-02-03 20:45:52 +000074
75
Victor Stinner8c663fd2017-11-08 14:44:44 -080076# Issue #10276 - check that inputs >=4 GiB are handled correctly.
Antoine Pitrouf3d22752011-02-21 18:09:00 +000077class ChecksumBigBufferTestCase(unittest.TestCase):
78
Nadeem Vawdabc8c8172012-02-23 14:16:15 +020079 @bigmemtest(size=_4G + 4, memuse=1, dry_run=False)
80 def test_big_buffer(self, size):
Nadeem Vawdab063a482012-02-23 13:36:25 +020081 data = b"nyan" * (_1G + 1)
82 self.assertEqual(zlib.crc32(data), 1044521549)
83 self.assertEqual(zlib.adler32(data), 2256789997)
Antoine Pitrouf3d22752011-02-21 18:09:00 +000084
Christian Heimesb186d002008-03-18 15:15:01 +000085
Guido van Rossum7d9ea502003-02-03 20:45:52 +000086class ExceptionTestCase(unittest.TestCase):
87 # make sure we generate some expected errors
Guido van Rossum8ce8a782007-11-01 19:42:39 +000088 def test_badlevel(self):
89 # specifying compression level out of range causes an error
90 # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib
91 # accepts 0 too)
Antoine Pitrou77b338b2009-12-14 18:00:06 +000092 self.assertRaises(zlib.error, zlib.compress, b'ERROR', 10)
93
94 def test_badargs(self):
95 self.assertRaises(TypeError, zlib.adler32)
96 self.assertRaises(TypeError, zlib.crc32)
97 self.assertRaises(TypeError, zlib.compress)
98 self.assertRaises(TypeError, zlib.decompress)
99 for arg in (42, None, '', 'abc', (), []):
100 self.assertRaises(TypeError, zlib.adler32, arg)
101 self.assertRaises(TypeError, zlib.crc32, arg)
102 self.assertRaises(TypeError, zlib.compress, arg)
103 self.assertRaises(TypeError, zlib.decompress, arg)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000104
105 def test_badcompressobj(self):
106 # verify failure on building compress object with bad params
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000107 self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0)
Guido van Rossum8ce8a782007-11-01 19:42:39 +0000108 # specifying total bits too large causes an error
109 self.assertRaises(ValueError,
110 zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000111
112 def test_baddecompressobj(self):
113 # verify failure on building decompress object with bad params
Antoine Pitrou90ee4df2010-04-06 17:23:13 +0000114 self.assertRaises(ValueError, zlib.decompressobj, -1)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000115
Christian Heimes5e696852008-04-09 08:37:03 +0000116 def test_decompressobj_badflush(self):
117 # verify failure on calling decompressobj.flush with bad params
118 self.assertRaises(ValueError, zlib.decompressobj().flush, 0)
119 self.assertRaises(ValueError, zlib.decompressobj().flush, -1)
120
Martin Pantere99e9772015-11-20 08:13:35 +0000121 @support.cpython_only
122 def test_overflow(self):
123 with self.assertRaisesRegex(OverflowError, 'int too large'):
124 zlib.decompress(b'', 15, sys.maxsize + 1)
125 with self.assertRaisesRegex(OverflowError, 'int too large'):
Martin Panter84544c12016-07-23 03:02:07 +0000126 zlib.decompressobj().decompress(b'', sys.maxsize + 1)
127 with self.assertRaisesRegex(OverflowError, 'int too large'):
Martin Pantere99e9772015-11-20 08:13:35 +0000128 zlib.decompressobj().flush(sys.maxsize + 1)
129
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000130
Antoine Pitrou89562712010-05-07 17:04:02 +0000131class BaseCompressTestCase(object):
132 def check_big_compress_buffer(self, size, compress_func):
133 _1M = 1024 * 1024
Victor Stinner8c663fd2017-11-08 14:44:44 -0800134 # Generate 10 MiB worth of random, and expand it by repeating it.
Antoine Pitrou89562712010-05-07 17:04:02 +0000135 # The assumption is that zlib's memory is not big enough to exploit
136 # such spread out redundancy.
137 data = b''.join([random.getrandbits(8 * _1M).to_bytes(_1M, 'little')
138 for i in range(10)])
139 data = data * (size // len(data) + 1)
140 try:
141 compress_func(data)
142 finally:
143 # Release memory
144 data = None
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000145
Antoine Pitrou89562712010-05-07 17:04:02 +0000146 def check_big_decompress_buffer(self, size, decompress_func):
147 data = b'x' * size
148 try:
149 compressed = zlib.compress(data, 1)
150 finally:
151 # Release memory
152 data = None
153 data = decompress_func(compressed)
154 # Sanity check
155 try:
156 self.assertEqual(len(data), size)
157 self.assertEqual(len(data.strip(b'x')), 0)
158 finally:
159 data = None
160
161
162class CompressTestCase(BaseCompressTestCase, unittest.TestCase):
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000163 # Test compression in one go (whole message compression)
164 def test_speech(self):
Neil Schemenauer6412b122004-06-05 19:34:28 +0000165 x = zlib.compress(HAMLET_SCENE)
166 self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000167
Martin Panter1fe0d132016-02-10 10:06:36 +0000168 def test_keywords(self):
Serhiy Storchaka95657cd2016-06-25 22:43:05 +0300169 x = zlib.compress(HAMLET_SCENE, level=3)
Martin Panter1fe0d132016-02-10 10:06:36 +0000170 self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
Serhiy Storchaka95657cd2016-06-25 22:43:05 +0300171 with self.assertRaises(TypeError):
172 zlib.compress(data=HAMLET_SCENE, level=3)
Serhiy Storchaka15f32282016-08-15 10:06:16 +0300173 self.assertEqual(zlib.decompress(x,
174 wbits=zlib.MAX_WBITS,
175 bufsize=zlib.DEF_BUF_SIZE),
176 HAMLET_SCENE)
Martin Panter1fe0d132016-02-10 10:06:36 +0000177
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000178 def test_speech128(self):
Neil Schemenauer6412b122004-06-05 19:34:28 +0000179 # compress more data
180 data = HAMLET_SCENE * 128
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000181 x = zlib.compress(data)
Antoine Pitrou77b338b2009-12-14 18:00:06 +0000182 self.assertEqual(zlib.compress(bytearray(data)), x)
183 for ob in x, bytearray(x):
184 self.assertEqual(zlib.decompress(ob), data)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000185
Antoine Pitrou53b21662010-05-11 23:46:02 +0000186 def test_incomplete_stream(self):
Martin Panter6245cb32016-04-15 02:14:19 +0000187 # A useful error message is given
Antoine Pitrou53b21662010-05-11 23:46:02 +0000188 x = zlib.compress(HAMLET_SCENE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000189 self.assertRaisesRegex(zlib.error,
Antoine Pitrou53b21662010-05-11 23:46:02 +0000190 "Error -5 while decompressing data: incomplete or truncated stream",
191 zlib.decompress, x[:-1])
192
Antoine Pitrou89562712010-05-07 17:04:02 +0000193 # Memory use of the following functions takes into account overallocation
194
Antoine Pitrou94190bb2011-10-04 10:22:36 +0200195 @bigmemtest(size=_1G + 1024 * 1024, memuse=3)
Antoine Pitrou89562712010-05-07 17:04:02 +0000196 def test_big_compress_buffer(self, size):
197 compress = lambda s: zlib.compress(s, 1)
198 self.check_big_compress_buffer(size, compress)
199
Antoine Pitrou94190bb2011-10-04 10:22:36 +0200200 @bigmemtest(size=_1G + 1024 * 1024, memuse=2)
Antoine Pitrou89562712010-05-07 17:04:02 +0000201 def test_big_decompress_buffer(self, size):
202 self.check_big_decompress_buffer(size, zlib.decompress)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000203
Martin Pantere99e9772015-11-20 08:13:35 +0000204 @bigmemtest(size=_4G, memuse=1)
205 def test_large_bufsize(self, size):
206 # Test decompress(bufsize) parameter greater than the internal limit
207 data = HAMLET_SCENE * 10
208 compressed = zlib.compress(data, 1)
209 self.assertEqual(zlib.decompress(compressed, 15, size), data)
210
211 def test_custom_bufsize(self):
212 data = HAMLET_SCENE * 10
213 compressed = zlib.compress(data, 1)
214 self.assertEqual(zlib.decompress(compressed, 15, CustomInt()), data)
215
Martin Panter84544c12016-07-23 03:02:07 +0000216 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
217 @bigmemtest(size=_4G + 100, memuse=4)
218 def test_64bit_compress(self, size):
219 data = b'x' * size
220 try:
221 comp = zlib.compress(data, 0)
222 self.assertEqual(zlib.decompress(comp), data)
223 finally:
224 comp = data = None
225
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000226
Antoine Pitrou89562712010-05-07 17:04:02 +0000227class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000228 # Test compression object
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000229 def test_pair(self):
Neil Schemenauer6412b122004-06-05 19:34:28 +0000230 # straightforward compress/decompress objects
Antoine Pitrou77b338b2009-12-14 18:00:06 +0000231 datasrc = HAMLET_SCENE * 128
232 datazip = zlib.compress(datasrc)
233 # should compress both bytes and bytearray data
234 for data in (datasrc, bytearray(datasrc)):
235 co = zlib.compressobj()
236 x1 = co.compress(data)
237 x2 = co.flush()
238 self.assertRaises(zlib.error, co.flush) # second flush should not work
239 self.assertEqual(x1 + x2, datazip)
240 for v1, v2 in ((x1, x2), (bytearray(x1), bytearray(x2))):
241 dco = zlib.decompressobj()
242 y1 = dco.decompress(v1 + v2)
243 y2 = dco.flush()
244 self.assertEqual(data, y1 + y2)
245 self.assertIsInstance(dco.unconsumed_tail, bytes)
246 self.assertIsInstance(dco.unused_data, bytes)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000247
Serhiy Storchaka15f32282016-08-15 10:06:16 +0300248 def test_keywords(self):
249 level = 2
250 method = zlib.DEFLATED
251 wbits = -12
252 memLevel = 9
253 strategy = zlib.Z_FILTERED
254 co = zlib.compressobj(level=level,
255 method=method,
256 wbits=wbits,
257 memLevel=memLevel,
258 strategy=strategy,
259 zdict=b"")
260 do = zlib.decompressobj(wbits=wbits, zdict=b"")
261 with self.assertRaises(TypeError):
262 co.compress(data=HAMLET_SCENE)
263 with self.assertRaises(TypeError):
264 do.decompress(data=zlib.compress(HAMLET_SCENE))
265 x = co.compress(HAMLET_SCENE) + co.flush()
266 y = do.decompress(x, max_length=len(HAMLET_SCENE)) + do.flush()
267 self.assertEqual(HAMLET_SCENE, y)
268
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000269 def test_compressoptions(self):
270 # specify lots of options to compressobj()
271 level = 2
272 method = zlib.DEFLATED
273 wbits = -12
Martin Panterbf19d162015-09-09 01:01:13 +0000274 memLevel = 9
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000275 strategy = zlib.Z_FILTERED
Martin Panterbf19d162015-09-09 01:01:13 +0000276 co = zlib.compressobj(level, method, wbits, memLevel, strategy)
Neil Schemenauer6412b122004-06-05 19:34:28 +0000277 x1 = co.compress(HAMLET_SCENE)
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000278 x2 = co.flush()
279 dco = zlib.decompressobj(wbits)
280 y1 = dco.decompress(x1 + x2)
281 y2 = dco.flush()
Neil Schemenauer6412b122004-06-05 19:34:28 +0000282 self.assertEqual(HAMLET_SCENE, y1 + y2)
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000283
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000284 def test_compressincremental(self):
285 # compress object in steps, decompress object as one-shot
Neil Schemenauer6412b122004-06-05 19:34:28 +0000286 data = HAMLET_SCENE * 128
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000287 co = zlib.compressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000288 bufs = []
289 for i in range(0, len(data), 256):
290 bufs.append(co.compress(data[i:i+256]))
291 bufs.append(co.flush())
Guido van Rossum776152b2007-05-22 22:44:07 +0000292 combuf = b''.join(bufs)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000293
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000294 dco = zlib.decompressobj()
Guido van Rossum776152b2007-05-22 22:44:07 +0000295 y1 = dco.decompress(b''.join(bufs))
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000296 y2 = dco.flush()
297 self.assertEqual(data, y1 + y2)
298
Neil Schemenauer6412b122004-06-05 19:34:28 +0000299 def test_decompinc(self, flush=False, source=None, cx=256, dcx=64):
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000300 # compress object in steps, decompress object in steps
Neil Schemenauer6412b122004-06-05 19:34:28 +0000301 source = source or HAMLET_SCENE
302 data = source * 128
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000303 co = zlib.compressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000304 bufs = []
Neil Schemenauer6412b122004-06-05 19:34:28 +0000305 for i in range(0, len(data), cx):
306 bufs.append(co.compress(data[i:i+cx]))
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000307 bufs.append(co.flush())
Guido van Rossum776152b2007-05-22 22:44:07 +0000308 combuf = b''.join(bufs)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000309
Gregory P. Smith693fc462008-09-06 20:13:06 +0000310 decombuf = zlib.decompress(combuf)
311 # Test type of return value
Ezio Melottie9615932010-01-24 19:26:24 +0000312 self.assertIsInstance(decombuf, bytes)
Gregory P. Smith693fc462008-09-06 20:13:06 +0000313
314 self.assertEqual(data, decombuf)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000315
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000316 dco = zlib.decompressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000317 bufs = []
Neil Schemenauer6412b122004-06-05 19:34:28 +0000318 for i in range(0, len(combuf), dcx):
319 bufs.append(dco.decompress(combuf[i:i+dcx]))
Guido van Rossum776152b2007-05-22 22:44:07 +0000320 self.assertEqual(b'', dco.unconsumed_tail, ########
321 "(A) uct should be b'': not %d long" %
Neil Schemenauer6412b122004-06-05 19:34:28 +0000322 len(dco.unconsumed_tail))
Amaury Forgeot d'Arce43d33a2008-07-02 20:50:16 +0000323 self.assertEqual(b'', dco.unused_data)
Neil Schemenauer6412b122004-06-05 19:34:28 +0000324 if flush:
325 bufs.append(dco.flush())
326 else:
327 while True:
Antoine Pitrou77b338b2009-12-14 18:00:06 +0000328 chunk = dco.decompress(b'')
Neil Schemenauer6412b122004-06-05 19:34:28 +0000329 if chunk:
330 bufs.append(chunk)
331 else:
332 break
Guido van Rossum776152b2007-05-22 22:44:07 +0000333 self.assertEqual(b'', dco.unconsumed_tail, ########
334 "(B) uct should be b'': not %d long" %
Neil Schemenauer6412b122004-06-05 19:34:28 +0000335 len(dco.unconsumed_tail))
Amaury Forgeot d'Arce43d33a2008-07-02 20:50:16 +0000336 self.assertEqual(b'', dco.unused_data)
Guido van Rossum776152b2007-05-22 22:44:07 +0000337 self.assertEqual(data, b''.join(bufs))
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000338 # Failure means: "decompressobj with init options failed"
339
Neil Schemenauer6412b122004-06-05 19:34:28 +0000340 def test_decompincflush(self):
341 self.test_decompinc(flush=True)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000342
Neil Schemenauer6412b122004-06-05 19:34:28 +0000343 def test_decompimax(self, source=None, cx=256, dcx=64):
344 # compress in steps, decompress in length-restricted steps
345 source = source or HAMLET_SCENE
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000346 # Check a decompression object with max_length specified
Neil Schemenauer6412b122004-06-05 19:34:28 +0000347 data = source * 128
348 co = zlib.compressobj()
349 bufs = []
350 for i in range(0, len(data), cx):
351 bufs.append(co.compress(data[i:i+cx]))
352 bufs.append(co.flush())
Guido van Rossum776152b2007-05-22 22:44:07 +0000353 combuf = b''.join(bufs)
Neil Schemenauer6412b122004-06-05 19:34:28 +0000354 self.assertEqual(data, zlib.decompress(combuf),
355 'compressed data failure')
356
357 dco = zlib.decompressobj()
358 bufs = []
359 cb = combuf
360 while cb:
361 #max_length = 1 + len(cb)//10
362 chunk = dco.decompress(cb, dcx)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000363 self.assertFalse(len(chunk) > dcx,
Neil Schemenauer6412b122004-06-05 19:34:28 +0000364 'chunk too big (%d>%d)' % (len(chunk), dcx))
365 bufs.append(chunk)
366 cb = dco.unconsumed_tail
367 bufs.append(dco.flush())
Guido van Rossum776152b2007-05-22 22:44:07 +0000368 self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved')
Neil Schemenauer6412b122004-06-05 19:34:28 +0000369
370 def test_decompressmaxlen(self, flush=False):
371 # Check a decompression object with max_length specified
372 data = HAMLET_SCENE * 128
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000373 co = zlib.compressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000374 bufs = []
375 for i in range(0, len(data), 256):
376 bufs.append(co.compress(data[i:i+256]))
377 bufs.append(co.flush())
Guido van Rossum776152b2007-05-22 22:44:07 +0000378 combuf = b''.join(bufs)
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000379 self.assertEqual(data, zlib.decompress(combuf),
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000380 'compressed data failure')
381
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000382 dco = zlib.decompressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000383 bufs = []
384 cb = combuf
385 while cb:
Guido van Rossumf3594102003-02-27 18:39:18 +0000386 max_length = 1 + len(cb)//10
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000387 chunk = dco.decompress(cb, max_length)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000388 self.assertFalse(len(chunk) > max_length,
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000389 'chunk too big (%d>%d)' % (len(chunk),max_length))
390 bufs.append(chunk)
391 cb = dco.unconsumed_tail
Neil Schemenauer6412b122004-06-05 19:34:28 +0000392 if flush:
393 bufs.append(dco.flush())
394 else:
395 while chunk:
Antoine Pitrou77b338b2009-12-14 18:00:06 +0000396 chunk = dco.decompress(b'', max_length)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000397 self.assertFalse(len(chunk) > max_length,
Neil Schemenauer6412b122004-06-05 19:34:28 +0000398 'chunk too big (%d>%d)' % (len(chunk),max_length))
399 bufs.append(chunk)
Guido van Rossum776152b2007-05-22 22:44:07 +0000400 self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved')
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000401
Neil Schemenauer6412b122004-06-05 19:34:28 +0000402 def test_decompressmaxlenflush(self):
403 self.test_decompressmaxlen(flush=True)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000404
405 def test_maxlenmisc(self):
406 # Misc tests of max_length
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000407 dco = zlib.decompressobj()
Antoine Pitrou77b338b2009-12-14 18:00:06 +0000408 self.assertRaises(ValueError, dco.decompress, b"", -1)
Guido van Rossum776152b2007-05-22 22:44:07 +0000409 self.assertEqual(b'', dco.unconsumed_tail)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000410
Martin Pantere99e9772015-11-20 08:13:35 +0000411 def test_maxlen_large(self):
412 # Sizes up to sys.maxsize should be accepted, although zlib is
413 # internally limited to expressing sizes with unsigned int
414 data = HAMLET_SCENE * 10
415 self.assertGreater(len(data), zlib.DEF_BUF_SIZE)
416 compressed = zlib.compress(data, 1)
417 dco = zlib.decompressobj()
418 self.assertEqual(dco.decompress(compressed, sys.maxsize), data)
419
420 def test_maxlen_custom(self):
421 data = HAMLET_SCENE * 10
422 compressed = zlib.compress(data, 1)
423 dco = zlib.decompressobj()
424 self.assertEqual(dco.decompress(compressed, CustomInt()), data[:100])
425
Nadeem Vawda7619e882011-05-14 14:05:20 +0200426 def test_clear_unconsumed_tail(self):
427 # Issue #12050: calling decompress() without providing max_length
428 # should clear the unconsumed_tail attribute.
429 cdata = b"x\x9cKLJ\x06\x00\x02M\x01" # "abc"
430 dco = zlib.decompressobj()
431 ddata = dco.decompress(cdata, 1)
432 ddata += dco.decompress(dco.unconsumed_tail)
433 self.assertEqual(dco.unconsumed_tail, b"")
434
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000435 def test_flushes(self):
436 # Test flush() with the various options, using all the
437 # different levels in order to provide more variations.
Xiang Zhangbc3f2282018-03-07 13:05:37 +0800438 sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH',
Steve Dower57675092018-09-24 07:44:50 -0400439 'Z_PARTIAL_FLUSH']
440
441 ver = tuple(int(v) for v in zlib.ZLIB_RUNTIME_VERSION.split('.'))
442 # Z_BLOCK has a known failure prior to 1.2.5.3
443 if ver >= (1, 2, 5, 3):
444 sync_opt.append('Z_BLOCK')
445
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000446 sync_opt = [getattr(zlib, opt) for opt in sync_opt
447 if hasattr(zlib, opt)]
Neil Schemenauer6412b122004-06-05 19:34:28 +0000448 data = HAMLET_SCENE * 8
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000449
450 for sync in sync_opt:
451 for level in range(10):
Steve Dower57675092018-09-24 07:44:50 -0400452 try:
453 obj = zlib.compressobj( level )
454 a = obj.compress( data[:3000] )
455 b = obj.flush( sync )
456 c = obj.compress( data[3000:] )
457 d = obj.flush()
458 except:
459 print("Error for flush mode={}, level={}"
460 .format(sync, level))
461 raise
Guido van Rossum776152b2007-05-22 22:44:07 +0000462 self.assertEqual(zlib.decompress(b''.join([a,b,c,d])),
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000463 data, ("Decompress failed: flush "
464 "mode=%i, level=%i") % (sync, level))
465 del obj
466
Serhiy Storchaka43767632013-11-03 21:31:38 +0200467 @unittest.skipUnless(hasattr(zlib, 'Z_SYNC_FLUSH'),
468 'requires zlib.Z_SYNC_FLUSH')
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000469 def test_odd_flush(self):
470 # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
471 import random
Serhiy Storchaka43767632013-11-03 21:31:38 +0200472 # Testing on 17K of "random" data
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000473
Serhiy Storchaka43767632013-11-03 21:31:38 +0200474 # Create compressor and decompressor objects
475 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
476 dco = zlib.decompressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000477
Serhiy Storchaka43767632013-11-03 21:31:38 +0200478 # Try 17K of data
479 # generate random data stream
480 try:
481 # In 2.3 and later, WichmannHill is the RNG of the bug report
482 gen = random.WichmannHill()
483 except AttributeError:
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000484 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +0200485 # 2.2 called it Random
486 gen = random.Random()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000487 except AttributeError:
Serhiy Storchaka43767632013-11-03 21:31:38 +0200488 # others might simply have a single RNG
489 gen = random
490 gen.seed(1)
491 data = genblock(1, 17 * 1024, generator=gen)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000492
Serhiy Storchaka43767632013-11-03 21:31:38 +0200493 # compress, sync-flush, and decompress
494 first = co.compress(data)
495 second = co.flush(zlib.Z_SYNC_FLUSH)
496 expanded = dco.decompress(first + second)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000497
Serhiy Storchaka43767632013-11-03 21:31:38 +0200498 # if decompressed data is different from the input data, choke.
499 self.assertEqual(expanded, data, "17K random source doesn't match")
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000500
Andrew M. Kuchling3b585b32004-12-28 20:10:48 +0000501 def test_empty_flush(self):
502 # Test that calling .flush() on unused objects works.
503 # (Bug #1083110 -- calling .flush() on decompress objects
504 # caused a core dump.)
505
506 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000507 self.assertTrue(co.flush()) # Returns a zlib header
Andrew M. Kuchling3b585b32004-12-28 20:10:48 +0000508 dco = zlib.decompressobj()
Guido van Rossum776152b2007-05-22 22:44:07 +0000509 self.assertEqual(dco.flush(), b"") # Returns nothing
Tim Peters5a9fb3c2005-01-07 16:01:32 +0000510
Nadeem Vawdafd8a8382012-06-21 02:13:12 +0200511 def test_dictionary(self):
512 h = HAMLET_SCENE
Nadeem Vawdacf5e1d82012-06-22 00:35:57 +0200513 # Build a simulated dictionary out of the words in HAMLET.
Nadeem Vawdafd8a8382012-06-21 02:13:12 +0200514 words = h.split()
515 random.shuffle(words)
516 zdict = b''.join(words)
Nadeem Vawdacf5e1d82012-06-22 00:35:57 +0200517 # Use it to compress HAMLET.
Nadeem Vawdafd8a8382012-06-21 02:13:12 +0200518 co = zlib.compressobj(zdict=zdict)
519 cd = co.compress(h) + co.flush()
Nadeem Vawdacf5e1d82012-06-22 00:35:57 +0200520 # Verify that it will decompress with the dictionary.
Nadeem Vawdafd8a8382012-06-21 02:13:12 +0200521 dco = zlib.decompressobj(zdict=zdict)
522 self.assertEqual(dco.decompress(cd) + dco.flush(), h)
Nadeem Vawdacf5e1d82012-06-22 00:35:57 +0200523 # Verify that it fails when not given the dictionary.
Nadeem Vawdafd8a8382012-06-21 02:13:12 +0200524 dco = zlib.decompressobj()
525 self.assertRaises(zlib.error, dco.decompress, cd)
526
527 def test_dictionary_streaming(self):
Nadeem Vawdacf5e1d82012-06-22 00:35:57 +0200528 # This simulates the reuse of a compressor object for compressing
529 # several separate data streams.
Nadeem Vawdafd8a8382012-06-21 02:13:12 +0200530 co = zlib.compressobj(zdict=HAMLET_SCENE)
531 do = zlib.decompressobj(zdict=HAMLET_SCENE)
532 piece = HAMLET_SCENE[1000:1500]
533 d0 = co.compress(piece) + co.flush(zlib.Z_SYNC_FLUSH)
534 d1 = co.compress(piece[100:]) + co.flush(zlib.Z_SYNC_FLUSH)
535 d2 = co.compress(piece[:-100]) + co.flush(zlib.Z_SYNC_FLUSH)
536 self.assertEqual(do.decompress(d0), piece)
537 self.assertEqual(do.decompress(d1), piece[100:])
538 self.assertEqual(do.decompress(d2), piece[:-100])
539
Antoine Pitrouc09c92f2010-05-11 23:36:40 +0000540 def test_decompress_incomplete_stream(self):
541 # This is 'foo', deflated
542 x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'
543 # For the record
544 self.assertEqual(zlib.decompress(x), b'foo')
545 self.assertRaises(zlib.error, zlib.decompress, x[:-5])
546 # Omitting the stream end works with decompressor objects
547 # (see issue #8672).
548 dco = zlib.decompressobj()
549 y = dco.decompress(x[:-5])
550 y += dco.flush()
551 self.assertEqual(y, b'foo')
552
Nadeem Vawda1c385462011-08-13 15:22:40 +0200553 def test_decompress_eof(self):
554 x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' # 'foo'
555 dco = zlib.decompressobj()
556 self.assertFalse(dco.eof)
557 dco.decompress(x[:-5])
558 self.assertFalse(dco.eof)
559 dco.decompress(x[-5:])
560 self.assertTrue(dco.eof)
561 dco.flush()
562 self.assertTrue(dco.eof)
563
564 def test_decompress_eof_incomplete_stream(self):
565 x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' # 'foo'
566 dco = zlib.decompressobj()
567 self.assertFalse(dco.eof)
568 dco.decompress(x[:-5])
569 self.assertFalse(dco.eof)
570 dco.flush()
571 self.assertFalse(dco.eof)
572
Nadeem Vawda39079942012-11-05 00:37:42 +0100573 def test_decompress_unused_data(self):
574 # Repeated calls to decompress() after EOF should accumulate data in
575 # dco.unused_data, instead of just storing the arg to the last call.
Nadeem Vawdaee7889d2012-11-11 02:14:36 +0100576 source = b'abcdefghijklmnopqrstuvwxyz'
577 remainder = b'0123456789'
578 y = zlib.compress(source)
579 x = y + remainder
580 for maxlen in 0, 1000:
581 for step in 1, 2, len(y), len(x):
582 dco = zlib.decompressobj()
583 data = b''
584 for i in range(0, len(x), step):
585 if i < len(y):
586 self.assertEqual(dco.unused_data, b'')
587 if maxlen == 0:
588 data += dco.decompress(x[i : i + step])
589 self.assertEqual(dco.unconsumed_tail, b'')
590 else:
591 data += dco.decompress(
592 dco.unconsumed_tail + x[i : i + step], maxlen)
593 data += dco.flush()
Nadeem Vawdadd1253a2012-11-11 02:21:22 +0100594 self.assertTrue(dco.eof)
Nadeem Vawdaee7889d2012-11-11 02:14:36 +0100595 self.assertEqual(data, source)
596 self.assertEqual(dco.unconsumed_tail, b'')
597 self.assertEqual(dco.unused_data, remainder)
Nadeem Vawda39079942012-11-05 00:37:42 +0100598
Martin Panter3f0ee832016-06-05 10:48:34 +0000599 # issue27164
600 def test_decompress_raw_with_dictionary(self):
601 zdict = b'abcdefghijklmnopqrstuvwxyz'
602 co = zlib.compressobj(wbits=-zlib.MAX_WBITS, zdict=zdict)
603 comp = co.compress(zdict) + co.flush()
604 dco = zlib.decompressobj(wbits=-zlib.MAX_WBITS, zdict=zdict)
605 uncomp = dco.decompress(comp) + dco.flush()
606 self.assertEqual(zdict, uncomp)
607
Nadeem Vawda7ee95552012-11-11 03:15:32 +0100608 def test_flush_with_freed_input(self):
609 # Issue #16411: decompressor accesses input to last decompress() call
610 # in flush(), even if this object has been freed in the meanwhile.
611 input1 = b'abcdefghijklmnopqrstuvwxyz'
612 input2 = b'QWERTYUIOPASDFGHJKLZXCVBNM'
613 data = zlib.compress(input1)
614 dco = zlib.decompressobj()
615 dco.decompress(data, 1)
616 del data
617 data = zlib.compress(input2)
618 self.assertEqual(dco.flush(), input1[1:])
619
Martin Pantere99e9772015-11-20 08:13:35 +0000620 @bigmemtest(size=_4G, memuse=1)
621 def test_flush_large_length(self, size):
622 # Test flush(length) parameter greater than internal limit UINT_MAX
623 input = HAMLET_SCENE * 10
624 data = zlib.compress(input, 1)
625 dco = zlib.decompressobj()
626 dco.decompress(data, 1)
627 self.assertEqual(dco.flush(size), input[1:])
628
629 def test_flush_custom_length(self):
630 input = HAMLET_SCENE * 10
631 data = zlib.compress(input, 1)
632 dco = zlib.decompressobj()
633 dco.decompress(data, 1)
634 self.assertEqual(dco.flush(CustomInt()), input[1:])
635
Serhiy Storchaka43767632013-11-03 21:31:38 +0200636 @requires_Compress_copy
637 def test_compresscopy(self):
638 # Test copying a compression object
639 data0 = HAMLET_SCENE
640 data1 = bytes(str(HAMLET_SCENE, "ascii").swapcase(), "ascii")
Zackery Spytzd2cbfff2018-06-27 12:04:51 -0600641 for func in lambda c: c.copy(), copy.copy, copy.deepcopy:
642 c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
643 bufs0 = []
644 bufs0.append(c0.compress(data0))
Thomas Wouters477c8d52006-05-27 19:21:47 +0000645
Zackery Spytzd2cbfff2018-06-27 12:04:51 -0600646 c1 = func(c0)
647 bufs1 = bufs0[:]
Thomas Wouters477c8d52006-05-27 19:21:47 +0000648
Zackery Spytzd2cbfff2018-06-27 12:04:51 -0600649 bufs0.append(c0.compress(data0))
650 bufs0.append(c0.flush())
651 s0 = b''.join(bufs0)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000652
Zackery Spytzd2cbfff2018-06-27 12:04:51 -0600653 bufs1.append(c1.compress(data1))
654 bufs1.append(c1.flush())
655 s1 = b''.join(bufs1)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000656
Zackery Spytzd2cbfff2018-06-27 12:04:51 -0600657 self.assertEqual(zlib.decompress(s0),data0+data0)
658 self.assertEqual(zlib.decompress(s1),data0+data1)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000659
Serhiy Storchaka43767632013-11-03 21:31:38 +0200660 @requires_Compress_copy
661 def test_badcompresscopy(self):
662 # Test copying a compression object in an inconsistent state
663 c = zlib.compressobj()
664 c.compress(HAMLET_SCENE)
665 c.flush()
666 self.assertRaises(ValueError, c.copy)
Zackery Spytzd2cbfff2018-06-27 12:04:51 -0600667 self.assertRaises(ValueError, copy.copy, c)
668 self.assertRaises(ValueError, copy.deepcopy, c)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000669
Serhiy Storchaka43767632013-11-03 21:31:38 +0200670 @requires_Decompress_copy
671 def test_decompresscopy(self):
672 # Test copying a decompression object
673 data = HAMLET_SCENE
674 comp = zlib.compress(data)
675 # Test type of return value
676 self.assertIsInstance(comp, bytes)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000677
Zackery Spytzd2cbfff2018-06-27 12:04:51 -0600678 for func in lambda c: c.copy(), copy.copy, copy.deepcopy:
679 d0 = zlib.decompressobj()
680 bufs0 = []
681 bufs0.append(d0.decompress(comp[:32]))
Thomas Wouters477c8d52006-05-27 19:21:47 +0000682
Zackery Spytzd2cbfff2018-06-27 12:04:51 -0600683 d1 = func(d0)
684 bufs1 = bufs0[:]
Thomas Wouters477c8d52006-05-27 19:21:47 +0000685
Zackery Spytzd2cbfff2018-06-27 12:04:51 -0600686 bufs0.append(d0.decompress(comp[32:]))
687 s0 = b''.join(bufs0)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000688
Zackery Spytzd2cbfff2018-06-27 12:04:51 -0600689 bufs1.append(d1.decompress(comp[32:]))
690 s1 = b''.join(bufs1)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000691
Zackery Spytzd2cbfff2018-06-27 12:04:51 -0600692 self.assertEqual(s0,s1)
693 self.assertEqual(s0,data)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000694
Serhiy Storchaka43767632013-11-03 21:31:38 +0200695 @requires_Decompress_copy
696 def test_baddecompresscopy(self):
697 # Test copying a compression object in an inconsistent state
698 data = zlib.compress(HAMLET_SCENE)
699 d = zlib.decompressobj()
700 d.decompress(data)
701 d.flush()
702 self.assertRaises(ValueError, d.copy)
Zackery Spytzd2cbfff2018-06-27 12:04:51 -0600703 self.assertRaises(ValueError, copy.copy, d)
704 self.assertRaises(ValueError, copy.deepcopy, d)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000705
Serhiy Storchakad7a44152015-11-12 11:23:04 +0200706 def test_compresspickle(self):
707 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
708 with self.assertRaises((TypeError, pickle.PicklingError)):
709 pickle.dumps(zlib.compressobj(zlib.Z_BEST_COMPRESSION), proto)
710
711 def test_decompresspickle(self):
712 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
713 with self.assertRaises((TypeError, pickle.PicklingError)):
714 pickle.dumps(zlib.decompressobj(), proto)
715
Antoine Pitrou89562712010-05-07 17:04:02 +0000716 # Memory use of the following functions takes into account overallocation
717
Antoine Pitrou94190bb2011-10-04 10:22:36 +0200718 @bigmemtest(size=_1G + 1024 * 1024, memuse=3)
Antoine Pitrou89562712010-05-07 17:04:02 +0000719 def test_big_compress_buffer(self, size):
720 c = zlib.compressobj(1)
721 compress = lambda s: c.compress(s) + c.flush()
722 self.check_big_compress_buffer(size, compress)
723
Antoine Pitrou94190bb2011-10-04 10:22:36 +0200724 @bigmemtest(size=_1G + 1024 * 1024, memuse=2)
Antoine Pitrou89562712010-05-07 17:04:02 +0000725 def test_big_decompress_buffer(self, size):
726 d = zlib.decompressobj()
727 decompress = lambda s: d.decompress(s) + d.flush()
728 self.check_big_decompress_buffer(size, decompress)
729
Martin Panter84544c12016-07-23 03:02:07 +0000730 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
731 @bigmemtest(size=_4G + 100, memuse=4)
732 def test_64bit_compress(self, size):
Nadeem Vawda0c3d96a2011-05-15 00:19:50 +0200733 data = b'x' * size
Martin Panter84544c12016-07-23 03:02:07 +0000734 co = zlib.compressobj(0)
735 do = zlib.decompressobj()
Nadeem Vawda0c3d96a2011-05-15 00:19:50 +0200736 try:
Martin Panter84544c12016-07-23 03:02:07 +0000737 comp = co.compress(data) + co.flush()
738 uncomp = do.decompress(comp) + do.flush()
739 self.assertEqual(uncomp, data)
Nadeem Vawda0c3d96a2011-05-15 00:19:50 +0200740 finally:
Martin Panter84544c12016-07-23 03:02:07 +0000741 comp = uncomp = data = None
742
743 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
744 @bigmemtest(size=_4G + 100, memuse=3)
745 def test_large_unused_data(self, size):
746 data = b'abcdefghijklmnop'
747 unused = b'x' * size
748 comp = zlib.compress(data) + unused
749 do = zlib.decompressobj()
750 try:
751 uncomp = do.decompress(comp) + do.flush()
752 self.assertEqual(unused, do.unused_data)
753 self.assertEqual(uncomp, data)
754 finally:
755 unused = comp = do = None
756
757 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
758 @bigmemtest(size=_4G + 100, memuse=5)
759 def test_large_unconsumed_tail(self, size):
760 data = b'x' * size
761 do = zlib.decompressobj()
762 try:
763 comp = zlib.compress(data, 0)
764 uncomp = do.decompress(comp, 1) + do.flush()
765 self.assertEqual(uncomp, data)
766 self.assertEqual(do.unconsumed_tail, b'')
767 finally:
768 comp = uncomp = data = None
Nadeem Vawda0c3d96a2011-05-15 00:19:50 +0200769
Martin Panter0fdf41d2016-05-27 07:32:11 +0000770 def test_wbits(self):
Martin Panterc618ae82016-05-27 11:20:21 +0000771 # wbits=0 only supported since zlib v1.2.3.5
772 # Register "1.2.3" as "1.2.3.0"
pmp-p4c7108a2018-02-19 04:45:11 +0100773 # or "1.2.0-linux","1.2.0.f","1.2.0.f-linux"
774 v = zlib.ZLIB_RUNTIME_VERSION.split('-', 1)[0].split('.')
775 if len(v) < 4:
776 v.append('0')
777 elif not v[-1].isnumeric():
778 v[-1] = '0'
779
780 v = tuple(map(int, v))
781 supports_wbits_0 = v >= (1, 2, 3, 5)
Martin Panterc618ae82016-05-27 11:20:21 +0000782
Martin Panter0fdf41d2016-05-27 07:32:11 +0000783 co = zlib.compressobj(level=1, wbits=15)
784 zlib15 = co.compress(HAMLET_SCENE) + co.flush()
785 self.assertEqual(zlib.decompress(zlib15, 15), HAMLET_SCENE)
Martin Panterc618ae82016-05-27 11:20:21 +0000786 if supports_wbits_0:
787 self.assertEqual(zlib.decompress(zlib15, 0), HAMLET_SCENE)
Martin Panter0fdf41d2016-05-27 07:32:11 +0000788 self.assertEqual(zlib.decompress(zlib15, 32 + 15), HAMLET_SCENE)
789 with self.assertRaisesRegex(zlib.error, 'invalid window size'):
790 zlib.decompress(zlib15, 14)
791 dco = zlib.decompressobj(wbits=32 + 15)
792 self.assertEqual(dco.decompress(zlib15), HAMLET_SCENE)
793 dco = zlib.decompressobj(wbits=14)
794 with self.assertRaisesRegex(zlib.error, 'invalid window size'):
795 dco.decompress(zlib15)
796
797 co = zlib.compressobj(level=1, wbits=9)
798 zlib9 = co.compress(HAMLET_SCENE) + co.flush()
799 self.assertEqual(zlib.decompress(zlib9, 9), HAMLET_SCENE)
800 self.assertEqual(zlib.decompress(zlib9, 15), HAMLET_SCENE)
Martin Panterc618ae82016-05-27 11:20:21 +0000801 if supports_wbits_0:
802 self.assertEqual(zlib.decompress(zlib9, 0), HAMLET_SCENE)
Martin Panter0fdf41d2016-05-27 07:32:11 +0000803 self.assertEqual(zlib.decompress(zlib9, 32 + 9), HAMLET_SCENE)
804 dco = zlib.decompressobj(wbits=32 + 9)
805 self.assertEqual(dco.decompress(zlib9), HAMLET_SCENE)
806
807 co = zlib.compressobj(level=1, wbits=-15)
808 deflate15 = co.compress(HAMLET_SCENE) + co.flush()
809 self.assertEqual(zlib.decompress(deflate15, -15), HAMLET_SCENE)
810 dco = zlib.decompressobj(wbits=-15)
811 self.assertEqual(dco.decompress(deflate15), HAMLET_SCENE)
812
813 co = zlib.compressobj(level=1, wbits=-9)
814 deflate9 = co.compress(HAMLET_SCENE) + co.flush()
815 self.assertEqual(zlib.decompress(deflate9, -9), HAMLET_SCENE)
816 self.assertEqual(zlib.decompress(deflate9, -15), HAMLET_SCENE)
817 dco = zlib.decompressobj(wbits=-9)
818 self.assertEqual(dco.decompress(deflate9), HAMLET_SCENE)
819
820 co = zlib.compressobj(level=1, wbits=16 + 15)
821 gzip = co.compress(HAMLET_SCENE) + co.flush()
822 self.assertEqual(zlib.decompress(gzip, 16 + 15), HAMLET_SCENE)
823 self.assertEqual(zlib.decompress(gzip, 32 + 15), HAMLET_SCENE)
824 dco = zlib.decompressobj(32 + 15)
825 self.assertEqual(dco.decompress(gzip), HAMLET_SCENE)
826
Antoine Pitrou89562712010-05-07 17:04:02 +0000827
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000828def genblock(seed, length, step=1024, generator=random):
829 """length-byte stream of random data from a seed (in step-byte blocks)."""
830 if seed is not None:
831 generator.seed(seed)
832 randint = generator.randint
833 if length < step or step < 2:
834 step = length
Guido van Rossum776152b2007-05-22 22:44:07 +0000835 blocks = bytes()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000836 for i in range(0, length, step):
Guido van Rossum776152b2007-05-22 22:44:07 +0000837 blocks += bytes(randint(0, 255) for x in range(step))
838 return blocks
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000839
840
841
842def choose_lines(source, number, seed=None, generator=random):
843 """Return a list of number lines randomly chosen from the source"""
844 if seed is not None:
845 generator.seed(seed)
846 sources = source.split('\n')
847 return [generator.choice(sources) for n in range(number)]
848
849
850
Guido van Rossum776152b2007-05-22 22:44:07 +0000851HAMLET_SCENE = b"""
Fred Drake004d5e62000-10-23 17:22:08 +0000852LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000853
854 O, fear me not.
855 I stay too long: but here my father comes.
856
857 Enter POLONIUS
858
859 A double blessing is a double grace,
860 Occasion smiles upon a second leave.
861
Fred Drake004d5e62000-10-23 17:22:08 +0000862LORD POLONIUS
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000863
864 Yet here, Laertes! aboard, aboard, for shame!
865 The wind sits in the shoulder of your sail,
866 And you are stay'd for. There; my blessing with thee!
867 And these few precepts in thy memory
868 See thou character. Give thy thoughts no tongue,
869 Nor any unproportioned thought his act.
870 Be thou familiar, but by no means vulgar.
871 Those friends thou hast, and their adoption tried,
872 Grapple them to thy soul with hoops of steel;
873 But do not dull thy palm with entertainment
874 Of each new-hatch'd, unfledged comrade. Beware
875 Of entrance to a quarrel, but being in,
876 Bear't that the opposed may beware of thee.
877 Give every man thy ear, but few thy voice;
878 Take each man's censure, but reserve thy judgment.
879 Costly thy habit as thy purse can buy,
880 But not express'd in fancy; rich, not gaudy;
881 For the apparel oft proclaims the man,
882 And they in France of the best rank and station
883 Are of a most select and generous chief in that.
884 Neither a borrower nor a lender be;
885 For loan oft loses both itself and friend,
886 And borrowing dulls the edge of husbandry.
887 This above all: to thine ownself be true,
888 And it must follow, as the night the day,
889 Thou canst not then be false to any man.
890 Farewell: my blessing season this in thee!
891
Fred Drake004d5e62000-10-23 17:22:08 +0000892LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000893
894 Most humbly do I take my leave, my lord.
895
Fred Drake004d5e62000-10-23 17:22:08 +0000896LORD POLONIUS
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000897
898 The time invites you; go; your servants tend.
899
Fred Drake004d5e62000-10-23 17:22:08 +0000900LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000901
902 Farewell, Ophelia; and remember well
903 What I have said to you.
904
Fred Drake004d5e62000-10-23 17:22:08 +0000905OPHELIA
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000906
907 'Tis in my memory lock'd,
908 And you yourself shall keep the key of it.
909
Fred Drake004d5e62000-10-23 17:22:08 +0000910LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000911
912 Farewell.
913"""
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000914
915
Martin Pantere99e9772015-11-20 08:13:35 +0000916class CustomInt:
Serhiy Storchaka6a44f6e2019-02-25 17:57:58 +0200917 def __index__(self):
Martin Pantere99e9772015-11-20 08:13:35 +0000918 return 100
919
920
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000921if __name__ == "__main__":
Zachary Ware38c707e2015-04-13 15:00:43 -0500922 unittest.main()