blob: 0f235d1805e0d3a1df9763460f78675ace9af6de [file] [log] [blame]
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +00001"""Test script for the gzip module.
2"""
3
Stéphane Wirtel84eec112018-10-09 23:16:43 +02004import array
5import functools
6import io
Christian Heimes05e8be12008-02-23 18:30:17 +00007import os
Berker Peksag03020cf2016-10-02 13:47:58 +03008import pathlib
Antoine Pitrou42db3ef2009-01-04 21:37:59 +00009import struct
Stéphane Wirtel84eec112018-10-09 23:16:43 +020010import sys
11import unittest
12from subprocess import PIPE, Popen
13from test import support
14from test.support import _4G, bigmemtest
Stéphane Wirtel3e28eed2018-11-03 16:24:23 +010015from test.support.script_helper import assert_python_ok, assert_python_failure
Stéphane Wirtel84eec112018-10-09 23:16:43 +020016
Ezio Melotti78ea2022009-09-12 18:41:20 +000017gzip = support.import_module('gzip')
Andrew M. Kuchling605ebdd1999-03-25 21:50:27 +000018
Walter Dörwald5b1284d2007-06-06 16:43:59 +000019data1 = b""" int length=DEFAULTALLOC, err = Z_OK;
Andrew M. Kuchling605ebdd1999-03-25 21:50:27 +000020 PyObject *RetVal;
21 int flushmode = Z_FINISH;
22 unsigned long start_total_out;
23
24"""
25
Walter Dörwald5b1284d2007-06-06 16:43:59 +000026data2 = b"""/* zlibmodule.c -- gzip-compatible data compression */
Neal Norwitz014f1032004-07-29 03:55:56 +000027/* See http://www.gzip.org/zlib/
Andrew M. Kuchling605ebdd1999-03-25 21:50:27 +000028/* See http://www.winimage.com/zLibDll for Windows */
29"""
30
Andrew M. Kuchling605ebdd1999-03-25 21:50:27 +000031
Stéphane Wirtel84eec112018-10-09 23:16:43 +020032TEMPDIR = os.path.abspath(support.TESTFN) + '-gzdir'
33
34
Antoine Pitrou7b969842010-09-23 16:22:51 +000035class UnseekableIO(io.BytesIO):
36 def seekable(self):
37 return False
38
39 def tell(self):
40 raise io.UnsupportedOperation
41
42 def seek(self, *args):
43 raise io.UnsupportedOperation
44
45
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +020046class BaseTest(unittest.TestCase):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000047 filename = support.TESTFN
Tim Peters5cfb05e2004-07-27 21:02:02 +000048
Georg Brandlb533e262008-05-25 18:19:30 +000049 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000050 support.unlink(self.filename)
Andrew M. Kuchling605ebdd1999-03-25 21:50:27 +000051
Georg Brandlb533e262008-05-25 18:19:30 +000052 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000053 support.unlink(self.filename)
Andrew M. Kuchling605ebdd1999-03-25 21:50:27 +000054
Andrew M. Kuchling85ab7382000-07-29 20:18:34 +000055
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +020056class TestGzip(BaseTest):
Serhiy Storchakabca63b32015-03-23 14:59:48 +020057 def write_and_read_back(self, data, mode='b'):
58 b_data = bytes(data)
59 with gzip.GzipFile(self.filename, 'w'+mode) as f:
60 l = f.write(data)
61 self.assertEqual(l, len(b_data))
62 with gzip.GzipFile(self.filename, 'r'+mode) as f:
63 self.assertEqual(f.read(), b_data)
64
Georg Brandlb533e262008-05-25 18:19:30 +000065 def test_write(self):
Brian Curtin28f96b52010-10-13 02:21:42 +000066 with gzip.GzipFile(self.filename, 'wb') as f:
67 f.write(data1 * 50)
Andrew M. Kuchling85ab7382000-07-29 20:18:34 +000068
Brian Curtin28f96b52010-10-13 02:21:42 +000069 # Try flush and fileno.
70 f.flush()
71 f.fileno()
72 if hasattr(os, 'fsync'):
73 os.fsync(f.fileno())
74 f.close()
Andrew M. Kuchling85ab7382000-07-29 20:18:34 +000075
Georg Brandlb533e262008-05-25 18:19:30 +000076 # Test multiple close() calls.
77 f.close()
78
Berker Peksag03020cf2016-10-02 13:47:58 +030079 def test_write_read_with_pathlike_file(self):
80 filename = pathlib.Path(self.filename)
81 with gzip.GzipFile(filename, 'w') as f:
82 f.write(data1 * 50)
83 self.assertIsInstance(f.name, str)
84 with gzip.GzipFile(filename, 'a') as f:
85 f.write(data1)
86 with gzip.GzipFile(filename) as f:
87 d = f.read()
88 self.assertEqual(d, data1 * 51)
89 self.assertIsInstance(f.name, str)
90
Serhiy Storchakabca63b32015-03-23 14:59:48 +020091 # The following test_write_xy methods test that write accepts
92 # the corresponding bytes-like object type as input
93 # and that the data written equals bytes(xy) in all cases.
94 def test_write_memoryview(self):
95 self.write_and_read_back(memoryview(data1 * 50))
96 m = memoryview(bytes(range(256)))
97 data = m.cast('B', shape=[8,8,4])
98 self.write_and_read_back(data)
99
100 def test_write_bytearray(self):
101 self.write_and_read_back(bytearray(data1 * 50))
102
103 def test_write_array(self):
104 self.write_and_read_back(array.array('I', data1 * 40))
105
106 def test_write_incompatible_type(self):
107 # Test that non-bytes-like types raise TypeError.
108 # Issue #21560: attempts to write incompatible types
109 # should not affect the state of the fileobject
110 with gzip.GzipFile(self.filename, 'wb') as f:
111 with self.assertRaises(TypeError):
112 f.write('')
113 with self.assertRaises(TypeError):
114 f.write([])
115 f.write(data1)
116 with gzip.GzipFile(self.filename, 'rb') as f:
117 self.assertEqual(f.read(), data1)
118
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000119 def test_read(self):
120 self.test_write()
121 # Try reading.
Brian Curtin28f96b52010-10-13 02:21:42 +0000122 with gzip.GzipFile(self.filename, 'r') as f:
123 d = f.read()
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000124 self.assertEqual(d, data1*50)
Andrew M. Kuchling85ab7382000-07-29 20:18:34 +0000125
Antoine Pitrou4ec4b0c2011-04-04 21:00:37 +0200126 def test_read1(self):
127 self.test_write()
128 blocks = []
129 nread = 0
130 with gzip.GzipFile(self.filename, 'r') as f:
131 while True:
132 d = f.read1()
133 if not d:
134 break
135 blocks.append(d)
136 nread += len(d)
137 # Check that position was updated correctly (see issue10791).
138 self.assertEqual(f.tell(), nread)
139 self.assertEqual(b''.join(blocks), data1 * 50)
140
Martin Pantere99e9772015-11-20 08:13:35 +0000141 @bigmemtest(size=_4G, memuse=1)
142 def test_read_large(self, size):
143 # Read chunk size over UINT_MAX should be supported, despite zlib's
144 # limitation per low-level call
145 compressed = gzip.compress(data1, compresslevel=1)
146 f = gzip.GzipFile(fileobj=io.BytesIO(compressed), mode='rb')
147 self.assertEqual(f.read(size), data1)
148
Antoine Pitrou7980eaa2010-10-06 21:21:18 +0000149 def test_io_on_closed_object(self):
150 # Test that I/O operations on closed GzipFile objects raise a
151 # ValueError, just like the corresponding functions on file objects.
152
153 # Write to a file, open it for reading, then close it.
154 self.test_write()
155 f = gzip.GzipFile(self.filename, 'r')
Antoine Pitrou2dbc6e62015-04-11 00:31:01 +0200156 fileobj = f.fileobj
157 self.assertFalse(fileobj.closed)
Antoine Pitrou7980eaa2010-10-06 21:21:18 +0000158 f.close()
Antoine Pitrou2dbc6e62015-04-11 00:31:01 +0200159 self.assertTrue(fileobj.closed)
Antoine Pitrou7980eaa2010-10-06 21:21:18 +0000160 with self.assertRaises(ValueError):
161 f.read(1)
162 with self.assertRaises(ValueError):
163 f.seek(0)
164 with self.assertRaises(ValueError):
165 f.tell()
166 # Open the file for writing, then close it.
167 f = gzip.GzipFile(self.filename, 'w')
Antoine Pitrou2dbc6e62015-04-11 00:31:01 +0200168 fileobj = f.fileobj
169 self.assertFalse(fileobj.closed)
Antoine Pitrou7980eaa2010-10-06 21:21:18 +0000170 f.close()
Antoine Pitrou2dbc6e62015-04-11 00:31:01 +0200171 self.assertTrue(fileobj.closed)
Antoine Pitrou7980eaa2010-10-06 21:21:18 +0000172 with self.assertRaises(ValueError):
173 f.write(b'')
174 with self.assertRaises(ValueError):
175 f.flush()
176
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000177 def test_append(self):
178 self.test_write()
179 # Append to the previous file
Brian Curtin28f96b52010-10-13 02:21:42 +0000180 with gzip.GzipFile(self.filename, 'ab') as f:
181 f.write(data2 * 15)
Andrew M. Kuchling85ab7382000-07-29 20:18:34 +0000182
Brian Curtin28f96b52010-10-13 02:21:42 +0000183 with gzip.GzipFile(self.filename, 'rb') as f:
184 d = f.read()
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000185 self.assertEqual(d, (data1*50) + (data2*15))
Andrew M. Kuchling85ab7382000-07-29 20:18:34 +0000186
Andrew M. Kuchling01cb47b2005-06-09 14:19:32 +0000187 def test_many_append(self):
188 # Bug #1074261 was triggered when reading a file that contained
189 # many, many members. Create such a file and verify that reading it
190 # works.
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +0200191 with gzip.GzipFile(self.filename, 'wb', 9) as f:
Walter Dörwald5b1284d2007-06-06 16:43:59 +0000192 f.write(b'a')
Brian Curtin28f96b52010-10-13 02:21:42 +0000193 for i in range(0, 200):
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +0200194 with gzip.GzipFile(self.filename, "ab", 9) as f: # append
Brian Curtin28f96b52010-10-13 02:21:42 +0000195 f.write(b'a')
Andrew M. Kuchling01cb47b2005-06-09 14:19:32 +0000196
197 # Try reading the file
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +0200198 with gzip.GzipFile(self.filename, "rb") as zgfile:
Brian Curtin28f96b52010-10-13 02:21:42 +0000199 contents = b""
200 while 1:
201 ztxt = zgfile.read(8192)
202 contents += ztxt
203 if not ztxt: break
Ezio Melottib3aedd42010-11-20 19:04:17 +0000204 self.assertEqual(contents, b'a'*201)
Andrew M. Kuchling01cb47b2005-06-09 14:19:32 +0000205
Nadeem Vawdaee1be992013-10-19 00:11:13 +0200206 def test_exclusive_write(self):
207 with gzip.GzipFile(self.filename, 'xb') as f:
208 f.write(data1 * 50)
209 with gzip.GzipFile(self.filename, 'rb') as f:
210 self.assertEqual(f.read(), data1 * 50)
211 with self.assertRaises(FileExistsError):
212 gzip.GzipFile(self.filename, 'xb')
213
Antoine Pitroub1f88352010-01-03 22:37:40 +0000214 def test_buffered_reader(self):
215 # Issue #7471: a GzipFile can be wrapped in a BufferedReader for
216 # performance.
217 self.test_write()
218
Brian Curtin28f96b52010-10-13 02:21:42 +0000219 with gzip.GzipFile(self.filename, 'rb') as f:
220 with io.BufferedReader(f) as r:
221 lines = [line for line in r]
Antoine Pitroub1f88352010-01-03 22:37:40 +0000222
Ezio Melottid8b509b2011-09-28 17:37:55 +0300223 self.assertEqual(lines, 50 * data1.splitlines(keepends=True))
Andrew M. Kuchling01cb47b2005-06-09 14:19:32 +0000224
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000225 def test_readline(self):
226 self.test_write()
227 # Try .readline() with varying line lengths
Martin v. Löwis8cc965c2001-08-09 07:21:56 +0000228
Brian Curtin28f96b52010-10-13 02:21:42 +0000229 with gzip.GzipFile(self.filename, 'rb') as f:
230 line_length = 0
231 while 1:
232 L = f.readline(line_length)
233 if not L and line_length != 0: break
234 self.assertTrue(len(L) <= line_length)
235 line_length = (line_length + 1) % 50
Martin v. Löwis8cc965c2001-08-09 07:21:56 +0000236
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000237 def test_readlines(self):
238 self.test_write()
239 # Try .readlines()
Andrew M. Kuchling605ebdd1999-03-25 21:50:27 +0000240
Brian Curtin28f96b52010-10-13 02:21:42 +0000241 with gzip.GzipFile(self.filename, 'rb') as f:
242 L = f.readlines()
Skip Montanaro12424bc2002-05-23 01:43:05 +0000243
Brian Curtin28f96b52010-10-13 02:21:42 +0000244 with gzip.GzipFile(self.filename, 'rb') as f:
245 while 1:
246 L = f.readlines(150)
247 if L == []: break
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000248
249 def test_seek_read(self):
250 self.test_write()
251 # Try seek, read test
252
Brian Curtin28f96b52010-10-13 02:21:42 +0000253 with gzip.GzipFile(self.filename) as f:
254 while 1:
255 oldpos = f.tell()
256 line1 = f.readline()
257 if not line1: break
258 newpos = f.tell()
259 f.seek(oldpos) # negative seek
260 if len(line1)>10:
261 amount = 10
262 else:
263 amount = len(line1)
264 line2 = f.read(amount)
265 self.assertEqual(line1[:amount], line2)
266 f.seek(newpos) # positive seek
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000267
Thomas Wouters89f507f2006-12-13 04:49:30 +0000268 def test_seek_whence(self):
269 self.test_write()
270 # Try seek(whence=1), read test
271
Brian Curtin28f96b52010-10-13 02:21:42 +0000272 with gzip.GzipFile(self.filename) as f:
273 f.read(10)
274 f.seek(10, whence=1)
275 y = f.read(10)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000276 self.assertEqual(y, data1[20:30])
Thomas Wouters9fe394c2007-02-05 01:24:16 +0000277
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000278 def test_seek_write(self):
279 # Try seek, write test
Brian Curtin28f96b52010-10-13 02:21:42 +0000280 with gzip.GzipFile(self.filename, 'w') as f:
281 for pos in range(0, 256, 16):
282 f.seek(pos)
283 f.write(b'GZ\n')
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000284
285 def test_mode(self):
286 self.test_write()
Brian Curtin28f96b52010-10-13 02:21:42 +0000287 with gzip.GzipFile(self.filename, 'r') as f:
288 self.assertEqual(f.myfileobj.mode, 'rb')
Nadeem Vawdaee1be992013-10-19 00:11:13 +0200289 support.unlink(self.filename)
290 with gzip.GzipFile(self.filename, 'x') as f:
291 self.assertEqual(f.myfileobj.mode, 'xb')
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000292
Thomas Wouterscf297e42007-02-23 15:07:44 +0000293 def test_1647484(self):
294 for mode in ('wb', 'rb'):
Brian Curtin28f96b52010-10-13 02:21:42 +0000295 with gzip.GzipFile(self.filename, mode) as f:
296 self.assertTrue(hasattr(f, "name"))
297 self.assertEqual(f.name, self.filename)
Thomas Wouterscf297e42007-02-23 15:07:44 +0000298
Georg Brandl9f1c1dc2010-11-20 11:25:01 +0000299 def test_paddedfile_getattr(self):
300 self.test_write()
301 with gzip.GzipFile(self.filename, 'rb') as f:
302 self.assertTrue(hasattr(f.fileobj, "name"))
303 self.assertEqual(f.fileobj.name, self.filename)
304
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000305 def test_mtime(self):
306 mtime = 123456789
Brian Curtin28f96b52010-10-13 02:21:42 +0000307 with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
308 fWrite.write(data1)
309 with gzip.GzipFile(self.filename) as fRead:
Antoine Pitrou2dbc6e62015-04-11 00:31:01 +0200310 self.assertTrue(hasattr(fRead, 'mtime'))
311 self.assertIsNone(fRead.mtime)
Brian Curtin28f96b52010-10-13 02:21:42 +0000312 dataRead = fRead.read()
313 self.assertEqual(dataRead, data1)
Brian Curtin28f96b52010-10-13 02:21:42 +0000314 self.assertEqual(fRead.mtime, mtime)
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000315
316 def test_metadata(self):
317 mtime = 123456789
318
Brian Curtin28f96b52010-10-13 02:21:42 +0000319 with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
320 fWrite.write(data1)
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000321
Brian Curtin28f96b52010-10-13 02:21:42 +0000322 with open(self.filename, 'rb') as fRead:
323 # see RFC 1952: http://www.faqs.org/rfcs/rfc1952.html
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000324
Brian Curtin28f96b52010-10-13 02:21:42 +0000325 idBytes = fRead.read(2)
326 self.assertEqual(idBytes, b'\x1f\x8b') # gzip ID
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000327
Brian Curtin28f96b52010-10-13 02:21:42 +0000328 cmByte = fRead.read(1)
329 self.assertEqual(cmByte, b'\x08') # deflate
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000330
Serhiy Storchaka700cfa82020-06-25 17:56:31 +0300331 try:
332 expectedname = self.filename.encode('Latin-1') + b'\x00'
333 expectedflags = b'\x08' # only the FNAME flag is set
334 except UnicodeEncodeError:
335 expectedname = b''
336 expectedflags = b'\x00'
337
Brian Curtin28f96b52010-10-13 02:21:42 +0000338 flagsByte = fRead.read(1)
Serhiy Storchaka700cfa82020-06-25 17:56:31 +0300339 self.assertEqual(flagsByte, expectedflags)
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000340
Brian Curtin28f96b52010-10-13 02:21:42 +0000341 mtimeBytes = fRead.read(4)
342 self.assertEqual(mtimeBytes, struct.pack('<i', mtime)) # little-endian
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000343
Brian Curtin28f96b52010-10-13 02:21:42 +0000344 xflByte = fRead.read(1)
345 self.assertEqual(xflByte, b'\x02') # maximum compression
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000346
Brian Curtin28f96b52010-10-13 02:21:42 +0000347 osByte = fRead.read(1)
348 self.assertEqual(osByte, b'\xff') # OS "unknown" (OS-independent)
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000349
Brian Curtin28f96b52010-10-13 02:21:42 +0000350 # Since the FNAME flag is set, the zero-terminated filename follows.
351 # RFC 1952 specifies that this is the name of the input file, if any.
352 # However, the gzip module defaults to storing the name of the output
353 # file in this field.
Serhiy Storchaka700cfa82020-06-25 17:56:31 +0300354 nameBytes = fRead.read(len(expectedname))
355 self.assertEqual(nameBytes, expectedname)
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000356
Brian Curtin28f96b52010-10-13 02:21:42 +0000357 # Since no other flags were set, the header ends here.
358 # Rather than process the compressed data, let's seek to the trailer.
359 fRead.seek(os.stat(self.filename).st_size - 8)
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000360
Brian Curtin28f96b52010-10-13 02:21:42 +0000361 crc32Bytes = fRead.read(4) # CRC32 of uncompressed data [data1]
362 self.assertEqual(crc32Bytes, b'\xaf\xd7d\x83')
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000363
Brian Curtin28f96b52010-10-13 02:21:42 +0000364 isizeBytes = fRead.read(4)
365 self.assertEqual(isizeBytes, struct.pack('<i', len(data1)))
Antoine Pitrou42db3ef2009-01-04 21:37:59 +0000366
Serhiy Storchaka700cfa82020-06-25 17:56:31 +0300367 def test_metadata_ascii_name(self):
368 self.filename = support.TESTFN_ASCII
369 self.test_metadata()
370
William Chargineab3b3f2020-01-21 03:25:24 -0800371 def test_compresslevel_metadata(self):
372 # see RFC 1952: http://www.faqs.org/rfcs/rfc1952.html
373 # specifically, discussion of XFL in section 2.3.1
374 cases = [
375 ('fast', 1, b'\x04'),
376 ('best', 9, b'\x02'),
377 ('tradeoff', 6, b'\x00'),
378 ]
379 xflOffset = 8
380
381 for (name, level, expectedXflByte) in cases:
382 with self.subTest(name):
383 fWrite = gzip.GzipFile(self.filename, 'w', compresslevel=level)
384 with fWrite:
385 fWrite.write(data1)
386 with open(self.filename, 'rb') as fRead:
387 fRead.seek(xflOffset)
388 xflByte = fRead.read(1)
389 self.assertEqual(xflByte, expectedXflByte)
390
Antoine Pitrou308705e2009-01-10 16:22:51 +0000391 def test_with_open(self):
392 # GzipFile supports the context management protocol
393 with gzip.GzipFile(self.filename, "wb") as f:
394 f.write(b"xxx")
395 f = gzip.GzipFile(self.filename, "rb")
396 f.close()
397 try:
398 with f:
399 pass
400 except ValueError:
401 pass
402 else:
403 self.fail("__enter__ on a closed file didn't raise an exception")
404 try:
405 with gzip.GzipFile(self.filename, "wb") as f:
406 1/0
407 except ZeroDivisionError:
408 pass
409 else:
410 self.fail("1/0 didn't raise an exception")
411
Antoine Pitrou8e33fd72010-01-13 14:37:26 +0000412 def test_zero_padded_file(self):
413 with gzip.GzipFile(self.filename, "wb") as f:
414 f.write(data1 * 50)
415
416 # Pad the file with zeroes
417 with open(self.filename, "ab") as f:
418 f.write(b"\x00" * 50)
419
420 with gzip.GzipFile(self.filename, "rb") as f:
421 d = f.read()
422 self.assertEqual(d, data1 * 50, "Incorrect data in file")
423
Zackery Spytzcf599f62019-05-13 01:50:52 -0600424 def test_gzip_BadGzipFile_exception(self):
425 self.assertTrue(issubclass(gzip.BadGzipFile, OSError))
426
427 def test_bad_gzip_file(self):
428 with open(self.filename, 'wb') as file:
429 file.write(data1 * 50)
430 with gzip.GzipFile(self.filename, 'r') as file:
431 self.assertRaises(gzip.BadGzipFile, file.readlines)
432
Antoine Pitrou7b969842010-09-23 16:22:51 +0000433 def test_non_seekable_file(self):
434 uncompressed = data1 * 50
435 buf = UnseekableIO()
436 with gzip.GzipFile(fileobj=buf, mode="wb") as f:
437 f.write(uncompressed)
438 compressed = buf.getvalue()
439 buf = UnseekableIO(compressed)
440 with gzip.GzipFile(fileobj=buf, mode="rb") as f:
441 self.assertEqual(f.read(), uncompressed)
442
Antoine Pitrouc3ed2e72010-09-29 10:49:46 +0000443 def test_peek(self):
444 uncompressed = data1 * 200
445 with gzip.GzipFile(self.filename, "wb") as f:
446 f.write(uncompressed)
447
448 def sizes():
449 while True:
450 for n in range(5, 50, 10):
451 yield n
452
453 with gzip.GzipFile(self.filename, "rb") as f:
454 f.max_read_chunk = 33
455 nread = 0
456 for n in sizes():
457 s = f.peek(n)
458 if s == b'':
459 break
460 self.assertEqual(f.read(len(s)), s)
461 nread += len(s)
462 self.assertEqual(f.read(100), b'')
463 self.assertEqual(nread, len(uncompressed))
464
Antoine Pitrou4ec4b0c2011-04-04 21:00:37 +0200465 def test_textio_readlines(self):
466 # Issue #10791: TextIOWrapper.readlines() fails when wrapping GzipFile.
Ezio Melottid8b509b2011-09-28 17:37:55 +0300467 lines = (data1 * 50).decode("ascii").splitlines(keepends=True)
Antoine Pitrou4ec4b0c2011-04-04 21:00:37 +0200468 self.test_write()
469 with gzip.GzipFile(self.filename, 'r') as f:
470 with io.TextIOWrapper(f, encoding="ascii") as t:
471 self.assertEqual(t.readlines(), lines)
472
Nadeem Vawda892b0b92012-01-18 09:25:58 +0200473 def test_fileobj_from_fdopen(self):
474 # Issue #13781: Opening a GzipFile for writing fails when using a
475 # fileobj created with os.fdopen().
476 fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT)
477 with os.fdopen(fd, "wb") as f:
478 with gzip.GzipFile(fileobj=f, mode="w") as g:
479 pass
480
Serhiy Storchakabcbdd2f2017-10-22 13:18:21 +0300481 def test_fileobj_mode(self):
482 gzip.GzipFile(self.filename, "wb").close()
483 with open(self.filename, "r+b") as f:
484 with gzip.GzipFile(fileobj=f, mode='r') as g:
485 self.assertEqual(g.mode, gzip.READ)
486 with gzip.GzipFile(fileobj=f, mode='w') as g:
487 self.assertEqual(g.mode, gzip.WRITE)
488 with gzip.GzipFile(fileobj=f, mode='a') as g:
489 self.assertEqual(g.mode, gzip.WRITE)
490 with gzip.GzipFile(fileobj=f, mode='x') as g:
491 self.assertEqual(g.mode, gzip.WRITE)
492 with self.assertRaises(ValueError):
493 gzip.GzipFile(fileobj=f, mode='z')
494 for mode in "rb", "r+b":
495 with open(self.filename, mode) as f:
496 with gzip.GzipFile(fileobj=f) as g:
497 self.assertEqual(g.mode, gzip.READ)
498 for mode in "wb", "ab", "xb":
499 if "x" in mode:
500 support.unlink(self.filename)
501 with open(self.filename, mode) as f:
Serhiy Storchakaa0652322019-11-16 18:56:57 +0200502 with self.assertWarns(FutureWarning):
503 g = gzip.GzipFile(fileobj=f)
504 with g:
Serhiy Storchakabcbdd2f2017-10-22 13:18:21 +0300505 self.assertEqual(g.mode, gzip.WRITE)
506
Nadeem Vawda103e8112012-06-20 01:35:22 +0200507 def test_bytes_filename(self):
508 str_filename = self.filename
509 try:
510 bytes_filename = str_filename.encode("ascii")
511 except UnicodeEncodeError:
512 self.skipTest("Temporary file name needs to be ASCII")
513 with gzip.GzipFile(bytes_filename, "wb") as f:
514 f.write(data1 * 50)
515 with gzip.GzipFile(bytes_filename, "rb") as f:
516 self.assertEqual(f.read(), data1 * 50)
517 # Sanity check that we are actually operating on the right file.
518 with gzip.GzipFile(str_filename, "rb") as f:
519 self.assertEqual(f.read(), data1 * 50)
520
Antoine Pitrou2dbc6e62015-04-11 00:31:01 +0200521 def test_decompress_limited(self):
522 """Decompressed data buffering should be limited"""
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +0300523 bomb = gzip.compress(b'\0' * int(2e6), compresslevel=9)
Antoine Pitrou2dbc6e62015-04-11 00:31:01 +0200524 self.assertLess(len(bomb), io.DEFAULT_BUFFER_SIZE)
525
526 bomb = io.BytesIO(bomb)
527 decomp = gzip.GzipFile(fileobj=bomb)
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +0300528 self.assertEqual(decomp.read(1), b'\0')
Antoine Pitrou2dbc6e62015-04-11 00:31:01 +0200529 max_decomp = 1 + io.DEFAULT_BUFFER_SIZE
530 self.assertLessEqual(decomp._buffer.raw.tell(), max_decomp,
531 "Excessive amount of data was decompressed")
532
Antoine Pitrou79c5ef12010-08-17 21:10:05 +0000533 # Testing compress/decompress shortcut functions
534
535 def test_compress(self):
536 for data in [data1, data2]:
537 for args in [(), (1,), (6,), (9,)]:
538 datac = gzip.compress(data, *args)
539 self.assertEqual(type(datac), bytes)
540 with gzip.GzipFile(fileobj=io.BytesIO(datac), mode="rb") as f:
541 self.assertEqual(f.read(), data)
542
guoci0e7497c2018-11-07 04:50:23 -0500543 def test_compress_mtime(self):
544 mtime = 123456789
545 for data in [data1, data2]:
546 for args in [(), (1,), (6,), (9,)]:
547 with self.subTest(data=data, args=args):
548 datac = gzip.compress(data, *args, mtime=mtime)
549 self.assertEqual(type(datac), bytes)
550 with gzip.GzipFile(fileobj=io.BytesIO(datac), mode="rb") as f:
551 f.read(1) # to set mtime attribute
552 self.assertEqual(f.mtime, mtime)
553
Antoine Pitrou79c5ef12010-08-17 21:10:05 +0000554 def test_decompress(self):
555 for data in (data1, data2):
556 buf = io.BytesIO()
557 with gzip.GzipFile(fileobj=buf, mode="wb") as f:
558 f.write(data)
559 self.assertEqual(gzip.decompress(buf.getvalue()), data)
560 # Roundtrip with compress
561 datac = gzip.compress(data)
562 self.assertEqual(gzip.decompress(datac), data)
563
Serhiy Storchaka7c3922f2013-01-22 17:01:59 +0200564 def test_read_truncated(self):
565 data = data1*50
566 # Drop the CRC (4 bytes) and file size (4 bytes).
567 truncated = gzip.compress(data)[:-8]
568 with gzip.GzipFile(fileobj=io.BytesIO(truncated)) as f:
569 self.assertRaises(EOFError, f.read)
570 with gzip.GzipFile(fileobj=io.BytesIO(truncated)) as f:
571 self.assertEqual(f.read(len(data)), data)
572 self.assertRaises(EOFError, f.read, 1)
573 # Incomplete 10-byte header.
574 for i in range(2, 10):
575 with gzip.GzipFile(fileobj=io.BytesIO(truncated[:i])) as f:
576 self.assertRaises(EOFError, f.read, 1)
577
Serhiy Storchaka7e69f002013-04-08 22:35:02 +0300578 def test_read_with_extra(self):
579 # Gzip data with an extra field
580 gzdata = (b'\x1f\x8b\x08\x04\xb2\x17cQ\x02\xff'
581 b'\x05\x00Extra'
582 b'\x0bI-.\x01\x002\xd1Mx\x04\x00\x00\x00')
583 with gzip.GzipFile(fileobj=io.BytesIO(gzdata)) as f:
584 self.assertEqual(f.read(), b'Test')
Nadeem Vawda7e126202012-05-06 15:04:01 +0200585
Ned Deily61207392014-03-09 14:44:34 -0700586 def test_prepend_error(self):
587 # See issue #20875
588 with gzip.open(self.filename, "wb") as f:
589 f.write(data1)
590 with gzip.open(self.filename, "rb") as f:
Antoine Pitrou2dbc6e62015-04-11 00:31:01 +0200591 f._buffer.raw._fp.prepend()
Ned Deily61207392014-03-09 14:44:34 -0700592
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +0200593class TestOpen(BaseTest):
594 def test_binary_modes(self):
Nadeem Vawda7e126202012-05-06 15:04:01 +0200595 uncompressed = data1 * 50
Nadeem Vawdaee1be992013-10-19 00:11:13 +0200596
Nadeem Vawda7e126202012-05-06 15:04:01 +0200597 with gzip.open(self.filename, "wb") as f:
598 f.write(uncompressed)
599 with open(self.filename, "rb") as f:
600 file_data = gzip.decompress(f.read())
601 self.assertEqual(file_data, uncompressed)
Nadeem Vawdaee1be992013-10-19 00:11:13 +0200602
Nadeem Vawda7e126202012-05-06 15:04:01 +0200603 with gzip.open(self.filename, "rb") as f:
604 self.assertEqual(f.read(), uncompressed)
Nadeem Vawdaee1be992013-10-19 00:11:13 +0200605
Nadeem Vawda7e126202012-05-06 15:04:01 +0200606 with gzip.open(self.filename, "ab") as f:
607 f.write(uncompressed)
608 with open(self.filename, "rb") as f:
609 file_data = gzip.decompress(f.read())
610 self.assertEqual(file_data, uncompressed * 2)
611
Nadeem Vawdaee1be992013-10-19 00:11:13 +0200612 with self.assertRaises(FileExistsError):
613 gzip.open(self.filename, "xb")
614 support.unlink(self.filename)
615 with gzip.open(self.filename, "xb") as f:
616 f.write(uncompressed)
617 with open(self.filename, "rb") as f:
618 file_data = gzip.decompress(f.read())
619 self.assertEqual(file_data, uncompressed)
620
Berker Peksag03020cf2016-10-02 13:47:58 +0300621 def test_pathlike_file(self):
622 filename = pathlib.Path(self.filename)
623 with gzip.open(filename, "wb") as f:
624 f.write(data1 * 50)
625 with gzip.open(filename, "ab") as f:
626 f.write(data1)
627 with gzip.open(filename) as f:
628 self.assertEqual(f.read(), data1 * 51)
629
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +0200630 def test_implicit_binary_modes(self):
Nadeem Vawda7e126202012-05-06 15:04:01 +0200631 # Test implicit binary modes (no "b" or "t" in mode string).
632 uncompressed = data1 * 50
Nadeem Vawdaee1be992013-10-19 00:11:13 +0200633
Nadeem Vawda7e126202012-05-06 15:04:01 +0200634 with gzip.open(self.filename, "w") as f:
635 f.write(uncompressed)
636 with open(self.filename, "rb") as f:
637 file_data = gzip.decompress(f.read())
638 self.assertEqual(file_data, uncompressed)
Nadeem Vawdaee1be992013-10-19 00:11:13 +0200639
Nadeem Vawda7e126202012-05-06 15:04:01 +0200640 with gzip.open(self.filename, "r") as f:
641 self.assertEqual(f.read(), uncompressed)
Nadeem Vawdaee1be992013-10-19 00:11:13 +0200642
Nadeem Vawda7e126202012-05-06 15:04:01 +0200643 with gzip.open(self.filename, "a") as f:
644 f.write(uncompressed)
645 with open(self.filename, "rb") as f:
646 file_data = gzip.decompress(f.read())
647 self.assertEqual(file_data, uncompressed * 2)
648
Nadeem Vawdaee1be992013-10-19 00:11:13 +0200649 with self.assertRaises(FileExistsError):
650 gzip.open(self.filename, "x")
651 support.unlink(self.filename)
652 with gzip.open(self.filename, "x") as f:
653 f.write(uncompressed)
654 with open(self.filename, "rb") as f:
655 file_data = gzip.decompress(f.read())
656 self.assertEqual(file_data, uncompressed)
657
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +0200658 def test_text_modes(self):
Nadeem Vawda11328e42012-05-06 19:24:18 +0200659 uncompressed = data1.decode("ascii") * 50
660 uncompressed_raw = uncompressed.replace("\n", os.linesep)
Nadeem Vawda7e126202012-05-06 15:04:01 +0200661 with gzip.open(self.filename, "wt") as f:
662 f.write(uncompressed)
663 with open(self.filename, "rb") as f:
664 file_data = gzip.decompress(f.read()).decode("ascii")
Nadeem Vawda11328e42012-05-06 19:24:18 +0200665 self.assertEqual(file_data, uncompressed_raw)
Nadeem Vawda7e126202012-05-06 15:04:01 +0200666 with gzip.open(self.filename, "rt") as f:
667 self.assertEqual(f.read(), uncompressed)
668 with gzip.open(self.filename, "at") as f:
669 f.write(uncompressed)
670 with open(self.filename, "rb") as f:
671 file_data = gzip.decompress(f.read()).decode("ascii")
Nadeem Vawda11328e42012-05-06 19:24:18 +0200672 self.assertEqual(file_data, uncompressed_raw * 2)
Nadeem Vawda7e126202012-05-06 15:04:01 +0200673
Nadeem Vawda68721012012-06-04 23:21:38 +0200674 def test_fileobj(self):
675 uncompressed_bytes = data1 * 50
676 uncompressed_str = uncompressed_bytes.decode("ascii")
677 compressed = gzip.compress(uncompressed_bytes)
678 with gzip.open(io.BytesIO(compressed), "r") as f:
679 self.assertEqual(f.read(), uncompressed_bytes)
680 with gzip.open(io.BytesIO(compressed), "rb") as f:
681 self.assertEqual(f.read(), uncompressed_bytes)
682 with gzip.open(io.BytesIO(compressed), "rt") as f:
683 self.assertEqual(f.read(), uncompressed_str)
684
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +0200685 def test_bad_params(self):
Nadeem Vawda7e126202012-05-06 15:04:01 +0200686 # Test invalid parameter combinations.
Nadeem Vawda68721012012-06-04 23:21:38 +0200687 with self.assertRaises(TypeError):
688 gzip.open(123.456)
Nadeem Vawda7e126202012-05-06 15:04:01 +0200689 with self.assertRaises(ValueError):
690 gzip.open(self.filename, "wbt")
691 with self.assertRaises(ValueError):
Nadeem Vawdaee1be992013-10-19 00:11:13 +0200692 gzip.open(self.filename, "xbt")
693 with self.assertRaises(ValueError):
Nadeem Vawda7e126202012-05-06 15:04:01 +0200694 gzip.open(self.filename, "rb", encoding="utf-8")
695 with self.assertRaises(ValueError):
696 gzip.open(self.filename, "rb", errors="ignore")
697 with self.assertRaises(ValueError):
698 gzip.open(self.filename, "rb", newline="\n")
699
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +0200700 def test_encoding(self):
Nadeem Vawda7e126202012-05-06 15:04:01 +0200701 # Test non-default encoding.
Nadeem Vawda11328e42012-05-06 19:24:18 +0200702 uncompressed = data1.decode("ascii") * 50
703 uncompressed_raw = uncompressed.replace("\n", os.linesep)
Nadeem Vawda7e126202012-05-06 15:04:01 +0200704 with gzip.open(self.filename, "wt", encoding="utf-16") as f:
705 f.write(uncompressed)
706 with open(self.filename, "rb") as f:
707 file_data = gzip.decompress(f.read()).decode("utf-16")
Nadeem Vawda11328e42012-05-06 19:24:18 +0200708 self.assertEqual(file_data, uncompressed_raw)
Nadeem Vawda7e126202012-05-06 15:04:01 +0200709 with gzip.open(self.filename, "rt", encoding="utf-16") as f:
710 self.assertEqual(f.read(), uncompressed)
711
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +0200712 def test_encoding_error_handler(self):
Nadeem Vawda7e126202012-05-06 15:04:01 +0200713 # Test with non-default encoding error handler.
714 with gzip.open(self.filename, "wb") as f:
715 f.write(b"foo\xffbar")
716 with gzip.open(self.filename, "rt", encoding="ascii", errors="ignore") \
717 as f:
718 self.assertEqual(f.read(), "foobar")
719
Nadeem Vawda1b8a14d2012-05-06 15:17:52 +0200720 def test_newline(self):
Nadeem Vawda7e126202012-05-06 15:04:01 +0200721 # Test with explicit newline (universal newline mode disabled).
722 uncompressed = data1.decode("ascii") * 50
Nadeem Vawda9d9dc8e2012-05-06 16:25:35 +0200723 with gzip.open(self.filename, "wt", newline="\n") as f:
Nadeem Vawda7e126202012-05-06 15:04:01 +0200724 f.write(uncompressed)
725 with gzip.open(self.filename, "rt", newline="\r") as f:
726 self.assertEqual(f.readlines(), [uncompressed])
727
Stéphane Wirtel84eec112018-10-09 23:16:43 +0200728
729def create_and_remove_directory(directory):
730 def decorator(function):
731 @functools.wraps(function)
732 def wrapper(*args, **kwargs):
733 os.makedirs(directory)
734 try:
735 return function(*args, **kwargs)
736 finally:
737 support.rmtree(directory)
738 return wrapper
739 return decorator
740
741
742class TestCommandLine(unittest.TestCase):
743 data = b'This is a simple test with gzip'
744
745 def test_decompress_stdin_stdout(self):
746 with io.BytesIO() as bytes_io:
747 with gzip.GzipFile(fileobj=bytes_io, mode='wb') as gzip_file:
748 gzip_file.write(self.data)
749
750 args = sys.executable, '-m', 'gzip', '-d'
751 with Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) as proc:
752 out, err = proc.communicate(bytes_io.getvalue())
753
754 self.assertEqual(err, b'')
755 self.assertEqual(out, self.data)
756
757 @create_and_remove_directory(TEMPDIR)
758 def test_decompress_infile_outfile(self):
759 gzipname = os.path.join(TEMPDIR, 'testgzip.gz')
760 self.assertFalse(os.path.exists(gzipname))
761
762 with gzip.open(gzipname, mode='wb') as fp:
763 fp.write(self.data)
764 rc, out, err = assert_python_ok('-m', 'gzip', '-d', gzipname)
765
766 with open(os.path.join(TEMPDIR, "testgzip"), "rb") as gunziped:
767 self.assertEqual(gunziped.read(), self.data)
768
769 self.assertTrue(os.path.exists(gzipname))
770 self.assertEqual(rc, 0)
771 self.assertEqual(out, b'')
772 self.assertEqual(err, b'')
773
774 def test_decompress_infile_outfile_error(self):
775 rc, out, err = assert_python_ok('-m', 'gzip', '-d', 'thisisatest.out')
776 self.assertIn(b"filename doesn't end in .gz:", out)
777 self.assertEqual(rc, 0)
778 self.assertEqual(err, b'')
779
780 @create_and_remove_directory(TEMPDIR)
781 def test_compress_stdin_outfile(self):
782 args = sys.executable, '-m', 'gzip'
783 with Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) as proc:
784 out, err = proc.communicate(self.data)
785
786 self.assertEqual(err, b'')
787 self.assertEqual(out[:2], b"\x1f\x8b")
788
789 @create_and_remove_directory(TEMPDIR)
Gregory P. Smithcd466552019-04-14 10:32:07 -0700790 def test_compress_infile_outfile_default(self):
Stéphane Wirtel84eec112018-10-09 23:16:43 +0200791 local_testgzip = os.path.join(TEMPDIR, 'testgzip')
792 gzipname = local_testgzip + '.gz'
793 self.assertFalse(os.path.exists(gzipname))
794
795 with open(local_testgzip, 'wb') as fp:
796 fp.write(self.data)
797
798 rc, out, err = assert_python_ok('-m', 'gzip', local_testgzip)
799
800 self.assertTrue(os.path.exists(gzipname))
Stéphane Wirtel84eec112018-10-09 23:16:43 +0200801 self.assertEqual(out, b'')
802 self.assertEqual(err, b'')
803
Stéphane Wirtel3e28eed2018-11-03 16:24:23 +0100804 @create_and_remove_directory(TEMPDIR)
805 def test_compress_infile_outfile(self):
806 for compress_level in ('--fast', '--best'):
807 with self.subTest(compress_level=compress_level):
808 local_testgzip = os.path.join(TEMPDIR, 'testgzip')
809 gzipname = local_testgzip + '.gz'
810 self.assertFalse(os.path.exists(gzipname))
811
812 with open(local_testgzip, 'wb') as fp:
813 fp.write(self.data)
814
815 rc, out, err = assert_python_ok('-m', 'gzip', compress_level, local_testgzip)
816
817 self.assertTrue(os.path.exists(gzipname))
818 self.assertEqual(out, b'')
819 self.assertEqual(err, b'')
820 os.remove(gzipname)
821 self.assertFalse(os.path.exists(gzipname))
822
823 def test_compress_fast_best_are_exclusive(self):
824 rc, out, err = assert_python_failure('-m', 'gzip', '--fast', '--best')
825 self.assertIn(b"error: argument --best: not allowed with argument --fast", err)
826 self.assertEqual(out, b'')
827
828 def test_decompress_cannot_have_flags_compression(self):
829 rc, out, err = assert_python_failure('-m', 'gzip', '--fast', '-d')
830 self.assertIn(b'error: argument -d/--decompress: not allowed with argument --fast', err)
831 self.assertEqual(out, b'')
832
Stéphane Wirtel84eec112018-10-09 23:16:43 +0200833
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000834def test_main(verbose=None):
Stéphane Wirtel84eec112018-10-09 23:16:43 +0200835 support.run_unittest(TestGzip, TestOpen, TestCommandLine)
836
Andrew M. Kuchlinga6f68e12005-06-09 14:12:36 +0000837
838if __name__ == "__main__":
839 test_main(verbose=True)