blob: d01d39627219e975aca08410a4e4cbe65b40d7e2 [file] [log] [blame]
Tim Peters3230d5c2001-07-11 22:21:17 +00001'''
2Tests for fileinput module.
3Nick Mathewson
4'''
Inada Naoki333d10c2021-04-14 14:12:58 +09005import io
Benjamin Petersoneb462882011-03-15 09:50:18 -05006import os
7import sys
8import re
briancurtin906f0c42011-03-15 10:29:41 -04009import fileinput
10import collections
Florent Xiclunaa011e2b2011-11-07 19:43:07 +010011import builtins
Serhiy Storchaka5f48e262018-06-05 12:08:36 +030012import tempfile
Benjamin Petersoneb462882011-03-15 09:50:18 -050013import unittest
14
briancurtinf84f3c32011-03-18 13:03:17 -050015try:
16 import bz2
17except ImportError:
18 bz2 = None
Ezio Melottic3afbb92011-05-14 10:10:53 +030019try:
20 import gzip
21except ImportError:
22 gzip = None
briancurtinf84f3c32011-03-18 13:03:17 -050023
Serhiy Storchaka946cfc32014-05-14 21:08:33 +030024from io import BytesIO, StringIO
Benjamin Petersoneb462882011-03-15 09:50:18 -050025from fileinput import FileInput, hook_encoded
Roy Williams002665a2017-05-22 22:24:17 -070026from pathlib import Path
Benjamin Petersoneb462882011-03-15 09:50:18 -050027
Hai Shi847f94f2020-06-26 01:17:57 +080028from test.support import verbose
29from test.support.os_helper import TESTFN
30from test.support.os_helper import unlink as safe_unlink
31from test.support import os_helper
32from test.support import warnings_helper
Martin Panter7978e102016-01-16 06:26:54 +000033from test import support
Serhiy Storchaka946cfc32014-05-14 21:08:33 +030034from unittest import mock
Benjamin Petersoneb462882011-03-15 09:50:18 -050035
Tim Peters3230d5c2001-07-11 22:21:17 +000036
37# The fileinput module has 2 interfaces: the FileInput class which does
38# all the work, and a few functions (input, etc.) that use a global _state
briancurtin906f0c42011-03-15 10:29:41 -040039# variable.
Tim Peters3230d5c2001-07-11 22:21:17 +000040
Serhiy Storchaka5f48e262018-06-05 12:08:36 +030041class BaseTests:
42 # Write a content (str or bytes) to temp file, and return the
43 # temp file's name.
44 def writeTmp(self, content, *, mode='w'): # opening in text mode is the default
45 fd, name = tempfile.mkstemp()
Hai Shi847f94f2020-06-26 01:17:57 +080046 self.addCleanup(os_helper.unlink, name)
Serhiy Storchaka5f48e262018-06-05 12:08:36 +030047 with open(fd, mode) as f:
48 f.write(content)
49 return name
Tim Peters3230d5c2001-07-11 22:21:17 +000050
Serhiy Storchakacc2dbc52016-03-08 18:28:36 +020051class LineReader:
52
53 def __init__(self):
54 self._linesread = []
55
56 @property
57 def linesread(self):
58 try:
59 return self._linesread[:]
60 finally:
61 self._linesread = []
62
63 def openhook(self, filename, mode):
64 self.it = iter(filename.splitlines(True))
65 return self
66
67 def readline(self, size=None):
68 line = next(self.it, '')
69 self._linesread.append(line)
70 return line
71
72 def readlines(self, hint=-1):
73 lines = []
74 size = 0
75 while True:
76 line = self.readline()
77 if not line:
78 return lines
79 lines.append(line)
80 size += len(line)
81 if size >= hint:
82 return lines
83
84 def close(self):
85 pass
86
Serhiy Storchaka5f48e262018-06-05 12:08:36 +030087class BufferSizesTests(BaseTests, unittest.TestCase):
Guido van Rossumd8faa362007-04-27 19:54:29 +000088 def test_buffer_sizes(self):
Tim Peters3230d5c2001-07-11 22:21:17 +000089
Matthias Bussonnier1a3faf92019-05-20 13:44:11 -070090 t1 = self.writeTmp(''.join("Line %s of file 1\n" % (i+1) for i in range(15)))
91 t2 = self.writeTmp(''.join("Line %s of file 2\n" % (i+1) for i in range(10)))
92 t3 = self.writeTmp(''.join("Line %s of file 3\n" % (i+1) for i in range(5)))
93 t4 = self.writeTmp(''.join("Line %s of file 4\n" % (i+1) for i in range(1)))
94
Guido van Rossumd8faa362007-04-27 19:54:29 +000095 pat = re.compile(r'LINE (\d+) OF FILE (\d+)')
Tim Peters3230d5c2001-07-11 22:21:17 +000096
Guido van Rossumd8faa362007-04-27 19:54:29 +000097 if verbose:
Matthias Bussonnier1a3faf92019-05-20 13:44:11 -070098 print('1. Simple iteration')
99 fi = FileInput(files=(t1, t2, t3, t4))
Tim Peters3230d5c2001-07-11 22:21:17 +0000100 lines = list(fi)
Tim Peters3230d5c2001-07-11 22:21:17 +0000101 fi.close()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000102 self.assertEqual(len(lines), 31)
103 self.assertEqual(lines[4], 'Line 5 of file 1\n')
104 self.assertEqual(lines[30], 'Line 1 of file 4\n')
105 self.assertEqual(fi.lineno(), 31)
106 self.assertEqual(fi.filename(), t4)
Tim Peters3230d5c2001-07-11 22:21:17 +0000107
Guido van Rossumd8faa362007-04-27 19:54:29 +0000108 if verbose:
Matthias Bussonnier1a3faf92019-05-20 13:44:11 -0700109 print('2. Status variables')
110 fi = FileInput(files=(t1, t2, t3, t4))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000111 s = "x"
112 while s and s != 'Line 6 of file 2\n':
113 s = fi.readline()
114 self.assertEqual(fi.filename(), t2)
115 self.assertEqual(fi.lineno(), 21)
116 self.assertEqual(fi.filelineno(), 6)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000117 self.assertFalse(fi.isfirstline())
118 self.assertFalse(fi.isstdin())
Tim Peters3230d5c2001-07-11 22:21:17 +0000119
Guido van Rossumd8faa362007-04-27 19:54:29 +0000120 if verbose:
Matthias Bussonnier1a3faf92019-05-20 13:44:11 -0700121 print('3. Nextfile')
Guido van Rossumd8faa362007-04-27 19:54:29 +0000122 fi.nextfile()
123 self.assertEqual(fi.readline(), 'Line 1 of file 3\n')
124 self.assertEqual(fi.lineno(), 22)
125 fi.close()
Tim Peters3230d5c2001-07-11 22:21:17 +0000126
Guido van Rossumd8faa362007-04-27 19:54:29 +0000127 if verbose:
Matthias Bussonnier1a3faf92019-05-20 13:44:11 -0700128 print('4. Stdin')
129 fi = FileInput(files=(t1, t2, t3, t4, '-'))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000130 savestdin = sys.stdin
131 try:
132 sys.stdin = StringIO("Line 1 of stdin\nLine 2 of stdin\n")
133 lines = list(fi)
134 self.assertEqual(len(lines), 33)
135 self.assertEqual(lines[32], 'Line 2 of stdin\n')
136 self.assertEqual(fi.filename(), '<stdin>')
137 fi.nextfile()
138 finally:
139 sys.stdin = savestdin
Tim Peters3230d5c2001-07-11 22:21:17 +0000140
Guido van Rossumd8faa362007-04-27 19:54:29 +0000141 if verbose:
Matthias Bussonnier1a3faf92019-05-20 13:44:11 -0700142 print('5. Boundary conditions')
143 fi = FileInput(files=(t1, t2, t3, t4))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000144 self.assertEqual(fi.lineno(), 0)
145 self.assertEqual(fi.filename(), None)
146 fi.nextfile()
147 self.assertEqual(fi.lineno(), 0)
148 self.assertEqual(fi.filename(), None)
Tim Peters3230d5c2001-07-11 22:21:17 +0000149
Guido van Rossumd8faa362007-04-27 19:54:29 +0000150 if verbose:
Matthias Bussonnier1a3faf92019-05-20 13:44:11 -0700151 print('6. Inplace')
Guido van Rossumd8faa362007-04-27 19:54:29 +0000152 savestdout = sys.stdout
153 try:
Matthias Bussonnier1a3faf92019-05-20 13:44:11 -0700154 fi = FileInput(files=(t1, t2, t3, t4), inplace=1)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000155 for line in fi:
156 line = line[:-1].upper()
157 print(line)
158 fi.close()
159 finally:
160 sys.stdout = savestdout
Tim Peters3230d5c2001-07-11 22:21:17 +0000161
Matthias Bussonnier1a3faf92019-05-20 13:44:11 -0700162 fi = FileInput(files=(t1, t2, t3, t4))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000163 for line in fi:
164 self.assertEqual(line[-1], '\n')
165 m = pat.match(line[:-1])
166 self.assertNotEqual(m, None)
167 self.assertEqual(int(m.group(1)), fi.filelineno())
168 fi.close()
Georg Brandle4662172006-02-19 09:51:27 +0000169
briancurtin906f0c42011-03-15 10:29:41 -0400170class UnconditionallyRaise:
171 def __init__(self, exception_type):
172 self.exception_type = exception_type
173 self.invoked = False
174 def __call__(self, *args, **kwargs):
175 self.invoked = True
176 raise self.exception_type()
177
Serhiy Storchaka5f48e262018-06-05 12:08:36 +0300178class FileInputTests(BaseTests, unittest.TestCase):
briancurtin906f0c42011-03-15 10:29:41 -0400179
Guido van Rossumd8faa362007-04-27 19:54:29 +0000180 def test_zero_byte_files(self):
Serhiy Storchaka5f48e262018-06-05 12:08:36 +0300181 t1 = self.writeTmp("")
182 t2 = self.writeTmp("")
183 t3 = self.writeTmp("The only line there is.\n")
184 t4 = self.writeTmp("")
185 fi = FileInput(files=(t1, t2, t3, t4))
Georg Brandl67e9fb92006-02-19 13:56:17 +0000186
Serhiy Storchaka5f48e262018-06-05 12:08:36 +0300187 line = fi.readline()
188 self.assertEqual(line, 'The only line there is.\n')
189 self.assertEqual(fi.lineno(), 1)
190 self.assertEqual(fi.filelineno(), 1)
191 self.assertEqual(fi.filename(), t3)
Georg Brandlc029f872006-02-19 14:12:34 +0000192
Serhiy Storchaka5f48e262018-06-05 12:08:36 +0300193 line = fi.readline()
194 self.assertFalse(line)
195 self.assertEqual(fi.lineno(), 1)
196 self.assertEqual(fi.filelineno(), 0)
197 self.assertEqual(fi.filename(), t4)
198 fi.close()
Georg Brandlc98eeed2006-02-19 14:57:47 +0000199
Guido van Rossumd8faa362007-04-27 19:54:29 +0000200 def test_files_that_dont_end_with_newline(self):
Serhiy Storchaka5f48e262018-06-05 12:08:36 +0300201 t1 = self.writeTmp("A\nB\nC")
202 t2 = self.writeTmp("D\nE\nF")
203 fi = FileInput(files=(t1, t2))
204 lines = list(fi)
205 self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"])
206 self.assertEqual(fi.filelineno(), 3)
207 self.assertEqual(fi.lineno(), 6)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000208
Guido van Rossumc43e79f2007-06-18 18:26:36 +0000209## def test_unicode_filenames(self):
210## # XXX A unicode string is always returned by writeTmp.
211## # So is this needed?
Serhiy Storchaka5f48e262018-06-05 12:08:36 +0300212## t1 = self.writeTmp("A\nB")
213## encoding = sys.getfilesystemencoding()
214## if encoding is None:
215## encoding = 'ascii'
216## fi = FileInput(files=str(t1, encoding))
217## lines = list(fi)
218## self.assertEqual(lines, ["A\n", "B"])
Guido van Rossumd8faa362007-04-27 19:54:29 +0000219
220 def test_fileno(self):
Serhiy Storchaka5f48e262018-06-05 12:08:36 +0300221 t1 = self.writeTmp("A\nB")
222 t2 = self.writeTmp("C\nD")
223 fi = FileInput(files=(t1, t2))
224 self.assertEqual(fi.fileno(), -1)
225 line = next(fi)
226 self.assertNotEqual(fi.fileno(), -1)
227 fi.nextfile()
228 self.assertEqual(fi.fileno(), -1)
229 line = list(fi)
230 self.assertEqual(fi.fileno(), -1)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000231
232 def test_opening_mode(self):
Victor Stinner942f7a22020-03-04 18:50:22 +0100233 try:
234 # invalid mode, should raise ValueError
235 fi = FileInput(mode="w")
236 self.fail("FileInput should reject invalid mode argument")
237 except ValueError:
238 pass
239 # try opening in universal newline mode
240 t1 = self.writeTmp(b"A\nB\r\nC\rD", mode="wb")
Hai Shi847f94f2020-06-26 01:17:57 +0800241 with warnings_helper.check_warnings(('', DeprecationWarning)):
Inada Naoki333d10c2021-04-14 14:12:58 +0900242 fi = FileInput(files=t1, mode="U", encoding="utf-8")
Hai Shi847f94f2020-06-26 01:17:57 +0800243 with warnings_helper.check_warnings(('', DeprecationWarning)):
Victor Stinner942f7a22020-03-04 18:50:22 +0100244 lines = list(fi)
245 self.assertEqual(lines, ["A\n", "B\n", "C\n", "D"])
Guido van Rossumd8faa362007-04-27 19:54:29 +0000246
Serhiy Storchaka946cfc32014-05-14 21:08:33 +0300247 def test_stdin_binary_mode(self):
248 with mock.patch('sys.stdin') as m_stdin:
249 m_stdin.buffer = BytesIO(b'spam, bacon, sausage, and spam')
250 fi = FileInput(files=['-'], mode='rb')
251 lines = list(fi)
252 self.assertEqual(lines, [b'spam, bacon, sausage, and spam'])
253
R David Murray830207e2016-01-02 15:41:41 -0500254 def test_detached_stdin_binary_mode(self):
255 orig_stdin = sys.stdin
256 try:
257 sys.stdin = BytesIO(b'spam, bacon, sausage, and spam')
258 self.assertFalse(hasattr(sys.stdin, 'buffer'))
259 fi = FileInput(files=['-'], mode='rb')
260 lines = list(fi)
261 self.assertEqual(lines, [b'spam, bacon, sausage, and spam'])
262 finally:
263 sys.stdin = orig_stdin
264
Guido van Rossume22905a2007-08-27 23:09:25 +0000265 def test_file_opening_hook(self):
266 try:
267 # cannot use openhook and inplace mode
268 fi = FileInput(inplace=1, openhook=lambda f, m: None)
269 self.fail("FileInput should raise if both inplace "
270 "and openhook arguments are given")
271 except ValueError:
272 pass
273 try:
274 fi = FileInput(openhook=1)
275 self.fail("FileInput should check openhook for being callable")
276 except ValueError:
277 pass
briancurtin906f0c42011-03-15 10:29:41 -0400278
279 class CustomOpenHook:
280 def __init__(self):
281 self.invoked = False
Inada Naoki333d10c2021-04-14 14:12:58 +0900282 def __call__(self, *args, **kargs):
briancurtin906f0c42011-03-15 10:29:41 -0400283 self.invoked = True
284 return open(*args)
285
Serhiy Storchaka5f48e262018-06-05 12:08:36 +0300286 t = self.writeTmp("\n")
briancurtin906f0c42011-03-15 10:29:41 -0400287 custom_open_hook = CustomOpenHook()
288 with FileInput([t], openhook=custom_open_hook) as fi:
289 fi.readline()
290 self.assertTrue(custom_open_hook.invoked, "openhook not invoked")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000291
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200292 def test_readline(self):
293 with open(TESTFN, 'wb') as f:
294 f.write(b'A\nB\r\nC\r')
295 # Fill TextIOWrapper buffer.
296 f.write(b'123456789\n' * 1000)
297 # Issue #20501: readline() shouldn't read whole file.
298 f.write(b'\x80')
299 self.addCleanup(safe_unlink, TESTFN)
300
301 with FileInput(files=TESTFN,
Serhiy Storchakacc2dbc52016-03-08 18:28:36 +0200302 openhook=hook_encoded('ascii')) as fi:
Serhiy Storchaka682ea5f2014-03-03 21:17:17 +0200303 try:
304 self.assertEqual(fi.readline(), 'A\n')
305 self.assertEqual(fi.readline(), 'B\n')
306 self.assertEqual(fi.readline(), 'C\n')
307 except UnicodeDecodeError:
308 self.fail('Read to end of file')
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200309 with self.assertRaises(UnicodeDecodeError):
310 # Read to the end of file.
311 list(fi)
Serhiy Storchaka314464d2015-11-01 16:43:58 +0200312 self.assertEqual(fi.readline(), '')
313 self.assertEqual(fi.readline(), '')
314
315 def test_readline_binary_mode(self):
316 with open(TESTFN, 'wb') as f:
317 f.write(b'A\nB\r\nC\rD')
318 self.addCleanup(safe_unlink, TESTFN)
319
320 with FileInput(files=TESTFN, mode='rb') as fi:
321 self.assertEqual(fi.readline(), b'A\n')
322 self.assertEqual(fi.readline(), b'B\r\n')
323 self.assertEqual(fi.readline(), b'C\rD')
324 # Read to the end of file.
325 self.assertEqual(fi.readline(), b'')
326 self.assertEqual(fi.readline(), b'')
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200327
Berker Peksagbe6dbfb2019-04-29 17:55:39 +0300328 def test_inplace_binary_write_mode(self):
329 temp_file = self.writeTmp(b'Initial text.', mode='wb')
330 with FileInput(temp_file, mode='rb', inplace=True) as fobj:
331 line = fobj.readline()
332 self.assertEqual(line, b'Initial text.')
333 # print() cannot be used with files opened in binary mode.
334 sys.stdout.write(b'New line.')
335 with open(temp_file, 'rb') as f:
336 self.assertEqual(f.read(), b'New line.')
337
Inada Naoki333d10c2021-04-14 14:12:58 +0900338 def test_file_hook_backward_compatibility(self):
339 def old_hook(filename, mode):
340 return io.StringIO("I used to receive only filename and mode")
341 t = self.writeTmp("\n")
342 with FileInput([t], openhook=old_hook) as fi:
343 result = fi.readline()
344 self.assertEqual(result, "I used to receive only filename and mode")
345
Georg Brandl6cb7b652010-07-31 20:08:15 +0000346 def test_context_manager(self):
Serhiy Storchaka5f48e262018-06-05 12:08:36 +0300347 t1 = self.writeTmp("A\nB\nC")
348 t2 = self.writeTmp("D\nE\nF")
349 with FileInput(files=(t1, t2)) as fi:
350 lines = list(fi)
351 self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"])
352 self.assertEqual(fi.filelineno(), 3)
353 self.assertEqual(fi.lineno(), 6)
354 self.assertEqual(fi._files, ())
Georg Brandl6cb7b652010-07-31 20:08:15 +0000355
356 def test_close_on_exception(self):
Serhiy Storchaka5f48e262018-06-05 12:08:36 +0300357 t1 = self.writeTmp("")
Georg Brandl6cb7b652010-07-31 20:08:15 +0000358 try:
Georg Brandl6cb7b652010-07-31 20:08:15 +0000359 with FileInput(files=t1) as fi:
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200360 raise OSError
361 except OSError:
Georg Brandl6cb7b652010-07-31 20:08:15 +0000362 self.assertEqual(fi._files, ())
Georg Brandl6cb7b652010-07-31 20:08:15 +0000363
briancurtin906f0c42011-03-15 10:29:41 -0400364 def test_empty_files_list_specified_to_constructor(self):
365 with FileInput(files=[]) as fi:
Brett Cannond47af532011-03-15 15:55:12 -0400366 self.assertEqual(fi._files, ('-',))
briancurtin906f0c42011-03-15 10:29:41 -0400367
Hai Shi847f94f2020-06-26 01:17:57 +0800368 @warnings_helper.ignore_warnings(category=DeprecationWarning)
briancurtin906f0c42011-03-15 10:29:41 -0400369 def test__getitem__(self):
370 """Tests invoking FileInput.__getitem__() with the current
371 line number"""
Serhiy Storchaka5f48e262018-06-05 12:08:36 +0300372 t = self.writeTmp("line1\nline2\n")
briancurtin906f0c42011-03-15 10:29:41 -0400373 with FileInput(files=[t]) as fi:
374 retval1 = fi[0]
375 self.assertEqual(retval1, "line1\n")
376 retval2 = fi[1]
377 self.assertEqual(retval2, "line2\n")
378
Berker Peksag84a13fb2018-08-11 09:05:04 +0300379 def test__getitem___deprecation(self):
380 t = self.writeTmp("line1\nline2\n")
381 with self.assertWarnsRegex(DeprecationWarning,
382 r'Use iterator protocol instead'):
383 with FileInput(files=[t]) as fi:
384 self.assertEqual(fi[0], "line1\n")
385
Hai Shi847f94f2020-06-26 01:17:57 +0800386 @warnings_helper.ignore_warnings(category=DeprecationWarning)
briancurtin906f0c42011-03-15 10:29:41 -0400387 def test__getitem__invalid_key(self):
388 """Tests invoking FileInput.__getitem__() with an index unequal to
389 the line number"""
Serhiy Storchaka5f48e262018-06-05 12:08:36 +0300390 t = self.writeTmp("line1\nline2\n")
briancurtin906f0c42011-03-15 10:29:41 -0400391 with FileInput(files=[t]) as fi:
392 with self.assertRaises(RuntimeError) as cm:
393 fi[1]
Brett Cannond47af532011-03-15 15:55:12 -0400394 self.assertEqual(cm.exception.args, ("accessing lines out of order",))
briancurtin906f0c42011-03-15 10:29:41 -0400395
Hai Shi847f94f2020-06-26 01:17:57 +0800396 @warnings_helper.ignore_warnings(category=DeprecationWarning)
briancurtin906f0c42011-03-15 10:29:41 -0400397 def test__getitem__eof(self):
398 """Tests invoking FileInput.__getitem__() with the line number but at
399 end-of-input"""
Serhiy Storchaka5f48e262018-06-05 12:08:36 +0300400 t = self.writeTmp('')
briancurtin906f0c42011-03-15 10:29:41 -0400401 with FileInput(files=[t]) as fi:
402 with self.assertRaises(IndexError) as cm:
403 fi[0]
Brett Cannond47af532011-03-15 15:55:12 -0400404 self.assertEqual(cm.exception.args, ("end of input reached",))
briancurtin906f0c42011-03-15 10:29:41 -0400405
406 def test_nextfile_oserror_deleting_backup(self):
407 """Tests invoking FileInput.nextfile() when the attempt to delete
408 the backup file would raise OSError. This error is expected to be
409 silently ignored"""
410
411 os_unlink_orig = os.unlink
412 os_unlink_replacement = UnconditionallyRaise(OSError)
413 try:
Serhiy Storchaka5f48e262018-06-05 12:08:36 +0300414 t = self.writeTmp("\n")
Hai Shi847f94f2020-06-26 01:17:57 +0800415 self.addCleanup(safe_unlink, t + '.bak')
briancurtin906f0c42011-03-15 10:29:41 -0400416 with FileInput(files=[t], inplace=True) as fi:
417 next(fi) # make sure the file is opened
418 os.unlink = os_unlink_replacement
419 fi.nextfile()
420 finally:
421 os.unlink = os_unlink_orig
422
423 # sanity check to make sure that our test scenario was actually hit
424 self.assertTrue(os_unlink_replacement.invoked,
425 "os.unlink() was not invoked")
426
427 def test_readline_os_fstat_raises_OSError(self):
428 """Tests invoking FileInput.readline() when os.fstat() raises OSError.
429 This exception should be silently discarded."""
430
431 os_fstat_orig = os.fstat
432 os_fstat_replacement = UnconditionallyRaise(OSError)
433 try:
Serhiy Storchaka5f48e262018-06-05 12:08:36 +0300434 t = self.writeTmp("\n")
briancurtin906f0c42011-03-15 10:29:41 -0400435 with FileInput(files=[t], inplace=True) as fi:
436 os.fstat = os_fstat_replacement
437 fi.readline()
438 finally:
439 os.fstat = os_fstat_orig
440
441 # sanity check to make sure that our test scenario was actually hit
442 self.assertTrue(os_fstat_replacement.invoked,
443 "os.fstat() was not invoked")
444
briancurtin906f0c42011-03-15 10:29:41 -0400445 def test_readline_os_chmod_raises_OSError(self):
446 """Tests invoking FileInput.readline() when os.chmod() raises OSError.
447 This exception should be silently discarded."""
448
449 os_chmod_orig = os.chmod
450 os_chmod_replacement = UnconditionallyRaise(OSError)
451 try:
Serhiy Storchaka5f48e262018-06-05 12:08:36 +0300452 t = self.writeTmp("\n")
briancurtin906f0c42011-03-15 10:29:41 -0400453 with FileInput(files=[t], inplace=True) as fi:
454 os.chmod = os_chmod_replacement
455 fi.readline()
456 finally:
457 os.chmod = os_chmod_orig
458
459 # sanity check to make sure that our test scenario was actually hit
460 self.assertTrue(os_chmod_replacement.invoked,
461 "os.fstat() was not invoked")
462
463 def test_fileno_when_ValueError_raised(self):
464 class FilenoRaisesValueError(UnconditionallyRaise):
465 def __init__(self):
466 UnconditionallyRaise.__init__(self, ValueError)
467 def fileno(self):
468 self.__call__()
469
470 unconditionally_raise_ValueError = FilenoRaisesValueError()
Serhiy Storchaka5f48e262018-06-05 12:08:36 +0300471 t = self.writeTmp("\n")
briancurtin906f0c42011-03-15 10:29:41 -0400472 with FileInput(files=[t]) as fi:
473 file_backup = fi._file
474 try:
475 fi._file = unconditionally_raise_ValueError
476 result = fi.fileno()
477 finally:
478 fi._file = file_backup # make sure the file gets cleaned up
479
480 # sanity check to make sure that our test scenario was actually hit
481 self.assertTrue(unconditionally_raise_ValueError.invoked,
482 "_file.fileno() was not invoked")
483
484 self.assertEqual(result, -1, "fileno() should return -1")
485
Serhiy Storchakacc2dbc52016-03-08 18:28:36 +0200486 def test_readline_buffering(self):
487 src = LineReader()
488 with FileInput(files=['line1\nline2', 'line3\n'],
489 openhook=src.openhook) as fi:
490 self.assertEqual(src.linesread, [])
491 self.assertEqual(fi.readline(), 'line1\n')
492 self.assertEqual(src.linesread, ['line1\n'])
493 self.assertEqual(fi.readline(), 'line2')
494 self.assertEqual(src.linesread, ['line2'])
495 self.assertEqual(fi.readline(), 'line3\n')
496 self.assertEqual(src.linesread, ['', 'line3\n'])
497 self.assertEqual(fi.readline(), '')
498 self.assertEqual(src.linesread, [''])
499 self.assertEqual(fi.readline(), '')
500 self.assertEqual(src.linesread, [])
501
502 def test_iteration_buffering(self):
503 src = LineReader()
504 with FileInput(files=['line1\nline2', 'line3\n'],
505 openhook=src.openhook) as fi:
506 self.assertEqual(src.linesread, [])
507 self.assertEqual(next(fi), 'line1\n')
508 self.assertEqual(src.linesread, ['line1\n'])
509 self.assertEqual(next(fi), 'line2')
510 self.assertEqual(src.linesread, ['line2'])
511 self.assertEqual(next(fi), 'line3\n')
512 self.assertEqual(src.linesread, ['', 'line3\n'])
513 self.assertRaises(StopIteration, next, fi)
514 self.assertEqual(src.linesread, [''])
515 self.assertRaises(StopIteration, next, fi)
516 self.assertEqual(src.linesread, [])
517
Roy Williams002665a2017-05-22 22:24:17 -0700518 def test_pathlib_file(self):
Serhiy Storchaka5f48e262018-06-05 12:08:36 +0300519 t1 = Path(self.writeTmp("Pathlib file."))
520 with FileInput(t1) as fi:
521 line = fi.readline()
522 self.assertEqual(line, 'Pathlib file.')
523 self.assertEqual(fi.lineno(), 1)
524 self.assertEqual(fi.filelineno(), 1)
525 self.assertEqual(fi.filename(), os.fspath(t1))
Roy Williams002665a2017-05-22 22:24:17 -0700526
Zhiming Wang06de1ae2017-09-05 01:37:24 +0800527 def test_pathlib_file_inplace(self):
Serhiy Storchaka5f48e262018-06-05 12:08:36 +0300528 t1 = Path(self.writeTmp('Pathlib file.'))
529 with FileInput(t1, inplace=True) as fi:
530 line = fi.readline()
531 self.assertEqual(line, 'Pathlib file.')
532 print('Modified %s' % line)
533 with open(t1) as f:
534 self.assertEqual(f.read(), 'Modified Pathlib file.\n')
Zhiming Wang06de1ae2017-09-05 01:37:24 +0800535
Roy Williams002665a2017-05-22 22:24:17 -0700536
briancurtin906f0c42011-03-15 10:29:41 -0400537class MockFileInput:
538 """A class that mocks out fileinput.FileInput for use during unit tests"""
539
Matthias Bussonnier1a3faf92019-05-20 13:44:11 -0700540 def __init__(self, files=None, inplace=False, backup="", *,
Inada Naoki333d10c2021-04-14 14:12:58 +0900541 mode="r", openhook=None, encoding=None, errors=None):
briancurtin906f0c42011-03-15 10:29:41 -0400542 self.files = files
543 self.inplace = inplace
544 self.backup = backup
briancurtin906f0c42011-03-15 10:29:41 -0400545 self.mode = mode
546 self.openhook = openhook
Inada Naoki333d10c2021-04-14 14:12:58 +0900547 self.encoding = encoding
548 self.errors = errors
briancurtin906f0c42011-03-15 10:29:41 -0400549 self._file = None
550 self.invocation_counts = collections.defaultdict(lambda: 0)
551 self.return_values = {}
552
553 def close(self):
554 self.invocation_counts["close"] += 1
555
556 def nextfile(self):
557 self.invocation_counts["nextfile"] += 1
558 return self.return_values["nextfile"]
559
560 def filename(self):
561 self.invocation_counts["filename"] += 1
562 return self.return_values["filename"]
563
564 def lineno(self):
565 self.invocation_counts["lineno"] += 1
566 return self.return_values["lineno"]
567
568 def filelineno(self):
569 self.invocation_counts["filelineno"] += 1
570 return self.return_values["filelineno"]
571
572 def fileno(self):
573 self.invocation_counts["fileno"] += 1
574 return self.return_values["fileno"]
575
576 def isfirstline(self):
577 self.invocation_counts["isfirstline"] += 1
578 return self.return_values["isfirstline"]
579
580 def isstdin(self):
581 self.invocation_counts["isstdin"] += 1
582 return self.return_values["isstdin"]
583
584class BaseFileInputGlobalMethodsTest(unittest.TestCase):
585 """Base class for unit tests for the global function of
586 the fileinput module."""
587
588 def setUp(self):
589 self._orig_state = fileinput._state
590 self._orig_FileInput = fileinput.FileInput
591 fileinput.FileInput = MockFileInput
592
593 def tearDown(self):
594 fileinput.FileInput = self._orig_FileInput
595 fileinput._state = self._orig_state
596
597 def assertExactlyOneInvocation(self, mock_file_input, method_name):
598 # assert that the method with the given name was invoked once
599 actual_count = mock_file_input.invocation_counts[method_name]
600 self.assertEqual(actual_count, 1, method_name)
601 # assert that no other unexpected methods were invoked
602 actual_total_count = len(mock_file_input.invocation_counts)
603 self.assertEqual(actual_total_count, 1)
604
605class Test_fileinput_input(BaseFileInputGlobalMethodsTest):
606 """Unit tests for fileinput.input()"""
607
608 def test_state_is_not_None_and_state_file_is_not_None(self):
609 """Tests invoking fileinput.input() when fileinput._state is not None
610 and its _file attribute is also not None. Expect RuntimeError to
611 be raised with a meaningful error message and for fileinput._state
612 to *not* be modified."""
613 instance = MockFileInput()
614 instance._file = object()
615 fileinput._state = instance
616 with self.assertRaises(RuntimeError) as cm:
617 fileinput.input()
618 self.assertEqual(("input() already active",), cm.exception.args)
619 self.assertIs(instance, fileinput._state, "fileinput._state")
620
621 def test_state_is_not_None_and_state_file_is_None(self):
622 """Tests invoking fileinput.input() when fileinput._state is not None
623 but its _file attribute *is* None. Expect it to create and return
624 a new fileinput.FileInput object with all method parameters passed
625 explicitly to the __init__() method; also ensure that
626 fileinput._state is set to the returned instance."""
627 instance = MockFileInput()
628 instance._file = None
629 fileinput._state = instance
630 self.do_test_call_input()
631
632 def test_state_is_None(self):
633 """Tests invoking fileinput.input() when fileinput._state is None
634 Expect it to create and return a new fileinput.FileInput object
635 with all method parameters passed explicitly to the __init__()
636 method; also ensure that fileinput._state is set to the returned
637 instance."""
638 fileinput._state = None
639 self.do_test_call_input()
640
641 def do_test_call_input(self):
642 """Tests that fileinput.input() creates a new fileinput.FileInput
643 object, passing the given parameters unmodified to
644 fileinput.FileInput.__init__(). Note that this test depends on the
645 monkey patching of fileinput.FileInput done by setUp()."""
646 files = object()
647 inplace = object()
648 backup = object()
briancurtin906f0c42011-03-15 10:29:41 -0400649 mode = object()
650 openhook = object()
Inada Naoki333d10c2021-04-14 14:12:58 +0900651 encoding = object()
briancurtin906f0c42011-03-15 10:29:41 -0400652
653 # call fileinput.input() with different values for each argument
654 result = fileinput.input(files=files, inplace=inplace, backup=backup,
Inada Naoki333d10c2021-04-14 14:12:58 +0900655 mode=mode, openhook=openhook, encoding=encoding)
briancurtin906f0c42011-03-15 10:29:41 -0400656
657 # ensure fileinput._state was set to the returned object
658 self.assertIs(result, fileinput._state, "fileinput._state")
659
660 # ensure the parameters to fileinput.input() were passed directly
661 # to FileInput.__init__()
662 self.assertIs(files, result.files, "files")
663 self.assertIs(inplace, result.inplace, "inplace")
664 self.assertIs(backup, result.backup, "backup")
briancurtin906f0c42011-03-15 10:29:41 -0400665 self.assertIs(mode, result.mode, "mode")
666 self.assertIs(openhook, result.openhook, "openhook")
667
668class Test_fileinput_close(BaseFileInputGlobalMethodsTest):
669 """Unit tests for fileinput.close()"""
670
671 def test_state_is_None(self):
672 """Tests that fileinput.close() does nothing if fileinput._state
673 is None"""
674 fileinput._state = None
675 fileinput.close()
676 self.assertIsNone(fileinput._state)
677
678 def test_state_is_not_None(self):
679 """Tests that fileinput.close() invokes close() on fileinput._state
680 and sets _state=None"""
681 instance = MockFileInput()
682 fileinput._state = instance
683 fileinput.close()
684 self.assertExactlyOneInvocation(instance, "close")
685 self.assertIsNone(fileinput._state)
686
687class Test_fileinput_nextfile(BaseFileInputGlobalMethodsTest):
688 """Unit tests for fileinput.nextfile()"""
689
690 def test_state_is_None(self):
691 """Tests fileinput.nextfile() when fileinput._state is None.
692 Ensure that it raises RuntimeError with a meaningful error message
693 and does not modify fileinput._state"""
694 fileinput._state = None
695 with self.assertRaises(RuntimeError) as cm:
696 fileinput.nextfile()
697 self.assertEqual(("no active input()",), cm.exception.args)
698 self.assertIsNone(fileinput._state)
699
700 def test_state_is_not_None(self):
701 """Tests fileinput.nextfile() when fileinput._state is not None.
702 Ensure that it invokes fileinput._state.nextfile() exactly once,
703 returns whatever it returns, and does not modify fileinput._state
704 to point to a different object."""
705 nextfile_retval = object()
706 instance = MockFileInput()
707 instance.return_values["nextfile"] = nextfile_retval
708 fileinput._state = instance
709 retval = fileinput.nextfile()
710 self.assertExactlyOneInvocation(instance, "nextfile")
711 self.assertIs(retval, nextfile_retval)
712 self.assertIs(fileinput._state, instance)
713
714class Test_fileinput_filename(BaseFileInputGlobalMethodsTest):
715 """Unit tests for fileinput.filename()"""
716
717 def test_state_is_None(self):
718 """Tests fileinput.filename() when fileinput._state is None.
719 Ensure that it raises RuntimeError with a meaningful error message
720 and does not modify fileinput._state"""
721 fileinput._state = None
722 with self.assertRaises(RuntimeError) as cm:
723 fileinput.filename()
724 self.assertEqual(("no active input()",), cm.exception.args)
725 self.assertIsNone(fileinput._state)
726
727 def test_state_is_not_None(self):
728 """Tests fileinput.filename() when fileinput._state is not None.
729 Ensure that it invokes fileinput._state.filename() exactly once,
730 returns whatever it returns, and does not modify fileinput._state
731 to point to a different object."""
732 filename_retval = object()
733 instance = MockFileInput()
734 instance.return_values["filename"] = filename_retval
735 fileinput._state = instance
736 retval = fileinput.filename()
737 self.assertExactlyOneInvocation(instance, "filename")
738 self.assertIs(retval, filename_retval)
739 self.assertIs(fileinput._state, instance)
740
741class Test_fileinput_lineno(BaseFileInputGlobalMethodsTest):
742 """Unit tests for fileinput.lineno()"""
743
744 def test_state_is_None(self):
745 """Tests fileinput.lineno() when fileinput._state is None.
746 Ensure that it raises RuntimeError with a meaningful error message
747 and does not modify fileinput._state"""
748 fileinput._state = None
749 with self.assertRaises(RuntimeError) as cm:
750 fileinput.lineno()
751 self.assertEqual(("no active input()",), cm.exception.args)
752 self.assertIsNone(fileinput._state)
753
754 def test_state_is_not_None(self):
755 """Tests fileinput.lineno() when fileinput._state is not None.
756 Ensure that it invokes fileinput._state.lineno() exactly once,
757 returns whatever it returns, and does not modify fileinput._state
758 to point to a different object."""
759 lineno_retval = object()
760 instance = MockFileInput()
761 instance.return_values["lineno"] = lineno_retval
762 fileinput._state = instance
763 retval = fileinput.lineno()
764 self.assertExactlyOneInvocation(instance, "lineno")
765 self.assertIs(retval, lineno_retval)
766 self.assertIs(fileinput._state, instance)
767
768class Test_fileinput_filelineno(BaseFileInputGlobalMethodsTest):
769 """Unit tests for fileinput.filelineno()"""
770
771 def test_state_is_None(self):
772 """Tests fileinput.filelineno() when fileinput._state is None.
773 Ensure that it raises RuntimeError with a meaningful error message
774 and does not modify fileinput._state"""
775 fileinput._state = None
776 with self.assertRaises(RuntimeError) as cm:
777 fileinput.filelineno()
778 self.assertEqual(("no active input()",), cm.exception.args)
779 self.assertIsNone(fileinput._state)
780
781 def test_state_is_not_None(self):
782 """Tests fileinput.filelineno() when fileinput._state is not None.
783 Ensure that it invokes fileinput._state.filelineno() exactly once,
784 returns whatever it returns, and does not modify fileinput._state
785 to point to a different object."""
786 filelineno_retval = object()
787 instance = MockFileInput()
788 instance.return_values["filelineno"] = filelineno_retval
789 fileinput._state = instance
790 retval = fileinput.filelineno()
791 self.assertExactlyOneInvocation(instance, "filelineno")
792 self.assertIs(retval, filelineno_retval)
793 self.assertIs(fileinput._state, instance)
794
795class Test_fileinput_fileno(BaseFileInputGlobalMethodsTest):
796 """Unit tests for fileinput.fileno()"""
797
798 def test_state_is_None(self):
799 """Tests fileinput.fileno() when fileinput._state is None.
800 Ensure that it raises RuntimeError with a meaningful error message
801 and does not modify fileinput._state"""
802 fileinput._state = None
803 with self.assertRaises(RuntimeError) as cm:
804 fileinput.fileno()
805 self.assertEqual(("no active input()",), cm.exception.args)
806 self.assertIsNone(fileinput._state)
807
808 def test_state_is_not_None(self):
809 """Tests fileinput.fileno() when fileinput._state is not None.
810 Ensure that it invokes fileinput._state.fileno() exactly once,
811 returns whatever it returns, and does not modify fileinput._state
812 to point to a different object."""
813 fileno_retval = object()
814 instance = MockFileInput()
815 instance.return_values["fileno"] = fileno_retval
816 instance.fileno_retval = fileno_retval
817 fileinput._state = instance
818 retval = fileinput.fileno()
819 self.assertExactlyOneInvocation(instance, "fileno")
820 self.assertIs(retval, fileno_retval)
821 self.assertIs(fileinput._state, instance)
822
823class Test_fileinput_isfirstline(BaseFileInputGlobalMethodsTest):
824 """Unit tests for fileinput.isfirstline()"""
825
826 def test_state_is_None(self):
827 """Tests fileinput.isfirstline() when fileinput._state is None.
828 Ensure that it raises RuntimeError with a meaningful error message
829 and does not modify fileinput._state"""
830 fileinput._state = None
831 with self.assertRaises(RuntimeError) as cm:
832 fileinput.isfirstline()
833 self.assertEqual(("no active input()",), cm.exception.args)
834 self.assertIsNone(fileinput._state)
835
836 def test_state_is_not_None(self):
837 """Tests fileinput.isfirstline() when fileinput._state is not None.
838 Ensure that it invokes fileinput._state.isfirstline() exactly once,
839 returns whatever it returns, and does not modify fileinput._state
840 to point to a different object."""
841 isfirstline_retval = object()
842 instance = MockFileInput()
843 instance.return_values["isfirstline"] = isfirstline_retval
844 fileinput._state = instance
845 retval = fileinput.isfirstline()
846 self.assertExactlyOneInvocation(instance, "isfirstline")
847 self.assertIs(retval, isfirstline_retval)
848 self.assertIs(fileinput._state, instance)
849
850class Test_fileinput_isstdin(BaseFileInputGlobalMethodsTest):
851 """Unit tests for fileinput.isstdin()"""
852
853 def test_state_is_None(self):
854 """Tests fileinput.isstdin() when fileinput._state is None.
855 Ensure that it raises RuntimeError with a meaningful error message
856 and does not modify fileinput._state"""
857 fileinput._state = None
858 with self.assertRaises(RuntimeError) as cm:
859 fileinput.isstdin()
860 self.assertEqual(("no active input()",), cm.exception.args)
861 self.assertIsNone(fileinput._state)
862
863 def test_state_is_not_None(self):
864 """Tests fileinput.isstdin() when fileinput._state is not None.
865 Ensure that it invokes fileinput._state.isstdin() exactly once,
866 returns whatever it returns, and does not modify fileinput._state
867 to point to a different object."""
868 isstdin_retval = object()
869 instance = MockFileInput()
870 instance.return_values["isstdin"] = isstdin_retval
871 fileinput._state = instance
872 retval = fileinput.isstdin()
873 self.assertExactlyOneInvocation(instance, "isstdin")
874 self.assertIs(retval, isstdin_retval)
875 self.assertIs(fileinput._state, instance)
876
877class InvocationRecorder:
Inada Naoki333d10c2021-04-14 14:12:58 +0900878
briancurtin906f0c42011-03-15 10:29:41 -0400879 def __init__(self):
880 self.invocation_count = 0
Inada Naoki333d10c2021-04-14 14:12:58 +0900881
briancurtin906f0c42011-03-15 10:29:41 -0400882 def __call__(self, *args, **kwargs):
883 self.invocation_count += 1
884 self.last_invocation = (args, kwargs)
Inada Naoki333d10c2021-04-14 14:12:58 +0900885 return io.BytesIO(b'some bytes')
886
briancurtin906f0c42011-03-15 10:29:41 -0400887
888class Test_hook_compressed(unittest.TestCase):
889 """Unit tests for fileinput.hook_compressed()"""
890
891 def setUp(self):
892 self.fake_open = InvocationRecorder()
893
894 def test_empty_string(self):
895 self.do_test_use_builtin_open("", 1)
896
897 def test_no_ext(self):
898 self.do_test_use_builtin_open("abcd", 2)
899
Ezio Melottic3afbb92011-05-14 10:10:53 +0300900 @unittest.skipUnless(gzip, "Requires gzip and zlib")
briancurtin5eb35912011-03-15 10:59:36 -0400901 def test_gz_ext_fake(self):
briancurtin906f0c42011-03-15 10:29:41 -0400902 original_open = gzip.open
903 gzip.open = self.fake_open
904 try:
Inada Naoki333d10c2021-04-14 14:12:58 +0900905 result = fileinput.hook_compressed("test.gz", "3")
briancurtin906f0c42011-03-15 10:29:41 -0400906 finally:
907 gzip.open = original_open
908
909 self.assertEqual(self.fake_open.invocation_count, 1)
Inada Naoki333d10c2021-04-14 14:12:58 +0900910 self.assertEqual(self.fake_open.last_invocation, (("test.gz", "3"), {}))
911
912 @unittest.skipUnless(gzip, "Requires gzip and zlib")
913 def test_gz_with_encoding_fake(self):
914 original_open = gzip.open
915 gzip.open = lambda filename, mode: io.BytesIO(b'Ex-binary string')
916 try:
917 result = fileinput.hook_compressed("test.gz", "3", encoding="utf-8")
918 finally:
919 gzip.open = original_open
920 self.assertEqual(list(result), ['Ex-binary string'])
briancurtin906f0c42011-03-15 10:29:41 -0400921
briancurtinf84f3c32011-03-18 13:03:17 -0500922 @unittest.skipUnless(bz2, "Requires bz2")
briancurtin5eb35912011-03-15 10:59:36 -0400923 def test_bz2_ext_fake(self):
briancurtin906f0c42011-03-15 10:29:41 -0400924 original_open = bz2.BZ2File
925 bz2.BZ2File = self.fake_open
926 try:
Inada Naoki333d10c2021-04-14 14:12:58 +0900927 result = fileinput.hook_compressed("test.bz2", "4")
briancurtin906f0c42011-03-15 10:29:41 -0400928 finally:
929 bz2.BZ2File = original_open
930
931 self.assertEqual(self.fake_open.invocation_count, 1)
Inada Naoki333d10c2021-04-14 14:12:58 +0900932 self.assertEqual(self.fake_open.last_invocation, (("test.bz2", "4"), {}))
briancurtin906f0c42011-03-15 10:29:41 -0400933
934 def test_blah_ext(self):
Inada Naoki333d10c2021-04-14 14:12:58 +0900935 self.do_test_use_builtin_open("abcd.blah", "5")
briancurtin906f0c42011-03-15 10:29:41 -0400936
briancurtin5eb35912011-03-15 10:59:36 -0400937 def test_gz_ext_builtin(self):
Inada Naoki333d10c2021-04-14 14:12:58 +0900938 self.do_test_use_builtin_open("abcd.Gz", "6")
briancurtin906f0c42011-03-15 10:29:41 -0400939
briancurtin5eb35912011-03-15 10:59:36 -0400940 def test_bz2_ext_builtin(self):
Inada Naoki333d10c2021-04-14 14:12:58 +0900941 self.do_test_use_builtin_open("abcd.Bz2", "7")
briancurtin906f0c42011-03-15 10:29:41 -0400942
943 def do_test_use_builtin_open(self, filename, mode):
944 original_open = self.replace_builtin_open(self.fake_open)
945 try:
946 result = fileinput.hook_compressed(filename, mode)
947 finally:
948 self.replace_builtin_open(original_open)
949
950 self.assertEqual(self.fake_open.invocation_count, 1)
951 self.assertEqual(self.fake_open.last_invocation,
Inada Naoki333d10c2021-04-14 14:12:58 +0900952 ((filename, mode), {'encoding': 'locale', 'errors': None}))
briancurtin906f0c42011-03-15 10:29:41 -0400953
954 @staticmethod
955 def replace_builtin_open(new_open_func):
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100956 original_open = builtins.open
957 builtins.open = new_open_func
briancurtin906f0c42011-03-15 10:29:41 -0400958 return original_open
959
960class Test_hook_encoded(unittest.TestCase):
961 """Unit tests for fileinput.hook_encoded()"""
962
963 def test(self):
964 encoding = object()
Serhiy Storchakab2752102016-04-27 23:13:46 +0300965 errors = object()
966 result = fileinput.hook_encoded(encoding, errors=errors)
briancurtin906f0c42011-03-15 10:29:41 -0400967
968 fake_open = InvocationRecorder()
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100969 original_open = builtins.open
970 builtins.open = fake_open
briancurtin906f0c42011-03-15 10:29:41 -0400971 try:
972 filename = object()
973 mode = object()
974 open_result = result(filename, mode)
975 finally:
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100976 builtins.open = original_open
briancurtin906f0c42011-03-15 10:29:41 -0400977
978 self.assertEqual(fake_open.invocation_count, 1)
979
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100980 args, kwargs = fake_open.last_invocation
briancurtin906f0c42011-03-15 10:29:41 -0400981 self.assertIs(args[0], filename)
982 self.assertIs(args[1], mode)
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100983 self.assertIs(kwargs.pop('encoding'), encoding)
Serhiy Storchakab2752102016-04-27 23:13:46 +0300984 self.assertIs(kwargs.pop('errors'), errors)
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100985 self.assertFalse(kwargs)
Georg Brandl6cb7b652010-07-31 20:08:15 +0000986
Serhiy Storchakab2752102016-04-27 23:13:46 +0300987 def test_errors(self):
988 with open(TESTFN, 'wb') as f:
989 f.write(b'\x80abc')
990 self.addCleanup(safe_unlink, TESTFN)
991
992 def check(errors, expected_lines):
993 with FileInput(files=TESTFN, mode='r',
994 openhook=hook_encoded('utf-8', errors=errors)) as fi:
995 lines = list(fi)
996 self.assertEqual(lines, expected_lines)
997
998 check('ignore', ['abc'])
999 with self.assertRaises(UnicodeDecodeError):
1000 check('strict', ['abc'])
1001 check('replace', ['\ufffdabc'])
1002 check('backslashreplace', ['\\x80abc'])
1003
Serhiy Storchaka517b7472014-02-26 20:59:43 +02001004 def test_modes(self):
Serhiy Storchaka517b7472014-02-26 20:59:43 +02001005 with open(TESTFN, 'wb') as f:
Serhiy Storchaka682ea5f2014-03-03 21:17:17 +02001006 # UTF-7 is a convenient, seldom used encoding
Serhiy Storchaka517b7472014-02-26 20:59:43 +02001007 f.write(b'A\nB\r\nC\rD+IKw-')
1008 self.addCleanup(safe_unlink, TESTFN)
1009
1010 def check(mode, expected_lines):
1011 with FileInput(files=TESTFN, mode=mode,
1012 openhook=hook_encoded('utf-7')) as fi:
1013 lines = list(fi)
1014 self.assertEqual(lines, expected_lines)
1015
1016 check('r', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
Victor Stinner942f7a22020-03-04 18:50:22 +01001017 with self.assertWarns(DeprecationWarning):
1018 check('rU', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
1019 with self.assertWarns(DeprecationWarning):
1020 check('U', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
Serhiy Storchaka517b7472014-02-26 20:59:43 +02001021 with self.assertRaises(ValueError):
1022 check('rb', ['A\n', 'B\r\n', 'C\r', 'D\u20ac'])
1023
Guido van Rossumd8faa362007-04-27 19:54:29 +00001024
Martin Panter7978e102016-01-16 06:26:54 +00001025class MiscTest(unittest.TestCase):
1026
1027 def test_all(self):
Serhiy Storchaka674e2d02016-03-08 18:35:19 +02001028 support.check__all__(self, fileinput)
Martin Panter7978e102016-01-16 06:26:54 +00001029
1030
Guido van Rossumd8faa362007-04-27 19:54:29 +00001031if __name__ == "__main__":
Brett Cannon3e9a9ae2013-06-12 21:25:59 -04001032 unittest.main()