blob: 810d6061d46dad887037584ea25fa1349deba3ae [file] [log] [blame]
Guido van Rossum7d9ea502003-02-03 20:45:52 +00001import unittest
Victor Stinnerf9fb4342011-05-03 15:19:23 +02002from test.test_support import TESTFN, run_unittest, import_module, unlink, requires
Gregory P. Smithc856fa82008-03-18 22:27:41 +00003import binascii
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +00004import random
Antoine Pitrou3843cd82010-05-07 16:50:34 +00005from test.test_support import precisionbigmemtest, _1G
Victor Stinnerf9fb4342011-05-03 15:19:23 +02006import sys
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +00007
Victor Stinnerf9fb4342011-05-03 15:19:23 +02008try:
9 import mmap
10except ImportError:
11 mmap = None
12
13zlib = import_module('zlib')
R. David Murray3db8a342009-03-30 23:05:48 +000014
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +000015
Guido van Rossum7d9ea502003-02-03 20:45:52 +000016class ChecksumTestCase(unittest.TestCase):
17 # checksum test cases
18 def test_crc32start(self):
19 self.assertEqual(zlib.crc32(""), zlib.crc32("", 0))
Benjamin Peterson5c8da862009-06-30 22:57:08 +000020 self.assertTrue(zlib.crc32("abc", 0xffffffff))
Andrew M. Kuchlingfcfc8d52001-08-10 15:50:11 +000021
Guido van Rossum7d9ea502003-02-03 20:45:52 +000022 def test_crc32empty(self):
23 self.assertEqual(zlib.crc32("", 0), 0)
24 self.assertEqual(zlib.crc32("", 1), 1)
25 self.assertEqual(zlib.crc32("", 432), 432)
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +000026
Guido van Rossum7d9ea502003-02-03 20:45:52 +000027 def test_adler32start(self):
28 self.assertEqual(zlib.adler32(""), zlib.adler32("", 1))
Benjamin Peterson5c8da862009-06-30 22:57:08 +000029 self.assertTrue(zlib.adler32("abc", 0xffffffff))
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +000030
Guido van Rossum7d9ea502003-02-03 20:45:52 +000031 def test_adler32empty(self):
32 self.assertEqual(zlib.adler32("", 0), 0)
33 self.assertEqual(zlib.adler32("", 1), 1)
34 self.assertEqual(zlib.adler32("", 432), 432)
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +000035
Guido van Rossum7d9ea502003-02-03 20:45:52 +000036 def assertEqual32(self, seen, expected):
37 # 32-bit values masked -- checksums on 32- vs 64- bit machines
38 # This is important if bit 31 (0x08000000L) is set.
39 self.assertEqual(seen & 0x0FFFFFFFFL, expected & 0x0FFFFFFFFL)
40
41 def test_penguins(self):
42 self.assertEqual32(zlib.crc32("penguin", 0), 0x0e5c1a120L)
43 self.assertEqual32(zlib.crc32("penguin", 1), 0x43b6aa94)
44 self.assertEqual32(zlib.adler32("penguin", 0), 0x0bcf02f6)
45 self.assertEqual32(zlib.adler32("penguin", 1), 0x0bd602f7)
46
47 self.assertEqual(zlib.crc32("penguin"), zlib.crc32("penguin", 0))
48 self.assertEqual(zlib.adler32("penguin"),zlib.adler32("penguin",1))
49
Gregory P. Smithf48f9d32008-03-17 18:48:05 +000050 def test_abcdefghijklmnop(self):
51 """test issue1202 compliance: signed crc32, adler32 in 2.x"""
52 foo = 'abcdefghijklmnop'
53 # explicitly test signed behavior
54 self.assertEqual(zlib.crc32(foo), -1808088941)
55 self.assertEqual(zlib.crc32('spam'), 1138425661)
56 self.assertEqual(zlib.adler32(foo+foo), -721416943)
57 self.assertEqual(zlib.adler32('spam'), 72286642)
58
Gregory P. Smithc856fa82008-03-18 22:27:41 +000059 def test_same_as_binascii_crc32(self):
60 foo = 'abcdefghijklmnop'
61 self.assertEqual(binascii.crc32(foo), zlib.crc32(foo))
62 self.assertEqual(binascii.crc32('spam'), zlib.crc32('spam'))
63
Gregory P. Smith88440962008-03-25 06:12:45 +000064 def test_negative_crc_iv_input(self):
65 # The range of valid input values for the crc state should be
66 # -2**31 through 2**32-1 to allow inputs artifically constrained
67 # to a signed 32-bit integer.
68 self.assertEqual(zlib.crc32('ham', -1), zlib.crc32('ham', 0xffffffffL))
69 self.assertEqual(zlib.crc32('spam', -3141593),
70 zlib.crc32('spam', 0xffd01027L))
71 self.assertEqual(zlib.crc32('spam', -(2**31)),
72 zlib.crc32('spam', (2**31)))
Guido van Rossum7d9ea502003-02-03 20:45:52 +000073
74
Victor Stinnerf9fb4342011-05-03 15:19:23 +020075# Issue #10276 - check that inputs of 2 GB are handled correctly.
76# Be aware of issues #1202, #8650, #8651 and #10276
77class ChecksumBigBufferTestCase(unittest.TestCase):
78 int_max = 0x7FFFFFFF
79
80 @unittest.skipUnless(mmap, "mmap() is not available.")
81 def test_big_buffer(self):
82 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
83 requires('largefile',
84 'test requires %s bytes and a long time to run' %
85 str(self.int_max))
86 try:
87 with open(TESTFN, "wb+") as f:
88 f.seek(self.int_max-4)
89 f.write("asdf")
90 f.flush()
Victor Stinnere4163e22011-05-03 17:25:28 +020091 m = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
Victor Stinnerf9fb4342011-05-03 15:19:23 +020092 try:
Victor Stinnerf9fb4342011-05-03 15:19:23 +020093 self.assertEqual(zlib.crc32(m), 0x709418e7)
94 self.assertEqual(zlib.adler32(m), -2072837729)
95 finally:
96 m.close()
97 except (IOError, OverflowError):
98 raise unittest.SkipTest("filesystem doesn't have largefile support")
99 finally:
100 unlink(TESTFN)
101
102
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000103class ExceptionTestCase(unittest.TestCase):
104 # make sure we generate some expected errors
Armin Rigoec560192007-10-15 07:48:35 +0000105 def test_badlevel(self):
106 # specifying compression level out of range causes an error
107 # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib
108 # accepts 0 too)
109 self.assertRaises(zlib.error, zlib.compress, 'ERROR', 10)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000110
111 def test_badcompressobj(self):
112 # verify failure on building compress object with bad params
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000113 self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0)
Armin Rigoec560192007-10-15 07:48:35 +0000114 # specifying total bits too large causes an error
115 self.assertRaises(ValueError,
116 zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000117
118 def test_baddecompressobj(self):
119 # verify failure on building decompress object with bad params
Antoine Pitrou3b4c9892010-04-06 17:21:09 +0000120 self.assertRaises(ValueError, zlib.decompressobj, -1)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000121
Gregory P. Smith79e42a02008-04-09 00:25:17 +0000122 def test_decompressobj_badflush(self):
123 # verify failure on calling decompressobj.flush with bad params
124 self.assertRaises(ValueError, zlib.decompressobj().flush, 0)
125 self.assertRaises(ValueError, zlib.decompressobj().flush, -1)
126
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000127
Antoine Pitrou3843cd82010-05-07 16:50:34 +0000128class BaseCompressTestCase(object):
129 def check_big_compress_buffer(self, size, compress_func):
130 _1M = 1024 * 1024
131 fmt = "%%0%dx" % (2 * _1M)
132 # Generate 10MB worth of random, and expand it by repeating it.
133 # The assumption is that zlib's memory is not big enough to exploit
134 # such spread out redundancy.
135 data = ''.join([binascii.a2b_hex(fmt % random.getrandbits(8 * _1M))
136 for i in range(10)])
137 data = data * (size // len(data) + 1)
138 try:
139 compress_func(data)
140 finally:
141 # Release memory
142 data = None
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000143
Antoine Pitrou3843cd82010-05-07 16:50:34 +0000144 def check_big_decompress_buffer(self, size, decompress_func):
145 data = 'x' * size
146 try:
147 compressed = zlib.compress(data, 1)
148 finally:
149 # Release memory
150 data = None
151 data = decompress_func(compressed)
152 # Sanity check
153 try:
154 self.assertEqual(len(data), size)
155 self.assertEqual(len(data.strip('x')), 0)
156 finally:
157 data = None
158
159
160class CompressTestCase(BaseCompressTestCase, unittest.TestCase):
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000161 # Test compression in one go (whole message compression)
162 def test_speech(self):
Neil Schemenauer6412b122004-06-05 19:34:28 +0000163 x = zlib.compress(HAMLET_SCENE)
164 self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000165
166 def test_speech128(self):
Neil Schemenauer6412b122004-06-05 19:34:28 +0000167 # compress more data
168 data = HAMLET_SCENE * 128
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000169 x = zlib.compress(data)
170 self.assertEqual(zlib.decompress(x), data)
171
Antoine Pitroufc3bfad2010-05-11 23:42:28 +0000172 def test_incomplete_stream(self):
173 # An useful error message is given
174 x = zlib.compress(HAMLET_SCENE)
175 self.assertRaisesRegexp(zlib.error,
176 "Error -5 while decompressing data: incomplete or truncated stream",
177 zlib.decompress, x[:-1])
178
Antoine Pitrou3843cd82010-05-07 16:50:34 +0000179 # Memory use of the following functions takes into account overallocation
180
181 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3)
182 def test_big_compress_buffer(self, size):
183 compress = lambda s: zlib.compress(s, 1)
184 self.check_big_compress_buffer(size, compress)
185
186 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2)
187 def test_big_decompress_buffer(self, size):
188 self.check_big_decompress_buffer(size, zlib.decompress)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000189
190
Antoine Pitrou3843cd82010-05-07 16:50:34 +0000191class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000192 # Test compression object
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000193 def test_pair(self):
Neil Schemenauer6412b122004-06-05 19:34:28 +0000194 # straightforward compress/decompress objects
195 data = HAMLET_SCENE * 128
196 co = zlib.compressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000197 x1 = co.compress(data)
198 x2 = co.flush()
199 self.assertRaises(zlib.error, co.flush) # second flush should not work
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000200 dco = zlib.decompressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000201 y1 = dco.decompress(x1 + x2)
202 y2 = dco.flush()
203 self.assertEqual(data, y1 + y2)
204
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000205 def test_compressoptions(self):
206 # specify lots of options to compressobj()
207 level = 2
208 method = zlib.DEFLATED
209 wbits = -12
210 memlevel = 9
211 strategy = zlib.Z_FILTERED
212 co = zlib.compressobj(level, method, wbits, memlevel, strategy)
Neil Schemenauer6412b122004-06-05 19:34:28 +0000213 x1 = co.compress(HAMLET_SCENE)
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000214 x2 = co.flush()
215 dco = zlib.decompressobj(wbits)
216 y1 = dco.decompress(x1 + x2)
217 y2 = dco.flush()
Neil Schemenauer6412b122004-06-05 19:34:28 +0000218 self.assertEqual(HAMLET_SCENE, y1 + y2)
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000219
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000220 def test_compressincremental(self):
221 # compress object in steps, decompress object as one-shot
Neil Schemenauer6412b122004-06-05 19:34:28 +0000222 data = HAMLET_SCENE * 128
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000223 co = zlib.compressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000224 bufs = []
225 for i in range(0, len(data), 256):
226 bufs.append(co.compress(data[i:i+256]))
227 bufs.append(co.flush())
228 combuf = ''.join(bufs)
229
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000230 dco = zlib.decompressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000231 y1 = dco.decompress(''.join(bufs))
232 y2 = dco.flush()
233 self.assertEqual(data, y1 + y2)
234
Neil Schemenauer6412b122004-06-05 19:34:28 +0000235 def test_decompinc(self, flush=False, source=None, cx=256, dcx=64):
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000236 # compress object in steps, decompress object in steps
Neil Schemenauer6412b122004-06-05 19:34:28 +0000237 source = source or HAMLET_SCENE
238 data = source * 128
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000239 co = zlib.compressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000240 bufs = []
Neil Schemenauer6412b122004-06-05 19:34:28 +0000241 for i in range(0, len(data), cx):
242 bufs.append(co.compress(data[i:i+cx]))
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000243 bufs.append(co.flush())
244 combuf = ''.join(bufs)
245
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000246 self.assertEqual(data, zlib.decompress(combuf))
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000247
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000248 dco = zlib.decompressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000249 bufs = []
Neil Schemenauer6412b122004-06-05 19:34:28 +0000250 for i in range(0, len(combuf), dcx):
251 bufs.append(dco.decompress(combuf[i:i+dcx]))
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000252 self.assertEqual('', dco.unconsumed_tail, ########
253 "(A) uct should be '': not %d long" %
Neil Schemenauer6412b122004-06-05 19:34:28 +0000254 len(dco.unconsumed_tail))
255 if flush:
256 bufs.append(dco.flush())
257 else:
258 while True:
259 chunk = dco.decompress('')
260 if chunk:
261 bufs.append(chunk)
262 else:
263 break
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000264 self.assertEqual('', dco.unconsumed_tail, ########
Neil Schemenauer6412b122004-06-05 19:34:28 +0000265 "(B) uct should be '': not %d long" %
266 len(dco.unconsumed_tail))
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000267 self.assertEqual(data, ''.join(bufs))
268 # Failure means: "decompressobj with init options failed"
269
Neil Schemenauer6412b122004-06-05 19:34:28 +0000270 def test_decompincflush(self):
271 self.test_decompinc(flush=True)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000272
Neil Schemenauer6412b122004-06-05 19:34:28 +0000273 def test_decompimax(self, source=None, cx=256, dcx=64):
274 # compress in steps, decompress in length-restricted steps
275 source = source or HAMLET_SCENE
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000276 # Check a decompression object with max_length specified
Neil Schemenauer6412b122004-06-05 19:34:28 +0000277 data = source * 128
278 co = zlib.compressobj()
279 bufs = []
280 for i in range(0, len(data), cx):
281 bufs.append(co.compress(data[i:i+cx]))
282 bufs.append(co.flush())
283 combuf = ''.join(bufs)
284 self.assertEqual(data, zlib.decompress(combuf),
285 'compressed data failure')
286
287 dco = zlib.decompressobj()
288 bufs = []
289 cb = combuf
290 while cb:
291 #max_length = 1 + len(cb)//10
292 chunk = dco.decompress(cb, dcx)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000293 self.assertFalse(len(chunk) > dcx,
Neil Schemenauer6412b122004-06-05 19:34:28 +0000294 'chunk too big (%d>%d)' % (len(chunk), dcx))
295 bufs.append(chunk)
296 cb = dco.unconsumed_tail
297 bufs.append(dco.flush())
298 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
299
300 def test_decompressmaxlen(self, flush=False):
301 # Check a decompression object with max_length specified
302 data = HAMLET_SCENE * 128
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000303 co = zlib.compressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000304 bufs = []
305 for i in range(0, len(data), 256):
306 bufs.append(co.compress(data[i:i+256]))
307 bufs.append(co.flush())
308 combuf = ''.join(bufs)
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000309 self.assertEqual(data, zlib.decompress(combuf),
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000310 'compressed data failure')
311
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000312 dco = zlib.decompressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000313 bufs = []
314 cb = combuf
315 while cb:
Guido van Rossumf3594102003-02-27 18:39:18 +0000316 max_length = 1 + len(cb)//10
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000317 chunk = dco.decompress(cb, max_length)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000318 self.assertFalse(len(chunk) > max_length,
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000319 'chunk too big (%d>%d)' % (len(chunk),max_length))
320 bufs.append(chunk)
321 cb = dco.unconsumed_tail
Neil Schemenauer6412b122004-06-05 19:34:28 +0000322 if flush:
323 bufs.append(dco.flush())
324 else:
325 while chunk:
326 chunk = dco.decompress('', max_length)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000327 self.assertFalse(len(chunk) > max_length,
Neil Schemenauer6412b122004-06-05 19:34:28 +0000328 'chunk too big (%d>%d)' % (len(chunk),max_length))
329 bufs.append(chunk)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000330 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
331
Neil Schemenauer6412b122004-06-05 19:34:28 +0000332 def test_decompressmaxlenflush(self):
333 self.test_decompressmaxlen(flush=True)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000334
335 def test_maxlenmisc(self):
336 # Misc tests of max_length
Neil Schemenauer94afd3e2004-06-05 19:02:52 +0000337 dco = zlib.decompressobj()
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000338 self.assertRaises(ValueError, dco.decompress, "", -1)
339 self.assertEqual('', dco.unconsumed_tail)
340
341 def test_flushes(self):
342 # Test flush() with the various options, using all the
343 # different levels in order to provide more variations.
344 sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH']
345 sync_opt = [getattr(zlib, opt) for opt in sync_opt
346 if hasattr(zlib, opt)]
Neil Schemenauer6412b122004-06-05 19:34:28 +0000347 data = HAMLET_SCENE * 8
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000348
349 for sync in sync_opt:
350 for level in range(10):
351 obj = zlib.compressobj( level )
352 a = obj.compress( data[:3000] )
353 b = obj.flush( sync )
354 c = obj.compress( data[3000:] )
355 d = obj.flush()
356 self.assertEqual(zlib.decompress(''.join([a,b,c,d])),
357 data, ("Decompress failed: flush "
358 "mode=%i, level=%i") % (sync, level))
359 del obj
360
361 def test_odd_flush(self):
362 # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
363 import random
364
365 if hasattr(zlib, 'Z_SYNC_FLUSH'):
366 # Testing on 17K of "random" data
367
368 # Create compressor and decompressor objects
Neil Schemenauer6412b122004-06-05 19:34:28 +0000369 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000370 dco = zlib.decompressobj()
371
372 # Try 17K of data
373 # generate random data stream
374 try:
375 # In 2.3 and later, WichmannHill is the RNG of the bug report
376 gen = random.WichmannHill()
377 except AttributeError:
378 try:
379 # 2.2 called it Random
380 gen = random.Random()
381 except AttributeError:
382 # others might simply have a single RNG
383 gen = random
384 gen.seed(1)
385 data = genblock(1, 17 * 1024, generator=gen)
386
387 # compress, sync-flush, and decompress
388 first = co.compress(data)
389 second = co.flush(zlib.Z_SYNC_FLUSH)
390 expanded = dco.decompress(first + second)
391
392 # if decompressed data is different from the input data, choke.
393 self.assertEqual(expanded, data, "17K random source doesn't match")
394
Andrew M. Kuchling3b585b32004-12-28 20:10:48 +0000395 def test_empty_flush(self):
396 # Test that calling .flush() on unused objects works.
397 # (Bug #1083110 -- calling .flush() on decompress objects
398 # caused a core dump.)
399
400 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000401 self.assertTrue(co.flush()) # Returns a zlib header
Andrew M. Kuchling3b585b32004-12-28 20:10:48 +0000402 dco = zlib.decompressobj()
403 self.assertEqual(dco.flush(), "") # Returns nothing
Tim Peters5a9fb3c2005-01-07 16:01:32 +0000404
Antoine Pitrou37ffc3e2010-05-11 23:32:31 +0000405 def test_decompress_incomplete_stream(self):
406 # This is 'foo', deflated
407 x = 'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'
408 # For the record
409 self.assertEqual(zlib.decompress(x), 'foo')
410 self.assertRaises(zlib.error, zlib.decompress, x[:-5])
411 # Omitting the stream end works with decompressor objects
412 # (see issue #8672).
413 dco = zlib.decompressobj()
414 y = dco.decompress(x[:-5])
415 y += dco.flush()
416 self.assertEqual(y, 'foo')
417
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000418 if hasattr(zlib.compressobj(), "copy"):
419 def test_compresscopy(self):
420 # Test copying a compression object
421 data0 = HAMLET_SCENE
422 data1 = HAMLET_SCENE.swapcase()
423 c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
424 bufs0 = []
425 bufs0.append(c0.compress(data0))
Georg Brandl8d3342b2006-05-16 07:38:27 +0000426
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000427 c1 = c0.copy()
428 bufs1 = bufs0[:]
Georg Brandl8d3342b2006-05-16 07:38:27 +0000429
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000430 bufs0.append(c0.compress(data0))
431 bufs0.append(c0.flush())
432 s0 = ''.join(bufs0)
Georg Brandl8d3342b2006-05-16 07:38:27 +0000433
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000434 bufs1.append(c1.compress(data1))
435 bufs1.append(c1.flush())
436 s1 = ''.join(bufs1)
Georg Brandl8d3342b2006-05-16 07:38:27 +0000437
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000438 self.assertEqual(zlib.decompress(s0),data0+data0)
439 self.assertEqual(zlib.decompress(s1),data0+data1)
Georg Brandl8d3342b2006-05-16 07:38:27 +0000440
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000441 def test_badcompresscopy(self):
442 # Test copying a compression object in an inconsistent state
443 c = zlib.compressobj()
444 c.compress(HAMLET_SCENE)
445 c.flush()
446 self.assertRaises(ValueError, c.copy)
Georg Brandl8d3342b2006-05-16 07:38:27 +0000447
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000448 if hasattr(zlib.decompressobj(), "copy"):
449 def test_decompresscopy(self):
450 # Test copying a decompression object
451 data = HAMLET_SCENE
452 comp = zlib.compress(data)
Georg Brandl8d3342b2006-05-16 07:38:27 +0000453
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000454 d0 = zlib.decompressobj()
455 bufs0 = []
456 bufs0.append(d0.decompress(comp[:32]))
Georg Brandl8d3342b2006-05-16 07:38:27 +0000457
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000458 d1 = d0.copy()
459 bufs1 = bufs0[:]
Georg Brandl8d3342b2006-05-16 07:38:27 +0000460
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000461 bufs0.append(d0.decompress(comp[32:]))
462 s0 = ''.join(bufs0)
Georg Brandl8d3342b2006-05-16 07:38:27 +0000463
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000464 bufs1.append(d1.decompress(comp[32:]))
465 s1 = ''.join(bufs1)
Georg Brandl8d3342b2006-05-16 07:38:27 +0000466
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000467 self.assertEqual(s0,s1)
468 self.assertEqual(s0,data)
Georg Brandl8d3342b2006-05-16 07:38:27 +0000469
Neal Norwitz6e73aaa2006-06-12 03:33:09 +0000470 def test_baddecompresscopy(self):
471 # Test copying a compression object in an inconsistent state
472 data = zlib.compress(HAMLET_SCENE)
473 d = zlib.decompressobj()
474 d.decompress(data)
475 d.flush()
476 self.assertRaises(ValueError, d.copy)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000477
Antoine Pitrou3843cd82010-05-07 16:50:34 +0000478 # Memory use of the following functions takes into account overallocation
479
480 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3)
481 def test_big_compress_buffer(self, size):
482 c = zlib.compressobj(1)
483 compress = lambda s: c.compress(s) + c.flush()
484 self.check_big_compress_buffer(size, compress)
485
486 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2)
487 def test_big_decompress_buffer(self, size):
488 d = zlib.decompressobj()
489 decompress = lambda s: d.decompress(s) + d.flush()
490 self.check_big_decompress_buffer(size, decompress)
491
492
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000493def genblock(seed, length, step=1024, generator=random):
494 """length-byte stream of random data from a seed (in step-byte blocks)."""
495 if seed is not None:
496 generator.seed(seed)
497 randint = generator.randint
498 if length < step or step < 2:
499 step = length
500 blocks = []
501 for i in range(0, length, step):
502 blocks.append(''.join([chr(randint(0,255))
503 for x in range(step)]))
504 return ''.join(blocks)[:length]
505
506
507
508def choose_lines(source, number, seed=None, generator=random):
509 """Return a list of number lines randomly chosen from the source"""
510 if seed is not None:
511 generator.seed(seed)
512 sources = source.split('\n')
513 return [generator.choice(sources) for n in range(number)]
514
515
516
Neil Schemenauer6412b122004-06-05 19:34:28 +0000517HAMLET_SCENE = """
Fred Drake004d5e62000-10-23 17:22:08 +0000518LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000519
520 O, fear me not.
521 I stay too long: but here my father comes.
522
523 Enter POLONIUS
524
525 A double blessing is a double grace,
526 Occasion smiles upon a second leave.
527
Fred Drake004d5e62000-10-23 17:22:08 +0000528LORD POLONIUS
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000529
530 Yet here, Laertes! aboard, aboard, for shame!
531 The wind sits in the shoulder of your sail,
532 And you are stay'd for. There; my blessing with thee!
533 And these few precepts in thy memory
534 See thou character. Give thy thoughts no tongue,
535 Nor any unproportioned thought his act.
536 Be thou familiar, but by no means vulgar.
537 Those friends thou hast, and their adoption tried,
538 Grapple them to thy soul with hoops of steel;
539 But do not dull thy palm with entertainment
540 Of each new-hatch'd, unfledged comrade. Beware
541 Of entrance to a quarrel, but being in,
542 Bear't that the opposed may beware of thee.
543 Give every man thy ear, but few thy voice;
544 Take each man's censure, but reserve thy judgment.
545 Costly thy habit as thy purse can buy,
546 But not express'd in fancy; rich, not gaudy;
547 For the apparel oft proclaims the man,
548 And they in France of the best rank and station
549 Are of a most select and generous chief in that.
550 Neither a borrower nor a lender be;
551 For loan oft loses both itself and friend,
552 And borrowing dulls the edge of husbandry.
553 This above all: to thine ownself be true,
554 And it must follow, as the night the day,
555 Thou canst not then be false to any man.
556 Farewell: my blessing season this in thee!
557
Fred Drake004d5e62000-10-23 17:22:08 +0000558LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000559
560 Most humbly do I take my leave, my lord.
561
Fred Drake004d5e62000-10-23 17:22:08 +0000562LORD POLONIUS
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000563
564 The time invites you; go; your servants tend.
565
Fred Drake004d5e62000-10-23 17:22:08 +0000566LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000567
568 Farewell, Ophelia; and remember well
569 What I have said to you.
570
Fred Drake004d5e62000-10-23 17:22:08 +0000571OPHELIA
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000572
573 'Tis in my memory lock'd,
574 And you yourself shall keep the key of it.
575
Fred Drake004d5e62000-10-23 17:22:08 +0000576LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000577
578 Farewell.
579"""
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000580
581
582def test_main():
Victor Stinnerf9fb4342011-05-03 15:19:23 +0200583 run_unittest(
Walter Dörwald21d3a322003-05-01 17:45:56 +0000584 ChecksumTestCase,
Victor Stinnerf9fb4342011-05-03 15:19:23 +0200585 ChecksumBigBufferTestCase,
Walter Dörwald21d3a322003-05-01 17:45:56 +0000586 ExceptionTestCase,
587 CompressTestCase,
588 CompressObjectTestCase
589 )
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000590
591if __name__ == "__main__":
592 test_main()