blob: 3c51673a922917c0327aed0faec63330106ce0f5 [file] [log] [blame]
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +00001"""Test script for the gzip module.
2"""
3
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Martin Pantere99e9772015-11-20 08:13:35 +00006from test.support import bigmemtest, _4G
Christian Heimes05e8be12008-02-23 18:30:17 +00007import os
Antoine Pitroub1f88352010-01-03 22:37:40 +00008import io
Antoine Pitrou42db3ef2009-01-04 21:37:59 +00009import struct
Serhiy Storchakabca63b32015-03-23 14:59:48 +020010import array
Ezio Melotti78ea2022009-09-12 18:41:20 +000011gzip = support.import_module('gzip')
Andrew M. Kuchling605ebdd1999-03-25 21:50:27 +000012
Walter Dörwald5b1284d2007-06-06 16:43:59 +000013data1 = b""" int length=DEFAULTALLOC, err = Z_OK;
Andrew M. Kuchling605ebdd1999-03-25 21:50:27 +000014 PyObject *RetVal;
15 int flushmode = Z_FINISH;
16 unsigned long start_total_out;
17
18"""
19
Walter Dörwald5b1284d2007-06-06 16:43:59 +000020data2 = b"""/* zlibmodule.c -- gzip-compatible data compression */
Neal Norwitz014f1032004-07-29 03:55:56 +000021/* See http://www.gzip.org/zlib/
Andrew M. Kuchling605ebdd1999-03-25 21:50:27 +000022/* See http://www.winimage.com/zLibDll for Windows */
23"""
24
Andrew M. Kuchling605ebdd1999-03-25 21:50:27 +000025
Antoine Pitrou7b969842010-09-23 16:22:51 +000026class UnseekableIO(io.BytesIO):
27 def seekable(self):
28 return False
29
30 def tell(self):
31 raise io.UnsupportedOperation
32
33 def seek(self, *args):
34 raise io.UnsupportedOperation
35
36
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +020037class BaseTest(unittest.TestCase):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000038 filename = support.TESTFN
Tim Peters5cfb05e2004-07-27 21:02:02 +000039
Georg Brandlb533e262008-05-25 18:19:30 +000040 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000041 support.unlink(self.filename)
Andrew M. Kuchling605ebdd1999-03-25 21:50:27 +000042
Georg Brandlb533e262008-05-25 18:19:30 +000043 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000044 support.unlink(self.filename)
Andrew M. Kuchling605ebdd1999-03-25 21:50:27 +000045
Andrew M. Kuchling85ab7382000-07-29 20:18:34 +000046
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +020047class TestGzip(BaseTest):
Serhiy Storchakabca63b32015-03-23 14:59:48 +020048 def write_and_read_back(self, data, mode='b'):
49 b_data = bytes(data)
50 with gzip.GzipFile(self.filename, 'w'+mode) as f:
51 l = f.write(data)
52 self.assertEqual(l, len(b_data))
53 with gzip.GzipFile(self.filename, 'r'+mode) as f:
54 self.assertEqual(f.read(), b_data)
55
Georg Brandlb533e262008-05-25 18:19:30 +000056 def test_write(self):
Brian Curtin28f96b52010-10-13 02:21:42 +000057 with gzip.GzipFile(self.filename, 'wb') as f:
58 f.write(data1 * 50)
Andrew M. Kuchling85ab7382000-07-29 20:18:34 +000059
Brian Curtin28f96b52010-10-13 02:21:42 +000060 # Try flush and fileno.
61 f.flush()
62 f.fileno()
63 if hasattr(os, 'fsync'):
64 os.fsync(f.fileno())
65 f.close()
Andrew M. Kuchling85ab7382000-07-29 20:18:34 +000066
Georg Brandlb533e262008-05-25 18:19:30 +000067 # Test multiple close() calls.
68 f.close()
69
Serhiy Storchakabca63b32015-03-23 14:59:48 +020070 # The following test_write_xy methods test that write accepts
71 # the corresponding bytes-like object type as input
72 # and that the data written equals bytes(xy) in all cases.
73 def test_write_memoryview(self):
74 self.write_and_read_back(memoryview(data1 * 50))
75 m = memoryview(bytes(range(256)))
76 data = m.cast('B', shape=[8,8,4])
77 self.write_and_read_back(data)
78
79 def test_write_bytearray(self):
80 self.write_and_read_back(bytearray(data1 * 50))
81
82 def test_write_array(self):
83 self.write_and_read_back(array.array('I', data1 * 40))
84
85 def test_write_incompatible_type(self):
86 # Test that non-bytes-like types raise TypeError.
87 # Issue #21560: attempts to write incompatible types
88 # should not affect the state of the fileobject
89 with gzip.GzipFile(self.filename, 'wb') as f:
90 with self.assertRaises(TypeError):
91 f.write('')
92 with self.assertRaises(TypeError):
93 f.write([])
94 f.write(data1)
95 with gzip.GzipFile(self.filename, 'rb') as f:
96 self.assertEqual(f.read(), data1)
97
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +000098 def test_read(self):
99 self.test_write()
100 # Try reading.
Brian Curtin28f96b52010-10-13 02:21:42 +0000101 with gzip.GzipFile(self.filename, 'r') as f:
102 d = f.read()
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000103 self.assertEqual(d, data1*50)
Andrew M. Kuchling85ab7382000-07-29 20:18:34 +0000104
Antoine Pitrou4ec4b0c2011-04-04 21:00:37 +0200105 def test_read1(self):
106 self.test_write()
107 blocks = []
108 nread = 0
109 with gzip.GzipFile(self.filename, 'r') as f:
110 while True:
111 d = f.read1()
112 if not d:
113 break
114 blocks.append(d)
115 nread += len(d)
116 # Check that position was updated correctly (see issue10791).
117 self.assertEqual(f.tell(), nread)
118 self.assertEqual(b''.join(blocks), data1 * 50)
119
Martin Pantere99e9772015-11-20 08:13:35 +0000120 @bigmemtest(size=_4G, memuse=1)
121 def test_read_large(self, size):
122 # Read chunk size over UINT_MAX should be supported, despite zlib's
123 # limitation per low-level call
124 compressed = gzip.compress(data1, compresslevel=1)
125 f = gzip.GzipFile(fileobj=io.BytesIO(compressed), mode='rb')
126 self.assertEqual(f.read(size), data1)
127
Antoine Pitrou7980eaa2010-10-06 21:21:18 +0000128 def test_io_on_closed_object(self):
129 # Test that I/O operations on closed GzipFile objects raise a
130 # ValueError, just like the corresponding functions on file objects.
131
132 # Write to a file, open it for reading, then close it.
133 self.test_write()
134 f = gzip.GzipFile(self.filename, 'r')
Antoine Pitrou2dbc6e62015-04-11 00:31:01 +0200135 fileobj = f.fileobj
136 self.assertFalse(fileobj.closed)
Antoine Pitrou7980eaa2010-10-06 21:21:18 +0000137 f.close()
Antoine Pitrou2dbc6e62015-04-11 00:31:01 +0200138 self.assertTrue(fileobj.closed)
Antoine Pitrou7980eaa2010-10-06 21:21:18 +0000139 with self.assertRaises(ValueError):
140 f.read(1)
141 with self.assertRaises(ValueError):
142 f.seek(0)
143 with self.assertRaises(ValueError):
144 f.tell()
145 # Open the file for writing, then close it.
146 f = gzip.GzipFile(self.filename, 'w')
Antoine Pitrou2dbc6e62015-04-11 00:31:01 +0200147 fileobj = f.fileobj
148 self.assertFalse(fileobj.closed)
Antoine Pitrou7980eaa2010-10-06 21:21:18 +0000149 f.close()
Antoine Pitrou2dbc6e62015-04-11 00:31:01 +0200150 self.assertTrue(fileobj.closed)
Antoine Pitrou7980eaa2010-10-06 21:21:18 +0000151 with self.assertRaises(ValueError):
152 f.write(b'')
153 with self.assertRaises(ValueError):
154 f.flush()
155
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000156 def test_append(self):
157 self.test_write()
158 # Append to the previous file
Brian Curtin28f96b52010-10-13 02:21:42 +0000159 with gzip.GzipFile(self.filename, 'ab') as f:
160 f.write(data2 * 15)
Andrew M. Kuchling85ab7382000-07-29 20:18:34 +0000161
Brian Curtin28f96b52010-10-13 02:21:42 +0000162 with gzip.GzipFile(self.filename, 'rb') as f:
163 d = f.read()
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000164 self.assertEqual(d, (data1*50) + (data2*15))
Andrew M. Kuchling85ab7382000-07-29 20:18:34 +0000165
Andrew M. Kuchling01cb47b2005-06-09 14:19:32 +0000166 def test_many_append(self):
167 # Bug #1074261 was triggered when reading a file that contained
168 # many, many members. Create such a file and verify that reading it
169 # works.
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +0200170 with gzip.GzipFile(self.filename, 'wb', 9) as f:
Walter Dörwald5b1284d2007-06-06 16:43:59 +0000171 f.write(b'a')
Brian Curtin28f96b52010-10-13 02:21:42 +0000172 for i in range(0, 200):
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +0200173 with gzip.GzipFile(self.filename, "ab", 9) as f: # append
Brian Curtin28f96b52010-10-13 02:21:42 +0000174 f.write(b'a')
Andrew M. Kuchling01cb47b2005-06-09 14:19:32 +0000175
176 # Try reading the file
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +0200177 with gzip.GzipFile(self.filename, "rb") as zgfile:
Brian Curtin28f96b52010-10-13 02:21:42 +0000178 contents = b""
179 while 1:
180 ztxt = zgfile.read(8192)
181 contents += ztxt
182 if not ztxt: break
Ezio Melottib3aedd42010-11-20 19:04:17 +0000183 self.assertEqual(contents, b'a'*201)
Andrew M. Kuchling01cb47b2005-06-09 14:19:32 +0000184
Nadeem Vawdaee1be992013-10-19 00:11:13 +0200185 def test_exclusive_write(self):
186 with gzip.GzipFile(self.filename, 'xb') as f:
187 f.write(data1 * 50)
188 with gzip.GzipFile(self.filename, 'rb') as f:
189 self.assertEqual(f.read(), data1 * 50)
190 with self.assertRaises(FileExistsError):
191 gzip.GzipFile(self.filename, 'xb')
192
Antoine Pitroub1f88352010-01-03 22:37:40 +0000193 def test_buffered_reader(self):
194 # Issue #7471: a GzipFile can be wrapped in a BufferedReader for
195 # performance.
196 self.test_write()
197
Brian Curtin28f96b52010-10-13 02:21:42 +0000198 with gzip.GzipFile(self.filename, 'rb') as f:
199 with io.BufferedReader(f) as r:
200 lines = [line for line in r]
Antoine Pitroub1f88352010-01-03 22:37:40 +0000201
Ezio Melottid8b509b2011-09-28 17:37:55 +0300202 self.assertEqual(lines, 50 * data1.splitlines(keepends=True))
Andrew M. Kuchling01cb47b2005-06-09 14:19:32 +0000203
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000204 def test_readline(self):
205 self.test_write()
206 # Try .readline() with varying line lengths
Martin v. Löwis8cc965c2001-08-09 07:21:56 +0000207
Brian Curtin28f96b52010-10-13 02:21:42 +0000208 with gzip.GzipFile(self.filename, 'rb') as f:
209 line_length = 0
210 while 1:
211 L = f.readline(line_length)
212 if not L and line_length != 0: break
213 self.assertTrue(len(L) <= line_length)
214 line_length = (line_length + 1) % 50
Martin v. Löwis8cc965c2001-08-09 07:21:56 +0000215
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000216 def test_readlines(self):
217 self.test_write()
218 # Try .readlines()
Andrew M. Kuchling605ebdd1999-03-25 21:50:27 +0000219
Brian Curtin28f96b52010-10-13 02:21:42 +0000220 with gzip.GzipFile(self.filename, 'rb') as f:
221 L = f.readlines()
Skip Montanaro12424bc2002-05-23 01:43:05 +0000222
Brian Curtin28f96b52010-10-13 02:21:42 +0000223 with gzip.GzipFile(self.filename, 'rb') as f:
224 while 1:
225 L = f.readlines(150)
226 if L == []: break
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000227
228 def test_seek_read(self):
229 self.test_write()
230 # Try seek, read test
231
Brian Curtin28f96b52010-10-13 02:21:42 +0000232 with gzip.GzipFile(self.filename) as f:
233 while 1:
234 oldpos = f.tell()
235 line1 = f.readline()
236 if not line1: break
237 newpos = f.tell()
238 f.seek(oldpos) # negative seek
239 if len(line1)>10:
240 amount = 10
241 else:
242 amount = len(line1)
243 line2 = f.read(amount)
244 self.assertEqual(line1[:amount], line2)
245 f.seek(newpos) # positive seek
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000246
Thomas Wouters89f507f2006-12-13 04:49:30 +0000247 def test_seek_whence(self):
248 self.test_write()
249 # Try seek(whence=1), read test
250
Brian Curtin28f96b52010-10-13 02:21:42 +0000251 with gzip.GzipFile(self.filename) as f:
252 f.read(10)
253 f.seek(10, whence=1)
254 y = f.read(10)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000255 self.assertEqual(y, data1[20:30])
Thomas Wouters9fe394c2007-02-05 01:24:16 +0000256
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000257 def test_seek_write(self):
258 # Try seek, write test
Brian Curtin28f96b52010-10-13 02:21:42 +0000259 with gzip.GzipFile(self.filename, 'w') as f:
260 for pos in range(0, 256, 16):
261 f.seek(pos)
262 f.write(b'GZ\n')
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000263
264 def test_mode(self):
265 self.test_write()
Brian Curtin28f96b52010-10-13 02:21:42 +0000266 with gzip.GzipFile(self.filename, 'r') as f:
267 self.assertEqual(f.myfileobj.mode, 'rb')
Nadeem Vawdaee1be992013-10-19 00:11:13 +0200268 support.unlink(self.filename)
269 with gzip.GzipFile(self.filename, 'x') as f:
270 self.assertEqual(f.myfileobj.mode, 'xb')
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000271
Thomas Wouterscf297e42007-02-23 15:07:44 +0000272 def test_1647484(self):
273 for mode in ('wb', 'rb'):
Brian Curtin28f96b52010-10-13 02:21:42 +0000274 with gzip.GzipFile(self.filename, mode) as f:
275 self.assertTrue(hasattr(f, "name"))
276 self.assertEqual(f.name, self.filename)
Thomas Wouterscf297e42007-02-23 15:07:44 +0000277
Georg Brandl9f1c1dc2010-11-20 11:25:01 +0000278 def test_paddedfile_getattr(self):
279 self.test_write()
280 with gzip.GzipFile(self.filename, 'rb') as f:
281 self.assertTrue(hasattr(f.fileobj, "name"))
282 self.assertEqual(f.fileobj.name, self.filename)
283
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000284 def test_mtime(self):
285 mtime = 123456789
Brian Curtin28f96b52010-10-13 02:21:42 +0000286 with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
287 fWrite.write(data1)
288 with gzip.GzipFile(self.filename) as fRead:
Antoine Pitrou2dbc6e62015-04-11 00:31:01 +0200289 self.assertTrue(hasattr(fRead, 'mtime'))
290 self.assertIsNone(fRead.mtime)
Brian Curtin28f96b52010-10-13 02:21:42 +0000291 dataRead = fRead.read()
292 self.assertEqual(dataRead, data1)
Brian Curtin28f96b52010-10-13 02:21:42 +0000293 self.assertEqual(fRead.mtime, mtime)
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000294
295 def test_metadata(self):
296 mtime = 123456789
297
Brian Curtin28f96b52010-10-13 02:21:42 +0000298 with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
299 fWrite.write(data1)
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000300
Brian Curtin28f96b52010-10-13 02:21:42 +0000301 with open(self.filename, 'rb') as fRead:
302 # see RFC 1952: http://www.faqs.org/rfcs/rfc1952.html
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000303
Brian Curtin28f96b52010-10-13 02:21:42 +0000304 idBytes = fRead.read(2)
305 self.assertEqual(idBytes, b'\x1f\x8b') # gzip ID
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000306
Brian Curtin28f96b52010-10-13 02:21:42 +0000307 cmByte = fRead.read(1)
308 self.assertEqual(cmByte, b'\x08') # deflate
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000309
Brian Curtin28f96b52010-10-13 02:21:42 +0000310 flagsByte = fRead.read(1)
311 self.assertEqual(flagsByte, b'\x08') # only the FNAME flag is set
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000312
Brian Curtin28f96b52010-10-13 02:21:42 +0000313 mtimeBytes = fRead.read(4)
314 self.assertEqual(mtimeBytes, struct.pack('<i', mtime)) # little-endian
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000315
Brian Curtin28f96b52010-10-13 02:21:42 +0000316 xflByte = fRead.read(1)
317 self.assertEqual(xflByte, b'\x02') # maximum compression
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000318
Brian Curtin28f96b52010-10-13 02:21:42 +0000319 osByte = fRead.read(1)
320 self.assertEqual(osByte, b'\xff') # OS "unknown" (OS-independent)
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000321
Brian Curtin28f96b52010-10-13 02:21:42 +0000322 # Since the FNAME flag is set, the zero-terminated filename follows.
323 # RFC 1952 specifies that this is the name of the input file, if any.
324 # However, the gzip module defaults to storing the name of the output
325 # file in this field.
326 expected = self.filename.encode('Latin-1') + b'\x00'
327 nameBytes = fRead.read(len(expected))
328 self.assertEqual(nameBytes, expected)
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000329
Brian Curtin28f96b52010-10-13 02:21:42 +0000330 # Since no other flags were set, the header ends here.
331 # Rather than process the compressed data, let's seek to the trailer.
332 fRead.seek(os.stat(self.filename).st_size - 8)
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000333
Brian Curtin28f96b52010-10-13 02:21:42 +0000334 crc32Bytes = fRead.read(4) # CRC32 of uncompressed data [data1]
335 self.assertEqual(crc32Bytes, b'\xaf\xd7d\x83')
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000336
Brian Curtin28f96b52010-10-13 02:21:42 +0000337 isizeBytes = fRead.read(4)
338 self.assertEqual(isizeBytes, struct.pack('<i', len(data1)))
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000339
Antoine Pitrou308705e2009-01-10 16:22:51 +0000340 def test_with_open(self):
341 # GzipFile supports the context management protocol
342 with gzip.GzipFile(self.filename, "wb") as f:
343 f.write(b"xxx")
344 f = gzip.GzipFile(self.filename, "rb")
345 f.close()
346 try:
347 with f:
348 pass
349 except ValueError:
350 pass
351 else:
352 self.fail("__enter__ on a closed file didn't raise an exception")
353 try:
354 with gzip.GzipFile(self.filename, "wb") as f:
355 1/0
356 except ZeroDivisionError:
357 pass
358 else:
359 self.fail("1/0 didn't raise an exception")
360
Antoine Pitrou8e33fd72010-01-13 14:37:26 +0000361 def test_zero_padded_file(self):
362 with gzip.GzipFile(self.filename, "wb") as f:
363 f.write(data1 * 50)
364
365 # Pad the file with zeroes
366 with open(self.filename, "ab") as f:
367 f.write(b"\x00" * 50)
368
369 with gzip.GzipFile(self.filename, "rb") as f:
370 d = f.read()
371 self.assertEqual(d, data1 * 50, "Incorrect data in file")
372
Antoine Pitrou7b969842010-09-23 16:22:51 +0000373 def test_non_seekable_file(self):
374 uncompressed = data1 * 50
375 buf = UnseekableIO()
376 with gzip.GzipFile(fileobj=buf, mode="wb") as f:
377 f.write(uncompressed)
378 compressed = buf.getvalue()
379 buf = UnseekableIO(compressed)
380 with gzip.GzipFile(fileobj=buf, mode="rb") as f:
381 self.assertEqual(f.read(), uncompressed)
382
Antoine Pitrouc3ed2e72010-09-29 10:49:46 +0000383 def test_peek(self):
384 uncompressed = data1 * 200
385 with gzip.GzipFile(self.filename, "wb") as f:
386 f.write(uncompressed)
387
388 def sizes():
389 while True:
390 for n in range(5, 50, 10):
391 yield n
392
393 with gzip.GzipFile(self.filename, "rb") as f:
394 f.max_read_chunk = 33
395 nread = 0
396 for n in sizes():
397 s = f.peek(n)
398 if s == b'':
399 break
400 self.assertEqual(f.read(len(s)), s)
401 nread += len(s)
402 self.assertEqual(f.read(100), b'')
403 self.assertEqual(nread, len(uncompressed))
404
Antoine Pitrou4ec4b0c2011-04-04 21:00:37 +0200405 def test_textio_readlines(self):
406 # Issue #10791: TextIOWrapper.readlines() fails when wrapping GzipFile.
Ezio Melottid8b509b2011-09-28 17:37:55 +0300407 lines = (data1 * 50).decode("ascii").splitlines(keepends=True)
Antoine Pitrou4ec4b0c2011-04-04 21:00:37 +0200408 self.test_write()
409 with gzip.GzipFile(self.filename, 'r') as f:
410 with io.TextIOWrapper(f, encoding="ascii") as t:
411 self.assertEqual(t.readlines(), lines)
412
Nadeem Vawda892b0b92012-01-18 09:25:58 +0200413 def test_fileobj_from_fdopen(self):
414 # Issue #13781: Opening a GzipFile for writing fails when using a
415 # fileobj created with os.fdopen().
416 fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT)
417 with os.fdopen(fd, "wb") as f:
418 with gzip.GzipFile(fileobj=f, mode="w") as g:
419 pass
420
Nadeem Vawda103e8112012-06-20 01:35:22 +0200421 def test_bytes_filename(self):
422 str_filename = self.filename
423 try:
424 bytes_filename = str_filename.encode("ascii")
425 except UnicodeEncodeError:
426 self.skipTest("Temporary file name needs to be ASCII")
427 with gzip.GzipFile(bytes_filename, "wb") as f:
428 f.write(data1 * 50)
429 with gzip.GzipFile(bytes_filename, "rb") as f:
430 self.assertEqual(f.read(), data1 * 50)
431 # Sanity check that we are actually operating on the right file.
432 with gzip.GzipFile(str_filename, "rb") as f:
433 self.assertEqual(f.read(), data1 * 50)
434
Antoine Pitrou2dbc6e62015-04-11 00:31:01 +0200435 def test_decompress_limited(self):
436 """Decompressed data buffering should be limited"""
437 bomb = gzip.compress(bytes(int(2e6)), compresslevel=9)
438 self.assertLess(len(bomb), io.DEFAULT_BUFFER_SIZE)
439
440 bomb = io.BytesIO(bomb)
441 decomp = gzip.GzipFile(fileobj=bomb)
442 self.assertEqual(bytes(1), decomp.read(1))
443 max_decomp = 1 + io.DEFAULT_BUFFER_SIZE
444 self.assertLessEqual(decomp._buffer.raw.tell(), max_decomp,
445 "Excessive amount of data was decompressed")
446
Antoine Pitrou79c5ef12010-08-17 21:10:05 +0000447 # Testing compress/decompress shortcut functions
448
449 def test_compress(self):
450 for data in [data1, data2]:
451 for args in [(), (1,), (6,), (9,)]:
452 datac = gzip.compress(data, *args)
453 self.assertEqual(type(datac), bytes)
454 with gzip.GzipFile(fileobj=io.BytesIO(datac), mode="rb") as f:
455 self.assertEqual(f.read(), data)
456
457 def test_decompress(self):
458 for data in (data1, data2):
459 buf = io.BytesIO()
460 with gzip.GzipFile(fileobj=buf, mode="wb") as f:
461 f.write(data)
462 self.assertEqual(gzip.decompress(buf.getvalue()), data)
463 # Roundtrip with compress
464 datac = gzip.compress(data)
465 self.assertEqual(gzip.decompress(datac), data)
466
Serhiy Storchaka7c3922f2013-01-22 17:01:59 +0200467 def test_read_truncated(self):
468 data = data1*50
469 # Drop the CRC (4 bytes) and file size (4 bytes).
470 truncated = gzip.compress(data)[:-8]
471 with gzip.GzipFile(fileobj=io.BytesIO(truncated)) as f:
472 self.assertRaises(EOFError, f.read)
473 with gzip.GzipFile(fileobj=io.BytesIO(truncated)) as f:
474 self.assertEqual(f.read(len(data)), data)
475 self.assertRaises(EOFError, f.read, 1)
476 # Incomplete 10-byte header.
477 for i in range(2, 10):
478 with gzip.GzipFile(fileobj=io.BytesIO(truncated[:i])) as f:
479 self.assertRaises(EOFError, f.read, 1)
480
Serhiy Storchaka7e69f002013-04-08 22:35:02 +0300481 def test_read_with_extra(self):
482 # Gzip data with an extra field
483 gzdata = (b'\x1f\x8b\x08\x04\xb2\x17cQ\x02\xff'
484 b'\x05\x00Extra'
485 b'\x0bI-.\x01\x002\xd1Mx\x04\x00\x00\x00')
486 with gzip.GzipFile(fileobj=io.BytesIO(gzdata)) as f:
487 self.assertEqual(f.read(), b'Test')
Nadeem Vawda7e126202012-05-06 15:04:01 +0200488
Ned Deily61207392014-03-09 14:44:34 -0700489 def test_prepend_error(self):
490 # See issue #20875
491 with gzip.open(self.filename, "wb") as f:
492 f.write(data1)
493 with gzip.open(self.filename, "rb") as f:
Antoine Pitrou2dbc6e62015-04-11 00:31:01 +0200494 f._buffer.raw._fp.prepend()
Ned Deily61207392014-03-09 14:44:34 -0700495
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +0200496class TestOpen(BaseTest):
497 def test_binary_modes(self):
Nadeem Vawda7e126202012-05-06 15:04:01 +0200498 uncompressed = data1 * 50
Nadeem Vawdaee1be992013-10-19 00:11:13 +0200499
Nadeem Vawda7e126202012-05-06 15:04:01 +0200500 with gzip.open(self.filename, "wb") as f:
501 f.write(uncompressed)
502 with open(self.filename, "rb") as f:
503 file_data = gzip.decompress(f.read())
504 self.assertEqual(file_data, uncompressed)
Nadeem Vawdaee1be992013-10-19 00:11:13 +0200505
Nadeem Vawda7e126202012-05-06 15:04:01 +0200506 with gzip.open(self.filename, "rb") as f:
507 self.assertEqual(f.read(), uncompressed)
Nadeem Vawdaee1be992013-10-19 00:11:13 +0200508
Nadeem Vawda7e126202012-05-06 15:04:01 +0200509 with gzip.open(self.filename, "ab") as f:
510 f.write(uncompressed)
511 with open(self.filename, "rb") as f:
512 file_data = gzip.decompress(f.read())
513 self.assertEqual(file_data, uncompressed * 2)
514
Nadeem Vawdaee1be992013-10-19 00:11:13 +0200515 with self.assertRaises(FileExistsError):
516 gzip.open(self.filename, "xb")
517 support.unlink(self.filename)
518 with gzip.open(self.filename, "xb") as f:
519 f.write(uncompressed)
520 with open(self.filename, "rb") as f:
521 file_data = gzip.decompress(f.read())
522 self.assertEqual(file_data, uncompressed)
523
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +0200524 def test_implicit_binary_modes(self):
Nadeem Vawda7e126202012-05-06 15:04:01 +0200525 # Test implicit binary modes (no "b" or "t" in mode string).
526 uncompressed = data1 * 50
Nadeem Vawdaee1be992013-10-19 00:11:13 +0200527
Nadeem Vawda7e126202012-05-06 15:04:01 +0200528 with gzip.open(self.filename, "w") as f:
529 f.write(uncompressed)
530 with open(self.filename, "rb") as f:
531 file_data = gzip.decompress(f.read())
532 self.assertEqual(file_data, uncompressed)
Nadeem Vawdaee1be992013-10-19 00:11:13 +0200533
Nadeem Vawda7e126202012-05-06 15:04:01 +0200534 with gzip.open(self.filename, "r") as f:
535 self.assertEqual(f.read(), uncompressed)
Nadeem Vawdaee1be992013-10-19 00:11:13 +0200536
Nadeem Vawda7e126202012-05-06 15:04:01 +0200537 with gzip.open(self.filename, "a") as f:
538 f.write(uncompressed)
539 with open(self.filename, "rb") as f:
540 file_data = gzip.decompress(f.read())
541 self.assertEqual(file_data, uncompressed * 2)
542
Nadeem Vawdaee1be992013-10-19 00:11:13 +0200543 with self.assertRaises(FileExistsError):
544 gzip.open(self.filename, "x")
545 support.unlink(self.filename)
546 with gzip.open(self.filename, "x") as f:
547 f.write(uncompressed)
548 with open(self.filename, "rb") as f:
549 file_data = gzip.decompress(f.read())
550 self.assertEqual(file_data, uncompressed)
551
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +0200552 def test_text_modes(self):
Nadeem Vawda11328e42012-05-06 19:24:18 +0200553 uncompressed = data1.decode("ascii") * 50
554 uncompressed_raw = uncompressed.replace("\n", os.linesep)
Nadeem Vawda7e126202012-05-06 15:04:01 +0200555 with gzip.open(self.filename, "wt") as f:
556 f.write(uncompressed)
557 with open(self.filename, "rb") as f:
558 file_data = gzip.decompress(f.read()).decode("ascii")
Nadeem Vawda11328e42012-05-06 19:24:18 +0200559 self.assertEqual(file_data, uncompressed_raw)
Nadeem Vawda7e126202012-05-06 15:04:01 +0200560 with gzip.open(self.filename, "rt") as f:
561 self.assertEqual(f.read(), uncompressed)
562 with gzip.open(self.filename, "at") as f:
563 f.write(uncompressed)
564 with open(self.filename, "rb") as f:
565 file_data = gzip.decompress(f.read()).decode("ascii")
Nadeem Vawda11328e42012-05-06 19:24:18 +0200566 self.assertEqual(file_data, uncompressed_raw * 2)
Nadeem Vawda7e126202012-05-06 15:04:01 +0200567
Nadeem Vawda68721012012-06-04 23:21:38 +0200568 def test_fileobj(self):
569 uncompressed_bytes = data1 * 50
570 uncompressed_str = uncompressed_bytes.decode("ascii")
571 compressed = gzip.compress(uncompressed_bytes)
572 with gzip.open(io.BytesIO(compressed), "r") as f:
573 self.assertEqual(f.read(), uncompressed_bytes)
574 with gzip.open(io.BytesIO(compressed), "rb") as f:
575 self.assertEqual(f.read(), uncompressed_bytes)
576 with gzip.open(io.BytesIO(compressed), "rt") as f:
577 self.assertEqual(f.read(), uncompressed_str)
578
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +0200579 def test_bad_params(self):
Nadeem Vawda7e126202012-05-06 15:04:01 +0200580 # Test invalid parameter combinations.
Nadeem Vawda68721012012-06-04 23:21:38 +0200581 with self.assertRaises(TypeError):
582 gzip.open(123.456)
Nadeem Vawda7e126202012-05-06 15:04:01 +0200583 with self.assertRaises(ValueError):
584 gzip.open(self.filename, "wbt")
585 with self.assertRaises(ValueError):
Nadeem Vawdaee1be992013-10-19 00:11:13 +0200586 gzip.open(self.filename, "xbt")
587 with self.assertRaises(ValueError):
Nadeem Vawda7e126202012-05-06 15:04:01 +0200588 gzip.open(self.filename, "rb", encoding="utf-8")
589 with self.assertRaises(ValueError):
590 gzip.open(self.filename, "rb", errors="ignore")
591 with self.assertRaises(ValueError):
592 gzip.open(self.filename, "rb", newline="\n")
593
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +0200594 def test_encoding(self):
Nadeem Vawda7e126202012-05-06 15:04:01 +0200595 # Test non-default encoding.
Nadeem Vawda11328e42012-05-06 19:24:18 +0200596 uncompressed = data1.decode("ascii") * 50
597 uncompressed_raw = uncompressed.replace("\n", os.linesep)
Nadeem Vawda7e126202012-05-06 15:04:01 +0200598 with gzip.open(self.filename, "wt", encoding="utf-16") as f:
599 f.write(uncompressed)
600 with open(self.filename, "rb") as f:
601 file_data = gzip.decompress(f.read()).decode("utf-16")
Nadeem Vawda11328e42012-05-06 19:24:18 +0200602 self.assertEqual(file_data, uncompressed_raw)
Nadeem Vawda7e126202012-05-06 15:04:01 +0200603 with gzip.open(self.filename, "rt", encoding="utf-16") as f:
604 self.assertEqual(f.read(), uncompressed)
605
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +0200606 def test_encoding_error_handler(self):
Nadeem Vawda7e126202012-05-06 15:04:01 +0200607 # Test with non-default encoding error handler.
608 with gzip.open(self.filename, "wb") as f:
609 f.write(b"foo\xffbar")
610 with gzip.open(self.filename, "rt", encoding="ascii", errors="ignore") \
611 as f:
612 self.assertEqual(f.read(), "foobar")
613
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +0200614 def test_newline(self):
Nadeem Vawda7e126202012-05-06 15:04:01 +0200615 # Test with explicit newline (universal newline mode disabled).
616 uncompressed = data1.decode("ascii") * 50
Nadeem Vawda9d9dc8e2012-05-06 16:25:35 +0200617 with gzip.open(self.filename, "wt", newline="\n") as f:
Nadeem Vawda7e126202012-05-06 15:04:01 +0200618 f.write(uncompressed)
619 with gzip.open(self.filename, "rt", newline="\r") as f:
620 self.assertEqual(f.readlines(), [uncompressed])
621
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000622def test_main(verbose=None):
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +0200623 support.run_unittest(TestGzip, TestOpen)
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000624
625if __name__ == "__main__":
626 test_main(verbose=True)