blob: 295d4d4a8fdf3fc0b2b502a90cb45ab2c9a71637 [file] [log] [blame]
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +00001"""Test script for the gzip module.
2"""
3
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Martin Pantere99e9772015-11-20 08:13:35 +00006from test.support import bigmemtest, _4G
Christian Heimes05e8be12008-02-23 18:30:17 +00007import os
Berker Peksag03020cf2016-10-02 13:47:58 +03008import pathlib
Antoine Pitroub1f88352010-01-03 22:37:40 +00009import io
Antoine Pitrou42db3ef2009-01-04 21:37:59 +000010import struct
Serhiy Storchakabca63b32015-03-23 14:59:48 +020011import array
Ezio Melotti78ea2022009-09-12 18:41:20 +000012gzip = support.import_module('gzip')
Andrew M. Kuchling605ebdd1999-03-25 21:50:27 +000013
Walter Dörwald5b1284d2007-06-06 16:43:59 +000014data1 = b""" int length=DEFAULTALLOC, err = Z_OK;
Andrew M. Kuchling605ebdd1999-03-25 21:50:27 +000015 PyObject *RetVal;
16 int flushmode = Z_FINISH;
17 unsigned long start_total_out;
18
19"""
20
Walter Dörwald5b1284d2007-06-06 16:43:59 +000021data2 = b"""/* zlibmodule.c -- gzip-compatible data compression */
Neal Norwitz014f1032004-07-29 03:55:56 +000022/* See http://www.gzip.org/zlib/
Andrew M. Kuchling605ebdd1999-03-25 21:50:27 +000023/* See http://www.winimage.com/zLibDll for Windows */
24"""
25
Andrew M. Kuchling605ebdd1999-03-25 21:50:27 +000026
Antoine Pitrou7b969842010-09-23 16:22:51 +000027class UnseekableIO(io.BytesIO):
28 def seekable(self):
29 return False
30
31 def tell(self):
32 raise io.UnsupportedOperation
33
34 def seek(self, *args):
35 raise io.UnsupportedOperation
36
37
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +020038class BaseTest(unittest.TestCase):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000039 filename = support.TESTFN
Tim Peters5cfb05e2004-07-27 21:02:02 +000040
Georg Brandlb533e262008-05-25 18:19:30 +000041 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000042 support.unlink(self.filename)
Andrew M. Kuchling605ebdd1999-03-25 21:50:27 +000043
Georg Brandlb533e262008-05-25 18:19:30 +000044 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000045 support.unlink(self.filename)
Andrew M. Kuchling605ebdd1999-03-25 21:50:27 +000046
Andrew M. Kuchling85ab7382000-07-29 20:18:34 +000047
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +020048class TestGzip(BaseTest):
Serhiy Storchakabca63b32015-03-23 14:59:48 +020049 def write_and_read_back(self, data, mode='b'):
50 b_data = bytes(data)
51 with gzip.GzipFile(self.filename, 'w'+mode) as f:
52 l = f.write(data)
53 self.assertEqual(l, len(b_data))
54 with gzip.GzipFile(self.filename, 'r'+mode) as f:
55 self.assertEqual(f.read(), b_data)
56
Georg Brandlb533e262008-05-25 18:19:30 +000057 def test_write(self):
Brian Curtin28f96b52010-10-13 02:21:42 +000058 with gzip.GzipFile(self.filename, 'wb') as f:
59 f.write(data1 * 50)
Andrew M. Kuchling85ab7382000-07-29 20:18:34 +000060
Brian Curtin28f96b52010-10-13 02:21:42 +000061 # Try flush and fileno.
62 f.flush()
63 f.fileno()
64 if hasattr(os, 'fsync'):
65 os.fsync(f.fileno())
66 f.close()
Andrew M. Kuchling85ab7382000-07-29 20:18:34 +000067
Georg Brandlb533e262008-05-25 18:19:30 +000068 # Test multiple close() calls.
69 f.close()
70
Berker Peksag03020cf2016-10-02 13:47:58 +030071 def test_write_read_with_pathlike_file(self):
72 filename = pathlib.Path(self.filename)
73 with gzip.GzipFile(filename, 'w') as f:
74 f.write(data1 * 50)
75 self.assertIsInstance(f.name, str)
76 with gzip.GzipFile(filename, 'a') as f:
77 f.write(data1)
78 with gzip.GzipFile(filename) as f:
79 d = f.read()
80 self.assertEqual(d, data1 * 51)
81 self.assertIsInstance(f.name, str)
82
Serhiy Storchakabca63b32015-03-23 14:59:48 +020083 # The following test_write_xy methods test that write accepts
84 # the corresponding bytes-like object type as input
85 # and that the data written equals bytes(xy) in all cases.
86 def test_write_memoryview(self):
87 self.write_and_read_back(memoryview(data1 * 50))
88 m = memoryview(bytes(range(256)))
89 data = m.cast('B', shape=[8,8,4])
90 self.write_and_read_back(data)
91
92 def test_write_bytearray(self):
93 self.write_and_read_back(bytearray(data1 * 50))
94
95 def test_write_array(self):
96 self.write_and_read_back(array.array('I', data1 * 40))
97
98 def test_write_incompatible_type(self):
99 # Test that non-bytes-like types raise TypeError.
100 # Issue #21560: attempts to write incompatible types
101 # should not affect the state of the fileobject
102 with gzip.GzipFile(self.filename, 'wb') as f:
103 with self.assertRaises(TypeError):
104 f.write('')
105 with self.assertRaises(TypeError):
106 f.write([])
107 f.write(data1)
108 with gzip.GzipFile(self.filename, 'rb') as f:
109 self.assertEqual(f.read(), data1)
110
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000111 def test_read(self):
112 self.test_write()
113 # Try reading.
Brian Curtin28f96b52010-10-13 02:21:42 +0000114 with gzip.GzipFile(self.filename, 'r') as f:
115 d = f.read()
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000116 self.assertEqual(d, data1*50)
Andrew M. Kuchling85ab7382000-07-29 20:18:34 +0000117
Antoine Pitrou4ec4b0c2011-04-04 21:00:37 +0200118 def test_read1(self):
119 self.test_write()
120 blocks = []
121 nread = 0
122 with gzip.GzipFile(self.filename, 'r') as f:
123 while True:
124 d = f.read1()
125 if not d:
126 break
127 blocks.append(d)
128 nread += len(d)
129 # Check that position was updated correctly (see issue10791).
130 self.assertEqual(f.tell(), nread)
131 self.assertEqual(b''.join(blocks), data1 * 50)
132
Martin Pantere99e9772015-11-20 08:13:35 +0000133 @bigmemtest(size=_4G, memuse=1)
134 def test_read_large(self, size):
135 # Read chunk size over UINT_MAX should be supported, despite zlib's
136 # limitation per low-level call
137 compressed = gzip.compress(data1, compresslevel=1)
138 f = gzip.GzipFile(fileobj=io.BytesIO(compressed), mode='rb')
139 self.assertEqual(f.read(size), data1)
140
Antoine Pitrou7980eaa2010-10-06 21:21:18 +0000141 def test_io_on_closed_object(self):
142 # Test that I/O operations on closed GzipFile objects raise a
143 # ValueError, just like the corresponding functions on file objects.
144
145 # Write to a file, open it for reading, then close it.
146 self.test_write()
147 f = gzip.GzipFile(self.filename, 'r')
Antoine Pitrou2dbc6e62015-04-11 00:31:01 +0200148 fileobj = f.fileobj
149 self.assertFalse(fileobj.closed)
Antoine Pitrou7980eaa2010-10-06 21:21:18 +0000150 f.close()
Antoine Pitrou2dbc6e62015-04-11 00:31:01 +0200151 self.assertTrue(fileobj.closed)
Antoine Pitrou7980eaa2010-10-06 21:21:18 +0000152 with self.assertRaises(ValueError):
153 f.read(1)
154 with self.assertRaises(ValueError):
155 f.seek(0)
156 with self.assertRaises(ValueError):
157 f.tell()
158 # Open the file for writing, then close it.
159 f = gzip.GzipFile(self.filename, 'w')
Antoine Pitrou2dbc6e62015-04-11 00:31:01 +0200160 fileobj = f.fileobj
161 self.assertFalse(fileobj.closed)
Antoine Pitrou7980eaa2010-10-06 21:21:18 +0000162 f.close()
Antoine Pitrou2dbc6e62015-04-11 00:31:01 +0200163 self.assertTrue(fileobj.closed)
Antoine Pitrou7980eaa2010-10-06 21:21:18 +0000164 with self.assertRaises(ValueError):
165 f.write(b'')
166 with self.assertRaises(ValueError):
167 f.flush()
168
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000169 def test_append(self):
170 self.test_write()
171 # Append to the previous file
Brian Curtin28f96b52010-10-13 02:21:42 +0000172 with gzip.GzipFile(self.filename, 'ab') as f:
173 f.write(data2 * 15)
Andrew M. Kuchling85ab7382000-07-29 20:18:34 +0000174
Brian Curtin28f96b52010-10-13 02:21:42 +0000175 with gzip.GzipFile(self.filename, 'rb') as f:
176 d = f.read()
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000177 self.assertEqual(d, (data1*50) + (data2*15))
Andrew M. Kuchling85ab7382000-07-29 20:18:34 +0000178
Andrew M. Kuchling01cb47b2005-06-09 14:19:32 +0000179 def test_many_append(self):
180 # Bug #1074261 was triggered when reading a file that contained
181 # many, many members. Create such a file and verify that reading it
182 # works.
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +0200183 with gzip.GzipFile(self.filename, 'wb', 9) as f:
Walter Dörwald5b1284d2007-06-06 16:43:59 +0000184 f.write(b'a')
Brian Curtin28f96b52010-10-13 02:21:42 +0000185 for i in range(0, 200):
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +0200186 with gzip.GzipFile(self.filename, "ab", 9) as f: # append
Brian Curtin28f96b52010-10-13 02:21:42 +0000187 f.write(b'a')
Andrew M. Kuchling01cb47b2005-06-09 14:19:32 +0000188
189 # Try reading the file
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +0200190 with gzip.GzipFile(self.filename, "rb") as zgfile:
Brian Curtin28f96b52010-10-13 02:21:42 +0000191 contents = b""
192 while 1:
193 ztxt = zgfile.read(8192)
194 contents += ztxt
195 if not ztxt: break
Ezio Melottib3aedd42010-11-20 19:04:17 +0000196 self.assertEqual(contents, b'a'*201)
Andrew M. Kuchling01cb47b2005-06-09 14:19:32 +0000197
Nadeem Vawdaee1be992013-10-19 00:11:13 +0200198 def test_exclusive_write(self):
199 with gzip.GzipFile(self.filename, 'xb') as f:
200 f.write(data1 * 50)
201 with gzip.GzipFile(self.filename, 'rb') as f:
202 self.assertEqual(f.read(), data1 * 50)
203 with self.assertRaises(FileExistsError):
204 gzip.GzipFile(self.filename, 'xb')
205
Antoine Pitroub1f88352010-01-03 22:37:40 +0000206 def test_buffered_reader(self):
207 # Issue #7471: a GzipFile can be wrapped in a BufferedReader for
208 # performance.
209 self.test_write()
210
Brian Curtin28f96b52010-10-13 02:21:42 +0000211 with gzip.GzipFile(self.filename, 'rb') as f:
212 with io.BufferedReader(f) as r:
213 lines = [line for line in r]
Antoine Pitroub1f88352010-01-03 22:37:40 +0000214
Ezio Melottid8b509b2011-09-28 17:37:55 +0300215 self.assertEqual(lines, 50 * data1.splitlines(keepends=True))
Andrew M. Kuchling01cb47b2005-06-09 14:19:32 +0000216
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000217 def test_readline(self):
218 self.test_write()
219 # Try .readline() with varying line lengths
Martin v. Löwis8cc965c2001-08-09 07:21:56 +0000220
Brian Curtin28f96b52010-10-13 02:21:42 +0000221 with gzip.GzipFile(self.filename, 'rb') as f:
222 line_length = 0
223 while 1:
224 L = f.readline(line_length)
225 if not L and line_length != 0: break
226 self.assertTrue(len(L) <= line_length)
227 line_length = (line_length + 1) % 50
Martin v. Löwis8cc965c2001-08-09 07:21:56 +0000228
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000229 def test_readlines(self):
230 self.test_write()
231 # Try .readlines()
Andrew M. Kuchling605ebdd1999-03-25 21:50:27 +0000232
Brian Curtin28f96b52010-10-13 02:21:42 +0000233 with gzip.GzipFile(self.filename, 'rb') as f:
234 L = f.readlines()
Skip Montanaro12424bc2002-05-23 01:43:05 +0000235
Brian Curtin28f96b52010-10-13 02:21:42 +0000236 with gzip.GzipFile(self.filename, 'rb') as f:
237 while 1:
238 L = f.readlines(150)
239 if L == []: break
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000240
241 def test_seek_read(self):
242 self.test_write()
243 # Try seek, read test
244
Brian Curtin28f96b52010-10-13 02:21:42 +0000245 with gzip.GzipFile(self.filename) as f:
246 while 1:
247 oldpos = f.tell()
248 line1 = f.readline()
249 if not line1: break
250 newpos = f.tell()
251 f.seek(oldpos) # negative seek
252 if len(line1)>10:
253 amount = 10
254 else:
255 amount = len(line1)
256 line2 = f.read(amount)
257 self.assertEqual(line1[:amount], line2)
258 f.seek(newpos) # positive seek
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000259
Thomas Wouters89f507f2006-12-13 04:49:30 +0000260 def test_seek_whence(self):
261 self.test_write()
262 # Try seek(whence=1), read test
263
Brian Curtin28f96b52010-10-13 02:21:42 +0000264 with gzip.GzipFile(self.filename) as f:
265 f.read(10)
266 f.seek(10, whence=1)
267 y = f.read(10)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000268 self.assertEqual(y, data1[20:30])
Thomas Wouters9fe394c2007-02-05 01:24:16 +0000269
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000270 def test_seek_write(self):
271 # Try seek, write test
Brian Curtin28f96b52010-10-13 02:21:42 +0000272 with gzip.GzipFile(self.filename, 'w') as f:
273 for pos in range(0, 256, 16):
274 f.seek(pos)
275 f.write(b'GZ\n')
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000276
277 def test_mode(self):
278 self.test_write()
Brian Curtin28f96b52010-10-13 02:21:42 +0000279 with gzip.GzipFile(self.filename, 'r') as f:
280 self.assertEqual(f.myfileobj.mode, 'rb')
Nadeem Vawdaee1be992013-10-19 00:11:13 +0200281 support.unlink(self.filename)
282 with gzip.GzipFile(self.filename, 'x') as f:
283 self.assertEqual(f.myfileobj.mode, 'xb')
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000284
Thomas Wouterscf297e42007-02-23 15:07:44 +0000285 def test_1647484(self):
286 for mode in ('wb', 'rb'):
Brian Curtin28f96b52010-10-13 02:21:42 +0000287 with gzip.GzipFile(self.filename, mode) as f:
288 self.assertTrue(hasattr(f, "name"))
289 self.assertEqual(f.name, self.filename)
Thomas Wouterscf297e42007-02-23 15:07:44 +0000290
Georg Brandl9f1c1dc2010-11-20 11:25:01 +0000291 def test_paddedfile_getattr(self):
292 self.test_write()
293 with gzip.GzipFile(self.filename, 'rb') as f:
294 self.assertTrue(hasattr(f.fileobj, "name"))
295 self.assertEqual(f.fileobj.name, self.filename)
296
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000297 def test_mtime(self):
298 mtime = 123456789
Brian Curtin28f96b52010-10-13 02:21:42 +0000299 with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
300 fWrite.write(data1)
301 with gzip.GzipFile(self.filename) as fRead:
Antoine Pitrou2dbc6e62015-04-11 00:31:01 +0200302 self.assertTrue(hasattr(fRead, 'mtime'))
303 self.assertIsNone(fRead.mtime)
Brian Curtin28f96b52010-10-13 02:21:42 +0000304 dataRead = fRead.read()
305 self.assertEqual(dataRead, data1)
Brian Curtin28f96b52010-10-13 02:21:42 +0000306 self.assertEqual(fRead.mtime, mtime)
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000307
308 def test_metadata(self):
309 mtime = 123456789
310
Brian Curtin28f96b52010-10-13 02:21:42 +0000311 with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
312 fWrite.write(data1)
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000313
Brian Curtin28f96b52010-10-13 02:21:42 +0000314 with open(self.filename, 'rb') as fRead:
315 # see RFC 1952: http://www.faqs.org/rfcs/rfc1952.html
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000316
Brian Curtin28f96b52010-10-13 02:21:42 +0000317 idBytes = fRead.read(2)
318 self.assertEqual(idBytes, b'\x1f\x8b') # gzip ID
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000319
Brian Curtin28f96b52010-10-13 02:21:42 +0000320 cmByte = fRead.read(1)
321 self.assertEqual(cmByte, b'\x08') # deflate
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000322
Brian Curtin28f96b52010-10-13 02:21:42 +0000323 flagsByte = fRead.read(1)
324 self.assertEqual(flagsByte, b'\x08') # only the FNAME flag is set
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000325
Brian Curtin28f96b52010-10-13 02:21:42 +0000326 mtimeBytes = fRead.read(4)
327 self.assertEqual(mtimeBytes, struct.pack('<i', mtime)) # little-endian
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000328
Brian Curtin28f96b52010-10-13 02:21:42 +0000329 xflByte = fRead.read(1)
330 self.assertEqual(xflByte, b'\x02') # maximum compression
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000331
Brian Curtin28f96b52010-10-13 02:21:42 +0000332 osByte = fRead.read(1)
333 self.assertEqual(osByte, b'\xff') # OS "unknown" (OS-independent)
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000334
Brian Curtin28f96b52010-10-13 02:21:42 +0000335 # Since the FNAME flag is set, the zero-terminated filename follows.
336 # RFC 1952 specifies that this is the name of the input file, if any.
337 # However, the gzip module defaults to storing the name of the output
338 # file in this field.
339 expected = self.filename.encode('Latin-1') + b'\x00'
340 nameBytes = fRead.read(len(expected))
341 self.assertEqual(nameBytes, expected)
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000342
Brian Curtin28f96b52010-10-13 02:21:42 +0000343 # Since no other flags were set, the header ends here.
344 # Rather than process the compressed data, let's seek to the trailer.
345 fRead.seek(os.stat(self.filename).st_size - 8)
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000346
Brian Curtin28f96b52010-10-13 02:21:42 +0000347 crc32Bytes = fRead.read(4) # CRC32 of uncompressed data [data1]
348 self.assertEqual(crc32Bytes, b'\xaf\xd7d\x83')
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000349
Brian Curtin28f96b52010-10-13 02:21:42 +0000350 isizeBytes = fRead.read(4)
351 self.assertEqual(isizeBytes, struct.pack('<i', len(data1)))
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000352
Antoine Pitrou308705e2009-01-10 16:22:51 +0000353 def test_with_open(self):
354 # GzipFile supports the context management protocol
355 with gzip.GzipFile(self.filename, "wb") as f:
356 f.write(b"xxx")
357 f = gzip.GzipFile(self.filename, "rb")
358 f.close()
359 try:
360 with f:
361 pass
362 except ValueError:
363 pass
364 else:
365 self.fail("__enter__ on a closed file didn't raise an exception")
366 try:
367 with gzip.GzipFile(self.filename, "wb") as f:
368 1/0
369 except ZeroDivisionError:
370 pass
371 else:
372 self.fail("1/0 didn't raise an exception")
373
Antoine Pitrou8e33fd72010-01-13 14:37:26 +0000374 def test_zero_padded_file(self):
375 with gzip.GzipFile(self.filename, "wb") as f:
376 f.write(data1 * 50)
377
378 # Pad the file with zeroes
379 with open(self.filename, "ab") as f:
380 f.write(b"\x00" * 50)
381
382 with gzip.GzipFile(self.filename, "rb") as f:
383 d = f.read()
384 self.assertEqual(d, data1 * 50, "Incorrect data in file")
385
Antoine Pitrou7b969842010-09-23 16:22:51 +0000386 def test_non_seekable_file(self):
387 uncompressed = data1 * 50
388 buf = UnseekableIO()
389 with gzip.GzipFile(fileobj=buf, mode="wb") as f:
390 f.write(uncompressed)
391 compressed = buf.getvalue()
392 buf = UnseekableIO(compressed)
393 with gzip.GzipFile(fileobj=buf, mode="rb") as f:
394 self.assertEqual(f.read(), uncompressed)
395
Antoine Pitrouc3ed2e72010-09-29 10:49:46 +0000396 def test_peek(self):
397 uncompressed = data1 * 200
398 with gzip.GzipFile(self.filename, "wb") as f:
399 f.write(uncompressed)
400
401 def sizes():
402 while True:
403 for n in range(5, 50, 10):
404 yield n
405
406 with gzip.GzipFile(self.filename, "rb") as f:
407 f.max_read_chunk = 33
408 nread = 0
409 for n in sizes():
410 s = f.peek(n)
411 if s == b'':
412 break
413 self.assertEqual(f.read(len(s)), s)
414 nread += len(s)
415 self.assertEqual(f.read(100), b'')
416 self.assertEqual(nread, len(uncompressed))
417
Antoine Pitrou4ec4b0c2011-04-04 21:00:37 +0200418 def test_textio_readlines(self):
419 # Issue #10791: TextIOWrapper.readlines() fails when wrapping GzipFile.
Ezio Melottid8b509b2011-09-28 17:37:55 +0300420 lines = (data1 * 50).decode("ascii").splitlines(keepends=True)
Antoine Pitrou4ec4b0c2011-04-04 21:00:37 +0200421 self.test_write()
422 with gzip.GzipFile(self.filename, 'r') as f:
423 with io.TextIOWrapper(f, encoding="ascii") as t:
424 self.assertEqual(t.readlines(), lines)
425
Nadeem Vawda892b0b92012-01-18 09:25:58 +0200426 def test_fileobj_from_fdopen(self):
427 # Issue #13781: Opening a GzipFile for writing fails when using a
428 # fileobj created with os.fdopen().
429 fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT)
430 with os.fdopen(fd, "wb") as f:
431 with gzip.GzipFile(fileobj=f, mode="w") as g:
432 pass
433
Serhiy Storchakabcbdd2f2017-10-22 13:18:21 +0300434 def test_fileobj_mode(self):
435 gzip.GzipFile(self.filename, "wb").close()
436 with open(self.filename, "r+b") as f:
437 with gzip.GzipFile(fileobj=f, mode='r') as g:
438 self.assertEqual(g.mode, gzip.READ)
439 with gzip.GzipFile(fileobj=f, mode='w') as g:
440 self.assertEqual(g.mode, gzip.WRITE)
441 with gzip.GzipFile(fileobj=f, mode='a') as g:
442 self.assertEqual(g.mode, gzip.WRITE)
443 with gzip.GzipFile(fileobj=f, mode='x') as g:
444 self.assertEqual(g.mode, gzip.WRITE)
445 with self.assertRaises(ValueError):
446 gzip.GzipFile(fileobj=f, mode='z')
447 for mode in "rb", "r+b":
448 with open(self.filename, mode) as f:
449 with gzip.GzipFile(fileobj=f) as g:
450 self.assertEqual(g.mode, gzip.READ)
451 for mode in "wb", "ab", "xb":
452 if "x" in mode:
453 support.unlink(self.filename)
454 with open(self.filename, mode) as f:
455 with gzip.GzipFile(fileobj=f) as g:
456 self.assertEqual(g.mode, gzip.WRITE)
457
Nadeem Vawda103e8112012-06-20 01:35:22 +0200458 def test_bytes_filename(self):
459 str_filename = self.filename
460 try:
461 bytes_filename = str_filename.encode("ascii")
462 except UnicodeEncodeError:
463 self.skipTest("Temporary file name needs to be ASCII")
464 with gzip.GzipFile(bytes_filename, "wb") as f:
465 f.write(data1 * 50)
466 with gzip.GzipFile(bytes_filename, "rb") as f:
467 self.assertEqual(f.read(), data1 * 50)
468 # Sanity check that we are actually operating on the right file.
469 with gzip.GzipFile(str_filename, "rb") as f:
470 self.assertEqual(f.read(), data1 * 50)
471
Antoine Pitrou2dbc6e62015-04-11 00:31:01 +0200472 def test_decompress_limited(self):
473 """Decompressed data buffering should be limited"""
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +0300474 bomb = gzip.compress(b'\0' * int(2e6), compresslevel=9)
Antoine Pitrou2dbc6e62015-04-11 00:31:01 +0200475 self.assertLess(len(bomb), io.DEFAULT_BUFFER_SIZE)
476
477 bomb = io.BytesIO(bomb)
478 decomp = gzip.GzipFile(fileobj=bomb)
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +0300479 self.assertEqual(decomp.read(1), b'\0')
Antoine Pitrou2dbc6e62015-04-11 00:31:01 +0200480 max_decomp = 1 + io.DEFAULT_BUFFER_SIZE
481 self.assertLessEqual(decomp._buffer.raw.tell(), max_decomp,
482 "Excessive amount of data was decompressed")
483
Antoine Pitrou79c5ef12010-08-17 21:10:05 +0000484 # Testing compress/decompress shortcut functions
485
486 def test_compress(self):
487 for data in [data1, data2]:
488 for args in [(), (1,), (6,), (9,)]:
489 datac = gzip.compress(data, *args)
490 self.assertEqual(type(datac), bytes)
491 with gzip.GzipFile(fileobj=io.BytesIO(datac), mode="rb") as f:
492 self.assertEqual(f.read(), data)
493
494 def test_decompress(self):
495 for data in (data1, data2):
496 buf = io.BytesIO()
497 with gzip.GzipFile(fileobj=buf, mode="wb") as f:
498 f.write(data)
499 self.assertEqual(gzip.decompress(buf.getvalue()), data)
500 # Roundtrip with compress
501 datac = gzip.compress(data)
502 self.assertEqual(gzip.decompress(datac), data)
503
Serhiy Storchaka7c3922f2013-01-22 17:01:59 +0200504 def test_read_truncated(self):
505 data = data1*50
506 # Drop the CRC (4 bytes) and file size (4 bytes).
507 truncated = gzip.compress(data)[:-8]
508 with gzip.GzipFile(fileobj=io.BytesIO(truncated)) as f:
509 self.assertRaises(EOFError, f.read)
510 with gzip.GzipFile(fileobj=io.BytesIO(truncated)) as f:
511 self.assertEqual(f.read(len(data)), data)
512 self.assertRaises(EOFError, f.read, 1)
513 # Incomplete 10-byte header.
514 for i in range(2, 10):
515 with gzip.GzipFile(fileobj=io.BytesIO(truncated[:i])) as f:
516 self.assertRaises(EOFError, f.read, 1)
517
Serhiy Storchaka7e69f002013-04-08 22:35:02 +0300518 def test_read_with_extra(self):
519 # Gzip data with an extra field
520 gzdata = (b'\x1f\x8b\x08\x04\xb2\x17cQ\x02\xff'
521 b'\x05\x00Extra'
522 b'\x0bI-.\x01\x002\xd1Mx\x04\x00\x00\x00')
523 with gzip.GzipFile(fileobj=io.BytesIO(gzdata)) as f:
524 self.assertEqual(f.read(), b'Test')
Nadeem Vawda7e126202012-05-06 15:04:01 +0200525
Ned Deily61207392014-03-09 14:44:34 -0700526 def test_prepend_error(self):
527 # See issue #20875
528 with gzip.open(self.filename, "wb") as f:
529 f.write(data1)
530 with gzip.open(self.filename, "rb") as f:
Antoine Pitrou2dbc6e62015-04-11 00:31:01 +0200531 f._buffer.raw._fp.prepend()
Ned Deily61207392014-03-09 14:44:34 -0700532
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +0200533class TestOpen(BaseTest):
534 def test_binary_modes(self):
Nadeem Vawda7e126202012-05-06 15:04:01 +0200535 uncompressed = data1 * 50
Nadeem Vawdaee1be992013-10-19 00:11:13 +0200536
Nadeem Vawda7e126202012-05-06 15:04:01 +0200537 with gzip.open(self.filename, "wb") as f:
538 f.write(uncompressed)
539 with open(self.filename, "rb") as f:
540 file_data = gzip.decompress(f.read())
541 self.assertEqual(file_data, uncompressed)
Nadeem Vawdaee1be992013-10-19 00:11:13 +0200542
Nadeem Vawda7e126202012-05-06 15:04:01 +0200543 with gzip.open(self.filename, "rb") as f:
544 self.assertEqual(f.read(), uncompressed)
Nadeem Vawdaee1be992013-10-19 00:11:13 +0200545
Nadeem Vawda7e126202012-05-06 15:04:01 +0200546 with gzip.open(self.filename, "ab") as f:
547 f.write(uncompressed)
548 with open(self.filename, "rb") as f:
549 file_data = gzip.decompress(f.read())
550 self.assertEqual(file_data, uncompressed * 2)
551
Nadeem Vawdaee1be992013-10-19 00:11:13 +0200552 with self.assertRaises(FileExistsError):
553 gzip.open(self.filename, "xb")
554 support.unlink(self.filename)
555 with gzip.open(self.filename, "xb") as f:
556 f.write(uncompressed)
557 with open(self.filename, "rb") as f:
558 file_data = gzip.decompress(f.read())
559 self.assertEqual(file_data, uncompressed)
560
Berker Peksag03020cf2016-10-02 13:47:58 +0300561 def test_pathlike_file(self):
562 filename = pathlib.Path(self.filename)
563 with gzip.open(filename, "wb") as f:
564 f.write(data1 * 50)
565 with gzip.open(filename, "ab") as f:
566 f.write(data1)
567 with gzip.open(filename) as f:
568 self.assertEqual(f.read(), data1 * 51)
569
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +0200570 def test_implicit_binary_modes(self):
Nadeem Vawda7e126202012-05-06 15:04:01 +0200571 # Test implicit binary modes (no "b" or "t" in mode string).
572 uncompressed = data1 * 50
Nadeem Vawdaee1be992013-10-19 00:11:13 +0200573
Nadeem Vawda7e126202012-05-06 15:04:01 +0200574 with gzip.open(self.filename, "w") as f:
575 f.write(uncompressed)
576 with open(self.filename, "rb") as f:
577 file_data = gzip.decompress(f.read())
578 self.assertEqual(file_data, uncompressed)
Nadeem Vawdaee1be992013-10-19 00:11:13 +0200579
Nadeem Vawda7e126202012-05-06 15:04:01 +0200580 with gzip.open(self.filename, "r") as f:
581 self.assertEqual(f.read(), uncompressed)
Nadeem Vawdaee1be992013-10-19 00:11:13 +0200582
Nadeem Vawda7e126202012-05-06 15:04:01 +0200583 with gzip.open(self.filename, "a") as f:
584 f.write(uncompressed)
585 with open(self.filename, "rb") as f:
586 file_data = gzip.decompress(f.read())
587 self.assertEqual(file_data, uncompressed * 2)
588
Nadeem Vawdaee1be992013-10-19 00:11:13 +0200589 with self.assertRaises(FileExistsError):
590 gzip.open(self.filename, "x")
591 support.unlink(self.filename)
592 with gzip.open(self.filename, "x") as f:
593 f.write(uncompressed)
594 with open(self.filename, "rb") as f:
595 file_data = gzip.decompress(f.read())
596 self.assertEqual(file_data, uncompressed)
597
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +0200598 def test_text_modes(self):
Nadeem Vawda11328e42012-05-06 19:24:18 +0200599 uncompressed = data1.decode("ascii") * 50
600 uncompressed_raw = uncompressed.replace("\n", os.linesep)
Nadeem Vawda7e126202012-05-06 15:04:01 +0200601 with gzip.open(self.filename, "wt") as f:
602 f.write(uncompressed)
603 with open(self.filename, "rb") as f:
604 file_data = gzip.decompress(f.read()).decode("ascii")
Nadeem Vawda11328e42012-05-06 19:24:18 +0200605 self.assertEqual(file_data, uncompressed_raw)
Nadeem Vawda7e126202012-05-06 15:04:01 +0200606 with gzip.open(self.filename, "rt") as f:
607 self.assertEqual(f.read(), uncompressed)
608 with gzip.open(self.filename, "at") as f:
609 f.write(uncompressed)
610 with open(self.filename, "rb") as f:
611 file_data = gzip.decompress(f.read()).decode("ascii")
Nadeem Vawda11328e42012-05-06 19:24:18 +0200612 self.assertEqual(file_data, uncompressed_raw * 2)
Nadeem Vawda7e126202012-05-06 15:04:01 +0200613
Nadeem Vawda68721012012-06-04 23:21:38 +0200614 def test_fileobj(self):
615 uncompressed_bytes = data1 * 50
616 uncompressed_str = uncompressed_bytes.decode("ascii")
617 compressed = gzip.compress(uncompressed_bytes)
618 with gzip.open(io.BytesIO(compressed), "r") as f:
619 self.assertEqual(f.read(), uncompressed_bytes)
620 with gzip.open(io.BytesIO(compressed), "rb") as f:
621 self.assertEqual(f.read(), uncompressed_bytes)
622 with gzip.open(io.BytesIO(compressed), "rt") as f:
623 self.assertEqual(f.read(), uncompressed_str)
624
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +0200625 def test_bad_params(self):
Nadeem Vawda7e126202012-05-06 15:04:01 +0200626 # Test invalid parameter combinations.
Nadeem Vawda68721012012-06-04 23:21:38 +0200627 with self.assertRaises(TypeError):
628 gzip.open(123.456)
Nadeem Vawda7e126202012-05-06 15:04:01 +0200629 with self.assertRaises(ValueError):
630 gzip.open(self.filename, "wbt")
631 with self.assertRaises(ValueError):
Nadeem Vawdaee1be992013-10-19 00:11:13 +0200632 gzip.open(self.filename, "xbt")
633 with self.assertRaises(ValueError):
Nadeem Vawda7e126202012-05-06 15:04:01 +0200634 gzip.open(self.filename, "rb", encoding="utf-8")
635 with self.assertRaises(ValueError):
636 gzip.open(self.filename, "rb", errors="ignore")
637 with self.assertRaises(ValueError):
638 gzip.open(self.filename, "rb", newline="\n")
639
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +0200640 def test_encoding(self):
Nadeem Vawda7e126202012-05-06 15:04:01 +0200641 # Test non-default encoding.
Nadeem Vawda11328e42012-05-06 19:24:18 +0200642 uncompressed = data1.decode("ascii") * 50
643 uncompressed_raw = uncompressed.replace("\n", os.linesep)
Nadeem Vawda7e126202012-05-06 15:04:01 +0200644 with gzip.open(self.filename, "wt", encoding="utf-16") as f:
645 f.write(uncompressed)
646 with open(self.filename, "rb") as f:
647 file_data = gzip.decompress(f.read()).decode("utf-16")
Nadeem Vawda11328e42012-05-06 19:24:18 +0200648 self.assertEqual(file_data, uncompressed_raw)
Nadeem Vawda7e126202012-05-06 15:04:01 +0200649 with gzip.open(self.filename, "rt", encoding="utf-16") as f:
650 self.assertEqual(f.read(), uncompressed)
651
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +0200652 def test_encoding_error_handler(self):
Nadeem Vawda7e126202012-05-06 15:04:01 +0200653 # Test with non-default encoding error handler.
654 with gzip.open(self.filename, "wb") as f:
655 f.write(b"foo\xffbar")
656 with gzip.open(self.filename, "rt", encoding="ascii", errors="ignore") \
657 as f:
658 self.assertEqual(f.read(), "foobar")
659
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +0200660 def test_newline(self):
Nadeem Vawda7e126202012-05-06 15:04:01 +0200661 # Test with explicit newline (universal newline mode disabled).
662 uncompressed = data1.decode("ascii") * 50
Nadeem Vawda9d9dc8e2012-05-06 16:25:35 +0200663 with gzip.open(self.filename, "wt", newline="\n") as f:
Nadeem Vawda7e126202012-05-06 15:04:01 +0200664 f.write(uncompressed)
665 with gzip.open(self.filename, "rt", newline="\r") as f:
666 self.assertEqual(f.readlines(), [uncompressed])
667
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000668def test_main(verbose=None):
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +0200669 support.run_unittest(TestGzip, TestOpen)
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000670
671if __name__ == "__main__":
672 test_main(verbose=True)