blob: c703964e3f35fbf85c9bb9d0677194d1e7c19c46 [file] [log] [blame]
Guido van Rossum7d9ea502003-02-03 20:45:52 +00001import unittest
2from test import test_support
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +00003import zlib
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +00004import random
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +00005
Guido van Rossum7d9ea502003-02-03 20:45:52 +00006# print test_support.TESTFN
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +00007
Guido van Rossum7d9ea502003-02-03 20:45:52 +00008def getbuf():
9 # This was in the original. Avoid non-repeatable sources.
10 # Left here (unused) in case something wants to be done with it.
11 import imp
12 try:
13 t = imp.find_module('test_zlib')
14 file = t[0]
15 except ImportError:
16 file = open(__file__)
17 buf = file.read() * 8
18 file.close()
19 return buf
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +000020
Tim Peters0009c4e2001-02-21 07:29:48 +000021
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +000022
Guido van Rossum7d9ea502003-02-03 20:45:52 +000023class ChecksumTestCase(unittest.TestCase):
24 # checksum test cases
25 def test_crc32start(self):
26 self.assertEqual(zlib.crc32(""), zlib.crc32("", 0))
Andrew M. Kuchlingfcfc8d52001-08-10 15:50:11 +000027
Guido van Rossum7d9ea502003-02-03 20:45:52 +000028 def test_crc32empty(self):
29 self.assertEqual(zlib.crc32("", 0), 0)
30 self.assertEqual(zlib.crc32("", 1), 1)
31 self.assertEqual(zlib.crc32("", 432), 432)
Andrew M. Kuchling9a0f98e2001-02-21 02:17:01 +000032
Guido van Rossum7d9ea502003-02-03 20:45:52 +000033 def test_adler32start(self):
34 self.assertEqual(zlib.adler32(""), zlib.adler32("", 1))
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +000035
Guido van Rossum7d9ea502003-02-03 20:45:52 +000036 def test_adler32empty(self):
37 self.assertEqual(zlib.adler32("", 0), 0)
38 self.assertEqual(zlib.adler32("", 1), 1)
39 self.assertEqual(zlib.adler32("", 432), 432)
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +000040
Guido van Rossum7d9ea502003-02-03 20:45:52 +000041 def assertEqual32(self, seen, expected):
42 # 32-bit values masked -- checksums on 32- vs 64- bit machines
43 # This is important if bit 31 (0x08000000L) is set.
44 self.assertEqual(seen & 0x0FFFFFFFFL, expected & 0x0FFFFFFFFL)
45
46 def test_penguins(self):
47 self.assertEqual32(zlib.crc32("penguin", 0), 0x0e5c1a120L)
48 self.assertEqual32(zlib.crc32("penguin", 1), 0x43b6aa94)
49 self.assertEqual32(zlib.adler32("penguin", 0), 0x0bcf02f6)
50 self.assertEqual32(zlib.adler32("penguin", 1), 0x0bd602f7)
51
52 self.assertEqual(zlib.crc32("penguin"), zlib.crc32("penguin", 0))
53 self.assertEqual(zlib.adler32("penguin"),zlib.adler32("penguin",1))
54
55
56
57class ExceptionTestCase(unittest.TestCase):
58 # make sure we generate some expected errors
59 def test_bigbits(self):
60 # specifying total bits too large causes an error
61 self.assertRaises(zlib.error,
62 zlib.compress, 'ERROR', zlib.MAX_WBITS + 1)
63
64 def test_badcompressobj(self):
65 # verify failure on building compress object with bad params
66 self.assertRaises(ValueError, zlib.compressobj, 1, 8, 0)
67
68 def test_baddecompressobj(self):
69 # verify failure on building decompress object with bad params
70 self.assertRaises(ValueError, zlib.decompressobj, 0)
71
72
73
74class CompressTestCase(unittest.TestCase):
75 # Test compression in one go (whole message compression)
76 def test_speech(self):
77 # decompress(compress(data)) better be data
78 x = zlib.compress(hamlet_scene)
79 self.assertEqual(zlib.decompress(x), hamlet_scene)
80
81 def test_speech8(self):
82 # decompress(compress(data)) better be data -- more compression chances
83 data = hamlet_scene * 8
84 x = zlib.compress(data)
85 self.assertEqual(zlib.decompress(x), data)
86
87 def test_speech16(self):
88 # decompress(compress(data)) better be data -- more compression chances
89 data = hamlet_scene * 16
90 x = zlib.compress(data)
91 self.assertEqual(zlib.decompress(x), data)
92
93 def test_speech128(self):
94 # decompress(compress(data)) better be data -- more compression chances
95 data = hamlet_scene * 8 * 16
96 x = zlib.compress(data)
97 self.assertEqual(zlib.decompress(x), data)
98
99 def test_monotonic(self):
100 # higher compression levels should not expand compressed size
101 data = hamlet_scene * 8 * 16
102 last = length = len(zlib.compress(data, 0))
103 self.failUnless(last > len(data), "compress level 0 always expands")
104 for level in range(10):
105 length = len(zlib.compress(data, level))
Tim Petersf2715e02003-02-19 02:35:07 +0000106 self.failUnless(length <= last,
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000107 'compress level %d more effective than %d!' % (
108 level-1, level))
109 last = length
110
111
112
113class CompressObjectTestCase(unittest.TestCase):
114 # Test compression object
115 def test_pairsmall(self):
116 # use compress object in straightforward manner, decompress w/ object
117 data = hamlet_scene
118 co = zlib.compressobj(8, 8, -15)
119 x1 = co.compress(data)
120 x2 = co.flush()
121 self.assertRaises(zlib.error, co.flush) # second flush should not work
122 dco = zlib.decompressobj(-15)
123 y1 = dco.decompress(x1 + x2)
124 y2 = dco.flush()
125 self.assertEqual(data, y1 + y2)
126
127 def test_pair(self):
128 # straightforward compress/decompress objects, more compression
129 data = hamlet_scene * 8 * 16
130 co = zlib.compressobj(8, 8, -15)
131 x1 = co.compress(data)
132 x2 = co.flush()
133 self.assertRaises(zlib.error, co.flush) # second flush should not work
134 dco = zlib.decompressobj(-15)
135 y1 = dco.decompress(x1 + x2)
136 y2 = dco.flush()
137 self.assertEqual(data, y1 + y2)
138
139 def test_compressincremental(self):
140 # compress object in steps, decompress object as one-shot
141 data = hamlet_scene * 8 * 16
142 co = zlib.compressobj(2, 8, -12, 9, 1)
143 bufs = []
144 for i in range(0, len(data), 256):
145 bufs.append(co.compress(data[i:i+256]))
146 bufs.append(co.flush())
147 combuf = ''.join(bufs)
148
149 dco = zlib.decompressobj(-15)
150 y1 = dco.decompress(''.join(bufs))
151 y2 = dco.flush()
152 self.assertEqual(data, y1 + y2)
153
154 def test_decompressincremental(self):
155 # compress object in steps, decompress object in steps
156 data = hamlet_scene * 8 * 16
157 co = zlib.compressobj(2, 8, -12, 9, 1)
158 bufs = []
159 for i in range(0, len(data), 256):
160 bufs.append(co.compress(data[i:i+256]))
161 bufs.append(co.flush())
162 combuf = ''.join(bufs)
163
164 self.assertEqual(data, zlib.decompress(combuf, -12, -5))
165
166 dco = zlib.decompressobj(-12)
167 bufs = []
168 for i in range(0, len(combuf), 128):
169 bufs.append(dco.decompress(combuf[i:i+128]))
170 self.assertEqual('', dco.unconsumed_tail, ########
171 "(A) uct should be '': not %d long" %
172 len(dco.unconsumed_tail))
173 bufs.append(dco.flush())
174 self.assertEqual('', dco.unconsumed_tail, ########
175 "(B) uct should be '': not %d long" %
176 len(dco.unconsumed_tail))
177 self.assertEqual(data, ''.join(bufs))
178 # Failure means: "decompressobj with init options failed"
179
180 def test_decompinc(self,sizes=[128],flush=True,source=None,cx=256,dcx=64):
181 # compress object in steps, decompress object in steps, loop sizes
182 source = source or hamlet_scene
183 for reps in sizes:
184 data = source * reps
185 co = zlib.compressobj(2, 8, -12, 9, 1)
186 bufs = []
187 for i in range(0, len(data), cx):
188 bufs.append(co.compress(data[i:i+cx]))
189 bufs.append(co.flush())
190 combuf = ''.join(bufs)
191
192 self.assertEqual(data, zlib.decompress(combuf, -12, -5))
193
194 dco = zlib.decompressobj(-12)
195 bufs = []
196 for i in range(0, len(combuf), dcx):
197 bufs.append(dco.decompress(combuf[i:i+dcx]))
198 self.assertEqual('', dco.unconsumed_tail, ########
199 "(A) uct should be '': not %d long" %
200 len(dco.unconsumed_tail))
201 if flush:
202 bufs.append(dco.flush())
203 else:
204 while True:
205 chunk = dco.decompress('')
206 if chunk:
207 bufs.append(chunk)
208 else:
209 break
210 self.assertEqual('', dco.unconsumed_tail, ########
211 "(B) uct should be '': not %d long" %
212 len(dco.unconsumed_tail))
213 self.assertEqual(data, ''.join(bufs))
214 # Failure means: "decompressobj with init options failed"
215
216 def test_decompimax(self,sizes=[128],flush=True,source=None,cx=256,dcx=64):
217 # compress in steps, decompress in length-restricted steps, loop sizes
218 source = source or hamlet_scene
219 for reps in sizes:
220 # Check a decompression object with max_length specified
221 data = source * reps
222 co = zlib.compressobj(2, 8, -12, 9, 1)
223 bufs = []
224 for i in range(0, len(data), cx):
225 bufs.append(co.compress(data[i:i+cx]))
226 bufs.append(co.flush())
227 combuf = ''.join(bufs)
228 self.assertEqual(data, zlib.decompress(combuf, -12, -5),
229 'compressed data failure')
230
231 dco = zlib.decompressobj(-12)
232 bufs = []
233 cb = combuf
234 while cb:
Guido van Rossumf3594102003-02-27 18:39:18 +0000235 #max_length = 1 + len(cb)//10
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000236 chunk = dco.decompress(cb, dcx)
237 self.failIf(len(chunk) > dcx,
238 'chunk too big (%d>%d)' % (len(chunk), dcx))
239 bufs.append(chunk)
240 cb = dco.unconsumed_tail
241 if flush:
242 bufs.append(dco.flush())
243 else:
244 while True:
245 chunk = dco.decompress('', dcx)
246 self.failIf(len(chunk) > dcx,
247 'chunk too big in tail (%d>%d)' % (len(chunk), dcx))
248 if chunk:
249 bufs.append(chunk)
250 else:
251 break
252 self.assertEqual(len(data), len(''.join(bufs)))
253 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
254
255 def test_decompressmaxlen(self):
256 # Check a decompression object with max_length specified
257 data = hamlet_scene * 8 * 16
258 co = zlib.compressobj(2, 8, -12, 9, 1)
259 bufs = []
260 for i in range(0, len(data), 256):
261 bufs.append(co.compress(data[i:i+256]))
262 bufs.append(co.flush())
263 combuf = ''.join(bufs)
264 self.assertEqual(data, zlib.decompress(combuf, -12, -5),
265 'compressed data failure')
266
267 dco = zlib.decompressobj(-12)
268 bufs = []
269 cb = combuf
270 while cb:
Guido van Rossumf3594102003-02-27 18:39:18 +0000271 max_length = 1 + len(cb)//10
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000272 chunk = dco.decompress(cb, max_length)
273 self.failIf(len(chunk) > max_length,
274 'chunk too big (%d>%d)' % (len(chunk),max_length))
275 bufs.append(chunk)
276 cb = dco.unconsumed_tail
277 bufs.append(dco.flush())
278 self.assertEqual(len(data), len(''.join(bufs)))
279 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
280
281 def test_decompressmaxlenflushless(self):
282 # identical to test_decompressmaxlen except flush is replaced
283 # with an equivalent. This works and other fails on (eg) 2.2.2
284 data = hamlet_scene * 8 * 16
285 co = zlib.compressobj(2, 8, -12, 9, 1)
286 bufs = []
287 for i in range(0, len(data), 256):
288 bufs.append(co.compress(data[i:i+256]))
289 bufs.append(co.flush())
290 combuf = ''.join(bufs)
291 self.assertEqual(data, zlib.decompress(combuf, -12, -5),
292 'compressed data mismatch')
293
294 dco = zlib.decompressobj(-12)
295 bufs = []
296 cb = combuf
297 while cb:
Guido van Rossumf3594102003-02-27 18:39:18 +0000298 max_length = 1 + len(cb)//10
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000299 chunk = dco.decompress(cb, max_length)
300 self.failIf(len(chunk) > max_length,
301 'chunk too big (%d>%d)' % (len(chunk),max_length))
302 bufs.append(chunk)
303 cb = dco.unconsumed_tail
304
305 #bufs.append(dco.flush())
306 while len(chunk):
307 chunk = dco.decompress('', max_length)
308 self.failIf(len(chunk) > max_length,
309 'chunk too big (%d>%d)' % (len(chunk),max_length))
310 bufs.append(chunk)
311
312 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
313
314 def test_maxlenmisc(self):
315 # Misc tests of max_length
316 dco = zlib.decompressobj(-12)
317 self.assertRaises(ValueError, dco.decompress, "", -1)
318 self.assertEqual('', dco.unconsumed_tail)
319
320 def test_flushes(self):
321 # Test flush() with the various options, using all the
322 # different levels in order to provide more variations.
323 sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH']
324 sync_opt = [getattr(zlib, opt) for opt in sync_opt
325 if hasattr(zlib, opt)]
326 data = hamlet_scene * 8
327
328 for sync in sync_opt:
329 for level in range(10):
330 obj = zlib.compressobj( level )
331 a = obj.compress( data[:3000] )
332 b = obj.flush( sync )
333 c = obj.compress( data[3000:] )
334 d = obj.flush()
335 self.assertEqual(zlib.decompress(''.join([a,b,c,d])),
336 data, ("Decompress failed: flush "
337 "mode=%i, level=%i") % (sync, level))
338 del obj
339
340 def test_odd_flush(self):
341 # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
342 import random
343
344 if hasattr(zlib, 'Z_SYNC_FLUSH'):
345 # Testing on 17K of "random" data
346
347 # Create compressor and decompressor objects
348 co = zlib.compressobj(9)
349 dco = zlib.decompressobj()
350
351 # Try 17K of data
352 # generate random data stream
353 try:
354 # In 2.3 and later, WichmannHill is the RNG of the bug report
355 gen = random.WichmannHill()
356 except AttributeError:
357 try:
358 # 2.2 called it Random
359 gen = random.Random()
360 except AttributeError:
361 # others might simply have a single RNG
362 gen = random
363 gen.seed(1)
364 data = genblock(1, 17 * 1024, generator=gen)
365
366 # compress, sync-flush, and decompress
367 first = co.compress(data)
368 second = co.flush(zlib.Z_SYNC_FLUSH)
369 expanded = dco.decompress(first + second)
370
371 # if decompressed data is different from the input data, choke.
372 self.assertEqual(expanded, data, "17K random source doesn't match")
373
374 def test_manydecompinc(self):
375 # Run incremental decompress test for a large range of sizes
376 self.test_decompinc(sizes=[1<<n for n in range(8)],
377 flush=True, cx=32, dcx=4)
378
379 def test_manydecompimax(self):
380 # Run incremental decompress maxlen test for a large range of sizes
381 # avoid the flush bug
382 self.test_decompimax(sizes=[1<<n for n in range(8)],
383 flush=False, cx=32, dcx=4)
384
385 def test_manydecompimaxflush(self):
386 # Run incremental decompress maxlen test for a large range of sizes
387 # avoid the flush bug
388 self.test_decompimax(sizes=[1<<n for n in range(8)],
389 flush=True, cx=32, dcx=4)
390
391
392def genblock(seed, length, step=1024, generator=random):
393 """length-byte stream of random data from a seed (in step-byte blocks)."""
394 if seed is not None:
395 generator.seed(seed)
396 randint = generator.randint
397 if length < step or step < 2:
398 step = length
399 blocks = []
400 for i in range(0, length, step):
401 blocks.append(''.join([chr(randint(0,255))
402 for x in range(step)]))
403 return ''.join(blocks)[:length]
404
405
406
407def choose_lines(source, number, seed=None, generator=random):
408 """Return a list of number lines randomly chosen from the source"""
409 if seed is not None:
410 generator.seed(seed)
411 sources = source.split('\n')
412 return [generator.choice(sources) for n in range(number)]
413
414
415
416hamlet_scene = """
Fred Drake004d5e62000-10-23 17:22:08 +0000417LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000418
419 O, fear me not.
420 I stay too long: but here my father comes.
421
422 Enter POLONIUS
423
424 A double blessing is a double grace,
425 Occasion smiles upon a second leave.
426
Fred Drake004d5e62000-10-23 17:22:08 +0000427LORD POLONIUS
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000428
429 Yet here, Laertes! aboard, aboard, for shame!
430 The wind sits in the shoulder of your sail,
431 And you are stay'd for. There; my blessing with thee!
432 And these few precepts in thy memory
433 See thou character. Give thy thoughts no tongue,
434 Nor any unproportioned thought his act.
435 Be thou familiar, but by no means vulgar.
436 Those friends thou hast, and their adoption tried,
437 Grapple them to thy soul with hoops of steel;
438 But do not dull thy palm with entertainment
439 Of each new-hatch'd, unfledged comrade. Beware
440 Of entrance to a quarrel, but being in,
441 Bear't that the opposed may beware of thee.
442 Give every man thy ear, but few thy voice;
443 Take each man's censure, but reserve thy judgment.
444 Costly thy habit as thy purse can buy,
445 But not express'd in fancy; rich, not gaudy;
446 For the apparel oft proclaims the man,
447 And they in France of the best rank and station
448 Are of a most select and generous chief in that.
449 Neither a borrower nor a lender be;
450 For loan oft loses both itself and friend,
451 And borrowing dulls the edge of husbandry.
452 This above all: to thine ownself be true,
453 And it must follow, as the night the day,
454 Thou canst not then be false to any man.
455 Farewell: my blessing season this in thee!
456
Fred Drake004d5e62000-10-23 17:22:08 +0000457LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000458
459 Most humbly do I take my leave, my lord.
460
Fred Drake004d5e62000-10-23 17:22:08 +0000461LORD POLONIUS
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000462
463 The time invites you; go; your servants tend.
464
Fred Drake004d5e62000-10-23 17:22:08 +0000465LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000466
467 Farewell, Ophelia; and remember well
468 What I have said to you.
469
Fred Drake004d5e62000-10-23 17:22:08 +0000470OPHELIA
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000471
472 'Tis in my memory lock'd,
473 And you yourself shall keep the key of it.
474
Fred Drake004d5e62000-10-23 17:22:08 +0000475LAERTES
Jeremy Hylton6eb4b6a1997-08-15 15:59:43 +0000476
477 Farewell.
478"""
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000479
480
481def test_main():
Walter Dörwald21d3a322003-05-01 17:45:56 +0000482 test_support.run_unittest(
483 ChecksumTestCase,
484 ExceptionTestCase,
485 CompressTestCase,
486 CompressObjectTestCase
487 )
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000488
489if __name__ == "__main__":
490 test_main()
491
492def test(tests=''):
493 if not tests: tests = 'o'
Walter Dörwald21d3a322003-05-01 17:45:56 +0000494 testcases = []
495 if 'k' in tests: testcases.append(ChecksumTestCase)
496 if 'x' in tests: testcases.append(ExceptionTestCase)
497 if 'c' in tests: testcases.append(CompressTestCase)
498 if 'o' in tests: testcases.append(CompressObjectTestCase)
499 test_support.run_unittest(*testcases)
Guido van Rossum7d9ea502003-02-03 20:45:52 +0000500
501if False:
502 import sys
503 sys.path.insert(1, '/Py23Src/python/dist/src/Lib/test')
504 import test_zlib as tz
505 ts, ut = tz.test_support, tz.unittest
506 su = ut.TestSuite()
507 su.addTest(ut.makeSuite(tz.CompressTestCase))
508 ts.run_suite(su)