blob: 02509cdf5532c9ed5d234c0703d33a5b8dbbaa9a [file] [log] [blame]
Guido van Rossum7d9ea502003-02-03 20:45:52 +00001import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002from test import support
Christian Heimesd5e2b6f2008-03-19 21:50:51 +00003import binascii
Zackery Spytzd2cbfff2018-06-27 12:04:51 -06004import copy
Serhiy Storchakad7a44152015-11-12 11:23:04 +02005import pickle
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +00006import random
Antoine Pitrouf3d22752011-02-21 18:09:00 +00007import sys
Antoine Pitrou94190bb2011-10-04 10:22:36 +02008from test.support import bigmemtest, _1G, _4G
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +00009
R. David Murraya21e4ca2009-03-31 23:16:50 +000010zlib = support.import_module('zlib')
11
Serhiy Storchaka43767632013-11-03 21:31:38 +020012requires_Compress_copy = unittest.skipUnless(
13 hasattr(zlib.compressobj(), "copy"),
14 'requires Compress.copy()')
15requires_Decompress_copy = unittest.skipUnless(
16 hasattr(zlib.decompressobj(), "copy"),
17 'requires Decompress.copy()')
18
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +000019
Nadeem Vawda64d25dd2011-09-12 00:04:13 +020020class VersionTestCase(unittest.TestCase):
21
22 def test_library_version(self):
Nadeem Vawda131c7072012-01-25 23:16:50 +020023 # Test that the major version of the actual library in use matches the
24 # major version that we were compiled against. We can't guarantee that
25 # the minor versions will match (even on the machine on which the module
26 # was compiled), and the API is stable between minor versions, so
Nadeem Vawdad770fe42012-01-28 17:32:47 +020027 # testing only the major versions avoids spurious failures.
Nadeem Vawda131c7072012-01-25 23:16:50 +020028 self.assertEqual(zlib.ZLIB_RUNTIME_VERSION[0], zlib.ZLIB_VERSION[0])
Nadeem Vawda64d25dd2011-09-12 00:04:13 +020029
30
Guido van Rossum7d9ea502003-02-03 20:45:52 +000031class ChecksumTestCase(unittest.TestCase):
32 # checksum test cases
33 def test_crc32start(self):
Guido van Rossum776152b2007-05-22 22:44:07 +000034 self.assertEqual(zlib.crc32(b""), zlib.crc32(b"", 0))
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000035 self.assertTrue(zlib.crc32(b"abc", 0xffffffff))
Andrew M. Kuchlingfcfc8d52001-08-10 15:50:11 +000036
Guido van Rossum7d9ea502003-02-03 20:45:52 +000037 def test_crc32empty(self):
Guido van Rossum776152b2007-05-22 22:44:07 +000038 self.assertEqual(zlib.crc32(b"", 0), 0)
39 self.assertEqual(zlib.crc32(b"", 1), 1)
40 self.assertEqual(zlib.crc32(b"", 432), 432)
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +000041
Guido van Rossum7d9ea502003-02-03 20:45:52 +000042 def test_adler32start(self):
Guido van Rossum776152b2007-05-22 22:44:07 +000043 self.assertEqual(zlib.adler32(b""), zlib.adler32(b"", 1))
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000044 self.assertTrue(zlib.adler32(b"abc", 0xffffffff))
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +000045
Guido van Rossum7d9ea502003-02-03 20:45:52 +000046 def test_adler32empty(self):
Guido van Rossum776152b2007-05-22 22:44:07 +000047 self.assertEqual(zlib.adler32(b"", 0), 0)
48 self.assertEqual(zlib.adler32(b"", 1), 1)
49 self.assertEqual(zlib.adler32(b"", 432), 432)
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +000050
Guido van Rossum7d9ea502003-02-03 20:45:52 +000051 def test_penguins(self):
Martin Panterb82032f2015-12-11 05:19:29 +000052 self.assertEqual(zlib.crc32(b"penguin", 0), 0x0e5c1a120)
53 self.assertEqual(zlib.crc32(b"penguin", 1), 0x43b6aa94)
54 self.assertEqual(zlib.adler32(b"penguin", 0), 0x0bcf02f6)
55 self.assertEqual(zlib.adler32(b"penguin", 1), 0x0bd602f7)
Guido van Rossum7d9ea502003-02-03 20:45:52 +000056
Guido van Rossum776152b2007-05-22 22:44:07 +000057 self.assertEqual(zlib.crc32(b"penguin"), zlib.crc32(b"penguin", 0))
58 self.assertEqual(zlib.adler32(b"penguin"),zlib.adler32(b"penguin",1))
Guido van Rossum7d9ea502003-02-03 20:45:52 +000059
Gregory P. Smithab0d8a12008-03-17 20:24:09 +000060 def test_crc32_adler32_unsigned(self):
Antoine Pitrou77b338b2009-12-14 18:00:06 +000061 foo = b'abcdefghijklmnop'
Gregory P. Smithab0d8a12008-03-17 20:24:09 +000062 # explicitly test signed behavior
Gregory P. Smith27275032008-03-20 06:20:09 +000063 self.assertEqual(zlib.crc32(foo), 2486878355)
Antoine Pitrou77b338b2009-12-14 18:00:06 +000064 self.assertEqual(zlib.crc32(b'spam'), 1138425661)
Gregory P. Smithab0d8a12008-03-17 20:24:09 +000065 self.assertEqual(zlib.adler32(foo+foo), 3573550353)
Antoine Pitrou77b338b2009-12-14 18:00:06 +000066 self.assertEqual(zlib.adler32(b'spam'), 72286642)
Gregory P. Smithab0d8a12008-03-17 20:24:09 +000067
Christian Heimesd5e2b6f2008-03-19 21:50:51 +000068 def test_same_as_binascii_crc32(self):
Martin v. Löwis15b16a32008-12-02 06:00:15 +000069 foo = b'abcdefghijklmnop'
Gregory P. Smith27275032008-03-20 06:20:09 +000070 crc = 2486878355
Christian Heimesd5e2b6f2008-03-19 21:50:51 +000071 self.assertEqual(binascii.crc32(foo), crc)
72 self.assertEqual(zlib.crc32(foo), crc)
Martin v. Löwis15b16a32008-12-02 06:00:15 +000073 self.assertEqual(binascii.crc32(b'spam'), zlib.crc32(b'spam'))
Guido van Rossum7d9ea502003-02-03 20:45:52 +000074
75
Victor Stinner8c663fd2017-11-08 14:44:44 -080076# Issue #10276 - check that inputs >=4 GiB are handled correctly.
Antoine Pitrouf3d22752011-02-21 18:09:00 +000077class ChecksumBigBufferTestCase(unittest.TestCase):
78
Nadeem Vawdabc8c8172012-02-23 14:16:15 +020079 @bigmemtest(size=_4G + 4, memuse=1, dry_run=False)
80 def test_big_buffer(self, size):
Nadeem Vawdab063a482012-02-23 13:36:25 +020081 data = b"nyan" * (_1G + 1)
82 self.assertEqual(zlib.crc32(data), 1044521549)
83 self.assertEqual(zlib.adler32(data), 2256789997)
Antoine Pitrouf3d22752011-02-21 18:09:00 +000084
Christian Heimesb186d002008-03-18 15:15:01 +000085
Guido van Rossum7d9ea502003-02-03 20:45:52 +000086class ExceptionTestCase(unittest.TestCase):
87 # make sure we generate some expected errors
Guido van Rossum8ce8a782007-11-01 19:42:39 +000088 def test_badlevel(self):
89 # specifying compression level out of range causes an error
90 # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib
91 # accepts 0 too)
Antoine Pitrou77b338b2009-12-14 18:00:06 +000092 self.assertRaises(zlib.error, zlib.compress, b'ERROR', 10)
93
94 def test_badargs(self):
95 self.assertRaises(TypeError, zlib.adler32)
96 self.assertRaises(TypeError, zlib.crc32)
97 self.assertRaises(TypeError, zlib.compress)
98 self.assertRaises(TypeError, zlib.decompress)
99 for arg in (42, None, '', 'abc', (), []):
100 self.assertRaises(TypeError, zlib.adler32, arg)
101 self.assertRaises(TypeError, zlib.crc32, arg)
102 self.assertRaises(TypeError, zlib.compress, arg)
103 self.assertRaises(TypeError, zlib.decompress, arg)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000104
105 def test_badcompressobj(self):
106 # verify failure on building compress object with bad params
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000107 self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0)
Guido van Rossum8ce8a782007-11-01 19:42:39 +0000108 # specifying total bits too large causes an error
109 self.assertRaises(ValueError,
110 zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000111
112 def test_baddecompressobj(self):
113 # verify failure on building decompress object with bad params
Antoine Pitrou90ee4df2010-04-06 17:23:13 +0000114 self.assertRaises(ValueError, zlib.decompressobj, -1)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000115
Christian Heimes5e696852008-04-09 08:37:03 +0000116 def test_decompressobj_badflush(self):
117 # verify failure on calling decompressobj.flush with bad params
118 self.assertRaises(ValueError, zlib.decompressobj().flush, 0)
119 self.assertRaises(ValueError, zlib.decompressobj().flush, -1)
120
Martin Pantere99e9772015-11-20 08:13:35 +0000121 @support.cpython_only
122 def test_overflow(self):
123 with self.assertRaisesRegex(OverflowError, 'int too large'):
124 zlib.decompress(b'', 15, sys.maxsize + 1)
125 with self.assertRaisesRegex(OverflowError, 'int too large'):
Martin Panter84544c12016-07-23 03:02:07 +0000126 zlib.decompressobj().decompress(b'', sys.maxsize + 1)
127 with self.assertRaisesRegex(OverflowError, 'int too large'):
Martin Pantere99e9772015-11-20 08:13:35 +0000128 zlib.decompressobj().flush(sys.maxsize + 1)
129
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000130
Antoine Pitrou89562712010-05-07 17:04:02 +0000131class BaseCompressTestCase(object):
132 def check_big_compress_buffer(self, size, compress_func):
133 _1M = 1024 * 1024
Victor Stinner8c663fd2017-11-08 14:44:44 -0800134 # Generate 10 MiB worth of random, and expand it by repeating it.
Antoine Pitrou89562712010-05-07 17:04:02 +0000135 # The assumption is that zlib's memory is not big enough to exploit
136 # such spread out redundancy.
Victor Stinner87502dd2020-04-17 22:54:38 +0200137 data = random.randbytes(_1M * 10)
Antoine Pitrou89562712010-05-07 17:04:02 +0000138 data = data * (size // len(data) + 1)
139 try:
140 compress_func(data)
141 finally:
142 # Release memory
143 data = None
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000144
Antoine Pitrou89562712010-05-07 17:04:02 +0000145 def check_big_decompress_buffer(self, size, decompress_func):
146 data = b'x' * size
147 try:
148 compressed = zlib.compress(data, 1)
149 finally:
150 # Release memory
151 data = None
152 data = decompress_func(compressed)
153 # Sanity check
154 try:
155 self.assertEqual(len(data), size)
156 self.assertEqual(len(data.strip(b'x')), 0)
157 finally:
158 data = None
159
160
161class CompressTestCase(BaseCompressTestCase, unittest.TestCase):
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000162 # Test compression in one go (whole message compression)
163 def test_speech(self):
Neil Schemenauer6412b122004-06-05 19:34:28 +0000164 x = zlib.compress(HAMLET_SCENE)
165 self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000166
Martin Panter1fe0d132016-02-10 10:06:36 +0000167 def test_keywords(self):
Serhiy Storchaka95657cd2016-06-25 22:43:05 +0300168 x = zlib.compress(HAMLET_SCENE, level=3)
Martin Panter1fe0d132016-02-10 10:06:36 +0000169 self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
Serhiy Storchaka95657cd2016-06-25 22:43:05 +0300170 with self.assertRaises(TypeError):
171 zlib.compress(data=HAMLET_SCENE, level=3)
Serhiy Storchaka15f32282016-08-15 10:06:16 +0300172 self.assertEqual(zlib.decompress(x,
173 wbits=zlib.MAX_WBITS,
174 bufsize=zlib.DEF_BUF_SIZE),
175 HAMLET_SCENE)
Martin Panter1fe0d132016-02-10 10:06:36 +0000176
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000177 def test_speech128(self):
Neil Schemenauer6412b122004-06-05 19:34:28 +0000178 # compress more data
179 data = HAMLET_SCENE * 128
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000180 x = zlib.compress(data)
Antoine Pitrou77b338b2009-12-14 18:00:06 +0000181 self.assertEqual(zlib.compress(bytearray(data)), x)
182 for ob in x, bytearray(x):
183 self.assertEqual(zlib.decompress(ob), data)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000184
Antoine Pitrou53b21662010-05-11 23:46:02 +0000185 def test_incomplete_stream(self):
Martin Panter6245cb32016-04-15 02:14:19 +0000186 # A useful error message is given
Antoine Pitrou53b21662010-05-11 23:46:02 +0000187 x = zlib.compress(HAMLET_SCENE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000188 self.assertRaisesRegex(zlib.error,
Antoine Pitrou53b21662010-05-11 23:46:02 +0000189 "Error -5 while decompressing data: incomplete or truncated stream",
190 zlib.decompress, x[:-1])
191
Antoine Pitrou89562712010-05-07 17:04:02 +0000192 # Memory use of the following functions takes into account overallocation
193
Antoine Pitrou94190bb2011-10-04 10:22:36 +0200194 @bigmemtest(size=_1G + 1024 * 1024, memuse=3)
Antoine Pitrou89562712010-05-07 17:04:02 +0000195 def test_big_compress_buffer(self, size):
196 compress = lambda s: zlib.compress(s, 1)
197 self.check_big_compress_buffer(size, compress)
198
Antoine Pitrou94190bb2011-10-04 10:22:36 +0200199 @bigmemtest(size=_1G + 1024 * 1024, memuse=2)
Antoine Pitrou89562712010-05-07 17:04:02 +0000200 def test_big_decompress_buffer(self, size):
201 self.check_big_decompress_buffer(size, zlib.decompress)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000202
Martin Pantere99e9772015-11-20 08:13:35 +0000203 @bigmemtest(size=_4G, memuse=1)
204 def test_large_bufsize(self, size):
205 # Test decompress(bufsize) parameter greater than the internal limit
206 data = HAMLET_SCENE * 10
207 compressed = zlib.compress(data, 1)
208 self.assertEqual(zlib.decompress(compressed, 15, size), data)
209
210 def test_custom_bufsize(self):
211 data = HAMLET_SCENE * 10
212 compressed = zlib.compress(data, 1)
213 self.assertEqual(zlib.decompress(compressed, 15, CustomInt()), data)
214
Martin Panter84544c12016-07-23 03:02:07 +0000215 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
216 @bigmemtest(size=_4G + 100, memuse=4)
217 def test_64bit_compress(self, size):
218 data = b'x' * size
219 try:
220 comp = zlib.compress(data, 0)
221 self.assertEqual(zlib.decompress(comp), data)
222 finally:
223 comp = data = None
224
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000225
Antoine Pitrou89562712010-05-07 17:04:02 +0000226class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000227 # Test compression object
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000228 def test_pair(self):
Neil Schemenauer6412b122004-06-05 19:34:28 +0000229 # straightforward compress/decompress objects
Antoine Pitrou77b338b2009-12-14 18:00:06 +0000230 datasrc = HAMLET_SCENE * 128
231 datazip = zlib.compress(datasrc)
232 # should compress both bytes and bytearray data
233 for data in (datasrc, bytearray(datasrc)):
234 co = zlib.compressobj()
235 x1 = co.compress(data)
236 x2 = co.flush()
237 self.assertRaises(zlib.error, co.flush) # second flush should not work
238 self.assertEqual(x1 + x2, datazip)
239 for v1, v2 in ((x1, x2), (bytearray(x1), bytearray(x2))):
240 dco = zlib.decompressobj()
241 y1 = dco.decompress(v1 + v2)
242 y2 = dco.flush()
243 self.assertEqual(data, y1 + y2)
244 self.assertIsInstance(dco.unconsumed_tail, bytes)
245 self.assertIsInstance(dco.unused_data, bytes)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000246
Serhiy Storchaka15f32282016-08-15 10:06:16 +0300247 def test_keywords(self):
248 level = 2
249 method = zlib.DEFLATED
250 wbits = -12
251 memLevel = 9
252 strategy = zlib.Z_FILTERED
253 co = zlib.compressobj(level=level,
254 method=method,
255 wbits=wbits,
256 memLevel=memLevel,
257 strategy=strategy,
258 zdict=b"")
259 do = zlib.decompressobj(wbits=wbits, zdict=b"")
260 with self.assertRaises(TypeError):
261 co.compress(data=HAMLET_SCENE)
262 with self.assertRaises(TypeError):
263 do.decompress(data=zlib.compress(HAMLET_SCENE))
264 x = co.compress(HAMLET_SCENE) + co.flush()
265 y = do.decompress(x, max_length=len(HAMLET_SCENE)) + do.flush()
266 self.assertEqual(HAMLET_SCENE, y)
267
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000268 def test_compressoptions(self):
269 # specify lots of options to compressobj()
270 level = 2
271 method = zlib.DEFLATED
272 wbits = -12
Martin Panterbf19d162015-09-09 01:01:13 +0000273 memLevel = 9
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000274 strategy = zlib.Z_FILTERED
Martin Panterbf19d162015-09-09 01:01:13 +0000275 co = zlib.compressobj(level, method, wbits, memLevel, strategy)
Neil Schemenauer6412b122004-06-05 19:34:28 +0000276 x1 = co.compress(HAMLET_SCENE)
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000277 x2 = co.flush()
278 dco = zlib.decompressobj(wbits)
279 y1 = dco.decompress(x1 + x2)
280 y2 = dco.flush()
Neil Schemenauer6412b122004-06-05 19:34:28 +0000281 self.assertEqual(HAMLET_SCENE, y1 + y2)
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000282
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000283 def test_compressincremental(self):
284 # compress object in steps, decompress object as one-shot
Neil Schemenauer6412b122004-06-05 19:34:28 +0000285 data = HAMLET_SCENE * 128
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000286 co = zlib.compressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000287 bufs = []
288 for i in range(0, len(data), 256):
289 bufs.append(co.compress(data[i:i+256]))
290 bufs.append(co.flush())
Guido van Rossum776152b2007-05-22 22:44:07 +0000291 combuf = b''.join(bufs)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000292
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000293 dco = zlib.decompressobj()
Guido van Rossum776152b2007-05-22 22:44:07 +0000294 y1 = dco.decompress(b''.join(bufs))
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000295 y2 = dco.flush()
296 self.assertEqual(data, y1 + y2)
297
Neil Schemenauer6412b122004-06-05 19:34:28 +0000298 def test_decompinc(self, flush=False, source=None, cx=256, dcx=64):
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000299 # compress object in steps, decompress object in steps
Neil Schemenauer6412b122004-06-05 19:34:28 +0000300 source = source or HAMLET_SCENE
301 data = source * 128
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000302 co = zlib.compressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000303 bufs = []
Neil Schemenauer6412b122004-06-05 19:34:28 +0000304 for i in range(0, len(data), cx):
305 bufs.append(co.compress(data[i:i+cx]))
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000306 bufs.append(co.flush())
Guido van Rossum776152b2007-05-22 22:44:07 +0000307 combuf = b''.join(bufs)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000308
Gregory P. Smith693fc462008-09-06 20:13:06 +0000309 decombuf = zlib.decompress(combuf)
310 # Test type of return value
Ezio Melottie9615932010-01-24 19:26:24 +0000311 self.assertIsInstance(decombuf, bytes)
Gregory P. Smith693fc462008-09-06 20:13:06 +0000312
313 self.assertEqual(data, decombuf)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000314
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000315 dco = zlib.decompressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000316 bufs = []
Neil Schemenauer6412b122004-06-05 19:34:28 +0000317 for i in range(0, len(combuf), dcx):
318 bufs.append(dco.decompress(combuf[i:i+dcx]))
Guido van Rossum776152b2007-05-22 22:44:07 +0000319 self.assertEqual(b'', dco.unconsumed_tail, ########
320 "(A) uct should be b'': not %d long" %
Neil Schemenauer6412b122004-06-05 19:34:28 +0000321 len(dco.unconsumed_tail))
Amaury Forgeot d'Arce43d33a2008-07-02 20:50:16 +0000322 self.assertEqual(b'', dco.unused_data)
Neil Schemenauer6412b122004-06-05 19:34:28 +0000323 if flush:
324 bufs.append(dco.flush())
325 else:
326 while True:
Antoine Pitrou77b338b2009-12-14 18:00:06 +0000327 chunk = dco.decompress(b'')
Neil Schemenauer6412b122004-06-05 19:34:28 +0000328 if chunk:
329 bufs.append(chunk)
330 else:
331 break
Guido van Rossum776152b2007-05-22 22:44:07 +0000332 self.assertEqual(b'', dco.unconsumed_tail, ########
333 "(B) uct should be b'': not %d long" %
Neil Schemenauer6412b122004-06-05 19:34:28 +0000334 len(dco.unconsumed_tail))
Amaury Forgeot d'Arce43d33a2008-07-02 20:50:16 +0000335 self.assertEqual(b'', dco.unused_data)
Guido van Rossum776152b2007-05-22 22:44:07 +0000336 self.assertEqual(data, b''.join(bufs))
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000337 # Failure means: "decompressobj with init options failed"
338
Neil Schemenauer6412b122004-06-05 19:34:28 +0000339 def test_decompincflush(self):
340 self.test_decompinc(flush=True)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000341
Neil Schemenauer6412b122004-06-05 19:34:28 +0000342 def test_decompimax(self, source=None, cx=256, dcx=64):
343 # compress in steps, decompress in length-restricted steps
344 source = source or HAMLET_SCENE
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000345 # Check a decompression object with max_length specified
Neil Schemenauer6412b122004-06-05 19:34:28 +0000346 data = source * 128
347 co = zlib.compressobj()
348 bufs = []
349 for i in range(0, len(data), cx):
350 bufs.append(co.compress(data[i:i+cx]))
351 bufs.append(co.flush())
Guido van Rossum776152b2007-05-22 22:44:07 +0000352 combuf = b''.join(bufs)
Neil Schemenauer6412b122004-06-05 19:34:28 +0000353 self.assertEqual(data, zlib.decompress(combuf),
354 'compressed data failure')
355
356 dco = zlib.decompressobj()
357 bufs = []
358 cb = combuf
359 while cb:
360 #max_length = 1 + len(cb)//10
361 chunk = dco.decompress(cb, dcx)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000362 self.assertFalse(len(chunk) > dcx,
Neil Schemenauer6412b122004-06-05 19:34:28 +0000363 'chunk too big (%d>%d)' % (len(chunk), dcx))
364 bufs.append(chunk)
365 cb = dco.unconsumed_tail
366 bufs.append(dco.flush())
Guido van Rossum776152b2007-05-22 22:44:07 +0000367 self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved')
Neil Schemenauer6412b122004-06-05 19:34:28 +0000368
369 def test_decompressmaxlen(self, flush=False):
370 # Check a decompression object with max_length specified
371 data = HAMLET_SCENE * 128
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000372 co = zlib.compressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000373 bufs = []
374 for i in range(0, len(data), 256):
375 bufs.append(co.compress(data[i:i+256]))
376 bufs.append(co.flush())
Guido van Rossum776152b2007-05-22 22:44:07 +0000377 combuf = b''.join(bufs)
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000378 self.assertEqual(data, zlib.decompress(combuf),
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000379 'compressed data failure')
380
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000381 dco = zlib.decompressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000382 bufs = []
383 cb = combuf
384 while cb:
Guido van Rossumf3594102003-02-27 18:39:18 +0000385 max_length = 1 + len(cb)//10
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000386 chunk = dco.decompress(cb, max_length)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000387 self.assertFalse(len(chunk) > max_length,
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000388 'chunk too big (%d>%d)' % (len(chunk),max_length))
389 bufs.append(chunk)
390 cb = dco.unconsumed_tail
Neil Schemenauer6412b122004-06-05 19:34:28 +0000391 if flush:
392 bufs.append(dco.flush())
393 else:
394 while chunk:
Antoine Pitrou77b338b2009-12-14 18:00:06 +0000395 chunk = dco.decompress(b'', max_length)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000396 self.assertFalse(len(chunk) > max_length,
Neil Schemenauer6412b122004-06-05 19:34:28 +0000397 'chunk too big (%d>%d)' % (len(chunk),max_length))
398 bufs.append(chunk)
Guido van Rossum776152b2007-05-22 22:44:07 +0000399 self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved')
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000400
Neil Schemenauer6412b122004-06-05 19:34:28 +0000401 def test_decompressmaxlenflush(self):
402 self.test_decompressmaxlen(flush=True)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000403
404 def test_maxlenmisc(self):
405 # Misc tests of max_length
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000406 dco = zlib.decompressobj()
Antoine Pitrou77b338b2009-12-14 18:00:06 +0000407 self.assertRaises(ValueError, dco.decompress, b"", -1)
Guido van Rossum776152b2007-05-22 22:44:07 +0000408 self.assertEqual(b'', dco.unconsumed_tail)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000409
Martin Pantere99e9772015-11-20 08:13:35 +0000410 def test_maxlen_large(self):
411 # Sizes up to sys.maxsize should be accepted, although zlib is
412 # internally limited to expressing sizes with unsigned int
413 data = HAMLET_SCENE * 10
414 self.assertGreater(len(data), zlib.DEF_BUF_SIZE)
415 compressed = zlib.compress(data, 1)
416 dco = zlib.decompressobj()
417 self.assertEqual(dco.decompress(compressed, sys.maxsize), data)
418
419 def test_maxlen_custom(self):
420 data = HAMLET_SCENE * 10
421 compressed = zlib.compress(data, 1)
422 dco = zlib.decompressobj()
423 self.assertEqual(dco.decompress(compressed, CustomInt()), data[:100])
424
Nadeem Vawda7619e882011-05-14 14:05:20 +0200425 def test_clear_unconsumed_tail(self):
426 # Issue #12050: calling decompress() without providing max_length
427 # should clear the unconsumed_tail attribute.
428 cdata = b"x\x9cKLJ\x06\x00\x02M\x01" # "abc"
429 dco = zlib.decompressobj()
430 ddata = dco.decompress(cdata, 1)
431 ddata += dco.decompress(dco.unconsumed_tail)
432 self.assertEqual(dco.unconsumed_tail, b"")
433
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000434 def test_flushes(self):
435 # Test flush() with the various options, using all the
436 # different levels in order to provide more variations.
Xiang Zhangbc3f2282018-03-07 13:05:37 +0800437 sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH',
Steve Dower57675092018-09-24 07:44:50 -0400438 'Z_PARTIAL_FLUSH']
439
440 ver = tuple(int(v) for v in zlib.ZLIB_RUNTIME_VERSION.split('.'))
441 # Z_BLOCK has a known failure prior to 1.2.5.3
442 if ver >= (1, 2, 5, 3):
443 sync_opt.append('Z_BLOCK')
444
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000445 sync_opt = [getattr(zlib, opt) for opt in sync_opt
446 if hasattr(zlib, opt)]
Neil Schemenauer6412b122004-06-05 19:34:28 +0000447 data = HAMLET_SCENE * 8
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000448
449 for sync in sync_opt:
450 for level in range(10):
Steve Dower57675092018-09-24 07:44:50 -0400451 try:
452 obj = zlib.compressobj( level )
453 a = obj.compress( data[:3000] )
454 b = obj.flush( sync )
455 c = obj.compress( data[3000:] )
456 d = obj.flush()
457 except:
458 print("Error for flush mode={}, level={}"
459 .format(sync, level))
460 raise
Guido van Rossum776152b2007-05-22 22:44:07 +0000461 self.assertEqual(zlib.decompress(b''.join([a,b,c,d])),
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000462 data, ("Decompress failed: flush "
463 "mode=%i, level=%i") % (sync, level))
464 del obj
465
Serhiy Storchaka43767632013-11-03 21:31:38 +0200466 @unittest.skipUnless(hasattr(zlib, 'Z_SYNC_FLUSH'),
467 'requires zlib.Z_SYNC_FLUSH')
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000468 def test_odd_flush(self):
469 # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
470 import random
Serhiy Storchaka43767632013-11-03 21:31:38 +0200471 # Testing on 17K of "random" data
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000472
Serhiy Storchaka43767632013-11-03 21:31:38 +0200473 # Create compressor and decompressor objects
474 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
475 dco = zlib.decompressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000476
Serhiy Storchaka43767632013-11-03 21:31:38 +0200477 # Try 17K of data
478 # generate random data stream
479 try:
480 # In 2.3 and later, WichmannHill is the RNG of the bug report
481 gen = random.WichmannHill()
482 except AttributeError:
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000483 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +0200484 # 2.2 called it Random
485 gen = random.Random()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000486 except AttributeError:
Serhiy Storchaka43767632013-11-03 21:31:38 +0200487 # others might simply have a single RNG
488 gen = random
489 gen.seed(1)
Victor Stinner87502dd2020-04-17 22:54:38 +0200490 data = gen.randbytes(17 * 1024)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000491
Serhiy Storchaka43767632013-11-03 21:31:38 +0200492 # compress, sync-flush, and decompress
493 first = co.compress(data)
494 second = co.flush(zlib.Z_SYNC_FLUSH)
495 expanded = dco.decompress(first + second)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000496
Serhiy Storchaka43767632013-11-03 21:31:38 +0200497 # if decompressed data is different from the input data, choke.
498 self.assertEqual(expanded, data, "17K random source doesn't match")
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000499
Andrew M. Kuchling3b585b32004-12-28 20:10:48 +0000500 def test_empty_flush(self):
501 # Test that calling .flush() on unused objects works.
502 # (Bug #1083110 -- calling .flush() on decompress objects
503 # caused a core dump.)
504
505 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000506 self.assertTrue(co.flush()) # Returns a zlib header
Andrew M. Kuchling3b585b32004-12-28 20:10:48 +0000507 dco = zlib.decompressobj()
Guido van Rossum776152b2007-05-22 22:44:07 +0000508 self.assertEqual(dco.flush(), b"") # Returns nothing
Tim Peters5a9fb3c2005-01-07 16:01:32 +0000509
Nadeem Vawdafd8a8382012-06-21 02:13:12 +0200510 def test_dictionary(self):
511 h = HAMLET_SCENE
Nadeem Vawdacf5e1d82012-06-22 00:35:57 +0200512 # Build a simulated dictionary out of the words in HAMLET.
Nadeem Vawdafd8a8382012-06-21 02:13:12 +0200513 words = h.split()
514 random.shuffle(words)
515 zdict = b''.join(words)
Nadeem Vawdacf5e1d82012-06-22 00:35:57 +0200516 # Use it to compress HAMLET.
Nadeem Vawdafd8a8382012-06-21 02:13:12 +0200517 co = zlib.compressobj(zdict=zdict)
518 cd = co.compress(h) + co.flush()
Nadeem Vawdacf5e1d82012-06-22 00:35:57 +0200519 # Verify that it will decompress with the dictionary.
Nadeem Vawdafd8a8382012-06-21 02:13:12 +0200520 dco = zlib.decompressobj(zdict=zdict)
521 self.assertEqual(dco.decompress(cd) + dco.flush(), h)
Nadeem Vawdacf5e1d82012-06-22 00:35:57 +0200522 # Verify that it fails when not given the dictionary.
Nadeem Vawdafd8a8382012-06-21 02:13:12 +0200523 dco = zlib.decompressobj()
524 self.assertRaises(zlib.error, dco.decompress, cd)
525
526 def test_dictionary_streaming(self):
Nadeem Vawdacf5e1d82012-06-22 00:35:57 +0200527 # This simulates the reuse of a compressor object for compressing
528 # several separate data streams.
Nadeem Vawdafd8a8382012-06-21 02:13:12 +0200529 co = zlib.compressobj(zdict=HAMLET_SCENE)
530 do = zlib.decompressobj(zdict=HAMLET_SCENE)
531 piece = HAMLET_SCENE[1000:1500]
532 d0 = co.compress(piece) + co.flush(zlib.Z_SYNC_FLUSH)
533 d1 = co.compress(piece[100:]) + co.flush(zlib.Z_SYNC_FLUSH)
534 d2 = co.compress(piece[:-100]) + co.flush(zlib.Z_SYNC_FLUSH)
535 self.assertEqual(do.decompress(d0), piece)
536 self.assertEqual(do.decompress(d1), piece[100:])
537 self.assertEqual(do.decompress(d2), piece[:-100])
538
Antoine Pitrouc09c92f2010-05-11 23:36:40 +0000539 def test_decompress_incomplete_stream(self):
540 # This is 'foo', deflated
541 x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'
542 # For the record
543 self.assertEqual(zlib.decompress(x), b'foo')
544 self.assertRaises(zlib.error, zlib.decompress, x[:-5])
545 # Omitting the stream end works with decompressor objects
546 # (see issue #8672).
547 dco = zlib.decompressobj()
548 y = dco.decompress(x[:-5])
549 y += dco.flush()
550 self.assertEqual(y, b'foo')
551
Nadeem Vawda1c385462011-08-13 15:22:40 +0200552 def test_decompress_eof(self):
553 x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' # 'foo'
554 dco = zlib.decompressobj()
555 self.assertFalse(dco.eof)
556 dco.decompress(x[:-5])
557 self.assertFalse(dco.eof)
558 dco.decompress(x[-5:])
559 self.assertTrue(dco.eof)
560 dco.flush()
561 self.assertTrue(dco.eof)
562
563 def test_decompress_eof_incomplete_stream(self):
564 x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' # 'foo'
565 dco = zlib.decompressobj()
566 self.assertFalse(dco.eof)
567 dco.decompress(x[:-5])
568 self.assertFalse(dco.eof)
569 dco.flush()
570 self.assertFalse(dco.eof)
571
Nadeem Vawda39079942012-11-05 00:37:42 +0100572 def test_decompress_unused_data(self):
573 # Repeated calls to decompress() after EOF should accumulate data in
574 # dco.unused_data, instead of just storing the arg to the last call.
Nadeem Vawdaee7889d2012-11-11 02:14:36 +0100575 source = b'abcdefghijklmnopqrstuvwxyz'
576 remainder = b'0123456789'
577 y = zlib.compress(source)
578 x = y + remainder
579 for maxlen in 0, 1000:
580 for step in 1, 2, len(y), len(x):
581 dco = zlib.decompressobj()
582 data = b''
583 for i in range(0, len(x), step):
584 if i < len(y):
585 self.assertEqual(dco.unused_data, b'')
586 if maxlen == 0:
587 data += dco.decompress(x[i : i + step])
588 self.assertEqual(dco.unconsumed_tail, b'')
589 else:
590 data += dco.decompress(
591 dco.unconsumed_tail + x[i : i + step], maxlen)
592 data += dco.flush()
Nadeem Vawdadd1253a2012-11-11 02:21:22 +0100593 self.assertTrue(dco.eof)
Nadeem Vawdaee7889d2012-11-11 02:14:36 +0100594 self.assertEqual(data, source)
595 self.assertEqual(dco.unconsumed_tail, b'')
596 self.assertEqual(dco.unused_data, remainder)
Nadeem Vawda39079942012-11-05 00:37:42 +0100597
Martin Panter3f0ee832016-06-05 10:48:34 +0000598 # issue27164
599 def test_decompress_raw_with_dictionary(self):
600 zdict = b'abcdefghijklmnopqrstuvwxyz'
601 co = zlib.compressobj(wbits=-zlib.MAX_WBITS, zdict=zdict)
602 comp = co.compress(zdict) + co.flush()
603 dco = zlib.decompressobj(wbits=-zlib.MAX_WBITS, zdict=zdict)
604 uncomp = dco.decompress(comp) + dco.flush()
605 self.assertEqual(zdict, uncomp)
606
Nadeem Vawda7ee95552012-11-11 03:15:32 +0100607 def test_flush_with_freed_input(self):
608 # Issue #16411: decompressor accesses input to last decompress() call
609 # in flush(), even if this object has been freed in the meanwhile.
610 input1 = b'abcdefghijklmnopqrstuvwxyz'
611 input2 = b'QWERTYUIOPASDFGHJKLZXCVBNM'
612 data = zlib.compress(input1)
613 dco = zlib.decompressobj()
614 dco.decompress(data, 1)
615 del data
616 data = zlib.compress(input2)
617 self.assertEqual(dco.flush(), input1[1:])
618
Martin Pantere99e9772015-11-20 08:13:35 +0000619 @bigmemtest(size=_4G, memuse=1)
620 def test_flush_large_length(self, size):
621 # Test flush(length) parameter greater than internal limit UINT_MAX
622 input = HAMLET_SCENE * 10
623 data = zlib.compress(input, 1)
624 dco = zlib.decompressobj()
625 dco.decompress(data, 1)
626 self.assertEqual(dco.flush(size), input[1:])
627
628 def test_flush_custom_length(self):
629 input = HAMLET_SCENE * 10
630 data = zlib.compress(input, 1)
631 dco = zlib.decompressobj()
632 dco.decompress(data, 1)
633 self.assertEqual(dco.flush(CustomInt()), input[1:])
634
Serhiy Storchaka43767632013-11-03 21:31:38 +0200635 @requires_Compress_copy
636 def test_compresscopy(self):
637 # Test copying a compression object
638 data0 = HAMLET_SCENE
639 data1 = bytes(str(HAMLET_SCENE, "ascii").swapcase(), "ascii")
Zackery Spytzd2cbfff2018-06-27 12:04:51 -0600640 for func in lambda c: c.copy(), copy.copy, copy.deepcopy:
641 c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
642 bufs0 = []
643 bufs0.append(c0.compress(data0))
Thomas Wouters477c8d52006-05-27 19:21:47 +0000644
Zackery Spytzd2cbfff2018-06-27 12:04:51 -0600645 c1 = func(c0)
646 bufs1 = bufs0[:]
Thomas Wouters477c8d52006-05-27 19:21:47 +0000647
Zackery Spytzd2cbfff2018-06-27 12:04:51 -0600648 bufs0.append(c0.compress(data0))
649 bufs0.append(c0.flush())
650 s0 = b''.join(bufs0)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000651
Zackery Spytzd2cbfff2018-06-27 12:04:51 -0600652 bufs1.append(c1.compress(data1))
653 bufs1.append(c1.flush())
654 s1 = b''.join(bufs1)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000655
Zackery Spytzd2cbfff2018-06-27 12:04:51 -0600656 self.assertEqual(zlib.decompress(s0),data0+data0)
657 self.assertEqual(zlib.decompress(s1),data0+data1)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000658
Serhiy Storchaka43767632013-11-03 21:31:38 +0200659 @requires_Compress_copy
660 def test_badcompresscopy(self):
661 # Test copying a compression object in an inconsistent state
662 c = zlib.compressobj()
663 c.compress(HAMLET_SCENE)
664 c.flush()
665 self.assertRaises(ValueError, c.copy)
Zackery Spytzd2cbfff2018-06-27 12:04:51 -0600666 self.assertRaises(ValueError, copy.copy, c)
667 self.assertRaises(ValueError, copy.deepcopy, c)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000668
Serhiy Storchaka43767632013-11-03 21:31:38 +0200669 @requires_Decompress_copy
670 def test_decompresscopy(self):
671 # Test copying a decompression object
672 data = HAMLET_SCENE
673 comp = zlib.compress(data)
674 # Test type of return value
675 self.assertIsInstance(comp, bytes)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000676
Zackery Spytzd2cbfff2018-06-27 12:04:51 -0600677 for func in lambda c: c.copy(), copy.copy, copy.deepcopy:
678 d0 = zlib.decompressobj()
679 bufs0 = []
680 bufs0.append(d0.decompress(comp[:32]))
Thomas Wouters477c8d52006-05-27 19:21:47 +0000681
Zackery Spytzd2cbfff2018-06-27 12:04:51 -0600682 d1 = func(d0)
683 bufs1 = bufs0[:]
Thomas Wouters477c8d52006-05-27 19:21:47 +0000684
Zackery Spytzd2cbfff2018-06-27 12:04:51 -0600685 bufs0.append(d0.decompress(comp[32:]))
686 s0 = b''.join(bufs0)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000687
Zackery Spytzd2cbfff2018-06-27 12:04:51 -0600688 bufs1.append(d1.decompress(comp[32:]))
689 s1 = b''.join(bufs1)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000690
Zackery Spytzd2cbfff2018-06-27 12:04:51 -0600691 self.assertEqual(s0,s1)
692 self.assertEqual(s0,data)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000693
Serhiy Storchaka43767632013-11-03 21:31:38 +0200694 @requires_Decompress_copy
695 def test_baddecompresscopy(self):
696 # Test copying a compression object in an inconsistent state
697 data = zlib.compress(HAMLET_SCENE)
698 d = zlib.decompressobj()
699 d.decompress(data)
700 d.flush()
701 self.assertRaises(ValueError, d.copy)
Zackery Spytzd2cbfff2018-06-27 12:04:51 -0600702 self.assertRaises(ValueError, copy.copy, d)
703 self.assertRaises(ValueError, copy.deepcopy, d)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000704
Serhiy Storchakad7a44152015-11-12 11:23:04 +0200705 def test_compresspickle(self):
706 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
707 with self.assertRaises((TypeError, pickle.PicklingError)):
708 pickle.dumps(zlib.compressobj(zlib.Z_BEST_COMPRESSION), proto)
709
710 def test_decompresspickle(self):
711 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
712 with self.assertRaises((TypeError, pickle.PicklingError)):
713 pickle.dumps(zlib.decompressobj(), proto)
714
Antoine Pitrou89562712010-05-07 17:04:02 +0000715 # Memory use of the following functions takes into account overallocation
716
Antoine Pitrou94190bb2011-10-04 10:22:36 +0200717 @bigmemtest(size=_1G + 1024 * 1024, memuse=3)
Antoine Pitrou89562712010-05-07 17:04:02 +0000718 def test_big_compress_buffer(self, size):
719 c = zlib.compressobj(1)
720 compress = lambda s: c.compress(s) + c.flush()
721 self.check_big_compress_buffer(size, compress)
722
Antoine Pitrou94190bb2011-10-04 10:22:36 +0200723 @bigmemtest(size=_1G + 1024 * 1024, memuse=2)
Antoine Pitrou89562712010-05-07 17:04:02 +0000724 def test_big_decompress_buffer(self, size):
725 d = zlib.decompressobj()
726 decompress = lambda s: d.decompress(s) + d.flush()
727 self.check_big_decompress_buffer(size, decompress)
728
Martin Panter84544c12016-07-23 03:02:07 +0000729 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
730 @bigmemtest(size=_4G + 100, memuse=4)
731 def test_64bit_compress(self, size):
Nadeem Vawda0c3d96a2011-05-15 00:19:50 +0200732 data = b'x' * size
Martin Panter84544c12016-07-23 03:02:07 +0000733 co = zlib.compressobj(0)
734 do = zlib.decompressobj()
Nadeem Vawda0c3d96a2011-05-15 00:19:50 +0200735 try:
Martin Panter84544c12016-07-23 03:02:07 +0000736 comp = co.compress(data) + co.flush()
737 uncomp = do.decompress(comp) + do.flush()
738 self.assertEqual(uncomp, data)
Nadeem Vawda0c3d96a2011-05-15 00:19:50 +0200739 finally:
Martin Panter84544c12016-07-23 03:02:07 +0000740 comp = uncomp = data = None
741
742 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
743 @bigmemtest(size=_4G + 100, memuse=3)
744 def test_large_unused_data(self, size):
745 data = b'abcdefghijklmnop'
746 unused = b'x' * size
747 comp = zlib.compress(data) + unused
748 do = zlib.decompressobj()
749 try:
750 uncomp = do.decompress(comp) + do.flush()
751 self.assertEqual(unused, do.unused_data)
752 self.assertEqual(uncomp, data)
753 finally:
754 unused = comp = do = None
755
756 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
757 @bigmemtest(size=_4G + 100, memuse=5)
758 def test_large_unconsumed_tail(self, size):
759 data = b'x' * size
760 do = zlib.decompressobj()
761 try:
762 comp = zlib.compress(data, 0)
763 uncomp = do.decompress(comp, 1) + do.flush()
764 self.assertEqual(uncomp, data)
765 self.assertEqual(do.unconsumed_tail, b'')
766 finally:
767 comp = uncomp = data = None
Nadeem Vawda0c3d96a2011-05-15 00:19:50 +0200768
Martin Panter0fdf41d2016-05-27 07:32:11 +0000769 def test_wbits(self):
Martin Panterc618ae82016-05-27 11:20:21 +0000770 # wbits=0 only supported since zlib v1.2.3.5
771 # Register "1.2.3" as "1.2.3.0"
pmp-p4c7108a2018-02-19 04:45:11 +0100772 # or "1.2.0-linux","1.2.0.f","1.2.0.f-linux"
773 v = zlib.ZLIB_RUNTIME_VERSION.split('-', 1)[0].split('.')
774 if len(v) < 4:
775 v.append('0')
776 elif not v[-1].isnumeric():
777 v[-1] = '0'
778
779 v = tuple(map(int, v))
780 supports_wbits_0 = v >= (1, 2, 3, 5)
Martin Panterc618ae82016-05-27 11:20:21 +0000781
Martin Panter0fdf41d2016-05-27 07:32:11 +0000782 co = zlib.compressobj(level=1, wbits=15)
783 zlib15 = co.compress(HAMLET_SCENE) + co.flush()
784 self.assertEqual(zlib.decompress(zlib15, 15), HAMLET_SCENE)
Martin Panterc618ae82016-05-27 11:20:21 +0000785 if supports_wbits_0:
786 self.assertEqual(zlib.decompress(zlib15, 0), HAMLET_SCENE)
Martin Panter0fdf41d2016-05-27 07:32:11 +0000787 self.assertEqual(zlib.decompress(zlib15, 32 + 15), HAMLET_SCENE)
788 with self.assertRaisesRegex(zlib.error, 'invalid window size'):
789 zlib.decompress(zlib15, 14)
790 dco = zlib.decompressobj(wbits=32 + 15)
791 self.assertEqual(dco.decompress(zlib15), HAMLET_SCENE)
792 dco = zlib.decompressobj(wbits=14)
793 with self.assertRaisesRegex(zlib.error, 'invalid window size'):
794 dco.decompress(zlib15)
795
796 co = zlib.compressobj(level=1, wbits=9)
797 zlib9 = co.compress(HAMLET_SCENE) + co.flush()
798 self.assertEqual(zlib.decompress(zlib9, 9), HAMLET_SCENE)
799 self.assertEqual(zlib.decompress(zlib9, 15), HAMLET_SCENE)
Martin Panterc618ae82016-05-27 11:20:21 +0000800 if supports_wbits_0:
801 self.assertEqual(zlib.decompress(zlib9, 0), HAMLET_SCENE)
Martin Panter0fdf41d2016-05-27 07:32:11 +0000802 self.assertEqual(zlib.decompress(zlib9, 32 + 9), HAMLET_SCENE)
803 dco = zlib.decompressobj(wbits=32 + 9)
804 self.assertEqual(dco.decompress(zlib9), HAMLET_SCENE)
805
806 co = zlib.compressobj(level=1, wbits=-15)
807 deflate15 = co.compress(HAMLET_SCENE) + co.flush()
808 self.assertEqual(zlib.decompress(deflate15, -15), HAMLET_SCENE)
809 dco = zlib.decompressobj(wbits=-15)
810 self.assertEqual(dco.decompress(deflate15), HAMLET_SCENE)
811
812 co = zlib.compressobj(level=1, wbits=-9)
813 deflate9 = co.compress(HAMLET_SCENE) + co.flush()
814 self.assertEqual(zlib.decompress(deflate9, -9), HAMLET_SCENE)
815 self.assertEqual(zlib.decompress(deflate9, -15), HAMLET_SCENE)
816 dco = zlib.decompressobj(wbits=-9)
817 self.assertEqual(dco.decompress(deflate9), HAMLET_SCENE)
818
819 co = zlib.compressobj(level=1, wbits=16 + 15)
820 gzip = co.compress(HAMLET_SCENE) + co.flush()
821 self.assertEqual(zlib.decompress(gzip, 16 + 15), HAMLET_SCENE)
822 self.assertEqual(zlib.decompress(gzip, 32 + 15), HAMLET_SCENE)
823 dco = zlib.decompressobj(32 + 15)
824 self.assertEqual(dco.decompress(gzip), HAMLET_SCENE)
825
Antoine Pitrou89562712010-05-07 17:04:02 +0000826
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000827def choose_lines(source, number, seed=None, generator=random):
828 """Return a list of number lines randomly chosen from the source"""
829 if seed is not None:
830 generator.seed(seed)
831 sources = source.split('\n')
832 return [generator.choice(sources) for n in range(number)]
833
834
Guido van Rossum776152b2007-05-22 22:44:07 +0000835HAMLET_SCENE = b"""
Fred Drake004d5e62000-10-23 17:22:08 +0000836LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000837
838 O, fear me not.
839 I stay too long: but here my father comes.
840
841 Enter POLONIUS
842
843 A double blessing is a double grace,
844 Occasion smiles upon a second leave.
845
Fred Drake004d5e62000-10-23 17:22:08 +0000846LORD POLONIUS
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000847
848 Yet here, Laertes! aboard, aboard, for shame!
849 The wind sits in the shoulder of your sail,
850 And you are stay'd for. There; my blessing with thee!
851 And these few precepts in thy memory
852 See thou character. Give thy thoughts no tongue,
853 Nor any unproportioned thought his act.
854 Be thou familiar, but by no means vulgar.
855 Those friends thou hast, and their adoption tried,
856 Grapple them to thy soul with hoops of steel;
857 But do not dull thy palm with entertainment
858 Of each new-hatch'd, unfledged comrade. Beware
859 Of entrance to a quarrel, but being in,
860 Bear't that the opposed may beware of thee.
861 Give every man thy ear, but few thy voice;
862 Take each man's censure, but reserve thy judgment.
863 Costly thy habit as thy purse can buy,
864 But not express'd in fancy; rich, not gaudy;
865 For the apparel oft proclaims the man,
866 And they in France of the best rank and station
867 Are of a most select and generous chief in that.
868 Neither a borrower nor a lender be;
869 For loan oft loses both itself and friend,
870 And borrowing dulls the edge of husbandry.
871 This above all: to thine ownself be true,
872 And it must follow, as the night the day,
873 Thou canst not then be false to any man.
874 Farewell: my blessing season this in thee!
875
Fred Drake004d5e62000-10-23 17:22:08 +0000876LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000877
878 Most humbly do I take my leave, my lord.
879
Fred Drake004d5e62000-10-23 17:22:08 +0000880LORD POLONIUS
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000881
882 The time invites you; go; your servants tend.
883
Fred Drake004d5e62000-10-23 17:22:08 +0000884LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000885
886 Farewell, Ophelia; and remember well
887 What I have said to you.
888
Fred Drake004d5e62000-10-23 17:22:08 +0000889OPHELIA
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000890
891 'Tis in my memory lock'd,
892 And you yourself shall keep the key of it.
893
Fred Drake004d5e62000-10-23 17:22:08 +0000894LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000895
896 Farewell.
897"""
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000898
899
Martin Pantere99e9772015-11-20 08:13:35 +0000900class CustomInt:
Serhiy Storchaka6a44f6e2019-02-25 17:57:58 +0200901 def __index__(self):
Martin Pantere99e9772015-11-20 08:13:35 +0000902 return 100
903
904
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000905if __name__ == "__main__":
Zachary Ware38c707e2015-04-13 15:00:43 -0500906 unittest.main()