blob: 5dcb83f3f17c8f7b6000da0d8b558b2934d00b1d [file] [log] [blame]
Guido van Rossum7d9ea502003-02-03 20:45:52 +00001import unittest
Victor Stinnerf9fb4342011-05-03 15:19:23 +02002from test.test_support import TESTFN, run_unittest, import_module, unlink, requires
Gregory P. Smithc856fa82008-03-18 22:27:41 +00003import binascii
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +00004import random
Victor Stinner7fd90c42011-05-04 21:27:39 +02005from test.test_support import precisionbigmemtest, _1G, _4G
Victor Stinnerf9fb4342011-05-03 15:19:23 +02006import sys
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +00007
Victor Stinnerf9fb4342011-05-03 15:19:23 +02008try:
9 import mmap
10except ImportError:
11 mmap = None
12
13zlib = import_module('zlib')
R. David Murray3db8a342009-03-30 23:05:48 +000014
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +000015
Guido van Rossum7d9ea502003-02-03 20:45:52 +000016class ChecksumTestCase(unittest.TestCase):
17 # checksum test cases
18 def test_crc32start(self):
19 self.assertEqual(zlib.crc32(""), zlib.crc32("", 0))
Benjamin Peterson5c8da862009-06-30 22:57:08 +000020 self.assertTrue(zlib.crc32("abc", 0xffffffff))
Andrew M. Kuchlingfcfc8d52001-08-10 15:50:11 +000021
Guido van Rossum7d9ea502003-02-03 20:45:52 +000022 def test_crc32empty(self):
23 self.assertEqual(zlib.crc32("", 0), 0)
24 self.assertEqual(zlib.crc32("", 1), 1)
25 self.assertEqual(zlib.crc32("", 432), 432)
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +000026
Guido van Rossum7d9ea502003-02-03 20:45:52 +000027 def test_adler32start(self):
28 self.assertEqual(zlib.adler32(""), zlib.adler32("", 1))
Benjamin Peterson5c8da862009-06-30 22:57:08 +000029 self.assertTrue(zlib.adler32("abc", 0xffffffff))
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +000030
Guido van Rossum7d9ea502003-02-03 20:45:52 +000031 def test_adler32empty(self):
32 self.assertEqual(zlib.adler32("", 0), 0)
33 self.assertEqual(zlib.adler32("", 1), 1)
34 self.assertEqual(zlib.adler32("", 432), 432)
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +000035
Guido van Rossum7d9ea502003-02-03 20:45:52 +000036 def assertEqual32(self, seen, expected):
37 # 32-bit values masked -- checksums on 32- vs 64- bit machines
38 # This is important if bit 31 (0x08000000L) is set.
39 self.assertEqual(seen & 0x0FFFFFFFFL, expected & 0x0FFFFFFFFL)
40
41 def test_penguins(self):
42 self.assertEqual32(zlib.crc32("penguin", 0), 0x0e5c1a120L)
43 self.assertEqual32(zlib.crc32("penguin", 1), 0x43b6aa94)
44 self.assertEqual32(zlib.adler32("penguin", 0), 0x0bcf02f6)
45 self.assertEqual32(zlib.adler32("penguin", 1), 0x0bd602f7)
46
47 self.assertEqual(zlib.crc32("penguin"), zlib.crc32("penguin", 0))
48 self.assertEqual(zlib.adler32("penguin"),zlib.adler32("penguin",1))
49
Gregory P. Smithf48f9d32008-03-17 18:48:05 +000050 def test_abcdefghijklmnop(self):
51 """test issue1202 compliance: signed crc32, adler32 in 2.x"""
52 foo = 'abcdefghijklmnop'
53 # explicitly test signed behavior
54 self.assertEqual(zlib.crc32(foo), -1808088941)
55 self.assertEqual(zlib.crc32('spam'), 1138425661)
56 self.assertEqual(zlib.adler32(foo+foo), -721416943)
57 self.assertEqual(zlib.adler32('spam'), 72286642)
58
Gregory P. Smithc856fa82008-03-18 22:27:41 +000059 def test_same_as_binascii_crc32(self):
60 foo = 'abcdefghijklmnop'
61 self.assertEqual(binascii.crc32(foo), zlib.crc32(foo))
62 self.assertEqual(binascii.crc32('spam'), zlib.crc32('spam'))
63
Gregory P. Smith88440962008-03-25 06:12:45 +000064 def test_negative_crc_iv_input(self):
65 # The range of valid input values for the crc state should be
66 # -2**31 through 2**32-1 to allow inputs artifically constrained
67 # to a signed 32-bit integer.
68 self.assertEqual(zlib.crc32('ham', -1), zlib.crc32('ham', 0xffffffffL))
69 self.assertEqual(zlib.crc32('spam', -3141593),
70 zlib.crc32('spam', 0xffd01027L))
71 self.assertEqual(zlib.crc32('spam', -(2**31)),
72 zlib.crc32('spam', (2**31)))
Guido van Rossum7d9ea502003-02-03 20:45:52 +000073
74
Victor Stinner7fd90c42011-05-04 21:27:39 +020075# Issue #11277 - check that inputs of 2 GB (or 1 GB on 32 bits system) are
76# handled correctly. Be aware of issues #1202. We cannot test a buffer of 4 GB
77# or more (#8650, #8651 and #10276), because the zlib stores the buffer size
78# into an int.
Victor Stinnerf9fb4342011-05-03 15:19:23 +020079class ChecksumBigBufferTestCase(unittest.TestCase):
Victor Stinner7fd90c42011-05-04 21:27:39 +020080 if sys.maxsize > _4G:
81 # (64 bits system) crc32() and adler32() stores the buffer size into an
82 # int, the maximum filesize is INT_MAX (0x7FFFFFFF)
83 filesize = 0x7FFFFFFF
84 else:
85 # (32 bits system) On a 32 bits OS, a process cannot usually address
86 # more than 2 GB, so test only 1 GB
87 filesize = _1G
Victor Stinnerf9fb4342011-05-03 15:19:23 +020088
89 @unittest.skipUnless(mmap, "mmap() is not available.")
90 def test_big_buffer(self):
91 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
92 requires('largefile',
93 'test requires %s bytes and a long time to run' %
Victor Stinner7fd90c42011-05-04 21:27:39 +020094 str(self.filesize))
Victor Stinnerf9fb4342011-05-03 15:19:23 +020095 try:
96 with open(TESTFN, "wb+") as f:
Victor Stinner7fd90c42011-05-04 21:27:39 +020097 f.seek(self.filesize-4)
Victor Stinnerf9fb4342011-05-03 15:19:23 +020098 f.write("asdf")
99 f.flush()
Victor Stinnere4163e22011-05-03 17:25:28 +0200100 m = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
Victor Stinnerf9fb4342011-05-03 15:19:23 +0200101 try:
Victor Stinner28a8e962011-05-04 21:40:36 +0200102 if sys.maxsize > _4G:
103 self.assertEqual(zlib.crc32(m), 0x709418e7)
104 self.assertEqual(zlib.adler32(m), -2072837729)
105 else:
106 self.assertEqual(zlib.crc32(m), 722071057)
107 self.assertEqual(zlib.adler32(m), -1002962529)
Victor Stinnerf9fb4342011-05-03 15:19:23 +0200108 finally:
109 m.close()
110 except (IOError, OverflowError):
111 raise unittest.SkipTest("filesystem doesn't have largefile support")
112 finally:
113 unlink(TESTFN)
114
115
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000116class ExceptionTestCase(unittest.TestCase):
117 # make sure we generate some expected errors
Armin Rigoec560192007-10-15 07:48:35 +0000118 def test_badlevel(self):
119 # specifying compression level out of range causes an error
120 # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib
121 # accepts 0 too)
122 self.assertRaises(zlib.error, zlib.compress, 'ERROR', 10)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000123
124 def test_badcompressobj(self):
125 # verify failure on building compress object with bad params
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000126 self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0)
Armin Rigoec560192007-10-15 07:48:35 +0000127 # specifying total bits too large causes an error
128 self.assertRaises(ValueError,
129 zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000130
131 def test_baddecompressobj(self):
132 # verify failure on building decompress object with bad params
Antoine Pitrou3b4c9892010-04-06 17:21:09 +0000133 self.assertRaises(ValueError, zlib.decompressobj, -1)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000134
Gregory P. Smith79e42a02008-04-09 00:25:17 +0000135 def test_decompressobj_badflush(self):
136 # verify failure on calling decompressobj.flush with bad params
137 self.assertRaises(ValueError, zlib.decompressobj().flush, 0)
138 self.assertRaises(ValueError, zlib.decompressobj().flush, -1)
139
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000140
Antoine Pitrou3843cd82010-05-07 16:50:34 +0000141class BaseCompressTestCase(object):
142 def check_big_compress_buffer(self, size, compress_func):
143 _1M = 1024 * 1024
144 fmt = "%%0%dx" % (2 * _1M)
145 # Generate 10MB worth of random, and expand it by repeating it.
146 # The assumption is that zlib's memory is not big enough to exploit
147 # such spread out redundancy.
148 data = ''.join([binascii.a2b_hex(fmt % random.getrandbits(8 * _1M))
149 for i in range(10)])
150 data = data * (size // len(data) + 1)
151 try:
152 compress_func(data)
153 finally:
154 # Release memory
155 data = None
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000156
Antoine Pitrou3843cd82010-05-07 16:50:34 +0000157 def check_big_decompress_buffer(self, size, decompress_func):
158 data = 'x' * size
159 try:
160 compressed = zlib.compress(data, 1)
161 finally:
162 # Release memory
163 data = None
164 data = decompress_func(compressed)
165 # Sanity check
166 try:
167 self.assertEqual(len(data), size)
168 self.assertEqual(len(data.strip('x')), 0)
169 finally:
170 data = None
171
172
173class CompressTestCase(BaseCompressTestCase, unittest.TestCase):
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000174 # Test compression in one go (whole message compression)
175 def test_speech(self):
Neil Schemenauer6412b122004-06-05 19:34:28 +0000176 x = zlib.compress(HAMLET_SCENE)
177 self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000178
179 def test_speech128(self):
Neil Schemenauer6412b122004-06-05 19:34:28 +0000180 # compress more data
181 data = HAMLET_SCENE * 128
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000182 x = zlib.compress(data)
183 self.assertEqual(zlib.decompress(x), data)
184
Antoine Pitroufc3bfad2010-05-11 23:42:28 +0000185 def test_incomplete_stream(self):
186 # An useful error message is given
187 x = zlib.compress(HAMLET_SCENE)
188 self.assertRaisesRegexp(zlib.error,
189 "Error -5 while decompressing data: incomplete or truncated stream",
190 zlib.decompress, x[:-1])
191
Antoine Pitrou3843cd82010-05-07 16:50:34 +0000192 # Memory use of the following functions takes into account overallocation
193
194 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3)
195 def test_big_compress_buffer(self, size):
196 compress = lambda s: zlib.compress(s, 1)
197 self.check_big_compress_buffer(size, compress)
198
199 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2)
200 def test_big_decompress_buffer(self, size):
201 self.check_big_decompress_buffer(size, zlib.decompress)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000202
203
Antoine Pitrou3843cd82010-05-07 16:50:34 +0000204class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000205 # Test compression object
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000206 def test_pair(self):
Neil Schemenauer6412b122004-06-05 19:34:28 +0000207 # straightforward compress/decompress objects
208 data = HAMLET_SCENE * 128
209 co = zlib.compressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000210 x1 = co.compress(data)
211 x2 = co.flush()
212 self.assertRaises(zlib.error, co.flush) # second flush should not work
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000213 dco = zlib.decompressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000214 y1 = dco.decompress(x1 + x2)
215 y2 = dco.flush()
216 self.assertEqual(data, y1 + y2)
217
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000218 def test_compressoptions(self):
219 # specify lots of options to compressobj()
220 level = 2
221 method = zlib.DEFLATED
222 wbits = -12
223 memlevel = 9
224 strategy = zlib.Z_FILTERED
225 co = zlib.compressobj(level, method, wbits, memlevel, strategy)
Neil Schemenauer6412b122004-06-05 19:34:28 +0000226 x1 = co.compress(HAMLET_SCENE)
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000227 x2 = co.flush()
228 dco = zlib.decompressobj(wbits)
229 y1 = dco.decompress(x1 + x2)
230 y2 = dco.flush()
Neil Schemenauer6412b122004-06-05 19:34:28 +0000231 self.assertEqual(HAMLET_SCENE, y1 + y2)
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000232
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000233 def test_compressincremental(self):
234 # compress object in steps, decompress object as one-shot
Neil Schemenauer6412b122004-06-05 19:34:28 +0000235 data = HAMLET_SCENE * 128
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000236 co = zlib.compressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000237 bufs = []
238 for i in range(0, len(data), 256):
239 bufs.append(co.compress(data[i:i+256]))
240 bufs.append(co.flush())
241 combuf = ''.join(bufs)
242
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000243 dco = zlib.decompressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000244 y1 = dco.decompress(''.join(bufs))
245 y2 = dco.flush()
246 self.assertEqual(data, y1 + y2)
247
Neil Schemenauer6412b122004-06-05 19:34:28 +0000248 def test_decompinc(self, flush=False, source=None, cx=256, dcx=64):
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000249 # compress object in steps, decompress object in steps
Neil Schemenauer6412b122004-06-05 19:34:28 +0000250 source = source or HAMLET_SCENE
251 data = source * 128
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000252 co = zlib.compressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000253 bufs = []
Neil Schemenauer6412b122004-06-05 19:34:28 +0000254 for i in range(0, len(data), cx):
255 bufs.append(co.compress(data[i:i+cx]))
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000256 bufs.append(co.flush())
257 combuf = ''.join(bufs)
258
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000259 self.assertEqual(data, zlib.decompress(combuf))
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000260
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000261 dco = zlib.decompressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000262 bufs = []
Neil Schemenauer6412b122004-06-05 19:34:28 +0000263 for i in range(0, len(combuf), dcx):
264 bufs.append(dco.decompress(combuf[i:i+dcx]))
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000265 self.assertEqual('', dco.unconsumed_tail, ########
266 "(A) uct should be '': not %d long" %
Neil Schemenauer6412b122004-06-05 19:34:28 +0000267 len(dco.unconsumed_tail))
268 if flush:
269 bufs.append(dco.flush())
270 else:
271 while True:
272 chunk = dco.decompress('')
273 if chunk:
274 bufs.append(chunk)
275 else:
276 break
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000277 self.assertEqual('', dco.unconsumed_tail, ########
Neil Schemenauer6412b122004-06-05 19:34:28 +0000278 "(B) uct should be '': not %d long" %
279 len(dco.unconsumed_tail))
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000280 self.assertEqual(data, ''.join(bufs))
281 # Failure means: "decompressobj with init options failed"
282
Neil Schemenauer6412b122004-06-05 19:34:28 +0000283 def test_decompincflush(self):
284 self.test_decompinc(flush=True)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000285
Neil Schemenauer6412b122004-06-05 19:34:28 +0000286 def test_decompimax(self, source=None, cx=256, dcx=64):
287 # compress in steps, decompress in length-restricted steps
288 source = source or HAMLET_SCENE
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000289 # Check a decompression object with max_length specified
Neil Schemenauer6412b122004-06-05 19:34:28 +0000290 data = source * 128
291 co = zlib.compressobj()
292 bufs = []
293 for i in range(0, len(data), cx):
294 bufs.append(co.compress(data[i:i+cx]))
295 bufs.append(co.flush())
296 combuf = ''.join(bufs)
297 self.assertEqual(data, zlib.decompress(combuf),
298 'compressed data failure')
299
300 dco = zlib.decompressobj()
301 bufs = []
302 cb = combuf
303 while cb:
304 #max_length = 1 + len(cb)//10
305 chunk = dco.decompress(cb, dcx)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000306 self.assertFalse(len(chunk) > dcx,
Neil Schemenauer6412b122004-06-05 19:34:28 +0000307 'chunk too big (%d>%d)' % (len(chunk), dcx))
308 bufs.append(chunk)
309 cb = dco.unconsumed_tail
310 bufs.append(dco.flush())
311 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
312
313 def test_decompressmaxlen(self, flush=False):
314 # Check a decompression object with max_length specified
315 data = HAMLET_SCENE * 128
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000316 co = zlib.compressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000317 bufs = []
318 for i in range(0, len(data), 256):
319 bufs.append(co.compress(data[i:i+256]))
320 bufs.append(co.flush())
321 combuf = ''.join(bufs)
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000322 self.assertEqual(data, zlib.decompress(combuf),
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000323 'compressed data failure')
324
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000325 dco = zlib.decompressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000326 bufs = []
327 cb = combuf
328 while cb:
Guido van Rossumf3594102003-02-27 18:39:18 +0000329 max_length = 1 + len(cb)//10
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000330 chunk = dco.decompress(cb, max_length)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000331 self.assertFalse(len(chunk) > max_length,
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000332 'chunk too big (%d>%d)' % (len(chunk),max_length))
333 bufs.append(chunk)
334 cb = dco.unconsumed_tail
Neil Schemenauer6412b122004-06-05 19:34:28 +0000335 if flush:
336 bufs.append(dco.flush())
337 else:
338 while chunk:
339 chunk = dco.decompress('', max_length)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000340 self.assertFalse(len(chunk) > max_length,
Neil Schemenauer6412b122004-06-05 19:34:28 +0000341 'chunk too big (%d>%d)' % (len(chunk),max_length))
342 bufs.append(chunk)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000343 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
344
Neil Schemenauer6412b122004-06-05 19:34:28 +0000345 def test_decompressmaxlenflush(self):
346 self.test_decompressmaxlen(flush=True)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000347
348 def test_maxlenmisc(self):
349 # Misc tests of max_length
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000350 dco = zlib.decompressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000351 self.assertRaises(ValueError, dco.decompress, "", -1)
352 self.assertEqual('', dco.unconsumed_tail)
353
354 def test_flushes(self):
355 # Test flush() with the various options, using all the
356 # different levels in order to provide more variations.
357 sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH']
358 sync_opt = [getattr(zlib, opt) for opt in sync_opt
359 if hasattr(zlib, opt)]
Neil Schemenauer6412b122004-06-05 19:34:28 +0000360 data = HAMLET_SCENE * 8
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000361
362 for sync in sync_opt:
363 for level in range(10):
364 obj = zlib.compressobj( level )
365 a = obj.compress( data[:3000] )
366 b = obj.flush( sync )
367 c = obj.compress( data[3000:] )
368 d = obj.flush()
369 self.assertEqual(zlib.decompress(''.join([a,b,c,d])),
370 data, ("Decompress failed: flush "
371 "mode=%i, level=%i") % (sync, level))
372 del obj
373
374 def test_odd_flush(self):
375 # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
376 import random
377
378 if hasattr(zlib, 'Z_SYNC_FLUSH'):
379 # Testing on 17K of "random" data
380
381 # Create compressor and decompressor objects
Neil Schemenauer6412b122004-06-05 19:34:28 +0000382 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000383 dco = zlib.decompressobj()
384
385 # Try 17K of data
386 # generate random data stream
387 try:
388 # In 2.3 and later, WichmannHill is the RNG of the bug report
389 gen = random.WichmannHill()
390 except AttributeError:
391 try:
392 # 2.2 called it Random
393 gen = random.Random()
394 except AttributeError:
395 # others might simply have a single RNG
396 gen = random
397 gen.seed(1)
398 data = genblock(1, 17 * 1024, generator=gen)
399
400 # compress, sync-flush, and decompress
401 first = co.compress(data)
402 second = co.flush(zlib.Z_SYNC_FLUSH)
403 expanded = dco.decompress(first + second)
404
405 # if decompressed data is different from the input data, choke.
406 self.assertEqual(expanded, data, "17K random source doesn't match")
407
Andrew M. Kuchling3b585b32004-12-28 20:10:48 +0000408 def test_empty_flush(self):
409 # Test that calling .flush() on unused objects works.
410 # (Bug #1083110 -- calling .flush() on decompress objects
411 # caused a core dump.)
412
413 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000414 self.assertTrue(co.flush()) # Returns a zlib header
Andrew M. Kuchling3b585b32004-12-28 20:10:48 +0000415 dco = zlib.decompressobj()
416 self.assertEqual(dco.flush(), "") # Returns nothing
Tim Peters5a9fb3c2005-01-07 16:01:32 +0000417
Antoine Pitrou37ffc3e2010-05-11 23:32:31 +0000418 def test_decompress_incomplete_stream(self):
419 # This is 'foo', deflated
420 x = 'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'
421 # For the record
422 self.assertEqual(zlib.decompress(x), 'foo')
423 self.assertRaises(zlib.error, zlib.decompress, x[:-5])
424 # Omitting the stream end works with decompressor objects
425 # (see issue #8672).
426 dco = zlib.decompressobj()
427 y = dco.decompress(x[:-5])
428 y += dco.flush()
429 self.assertEqual(y, 'foo')
430
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000431 if hasattr(zlib.compressobj(), "copy"):
432 def test_compresscopy(self):
433 # Test copying a compression object
434 data0 = HAMLET_SCENE
435 data1 = HAMLET_SCENE.swapcase()
436 c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
437 bufs0 = []
438 bufs0.append(c0.compress(data0))
Georg Brandl8d3342b2006-05-16 07:38:27 +0000439
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000440 c1 = c0.copy()
441 bufs1 = bufs0[:]
Georg Brandl8d3342b2006-05-16 07:38:27 +0000442
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000443 bufs0.append(c0.compress(data0))
444 bufs0.append(c0.flush())
445 s0 = ''.join(bufs0)
Georg Brandl8d3342b2006-05-16 07:38:27 +0000446
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000447 bufs1.append(c1.compress(data1))
448 bufs1.append(c1.flush())
449 s1 = ''.join(bufs1)
Georg Brandl8d3342b2006-05-16 07:38:27 +0000450
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000451 self.assertEqual(zlib.decompress(s0),data0+data0)
452 self.assertEqual(zlib.decompress(s1),data0+data1)
Georg Brandl8d3342b2006-05-16 07:38:27 +0000453
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000454 def test_badcompresscopy(self):
455 # Test copying a compression object in an inconsistent state
456 c = zlib.compressobj()
457 c.compress(HAMLET_SCENE)
458 c.flush()
459 self.assertRaises(ValueError, c.copy)
Georg Brandl8d3342b2006-05-16 07:38:27 +0000460
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000461 if hasattr(zlib.decompressobj(), "copy"):
462 def test_decompresscopy(self):
463 # Test copying a decompression object
464 data = HAMLET_SCENE
465 comp = zlib.compress(data)
Georg Brandl8d3342b2006-05-16 07:38:27 +0000466
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000467 d0 = zlib.decompressobj()
468 bufs0 = []
469 bufs0.append(d0.decompress(comp[:32]))
Georg Brandl8d3342b2006-05-16 07:38:27 +0000470
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000471 d1 = d0.copy()
472 bufs1 = bufs0[:]
Georg Brandl8d3342b2006-05-16 07:38:27 +0000473
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000474 bufs0.append(d0.decompress(comp[32:]))
475 s0 = ''.join(bufs0)
Georg Brandl8d3342b2006-05-16 07:38:27 +0000476
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000477 bufs1.append(d1.decompress(comp[32:]))
478 s1 = ''.join(bufs1)
Georg Brandl8d3342b2006-05-16 07:38:27 +0000479
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000480 self.assertEqual(s0,s1)
481 self.assertEqual(s0,data)
Georg Brandl8d3342b2006-05-16 07:38:27 +0000482
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000483 def test_baddecompresscopy(self):
484 # Test copying a compression object in an inconsistent state
485 data = zlib.compress(HAMLET_SCENE)
486 d = zlib.decompressobj()
487 d.decompress(data)
488 d.flush()
489 self.assertRaises(ValueError, d.copy)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000490
Antoine Pitrou3843cd82010-05-07 16:50:34 +0000491 # Memory use of the following functions takes into account overallocation
492
493 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3)
494 def test_big_compress_buffer(self, size):
495 c = zlib.compressobj(1)
496 compress = lambda s: c.compress(s) + c.flush()
497 self.check_big_compress_buffer(size, compress)
498
499 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2)
500 def test_big_decompress_buffer(self, size):
501 d = zlib.decompressobj()
502 decompress = lambda s: d.decompress(s) + d.flush()
503 self.check_big_decompress_buffer(size, decompress)
504
505
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000506def genblock(seed, length, step=1024, generator=random):
507 """length-byte stream of random data from a seed (in step-byte blocks)."""
508 if seed is not None:
509 generator.seed(seed)
510 randint = generator.randint
511 if length < step or step < 2:
512 step = length
513 blocks = []
514 for i in range(0, length, step):
515 blocks.append(''.join([chr(randint(0,255))
516 for x in range(step)]))
517 return ''.join(blocks)[:length]
518
519
520
521def choose_lines(source, number, seed=None, generator=random):
522 """Return a list of number lines randomly chosen from the source"""
523 if seed is not None:
524 generator.seed(seed)
525 sources = source.split('\n')
526 return [generator.choice(sources) for n in range(number)]
527
528
529
Neil Schemenauer6412b122004-06-05 19:34:28 +0000530HAMLET_SCENE = """
Fred Drake004d5e62000-10-23 17:22:08 +0000531LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000532
533 O, fear me not.
534 I stay too long: but here my father comes.
535
536 Enter POLONIUS
537
538 A double blessing is a double grace,
539 Occasion smiles upon a second leave.
540
Fred Drake004d5e62000-10-23 17:22:08 +0000541LORD POLONIUS
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000542
543 Yet here, Laertes! aboard, aboard, for shame!
544 The wind sits in the shoulder of your sail,
545 And you are stay'd for. There; my blessing with thee!
546 And these few precepts in thy memory
547 See thou character. Give thy thoughts no tongue,
548 Nor any unproportioned thought his act.
549 Be thou familiar, but by no means vulgar.
550 Those friends thou hast, and their adoption tried,
551 Grapple them to thy soul with hoops of steel;
552 But do not dull thy palm with entertainment
553 Of each new-hatch'd, unfledged comrade. Beware
554 Of entrance to a quarrel, but being in,
555 Bear't that the opposed may beware of thee.
556 Give every man thy ear, but few thy voice;
557 Take each man's censure, but reserve thy judgment.
558 Costly thy habit as thy purse can buy,
559 But not express'd in fancy; rich, not gaudy;
560 For the apparel oft proclaims the man,
561 And they in France of the best rank and station
562 Are of a most select and generous chief in that.
563 Neither a borrower nor a lender be;
564 For loan oft loses both itself and friend,
565 And borrowing dulls the edge of husbandry.
566 This above all: to thine ownself be true,
567 And it must follow, as the night the day,
568 Thou canst not then be false to any man.
569 Farewell: my blessing season this in thee!
570
Fred Drake004d5e62000-10-23 17:22:08 +0000571LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000572
573 Most humbly do I take my leave, my lord.
574
Fred Drake004d5e62000-10-23 17:22:08 +0000575LORD POLONIUS
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000576
577 The time invites you; go; your servants tend.
578
Fred Drake004d5e62000-10-23 17:22:08 +0000579LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000580
581 Farewell, Ophelia; and remember well
582 What I have said to you.
583
Fred Drake004d5e62000-10-23 17:22:08 +0000584OPHELIA
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000585
586 'Tis in my memory lock'd,
587 And you yourself shall keep the key of it.
588
Fred Drake004d5e62000-10-23 17:22:08 +0000589LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000590
591 Farewell.
592"""
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000593
594
595def test_main():
Victor Stinnerf9fb4342011-05-03 15:19:23 +0200596 run_unittest(
Walter Dörwald21d3a322003-05-01 17:45:56 +0000597 ChecksumTestCase,
Victor Stinnerf9fb4342011-05-03 15:19:23 +0200598 ChecksumBigBufferTestCase,
Walter Dörwald21d3a322003-05-01 17:45:56 +0000599 ExceptionTestCase,
600 CompressTestCase,
601 CompressObjectTestCase
602 )
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000603
604if __name__ == "__main__":
605 test_main()