blob: 26cf3093301e8e3a0a09b551547168530d481189 [file] [log] [blame]
Guido van Rossum7d9ea502003-02-03 20:45:52 +00001import unittest
Victor Stinnerf9fb4342011-05-03 15:19:23 +02002from test.test_support import TESTFN, run_unittest, import_module, unlink, requires
Gregory P. Smithc856fa82008-03-18 22:27:41 +00003import binascii
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +00004import random
Victor Stinner7fd90c42011-05-04 21:27:39 +02005from test.test_support import precisionbigmemtest, _1G, _4G
Victor Stinnerf9fb4342011-05-03 15:19:23 +02006import sys
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +00007
Victor Stinnerf9fb4342011-05-03 15:19:23 +02008try:
9 import mmap
10except ImportError:
11 mmap = None
12
13zlib = import_module('zlib')
R. David Murray3db8a342009-03-30 23:05:48 +000014
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +000015
Guido van Rossum7d9ea502003-02-03 20:45:52 +000016class ChecksumTestCase(unittest.TestCase):
17 # checksum test cases
18 def test_crc32start(self):
19 self.assertEqual(zlib.crc32(""), zlib.crc32("", 0))
Benjamin Peterson5c8da862009-06-30 22:57:08 +000020 self.assertTrue(zlib.crc32("abc", 0xffffffff))
Andrew M. Kuchlingfcfc8d52001-08-10 15:50:11 +000021
Guido van Rossum7d9ea502003-02-03 20:45:52 +000022 def test_crc32empty(self):
23 self.assertEqual(zlib.crc32("", 0), 0)
24 self.assertEqual(zlib.crc32("", 1), 1)
25 self.assertEqual(zlib.crc32("", 432), 432)
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +000026
Guido van Rossum7d9ea502003-02-03 20:45:52 +000027 def test_adler32start(self):
28 self.assertEqual(zlib.adler32(""), zlib.adler32("", 1))
Benjamin Peterson5c8da862009-06-30 22:57:08 +000029 self.assertTrue(zlib.adler32("abc", 0xffffffff))
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +000030
Guido van Rossum7d9ea502003-02-03 20:45:52 +000031 def test_adler32empty(self):
32 self.assertEqual(zlib.adler32("", 0), 0)
33 self.assertEqual(zlib.adler32("", 1), 1)
34 self.assertEqual(zlib.adler32("", 432), 432)
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +000035
Guido van Rossum7d9ea502003-02-03 20:45:52 +000036 def assertEqual32(self, seen, expected):
37 # 32-bit values masked -- checksums on 32- vs 64- bit machines
38 # This is important if bit 31 (0x08000000L) is set.
39 self.assertEqual(seen & 0x0FFFFFFFFL, expected & 0x0FFFFFFFFL)
40
41 def test_penguins(self):
42 self.assertEqual32(zlib.crc32("penguin", 0), 0x0e5c1a120L)
43 self.assertEqual32(zlib.crc32("penguin", 1), 0x43b6aa94)
44 self.assertEqual32(zlib.adler32("penguin", 0), 0x0bcf02f6)
45 self.assertEqual32(zlib.adler32("penguin", 1), 0x0bd602f7)
46
47 self.assertEqual(zlib.crc32("penguin"), zlib.crc32("penguin", 0))
48 self.assertEqual(zlib.adler32("penguin"),zlib.adler32("penguin",1))
49
Gregory P. Smithf48f9d32008-03-17 18:48:05 +000050 def test_abcdefghijklmnop(self):
51 """test issue1202 compliance: signed crc32, adler32 in 2.x"""
52 foo = 'abcdefghijklmnop'
53 # explicitly test signed behavior
54 self.assertEqual(zlib.crc32(foo), -1808088941)
55 self.assertEqual(zlib.crc32('spam'), 1138425661)
56 self.assertEqual(zlib.adler32(foo+foo), -721416943)
57 self.assertEqual(zlib.adler32('spam'), 72286642)
58
Gregory P. Smithc856fa82008-03-18 22:27:41 +000059 def test_same_as_binascii_crc32(self):
60 foo = 'abcdefghijklmnop'
61 self.assertEqual(binascii.crc32(foo), zlib.crc32(foo))
62 self.assertEqual(binascii.crc32('spam'), zlib.crc32('spam'))
63
Gregory P. Smith88440962008-03-25 06:12:45 +000064 def test_negative_crc_iv_input(self):
65 # The range of valid input values for the crc state should be
66 # -2**31 through 2**32-1 to allow inputs artifically constrained
67 # to a signed 32-bit integer.
68 self.assertEqual(zlib.crc32('ham', -1), zlib.crc32('ham', 0xffffffffL))
69 self.assertEqual(zlib.crc32('spam', -3141593),
70 zlib.crc32('spam', 0xffd01027L))
71 self.assertEqual(zlib.crc32('spam', -(2**31)),
72 zlib.crc32('spam', (2**31)))
Guido van Rossum7d9ea502003-02-03 20:45:52 +000073
74
Victor Stinner7fd90c42011-05-04 21:27:39 +020075# Issue #11277 - check that inputs of 2 GB (or 1 GB on 32 bits system) are
76# handled correctly. Be aware of issues #1202. We cannot test a buffer of 4 GB
77# or more (#8650, #8651 and #10276), because the zlib stores the buffer size
78# into an int.
Victor Stinnerf9fb4342011-05-03 15:19:23 +020079class ChecksumBigBufferTestCase(unittest.TestCase):
Victor Stinner7fd90c42011-05-04 21:27:39 +020080 if sys.maxsize > _4G:
81 # (64 bits system) crc32() and adler32() stores the buffer size into an
82 # int, the maximum filesize is INT_MAX (0x7FFFFFFF)
83 filesize = 0x7FFFFFFF
84 else:
85 # (32 bits system) On a 32 bits OS, a process cannot usually address
86 # more than 2 GB, so test only 1 GB
87 filesize = _1G
Victor Stinnerf9fb4342011-05-03 15:19:23 +020088
89 @unittest.skipUnless(mmap, "mmap() is not available.")
90 def test_big_buffer(self):
91 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
92 requires('largefile',
93 'test requires %s bytes and a long time to run' %
Victor Stinner7fd90c42011-05-04 21:27:39 +020094 str(self.filesize))
Victor Stinnerf9fb4342011-05-03 15:19:23 +020095 try:
96 with open(TESTFN, "wb+") as f:
Victor Stinner7fd90c42011-05-04 21:27:39 +020097 f.seek(self.filesize-4)
Victor Stinnerf9fb4342011-05-03 15:19:23 +020098 f.write("asdf")
99 f.flush()
Victor Stinnere4163e22011-05-03 17:25:28 +0200100 m = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
Victor Stinnerf9fb4342011-05-03 15:19:23 +0200101 try:
Victor Stinnerf9fb4342011-05-03 15:19:23 +0200102 self.assertEqual(zlib.crc32(m), 0x709418e7)
103 self.assertEqual(zlib.adler32(m), -2072837729)
104 finally:
105 m.close()
106 except (IOError, OverflowError):
107 raise unittest.SkipTest("filesystem doesn't have largefile support")
108 finally:
109 unlink(TESTFN)
110
111
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000112class ExceptionTestCase(unittest.TestCase):
113 # make sure we generate some expected errors
Armin Rigoec560192007-10-15 07:48:35 +0000114 def test_badlevel(self):
115 # specifying compression level out of range causes an error
116 # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib
117 # accepts 0 too)
118 self.assertRaises(zlib.error, zlib.compress, 'ERROR', 10)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000119
120 def test_badcompressobj(self):
121 # verify failure on building compress object with bad params
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000122 self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0)
Armin Rigoec560192007-10-15 07:48:35 +0000123 # specifying total bits too large causes an error
124 self.assertRaises(ValueError,
125 zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000126
127 def test_baddecompressobj(self):
128 # verify failure on building decompress object with bad params
Antoine Pitrou3b4c9892010-04-06 17:21:09 +0000129 self.assertRaises(ValueError, zlib.decompressobj, -1)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000130
Gregory P. Smith79e42a02008-04-09 00:25:17 +0000131 def test_decompressobj_badflush(self):
132 # verify failure on calling decompressobj.flush with bad params
133 self.assertRaises(ValueError, zlib.decompressobj().flush, 0)
134 self.assertRaises(ValueError, zlib.decompressobj().flush, -1)
135
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000136
Antoine Pitrou3843cd82010-05-07 16:50:34 +0000137class BaseCompressTestCase(object):
138 def check_big_compress_buffer(self, size, compress_func):
139 _1M = 1024 * 1024
140 fmt = "%%0%dx" % (2 * _1M)
141 # Generate 10MB worth of random, and expand it by repeating it.
142 # The assumption is that zlib's memory is not big enough to exploit
143 # such spread out redundancy.
144 data = ''.join([binascii.a2b_hex(fmt % random.getrandbits(8 * _1M))
145 for i in range(10)])
146 data = data * (size // len(data) + 1)
147 try:
148 compress_func(data)
149 finally:
150 # Release memory
151 data = None
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000152
Antoine Pitrou3843cd82010-05-07 16:50:34 +0000153 def check_big_decompress_buffer(self, size, decompress_func):
154 data = 'x' * size
155 try:
156 compressed = zlib.compress(data, 1)
157 finally:
158 # Release memory
159 data = None
160 data = decompress_func(compressed)
161 # Sanity check
162 try:
163 self.assertEqual(len(data), size)
164 self.assertEqual(len(data.strip('x')), 0)
165 finally:
166 data = None
167
168
169class CompressTestCase(BaseCompressTestCase, unittest.TestCase):
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000170 # Test compression in one go (whole message compression)
171 def test_speech(self):
Neil Schemenauer6412b122004-06-05 19:34:28 +0000172 x = zlib.compress(HAMLET_SCENE)
173 self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000174
175 def test_speech128(self):
Neil Schemenauer6412b122004-06-05 19:34:28 +0000176 # compress more data
177 data = HAMLET_SCENE * 128
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000178 x = zlib.compress(data)
179 self.assertEqual(zlib.decompress(x), data)
180
Antoine Pitroufc3bfad2010-05-11 23:42:28 +0000181 def test_incomplete_stream(self):
182 # An useful error message is given
183 x = zlib.compress(HAMLET_SCENE)
184 self.assertRaisesRegexp(zlib.error,
185 "Error -5 while decompressing data: incomplete or truncated stream",
186 zlib.decompress, x[:-1])
187
Antoine Pitrou3843cd82010-05-07 16:50:34 +0000188 # Memory use of the following functions takes into account overallocation
189
190 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3)
191 def test_big_compress_buffer(self, size):
192 compress = lambda s: zlib.compress(s, 1)
193 self.check_big_compress_buffer(size, compress)
194
195 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2)
196 def test_big_decompress_buffer(self, size):
197 self.check_big_decompress_buffer(size, zlib.decompress)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000198
199
Antoine Pitrou3843cd82010-05-07 16:50:34 +0000200class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000201 # Test compression object
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000202 def test_pair(self):
Neil Schemenauer6412b122004-06-05 19:34:28 +0000203 # straightforward compress/decompress objects
204 data = HAMLET_SCENE * 128
205 co = zlib.compressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000206 x1 = co.compress(data)
207 x2 = co.flush()
208 self.assertRaises(zlib.error, co.flush) # second flush should not work
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000209 dco = zlib.decompressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000210 y1 = dco.decompress(x1 + x2)
211 y2 = dco.flush()
212 self.assertEqual(data, y1 + y2)
213
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000214 def test_compressoptions(self):
215 # specify lots of options to compressobj()
216 level = 2
217 method = zlib.DEFLATED
218 wbits = -12
219 memlevel = 9
220 strategy = zlib.Z_FILTERED
221 co = zlib.compressobj(level, method, wbits, memlevel, strategy)
Neil Schemenauer6412b122004-06-05 19:34:28 +0000222 x1 = co.compress(HAMLET_SCENE)
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000223 x2 = co.flush()
224 dco = zlib.decompressobj(wbits)
225 y1 = dco.decompress(x1 + x2)
226 y2 = dco.flush()
Neil Schemenauer6412b122004-06-05 19:34:28 +0000227 self.assertEqual(HAMLET_SCENE, y1 + y2)
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000228
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000229 def test_compressincremental(self):
230 # compress object in steps, decompress object as one-shot
Neil Schemenauer6412b122004-06-05 19:34:28 +0000231 data = HAMLET_SCENE * 128
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000232 co = zlib.compressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000233 bufs = []
234 for i in range(0, len(data), 256):
235 bufs.append(co.compress(data[i:i+256]))
236 bufs.append(co.flush())
237 combuf = ''.join(bufs)
238
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000239 dco = zlib.decompressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000240 y1 = dco.decompress(''.join(bufs))
241 y2 = dco.flush()
242 self.assertEqual(data, y1 + y2)
243
Neil Schemenauer6412b122004-06-05 19:34:28 +0000244 def test_decompinc(self, flush=False, source=None, cx=256, dcx=64):
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000245 # compress object in steps, decompress object in steps
Neil Schemenauer6412b122004-06-05 19:34:28 +0000246 source = source or HAMLET_SCENE
247 data = source * 128
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000248 co = zlib.compressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000249 bufs = []
Neil Schemenauer6412b122004-06-05 19:34:28 +0000250 for i in range(0, len(data), cx):
251 bufs.append(co.compress(data[i:i+cx]))
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000252 bufs.append(co.flush())
253 combuf = ''.join(bufs)
254
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000255 self.assertEqual(data, zlib.decompress(combuf))
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000256
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000257 dco = zlib.decompressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000258 bufs = []
Neil Schemenauer6412b122004-06-05 19:34:28 +0000259 for i in range(0, len(combuf), dcx):
260 bufs.append(dco.decompress(combuf[i:i+dcx]))
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000261 self.assertEqual('', dco.unconsumed_tail, ########
262 "(A) uct should be '': not %d long" %
Neil Schemenauer6412b122004-06-05 19:34:28 +0000263 len(dco.unconsumed_tail))
264 if flush:
265 bufs.append(dco.flush())
266 else:
267 while True:
268 chunk = dco.decompress('')
269 if chunk:
270 bufs.append(chunk)
271 else:
272 break
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000273 self.assertEqual('', dco.unconsumed_tail, ########
Neil Schemenauer6412b122004-06-05 19:34:28 +0000274 "(B) uct should be '': not %d long" %
275 len(dco.unconsumed_tail))
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000276 self.assertEqual(data, ''.join(bufs))
277 # Failure means: "decompressobj with init options failed"
278
Neil Schemenauer6412b122004-06-05 19:34:28 +0000279 def test_decompincflush(self):
280 self.test_decompinc(flush=True)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000281
Neil Schemenauer6412b122004-06-05 19:34:28 +0000282 def test_decompimax(self, source=None, cx=256, dcx=64):
283 # compress in steps, decompress in length-restricted steps
284 source = source or HAMLET_SCENE
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000285 # Check a decompression object with max_length specified
Neil Schemenauer6412b122004-06-05 19:34:28 +0000286 data = source * 128
287 co = zlib.compressobj()
288 bufs = []
289 for i in range(0, len(data), cx):
290 bufs.append(co.compress(data[i:i+cx]))
291 bufs.append(co.flush())
292 combuf = ''.join(bufs)
293 self.assertEqual(data, zlib.decompress(combuf),
294 'compressed data failure')
295
296 dco = zlib.decompressobj()
297 bufs = []
298 cb = combuf
299 while cb:
300 #max_length = 1 + len(cb)//10
301 chunk = dco.decompress(cb, dcx)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000302 self.assertFalse(len(chunk) > dcx,
Neil Schemenauer6412b122004-06-05 19:34:28 +0000303 'chunk too big (%d>%d)' % (len(chunk), dcx))
304 bufs.append(chunk)
305 cb = dco.unconsumed_tail
306 bufs.append(dco.flush())
307 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
308
309 def test_decompressmaxlen(self, flush=False):
310 # Check a decompression object with max_length specified
311 data = HAMLET_SCENE * 128
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000312 co = zlib.compressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000313 bufs = []
314 for i in range(0, len(data), 256):
315 bufs.append(co.compress(data[i:i+256]))
316 bufs.append(co.flush())
317 combuf = ''.join(bufs)
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000318 self.assertEqual(data, zlib.decompress(combuf),
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000319 'compressed data failure')
320
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000321 dco = zlib.decompressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000322 bufs = []
323 cb = combuf
324 while cb:
Guido van Rossumf3594102003-02-27 18:39:18 +0000325 max_length = 1 + len(cb)//10
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000326 chunk = dco.decompress(cb, max_length)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000327 self.assertFalse(len(chunk) > max_length,
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000328 'chunk too big (%d>%d)' % (len(chunk),max_length))
329 bufs.append(chunk)
330 cb = dco.unconsumed_tail
Neil Schemenauer6412b122004-06-05 19:34:28 +0000331 if flush:
332 bufs.append(dco.flush())
333 else:
334 while chunk:
335 chunk = dco.decompress('', max_length)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000336 self.assertFalse(len(chunk) > max_length,
Neil Schemenauer6412b122004-06-05 19:34:28 +0000337 'chunk too big (%d>%d)' % (len(chunk),max_length))
338 bufs.append(chunk)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000339 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
340
Neil Schemenauer6412b122004-06-05 19:34:28 +0000341 def test_decompressmaxlenflush(self):
342 self.test_decompressmaxlen(flush=True)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000343
344 def test_maxlenmisc(self):
345 # Misc tests of max_length
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000346 dco = zlib.decompressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000347 self.assertRaises(ValueError, dco.decompress, "", -1)
348 self.assertEqual('', dco.unconsumed_tail)
349
350 def test_flushes(self):
351 # Test flush() with the various options, using all the
352 # different levels in order to provide more variations.
353 sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH']
354 sync_opt = [getattr(zlib, opt) for opt in sync_opt
355 if hasattr(zlib, opt)]
Neil Schemenauer6412b122004-06-05 19:34:28 +0000356 data = HAMLET_SCENE * 8
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000357
358 for sync in sync_opt:
359 for level in range(10):
360 obj = zlib.compressobj( level )
361 a = obj.compress( data[:3000] )
362 b = obj.flush( sync )
363 c = obj.compress( data[3000:] )
364 d = obj.flush()
365 self.assertEqual(zlib.decompress(''.join([a,b,c,d])),
366 data, ("Decompress failed: flush "
367 "mode=%i, level=%i") % (sync, level))
368 del obj
369
370 def test_odd_flush(self):
371 # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
372 import random
373
374 if hasattr(zlib, 'Z_SYNC_FLUSH'):
375 # Testing on 17K of "random" data
376
377 # Create compressor and decompressor objects
Neil Schemenauer6412b122004-06-05 19:34:28 +0000378 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000379 dco = zlib.decompressobj()
380
381 # Try 17K of data
382 # generate random data stream
383 try:
384 # In 2.3 and later, WichmannHill is the RNG of the bug report
385 gen = random.WichmannHill()
386 except AttributeError:
387 try:
388 # 2.2 called it Random
389 gen = random.Random()
390 except AttributeError:
391 # others might simply have a single RNG
392 gen = random
393 gen.seed(1)
394 data = genblock(1, 17 * 1024, generator=gen)
395
396 # compress, sync-flush, and decompress
397 first = co.compress(data)
398 second = co.flush(zlib.Z_SYNC_FLUSH)
399 expanded = dco.decompress(first + second)
400
401 # if decompressed data is different from the input data, choke.
402 self.assertEqual(expanded, data, "17K random source doesn't match")
403
Andrew M. Kuchling3b585b32004-12-28 20:10:48 +0000404 def test_empty_flush(self):
405 # Test that calling .flush() on unused objects works.
406 # (Bug #1083110 -- calling .flush() on decompress objects
407 # caused a core dump.)
408
409 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000410 self.assertTrue(co.flush()) # Returns a zlib header
Andrew M. Kuchling3b585b32004-12-28 20:10:48 +0000411 dco = zlib.decompressobj()
412 self.assertEqual(dco.flush(), "") # Returns nothing
Tim Peters5a9fb3c2005-01-07 16:01:32 +0000413
Antoine Pitrou37ffc3e2010-05-11 23:32:31 +0000414 def test_decompress_incomplete_stream(self):
415 # This is 'foo', deflated
416 x = 'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'
417 # For the record
418 self.assertEqual(zlib.decompress(x), 'foo')
419 self.assertRaises(zlib.error, zlib.decompress, x[:-5])
420 # Omitting the stream end works with decompressor objects
421 # (see issue #8672).
422 dco = zlib.decompressobj()
423 y = dco.decompress(x[:-5])
424 y += dco.flush()
425 self.assertEqual(y, 'foo')
426
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000427 if hasattr(zlib.compressobj(), "copy"):
428 def test_compresscopy(self):
429 # Test copying a compression object
430 data0 = HAMLET_SCENE
431 data1 = HAMLET_SCENE.swapcase()
432 c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
433 bufs0 = []
434 bufs0.append(c0.compress(data0))
Georg Brandl8d3342b2006-05-16 07:38:27 +0000435
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000436 c1 = c0.copy()
437 bufs1 = bufs0[:]
Georg Brandl8d3342b2006-05-16 07:38:27 +0000438
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000439 bufs0.append(c0.compress(data0))
440 bufs0.append(c0.flush())
441 s0 = ''.join(bufs0)
Georg Brandl8d3342b2006-05-16 07:38:27 +0000442
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000443 bufs1.append(c1.compress(data1))
444 bufs1.append(c1.flush())
445 s1 = ''.join(bufs1)
Georg Brandl8d3342b2006-05-16 07:38:27 +0000446
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000447 self.assertEqual(zlib.decompress(s0),data0+data0)
448 self.assertEqual(zlib.decompress(s1),data0+data1)
Georg Brandl8d3342b2006-05-16 07:38:27 +0000449
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000450 def test_badcompresscopy(self):
451 # Test copying a compression object in an inconsistent state
452 c = zlib.compressobj()
453 c.compress(HAMLET_SCENE)
454 c.flush()
455 self.assertRaises(ValueError, c.copy)
Georg Brandl8d3342b2006-05-16 07:38:27 +0000456
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000457 if hasattr(zlib.decompressobj(), "copy"):
458 def test_decompresscopy(self):
459 # Test copying a decompression object
460 data = HAMLET_SCENE
461 comp = zlib.compress(data)
Georg Brandl8d3342b2006-05-16 07:38:27 +0000462
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000463 d0 = zlib.decompressobj()
464 bufs0 = []
465 bufs0.append(d0.decompress(comp[:32]))
Georg Brandl8d3342b2006-05-16 07:38:27 +0000466
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000467 d1 = d0.copy()
468 bufs1 = bufs0[:]
Georg Brandl8d3342b2006-05-16 07:38:27 +0000469
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000470 bufs0.append(d0.decompress(comp[32:]))
471 s0 = ''.join(bufs0)
Georg Brandl8d3342b2006-05-16 07:38:27 +0000472
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000473 bufs1.append(d1.decompress(comp[32:]))
474 s1 = ''.join(bufs1)
Georg Brandl8d3342b2006-05-16 07:38:27 +0000475
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000476 self.assertEqual(s0,s1)
477 self.assertEqual(s0,data)
Georg Brandl8d3342b2006-05-16 07:38:27 +0000478
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000479 def test_baddecompresscopy(self):
480 # Test copying a compression object in an inconsistent state
481 data = zlib.compress(HAMLET_SCENE)
482 d = zlib.decompressobj()
483 d.decompress(data)
484 d.flush()
485 self.assertRaises(ValueError, d.copy)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000486
Antoine Pitrou3843cd82010-05-07 16:50:34 +0000487 # Memory use of the following functions takes into account overallocation
488
489 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3)
490 def test_big_compress_buffer(self, size):
491 c = zlib.compressobj(1)
492 compress = lambda s: c.compress(s) + c.flush()
493 self.check_big_compress_buffer(size, compress)
494
495 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2)
496 def test_big_decompress_buffer(self, size):
497 d = zlib.decompressobj()
498 decompress = lambda s: d.decompress(s) + d.flush()
499 self.check_big_decompress_buffer(size, decompress)
500
501
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000502def genblock(seed, length, step=1024, generator=random):
503 """length-byte stream of random data from a seed (in step-byte blocks)."""
504 if seed is not None:
505 generator.seed(seed)
506 randint = generator.randint
507 if length < step or step < 2:
508 step = length
509 blocks = []
510 for i in range(0, length, step):
511 blocks.append(''.join([chr(randint(0,255))
512 for x in range(step)]))
513 return ''.join(blocks)[:length]
514
515
516
517def choose_lines(source, number, seed=None, generator=random):
518 """Return a list of number lines randomly chosen from the source"""
519 if seed is not None:
520 generator.seed(seed)
521 sources = source.split('\n')
522 return [generator.choice(sources) for n in range(number)]
523
524
525
Neil Schemenauer6412b122004-06-05 19:34:28 +0000526HAMLET_SCENE = """
Fred Drake004d5e62000-10-23 17:22:08 +0000527LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000528
529 O, fear me not.
530 I stay too long: but here my father comes.
531
532 Enter POLONIUS
533
534 A double blessing is a double grace,
535 Occasion smiles upon a second leave.
536
Fred Drake004d5e62000-10-23 17:22:08 +0000537LORD POLONIUS
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000538
539 Yet here, Laertes! aboard, aboard, for shame!
540 The wind sits in the shoulder of your sail,
541 And you are stay'd for. There; my blessing with thee!
542 And these few precepts in thy memory
543 See thou character. Give thy thoughts no tongue,
544 Nor any unproportioned thought his act.
545 Be thou familiar, but by no means vulgar.
546 Those friends thou hast, and their adoption tried,
547 Grapple them to thy soul with hoops of steel;
548 But do not dull thy palm with entertainment
549 Of each new-hatch'd, unfledged comrade. Beware
550 Of entrance to a quarrel, but being in,
551 Bear't that the opposed may beware of thee.
552 Give every man thy ear, but few thy voice;
553 Take each man's censure, but reserve thy judgment.
554 Costly thy habit as thy purse can buy,
555 But not express'd in fancy; rich, not gaudy;
556 For the apparel oft proclaims the man,
557 And they in France of the best rank and station
558 Are of a most select and generous chief in that.
559 Neither a borrower nor a lender be;
560 For loan oft loses both itself and friend,
561 And borrowing dulls the edge of husbandry.
562 This above all: to thine ownself be true,
563 And it must follow, as the night the day,
564 Thou canst not then be false to any man.
565 Farewell: my blessing season this in thee!
566
Fred Drake004d5e62000-10-23 17:22:08 +0000567LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000568
569 Most humbly do I take my leave, my lord.
570
Fred Drake004d5e62000-10-23 17:22:08 +0000571LORD POLONIUS
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000572
573 The time invites you; go; your servants tend.
574
Fred Drake004d5e62000-10-23 17:22:08 +0000575LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000576
577 Farewell, Ophelia; and remember well
578 What I have said to you.
579
Fred Drake004d5e62000-10-23 17:22:08 +0000580OPHELIA
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000581
582 'Tis in my memory lock'd,
583 And you yourself shall keep the key of it.
584
Fred Drake004d5e62000-10-23 17:22:08 +0000585LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000586
587 Farewell.
588"""
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000589
590
591def test_main():
Victor Stinnerf9fb4342011-05-03 15:19:23 +0200592 run_unittest(
Walter Dörwald21d3a322003-05-01 17:45:56 +0000593 ChecksumTestCase,
Victor Stinnerf9fb4342011-05-03 15:19:23 +0200594 ChecksumBigBufferTestCase,
Walter Dörwald21d3a322003-05-01 17:45:56 +0000595 ExceptionTestCase,
596 CompressTestCase,
597 CompressObjectTestCase
598 )
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000599
600if __name__ == "__main__":
601 test_main()