blob: 3ad255f8bff5e8071288e77a776f162b80f204f2 [file] [log] [blame]
Guido van Rossum7d9ea502003-02-03 20:45:52 +00001import unittest
2from test import test_support
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +00003import zlib
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +00004import random
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +00005
Guido van Rossum7d9ea502003-02-03 20:45:52 +00006# print test_support.TESTFN
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +00007
Guido van Rossum7d9ea502003-02-03 20:45:52 +00008def getbuf():
9 # This was in the original. Avoid non-repeatable sources.
10 # Left here (unused) in case something wants to be done with it.
11 import imp
12 try:
13 t = imp.find_module('test_zlib')
14 file = t[0]
15 except ImportError:
16 file = open(__file__)
17 buf = file.read() * 8
18 file.close()
19 return buf
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +000020
Tim Peters0009c4e2001-02-21 07:29:48 +000021
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +000022
Guido van Rossum7d9ea502003-02-03 20:45:52 +000023class ChecksumTestCase(unittest.TestCase):
24 # checksum test cases
25 def test_crc32start(self):
26 self.assertEqual(zlib.crc32(""), zlib.crc32("", 0))
Andrew M. Kuchlingfcfc8d52001-08-10 15:50:11 +000027
Guido van Rossum7d9ea502003-02-03 20:45:52 +000028 def test_crc32empty(self):
29 self.assertEqual(zlib.crc32("", 0), 0)
30 self.assertEqual(zlib.crc32("", 1), 1)
31 self.assertEqual(zlib.crc32("", 432), 432)
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +000032
Guido van Rossum7d9ea502003-02-03 20:45:52 +000033 def test_adler32start(self):
34 self.assertEqual(zlib.adler32(""), zlib.adler32("", 1))
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +000035
Guido van Rossum7d9ea502003-02-03 20:45:52 +000036 def test_adler32empty(self):
37 self.assertEqual(zlib.adler32("", 0), 0)
38 self.assertEqual(zlib.adler32("", 1), 1)
39 self.assertEqual(zlib.adler32("", 432), 432)
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +000040
Guido van Rossum7d9ea502003-02-03 20:45:52 +000041 def assertEqual32(self, seen, expected):
42 # 32-bit values masked -- checksums on 32- vs 64- bit machines
43 # This is important if bit 31 (0x08000000L) is set.
44 self.assertEqual(seen & 0x0FFFFFFFFL, expected & 0x0FFFFFFFFL)
45
46 def test_penguins(self):
47 self.assertEqual32(zlib.crc32("penguin", 0), 0x0e5c1a120L)
48 self.assertEqual32(zlib.crc32("penguin", 1), 0x43b6aa94)
49 self.assertEqual32(zlib.adler32("penguin", 0), 0x0bcf02f6)
50 self.assertEqual32(zlib.adler32("penguin", 1), 0x0bd602f7)
51
52 self.assertEqual(zlib.crc32("penguin"), zlib.crc32("penguin", 0))
53 self.assertEqual(zlib.adler32("penguin"),zlib.adler32("penguin",1))
54
55
56
57class ExceptionTestCase(unittest.TestCase):
58 # make sure we generate some expected errors
59 def test_bigbits(self):
60 # specifying total bits too large causes an error
61 self.assertRaises(zlib.error,
62 zlib.compress, 'ERROR', zlib.MAX_WBITS + 1)
63
64 def test_badcompressobj(self):
65 # verify failure on building compress object with bad params
66 self.assertRaises(ValueError, zlib.compressobj, 1, 8, 0)
67
68 def test_baddecompressobj(self):
69 # verify failure on building decompress object with bad params
70 self.assertRaises(ValueError, zlib.decompressobj, 0)
71
72
73
74class CompressTestCase(unittest.TestCase):
75 # Test compression in one go (whole message compression)
76 def test_speech(self):
77 # decompress(compress(data)) better be data
78 x = zlib.compress(hamlet_scene)
79 self.assertEqual(zlib.decompress(x), hamlet_scene)
80
81 def test_speech8(self):
82 # decompress(compress(data)) better be data -- more compression chances
83 data = hamlet_scene * 8
84 x = zlib.compress(data)
85 self.assertEqual(zlib.decompress(x), data)
86
87 def test_speech16(self):
88 # decompress(compress(data)) better be data -- more compression chances
89 data = hamlet_scene * 16
90 x = zlib.compress(data)
91 self.assertEqual(zlib.decompress(x), data)
92
93 def test_speech128(self):
94 # decompress(compress(data)) better be data -- more compression chances
95 data = hamlet_scene * 8 * 16
96 x = zlib.compress(data)
97 self.assertEqual(zlib.decompress(x), data)
98
Guido van Rossum7d9ea502003-02-03 20:45:52 +000099
100
101
102class CompressObjectTestCase(unittest.TestCase):
103 # Test compression object
104 def test_pairsmall(self):
105 # use compress object in straightforward manner, decompress w/ object
106 data = hamlet_scene
107 co = zlib.compressobj(8, 8, -15)
108 x1 = co.compress(data)
109 x2 = co.flush()
110 self.assertRaises(zlib.error, co.flush) # second flush should not work
111 dco = zlib.decompressobj(-15)
112 y1 = dco.decompress(x1 + x2)
113 y2 = dco.flush()
114 self.assertEqual(data, y1 + y2)
115
116 def test_pair(self):
117 # straightforward compress/decompress objects, more compression
118 data = hamlet_scene * 8 * 16
119 co = zlib.compressobj(8, 8, -15)
120 x1 = co.compress(data)
121 x2 = co.flush()
122 self.assertRaises(zlib.error, co.flush) # second flush should not work
123 dco = zlib.decompressobj(-15)
124 y1 = dco.decompress(x1 + x2)
125 y2 = dco.flush()
126 self.assertEqual(data, y1 + y2)
127
128 def test_compressincremental(self):
129 # compress object in steps, decompress object as one-shot
130 data = hamlet_scene * 8 * 16
131 co = zlib.compressobj(2, 8, -12, 9, 1)
132 bufs = []
133 for i in range(0, len(data), 256):
134 bufs.append(co.compress(data[i:i+256]))
135 bufs.append(co.flush())
136 combuf = ''.join(bufs)
137
138 dco = zlib.decompressobj(-15)
139 y1 = dco.decompress(''.join(bufs))
140 y2 = dco.flush()
141 self.assertEqual(data, y1 + y2)
142
143 def test_decompressincremental(self):
144 # compress object in steps, decompress object in steps
145 data = hamlet_scene * 8 * 16
146 co = zlib.compressobj(2, 8, -12, 9, 1)
147 bufs = []
148 for i in range(0, len(data), 256):
149 bufs.append(co.compress(data[i:i+256]))
150 bufs.append(co.flush())
151 combuf = ''.join(bufs)
152
153 self.assertEqual(data, zlib.decompress(combuf, -12, -5))
154
155 dco = zlib.decompressobj(-12)
156 bufs = []
157 for i in range(0, len(combuf), 128):
158 bufs.append(dco.decompress(combuf[i:i+128]))
159 self.assertEqual('', dco.unconsumed_tail, ########
160 "(A) uct should be '': not %d long" %
161 len(dco.unconsumed_tail))
162 bufs.append(dco.flush())
163 self.assertEqual('', dco.unconsumed_tail, ########
164 "(B) uct should be '': not %d long" %
165 len(dco.unconsumed_tail))
166 self.assertEqual(data, ''.join(bufs))
167 # Failure means: "decompressobj with init options failed"
168
169 def test_decompinc(self,sizes=[128],flush=True,source=None,cx=256,dcx=64):
170 # compress object in steps, decompress object in steps, loop sizes
171 source = source or hamlet_scene
172 for reps in sizes:
173 data = source * reps
174 co = zlib.compressobj(2, 8, -12, 9, 1)
175 bufs = []
176 for i in range(0, len(data), cx):
177 bufs.append(co.compress(data[i:i+cx]))
178 bufs.append(co.flush())
179 combuf = ''.join(bufs)
180
181 self.assertEqual(data, zlib.decompress(combuf, -12, -5))
182
183 dco = zlib.decompressobj(-12)
184 bufs = []
185 for i in range(0, len(combuf), dcx):
186 bufs.append(dco.decompress(combuf[i:i+dcx]))
187 self.assertEqual('', dco.unconsumed_tail, ########
188 "(A) uct should be '': not %d long" %
189 len(dco.unconsumed_tail))
190 if flush:
191 bufs.append(dco.flush())
192 else:
193 while True:
194 chunk = dco.decompress('')
195 if chunk:
196 bufs.append(chunk)
197 else:
198 break
199 self.assertEqual('', dco.unconsumed_tail, ########
200 "(B) uct should be '': not %d long" %
201 len(dco.unconsumed_tail))
202 self.assertEqual(data, ''.join(bufs))
203 # Failure means: "decompressobj with init options failed"
204
205 def test_decompimax(self,sizes=[128],flush=True,source=None,cx=256,dcx=64):
206 # compress in steps, decompress in length-restricted steps, loop sizes
207 source = source or hamlet_scene
208 for reps in sizes:
209 # Check a decompression object with max_length specified
210 data = source * reps
211 co = zlib.compressobj(2, 8, -12, 9, 1)
212 bufs = []
213 for i in range(0, len(data), cx):
214 bufs.append(co.compress(data[i:i+cx]))
215 bufs.append(co.flush())
216 combuf = ''.join(bufs)
217 self.assertEqual(data, zlib.decompress(combuf, -12, -5),
218 'compressed data failure')
219
220 dco = zlib.decompressobj(-12)
221 bufs = []
222 cb = combuf
223 while cb:
Guido van Rossumf3594102003-02-27 18:39:18 +0000224 #max_length = 1 + len(cb)//10
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000225 chunk = dco.decompress(cb, dcx)
226 self.failIf(len(chunk) > dcx,
227 'chunk too big (%d>%d)' % (len(chunk), dcx))
228 bufs.append(chunk)
229 cb = dco.unconsumed_tail
230 if flush:
231 bufs.append(dco.flush())
232 else:
233 while True:
234 chunk = dco.decompress('', dcx)
235 self.failIf(len(chunk) > dcx,
236 'chunk too big in tail (%d>%d)' % (len(chunk), dcx))
237 if chunk:
238 bufs.append(chunk)
239 else:
240 break
241 self.assertEqual(len(data), len(''.join(bufs)))
242 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
243
244 def test_decompressmaxlen(self):
245 # Check a decompression object with max_length specified
246 data = hamlet_scene * 8 * 16
247 co = zlib.compressobj(2, 8, -12, 9, 1)
248 bufs = []
249 for i in range(0, len(data), 256):
250 bufs.append(co.compress(data[i:i+256]))
251 bufs.append(co.flush())
252 combuf = ''.join(bufs)
253 self.assertEqual(data, zlib.decompress(combuf, -12, -5),
254 'compressed data failure')
255
256 dco = zlib.decompressobj(-12)
257 bufs = []
258 cb = combuf
259 while cb:
Guido van Rossumf3594102003-02-27 18:39:18 +0000260 max_length = 1 + len(cb)//10
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000261 chunk = dco.decompress(cb, max_length)
262 self.failIf(len(chunk) > max_length,
263 'chunk too big (%d>%d)' % (len(chunk),max_length))
264 bufs.append(chunk)
265 cb = dco.unconsumed_tail
266 bufs.append(dco.flush())
267 self.assertEqual(len(data), len(''.join(bufs)))
268 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
269
270 def test_decompressmaxlenflushless(self):
271 # identical to test_decompressmaxlen except flush is replaced
272 # with an equivalent. This works and other fails on (eg) 2.2.2
273 data = hamlet_scene * 8 * 16
274 co = zlib.compressobj(2, 8, -12, 9, 1)
275 bufs = []
276 for i in range(0, len(data), 256):
277 bufs.append(co.compress(data[i:i+256]))
278 bufs.append(co.flush())
279 combuf = ''.join(bufs)
280 self.assertEqual(data, zlib.decompress(combuf, -12, -5),
281 'compressed data mismatch')
282
283 dco = zlib.decompressobj(-12)
284 bufs = []
285 cb = combuf
286 while cb:
Guido van Rossumf3594102003-02-27 18:39:18 +0000287 max_length = 1 + len(cb)//10
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000288 chunk = dco.decompress(cb, max_length)
289 self.failIf(len(chunk) > max_length,
290 'chunk too big (%d>%d)' % (len(chunk),max_length))
291 bufs.append(chunk)
292 cb = dco.unconsumed_tail
293
294 #bufs.append(dco.flush())
295 while len(chunk):
296 chunk = dco.decompress('', max_length)
297 self.failIf(len(chunk) > max_length,
298 'chunk too big (%d>%d)' % (len(chunk),max_length))
299 bufs.append(chunk)
300
301 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
302
303 def test_maxlenmisc(self):
304 # Misc tests of max_length
305 dco = zlib.decompressobj(-12)
306 self.assertRaises(ValueError, dco.decompress, "", -1)
307 self.assertEqual('', dco.unconsumed_tail)
308
309 def test_flushes(self):
310 # Test flush() with the various options, using all the
311 # different levels in order to provide more variations.
312 sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH']
313 sync_opt = [getattr(zlib, opt) for opt in sync_opt
314 if hasattr(zlib, opt)]
315 data = hamlet_scene * 8
316
317 for sync in sync_opt:
318 for level in range(10):
319 obj = zlib.compressobj( level )
320 a = obj.compress( data[:3000] )
321 b = obj.flush( sync )
322 c = obj.compress( data[3000:] )
323 d = obj.flush()
324 self.assertEqual(zlib.decompress(''.join([a,b,c,d])),
325 data, ("Decompress failed: flush "
326 "mode=%i, level=%i") % (sync, level))
327 del obj
328
329 def test_odd_flush(self):
330 # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
331 import random
332
333 if hasattr(zlib, 'Z_SYNC_FLUSH'):
334 # Testing on 17K of "random" data
335
336 # Create compressor and decompressor objects
337 co = zlib.compressobj(9)
338 dco = zlib.decompressobj()
339
340 # Try 17K of data
341 # generate random data stream
342 try:
343 # In 2.3 and later, WichmannHill is the RNG of the bug report
344 gen = random.WichmannHill()
345 except AttributeError:
346 try:
347 # 2.2 called it Random
348 gen = random.Random()
349 except AttributeError:
350 # others might simply have a single RNG
351 gen = random
352 gen.seed(1)
353 data = genblock(1, 17 * 1024, generator=gen)
354
355 # compress, sync-flush, and decompress
356 first = co.compress(data)
357 second = co.flush(zlib.Z_SYNC_FLUSH)
358 expanded = dco.decompress(first + second)
359
360 # if decompressed data is different from the input data, choke.
361 self.assertEqual(expanded, data, "17K random source doesn't match")
362
363 def test_manydecompinc(self):
364 # Run incremental decompress test for a large range of sizes
365 self.test_decompinc(sizes=[1<<n for n in range(8)],
366 flush=True, cx=32, dcx=4)
367
368 def test_manydecompimax(self):
369 # Run incremental decompress maxlen test for a large range of sizes
370 # avoid the flush bug
371 self.test_decompimax(sizes=[1<<n for n in range(8)],
372 flush=False, cx=32, dcx=4)
373
374 def test_manydecompimaxflush(self):
375 # Run incremental decompress maxlen test for a large range of sizes
376 # avoid the flush bug
377 self.test_decompimax(sizes=[1<<n for n in range(8)],
378 flush=True, cx=32, dcx=4)
379
380
381def genblock(seed, length, step=1024, generator=random):
382 """length-byte stream of random data from a seed (in step-byte blocks)."""
383 if seed is not None:
384 generator.seed(seed)
385 randint = generator.randint
386 if length < step or step < 2:
387 step = length
388 blocks = []
389 for i in range(0, length, step):
390 blocks.append(''.join([chr(randint(0,255))
391 for x in range(step)]))
392 return ''.join(blocks)[:length]
393
394
395
396def choose_lines(source, number, seed=None, generator=random):
397 """Return a list of number lines randomly chosen from the source"""
398 if seed is not None:
399 generator.seed(seed)
400 sources = source.split('\n')
401 return [generator.choice(sources) for n in range(number)]
402
403
404
405hamlet_scene = """
Fred Drake004d5e62000-10-23 17:22:08 +0000406LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000407
408 O, fear me not.
409 I stay too long: but here my father comes.
410
411 Enter POLONIUS
412
413 A double blessing is a double grace,
414 Occasion smiles upon a second leave.
415
Fred Drake004d5e62000-10-23 17:22:08 +0000416LORD POLONIUS
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000417
418 Yet here, Laertes! aboard, aboard, for shame!
419 The wind sits in the shoulder of your sail,
420 And you are stay'd for. There; my blessing with thee!
421 And these few precepts in thy memory
422 See thou character. Give thy thoughts no tongue,
423 Nor any unproportioned thought his act.
424 Be thou familiar, but by no means vulgar.
425 Those friends thou hast, and their adoption tried,
426 Grapple them to thy soul with hoops of steel;
427 But do not dull thy palm with entertainment
428 Of each new-hatch'd, unfledged comrade. Beware
429 Of entrance to a quarrel, but being in,
430 Bear't that the opposed may beware of thee.
431 Give every man thy ear, but few thy voice;
432 Take each man's censure, but reserve thy judgment.
433 Costly thy habit as thy purse can buy,
434 But not express'd in fancy; rich, not gaudy;
435 For the apparel oft proclaims the man,
436 And they in France of the best rank and station
437 Are of a most select and generous chief in that.
438 Neither a borrower nor a lender be;
439 For loan oft loses both itself and friend,
440 And borrowing dulls the edge of husbandry.
441 This above all: to thine ownself be true,
442 And it must follow, as the night the day,
443 Thou canst not then be false to any man.
444 Farewell: my blessing season this in thee!
445
Fred Drake004d5e62000-10-23 17:22:08 +0000446LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000447
448 Most humbly do I take my leave, my lord.
449
Fred Drake004d5e62000-10-23 17:22:08 +0000450LORD POLONIUS
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000451
452 The time invites you; go; your servants tend.
453
Fred Drake004d5e62000-10-23 17:22:08 +0000454LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000455
456 Farewell, Ophelia; and remember well
457 What I have said to you.
458
Fred Drake004d5e62000-10-23 17:22:08 +0000459OPHELIA
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000460
461 'Tis in my memory lock'd,
462 And you yourself shall keep the key of it.
463
Fred Drake004d5e62000-10-23 17:22:08 +0000464LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000465
466 Farewell.
467"""
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000468
469
470def test_main():
Walter Dörwald21d3a322003-05-01 17:45:56 +0000471 test_support.run_unittest(
472 ChecksumTestCase,
473 ExceptionTestCase,
474 CompressTestCase,
475 CompressObjectTestCase
476 )
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000477
478if __name__ == "__main__":
479 test_main()
480
481def test(tests=''):
482 if not tests: tests = 'o'
Walter Dörwald21d3a322003-05-01 17:45:56 +0000483 testcases = []
484 if 'k' in tests: testcases.append(ChecksumTestCase)
485 if 'x' in tests: testcases.append(ExceptionTestCase)
486 if 'c' in tests: testcases.append(CompressTestCase)
487 if 'o' in tests: testcases.append(CompressObjectTestCase)
488 test_support.run_unittest(*testcases)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000489
490if False:
491 import sys
492 sys.path.insert(1, '/Py23Src/python/dist/src/Lib/test')
493 import test_zlib as tz
494 ts, ut = tz.test_support, tz.unittest
495 su = ut.TestSuite()
496 su.addTest(ut.makeSuite(tz.CompressTestCase))
497 ts.run_suite(su)