blob: d7efc685d87e383f7b6f1c5edff7bd01f08144ad [file] [log] [blame]
Tim Peters3230d5c2001-07-11 22:21:17 +00001'''
2Tests for fileinput module.
3Nick Mathewson
4'''
Benjamin Petersoneb462882011-03-15 09:50:18 -05005import os
6import sys
7import re
briancurtin906f0c42011-03-15 10:29:41 -04008import fileinput
9import collections
Florent Xiclunaa011e2b2011-11-07 19:43:07 +010010import builtins
Benjamin Petersoneb462882011-03-15 09:50:18 -050011import unittest
12
briancurtinf84f3c32011-03-18 13:03:17 -050013try:
14 import bz2
15except ImportError:
16 bz2 = None
Ezio Melottic3afbb92011-05-14 10:10:53 +030017try:
18 import gzip
19except ImportError:
20 gzip = None
briancurtinf84f3c32011-03-18 13:03:17 -050021
Serhiy Storchaka946cfc32014-05-14 21:08:33 +030022from io import BytesIO, StringIO
Benjamin Petersoneb462882011-03-15 09:50:18 -050023from fileinput import FileInput, hook_encoded
Roy Williams002665a2017-05-22 22:24:17 -070024from pathlib import Path
Benjamin Petersoneb462882011-03-15 09:50:18 -050025
Serhiy Storchaka597d15a2016-04-24 13:45:58 +030026from test.support import verbose, TESTFN, check_warnings
Benjamin Petersoneb462882011-03-15 09:50:18 -050027from test.support import unlink as safe_unlink
Martin Panter7978e102016-01-16 06:26:54 +000028from test import support
Serhiy Storchaka946cfc32014-05-14 21:08:33 +030029from unittest import mock
Benjamin Petersoneb462882011-03-15 09:50:18 -050030
Tim Peters3230d5c2001-07-11 22:21:17 +000031
32# The fileinput module has 2 interfaces: the FileInput class which does
33# all the work, and a few functions (input, etc.) that use a global _state
briancurtin906f0c42011-03-15 10:29:41 -040034# variable.
Tim Peters3230d5c2001-07-11 22:21:17 +000035
36# Write lines (a list of lines) to temp file number i, and return the
37# temp file's name.
Tim Peters4d7cad12006-02-19 21:22:10 +000038def writeTmp(i, lines, mode='w'): # opening in text mode is the default
Tim Peters3230d5c2001-07-11 22:21:17 +000039 name = TESTFN + str(i)
Tim Peters4d7cad12006-02-19 21:22:10 +000040 f = open(name, mode)
Guido van Rossumc43e79f2007-06-18 18:26:36 +000041 for line in lines:
42 f.write(line)
Tim Peters3230d5c2001-07-11 22:21:17 +000043 f.close()
44 return name
45
Tim Peters3230d5c2001-07-11 22:21:17 +000046def remove_tempfiles(*names):
47 for name in names:
Guido van Rossume22905a2007-08-27 23:09:25 +000048 if name:
49 safe_unlink(name)
Tim Peters3230d5c2001-07-11 22:21:17 +000050
Serhiy Storchakacc2dbc52016-03-08 18:28:36 +020051class LineReader:
52
53 def __init__(self):
54 self._linesread = []
55
56 @property
57 def linesread(self):
58 try:
59 return self._linesread[:]
60 finally:
61 self._linesread = []
62
63 def openhook(self, filename, mode):
64 self.it = iter(filename.splitlines(True))
65 return self
66
67 def readline(self, size=None):
68 line = next(self.it, '')
69 self._linesread.append(line)
70 return line
71
72 def readlines(self, hint=-1):
73 lines = []
74 size = 0
75 while True:
76 line = self.readline()
77 if not line:
78 return lines
79 lines.append(line)
80 size += len(line)
81 if size >= hint:
82 return lines
83
84 def close(self):
85 pass
86
Guido van Rossumd8faa362007-04-27 19:54:29 +000087class BufferSizesTests(unittest.TestCase):
88 def test_buffer_sizes(self):
89 # First, run the tests with default and teeny buffer size.
90 for round, bs in (0, 0), (1, 30):
Neal Norwitz2595e762008-03-24 06:10:13 +000091 t1 = t2 = t3 = t4 = None
Guido van Rossumd8faa362007-04-27 19:54:29 +000092 try:
93 t1 = writeTmp(1, ["Line %s of file 1\n" % (i+1) for i in range(15)])
94 t2 = writeTmp(2, ["Line %s of file 2\n" % (i+1) for i in range(10)])
95 t3 = writeTmp(3, ["Line %s of file 3\n" % (i+1) for i in range(5)])
96 t4 = writeTmp(4, ["Line %s of file 4\n" % (i+1) for i in range(1)])
Serhiy Storchaka674e2d02016-03-08 18:35:19 +020097 if bs:
98 with self.assertWarns(DeprecationWarning):
99 self.buffer_size_test(t1, t2, t3, t4, bs, round)
100 else:
101 self.buffer_size_test(t1, t2, t3, t4, bs, round)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000102 finally:
103 remove_tempfiles(t1, t2, t3, t4)
Tim Peters3230d5c2001-07-11 22:21:17 +0000104
Guido van Rossumd8faa362007-04-27 19:54:29 +0000105 def buffer_size_test(self, t1, t2, t3, t4, bs=0, round=0):
106 pat = re.compile(r'LINE (\d+) OF FILE (\d+)')
Tim Peters3230d5c2001-07-11 22:21:17 +0000107
Guido van Rossumd8faa362007-04-27 19:54:29 +0000108 start = 1 + round*6
109 if verbose:
110 print('%s. Simple iteration (bs=%s)' % (start+0, bs))
111 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
Tim Peters3230d5c2001-07-11 22:21:17 +0000112 lines = list(fi)
Tim Peters3230d5c2001-07-11 22:21:17 +0000113 fi.close()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000114 self.assertEqual(len(lines), 31)
115 self.assertEqual(lines[4], 'Line 5 of file 1\n')
116 self.assertEqual(lines[30], 'Line 1 of file 4\n')
117 self.assertEqual(fi.lineno(), 31)
118 self.assertEqual(fi.filename(), t4)
Tim Peters3230d5c2001-07-11 22:21:17 +0000119
Guido van Rossumd8faa362007-04-27 19:54:29 +0000120 if verbose:
121 print('%s. Status variables (bs=%s)' % (start+1, bs))
122 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
123 s = "x"
124 while s and s != 'Line 6 of file 2\n':
125 s = fi.readline()
126 self.assertEqual(fi.filename(), t2)
127 self.assertEqual(fi.lineno(), 21)
128 self.assertEqual(fi.filelineno(), 6)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000129 self.assertFalse(fi.isfirstline())
130 self.assertFalse(fi.isstdin())
Tim Peters3230d5c2001-07-11 22:21:17 +0000131
Guido van Rossumd8faa362007-04-27 19:54:29 +0000132 if verbose:
133 print('%s. Nextfile (bs=%s)' % (start+2, bs))
134 fi.nextfile()
135 self.assertEqual(fi.readline(), 'Line 1 of file 3\n')
136 self.assertEqual(fi.lineno(), 22)
137 fi.close()
Tim Peters3230d5c2001-07-11 22:21:17 +0000138
Guido van Rossumd8faa362007-04-27 19:54:29 +0000139 if verbose:
140 print('%s. Stdin (bs=%s)' % (start+3, bs))
141 fi = FileInput(files=(t1, t2, t3, t4, '-'), bufsize=bs)
142 savestdin = sys.stdin
143 try:
144 sys.stdin = StringIO("Line 1 of stdin\nLine 2 of stdin\n")
145 lines = list(fi)
146 self.assertEqual(len(lines), 33)
147 self.assertEqual(lines[32], 'Line 2 of stdin\n')
148 self.assertEqual(fi.filename(), '<stdin>')
149 fi.nextfile()
150 finally:
151 sys.stdin = savestdin
Tim Peters3230d5c2001-07-11 22:21:17 +0000152
Guido van Rossumd8faa362007-04-27 19:54:29 +0000153 if verbose:
154 print('%s. Boundary conditions (bs=%s)' % (start+4, bs))
155 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
156 self.assertEqual(fi.lineno(), 0)
157 self.assertEqual(fi.filename(), None)
158 fi.nextfile()
159 self.assertEqual(fi.lineno(), 0)
160 self.assertEqual(fi.filename(), None)
Tim Peters3230d5c2001-07-11 22:21:17 +0000161
Guido van Rossumd8faa362007-04-27 19:54:29 +0000162 if verbose:
163 print('%s. Inplace (bs=%s)' % (start+5, bs))
164 savestdout = sys.stdout
165 try:
166 fi = FileInput(files=(t1, t2, t3, t4), inplace=1, bufsize=bs)
167 for line in fi:
168 line = line[:-1].upper()
169 print(line)
170 fi.close()
171 finally:
172 sys.stdout = savestdout
Tim Peters3230d5c2001-07-11 22:21:17 +0000173
Guido van Rossumd8faa362007-04-27 19:54:29 +0000174 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
175 for line in fi:
176 self.assertEqual(line[-1], '\n')
177 m = pat.match(line[:-1])
178 self.assertNotEqual(m, None)
179 self.assertEqual(int(m.group(1)), fi.filelineno())
180 fi.close()
Georg Brandle4662172006-02-19 09:51:27 +0000181
briancurtin906f0c42011-03-15 10:29:41 -0400182class UnconditionallyRaise:
183 def __init__(self, exception_type):
184 self.exception_type = exception_type
185 self.invoked = False
186 def __call__(self, *args, **kwargs):
187 self.invoked = True
188 raise self.exception_type()
189
Guido van Rossumd8faa362007-04-27 19:54:29 +0000190class FileInputTests(unittest.TestCase):
briancurtin906f0c42011-03-15 10:29:41 -0400191
Guido van Rossumd8faa362007-04-27 19:54:29 +0000192 def test_zero_byte_files(self):
Neal Norwitz2595e762008-03-24 06:10:13 +0000193 t1 = t2 = t3 = t4 = None
Guido van Rossumd8faa362007-04-27 19:54:29 +0000194 try:
195 t1 = writeTmp(1, [""])
196 t2 = writeTmp(2, [""])
197 t3 = writeTmp(3, ["The only line there is.\n"])
198 t4 = writeTmp(4, [""])
199 fi = FileInput(files=(t1, t2, t3, t4))
Georg Brandl67e9fb92006-02-19 13:56:17 +0000200
Guido van Rossumd8faa362007-04-27 19:54:29 +0000201 line = fi.readline()
202 self.assertEqual(line, 'The only line there is.\n')
203 self.assertEqual(fi.lineno(), 1)
204 self.assertEqual(fi.filelineno(), 1)
205 self.assertEqual(fi.filename(), t3)
Georg Brandlc029f872006-02-19 14:12:34 +0000206
Guido van Rossumd8faa362007-04-27 19:54:29 +0000207 line = fi.readline()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000208 self.assertFalse(line)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000209 self.assertEqual(fi.lineno(), 1)
210 self.assertEqual(fi.filelineno(), 0)
211 self.assertEqual(fi.filename(), t4)
212 fi.close()
213 finally:
214 remove_tempfiles(t1, t2, t3, t4)
Georg Brandlc98eeed2006-02-19 14:57:47 +0000215
Guido van Rossumd8faa362007-04-27 19:54:29 +0000216 def test_files_that_dont_end_with_newline(self):
Neal Norwitz2595e762008-03-24 06:10:13 +0000217 t1 = t2 = None
Guido van Rossumd8faa362007-04-27 19:54:29 +0000218 try:
219 t1 = writeTmp(1, ["A\nB\nC"])
220 t2 = writeTmp(2, ["D\nE\nF"])
221 fi = FileInput(files=(t1, t2))
222 lines = list(fi)
223 self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"])
224 self.assertEqual(fi.filelineno(), 3)
225 self.assertEqual(fi.lineno(), 6)
226 finally:
227 remove_tempfiles(t1, t2)
228
Guido van Rossumc43e79f2007-06-18 18:26:36 +0000229## def test_unicode_filenames(self):
230## # XXX A unicode string is always returned by writeTmp.
231## # So is this needed?
232## try:
233## t1 = writeTmp(1, ["A\nB"])
234## encoding = sys.getfilesystemencoding()
235## if encoding is None:
236## encoding = 'ascii'
237## fi = FileInput(files=str(t1, encoding))
238## lines = list(fi)
239## self.assertEqual(lines, ["A\n", "B"])
240## finally:
241## remove_tempfiles(t1)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000242
243 def test_fileno(self):
Neal Norwitz2595e762008-03-24 06:10:13 +0000244 t1 = t2 = None
Guido van Rossumd8faa362007-04-27 19:54:29 +0000245 try:
246 t1 = writeTmp(1, ["A\nB"])
247 t2 = writeTmp(2, ["C\nD"])
248 fi = FileInput(files=(t1, t2))
249 self.assertEqual(fi.fileno(), -1)
250 line =next( fi)
251 self.assertNotEqual(fi.fileno(), -1)
252 fi.nextfile()
253 self.assertEqual(fi.fileno(), -1)
254 line = list(fi)
255 self.assertEqual(fi.fileno(), -1)
256 finally:
257 remove_tempfiles(t1, t2)
258
259 def test_opening_mode(self):
260 try:
261 # invalid mode, should raise ValueError
262 fi = FileInput(mode="w")
263 self.fail("FileInput should reject invalid mode argument")
264 except ValueError:
265 pass
Guido van Rossume22905a2007-08-27 23:09:25 +0000266 t1 = None
Guido van Rossumd8faa362007-04-27 19:54:29 +0000267 try:
268 # try opening in universal newline mode
Guido van Rossume22905a2007-08-27 23:09:25 +0000269 t1 = writeTmp(1, [b"A\nB\r\nC\rD"], mode="wb")
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +0200270 with check_warnings(('', DeprecationWarning)):
271 fi = FileInput(files=t1, mode="U")
272 with check_warnings(('', DeprecationWarning)):
273 lines = list(fi)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000274 self.assertEqual(lines, ["A\n", "B\n", "C\n", "D"])
275 finally:
276 remove_tempfiles(t1)
277
Serhiy Storchaka946cfc32014-05-14 21:08:33 +0300278 def test_stdin_binary_mode(self):
279 with mock.patch('sys.stdin') as m_stdin:
280 m_stdin.buffer = BytesIO(b'spam, bacon, sausage, and spam')
281 fi = FileInput(files=['-'], mode='rb')
282 lines = list(fi)
283 self.assertEqual(lines, [b'spam, bacon, sausage, and spam'])
284
R David Murray830207e2016-01-02 15:41:41 -0500285 def test_detached_stdin_binary_mode(self):
286 orig_stdin = sys.stdin
287 try:
288 sys.stdin = BytesIO(b'spam, bacon, sausage, and spam')
289 self.assertFalse(hasattr(sys.stdin, 'buffer'))
290 fi = FileInput(files=['-'], mode='rb')
291 lines = list(fi)
292 self.assertEqual(lines, [b'spam, bacon, sausage, and spam'])
293 finally:
294 sys.stdin = orig_stdin
295
Guido van Rossume22905a2007-08-27 23:09:25 +0000296 def test_file_opening_hook(self):
297 try:
298 # cannot use openhook and inplace mode
299 fi = FileInput(inplace=1, openhook=lambda f, m: None)
300 self.fail("FileInput should raise if both inplace "
301 "and openhook arguments are given")
302 except ValueError:
303 pass
304 try:
305 fi = FileInput(openhook=1)
306 self.fail("FileInput should check openhook for being callable")
307 except ValueError:
308 pass
briancurtin906f0c42011-03-15 10:29:41 -0400309
310 class CustomOpenHook:
311 def __init__(self):
312 self.invoked = False
313 def __call__(self, *args):
314 self.invoked = True
315 return open(*args)
316
317 t = writeTmp(1, ["\n"])
318 self.addCleanup(remove_tempfiles, t)
319 custom_open_hook = CustomOpenHook()
320 with FileInput([t], openhook=custom_open_hook) as fi:
321 fi.readline()
322 self.assertTrue(custom_open_hook.invoked, "openhook not invoked")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000323
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200324 def test_readline(self):
325 with open(TESTFN, 'wb') as f:
326 f.write(b'A\nB\r\nC\r')
327 # Fill TextIOWrapper buffer.
328 f.write(b'123456789\n' * 1000)
329 # Issue #20501: readline() shouldn't read whole file.
330 f.write(b'\x80')
331 self.addCleanup(safe_unlink, TESTFN)
332
333 with FileInput(files=TESTFN,
Serhiy Storchakacc2dbc52016-03-08 18:28:36 +0200334 openhook=hook_encoded('ascii')) as fi:
Serhiy Storchaka682ea5f2014-03-03 21:17:17 +0200335 try:
336 self.assertEqual(fi.readline(), 'A\n')
337 self.assertEqual(fi.readline(), 'B\n')
338 self.assertEqual(fi.readline(), 'C\n')
339 except UnicodeDecodeError:
340 self.fail('Read to end of file')
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200341 with self.assertRaises(UnicodeDecodeError):
342 # Read to the end of file.
343 list(fi)
Serhiy Storchaka314464d2015-11-01 16:43:58 +0200344 self.assertEqual(fi.readline(), '')
345 self.assertEqual(fi.readline(), '')
346
347 def test_readline_binary_mode(self):
348 with open(TESTFN, 'wb') as f:
349 f.write(b'A\nB\r\nC\rD')
350 self.addCleanup(safe_unlink, TESTFN)
351
352 with FileInput(files=TESTFN, mode='rb') as fi:
353 self.assertEqual(fi.readline(), b'A\n')
354 self.assertEqual(fi.readline(), b'B\r\n')
355 self.assertEqual(fi.readline(), b'C\rD')
356 # Read to the end of file.
357 self.assertEqual(fi.readline(), b'')
358 self.assertEqual(fi.readline(), b'')
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200359
Georg Brandl6cb7b652010-07-31 20:08:15 +0000360 def test_context_manager(self):
361 try:
362 t1 = writeTmp(1, ["A\nB\nC"])
363 t2 = writeTmp(2, ["D\nE\nF"])
364 with FileInput(files=(t1, t2)) as fi:
365 lines = list(fi)
366 self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"])
367 self.assertEqual(fi.filelineno(), 3)
368 self.assertEqual(fi.lineno(), 6)
369 self.assertEqual(fi._files, ())
370 finally:
371 remove_tempfiles(t1, t2)
372
373 def test_close_on_exception(self):
374 try:
375 t1 = writeTmp(1, [""])
376 with FileInput(files=t1) as fi:
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200377 raise OSError
378 except OSError:
Georg Brandl6cb7b652010-07-31 20:08:15 +0000379 self.assertEqual(fi._files, ())
380 finally:
381 remove_tempfiles(t1)
382
briancurtin906f0c42011-03-15 10:29:41 -0400383 def test_empty_files_list_specified_to_constructor(self):
384 with FileInput(files=[]) as fi:
Brett Cannond47af532011-03-15 15:55:12 -0400385 self.assertEqual(fi._files, ('-',))
briancurtin906f0c42011-03-15 10:29:41 -0400386
387 def test__getitem__(self):
388 """Tests invoking FileInput.__getitem__() with the current
389 line number"""
390 t = writeTmp(1, ["line1\n", "line2\n"])
391 self.addCleanup(remove_tempfiles, t)
392 with FileInput(files=[t]) as fi:
393 retval1 = fi[0]
394 self.assertEqual(retval1, "line1\n")
395 retval2 = fi[1]
396 self.assertEqual(retval2, "line2\n")
397
398 def test__getitem__invalid_key(self):
399 """Tests invoking FileInput.__getitem__() with an index unequal to
400 the line number"""
401 t = writeTmp(1, ["line1\n", "line2\n"])
402 self.addCleanup(remove_tempfiles, t)
403 with FileInput(files=[t]) as fi:
404 with self.assertRaises(RuntimeError) as cm:
405 fi[1]
Brett Cannond47af532011-03-15 15:55:12 -0400406 self.assertEqual(cm.exception.args, ("accessing lines out of order",))
briancurtin906f0c42011-03-15 10:29:41 -0400407
408 def test__getitem__eof(self):
409 """Tests invoking FileInput.__getitem__() with the line number but at
410 end-of-input"""
411 t = writeTmp(1, [])
412 self.addCleanup(remove_tempfiles, t)
413 with FileInput(files=[t]) as fi:
414 with self.assertRaises(IndexError) as cm:
415 fi[0]
Brett Cannond47af532011-03-15 15:55:12 -0400416 self.assertEqual(cm.exception.args, ("end of input reached",))
briancurtin906f0c42011-03-15 10:29:41 -0400417
418 def test_nextfile_oserror_deleting_backup(self):
419 """Tests invoking FileInput.nextfile() when the attempt to delete
420 the backup file would raise OSError. This error is expected to be
421 silently ignored"""
422
423 os_unlink_orig = os.unlink
424 os_unlink_replacement = UnconditionallyRaise(OSError)
425 try:
426 t = writeTmp(1, ["\n"])
427 self.addCleanup(remove_tempfiles, t)
428 with FileInput(files=[t], inplace=True) as fi:
429 next(fi) # make sure the file is opened
430 os.unlink = os_unlink_replacement
431 fi.nextfile()
432 finally:
433 os.unlink = os_unlink_orig
434
435 # sanity check to make sure that our test scenario was actually hit
436 self.assertTrue(os_unlink_replacement.invoked,
437 "os.unlink() was not invoked")
438
439 def test_readline_os_fstat_raises_OSError(self):
440 """Tests invoking FileInput.readline() when os.fstat() raises OSError.
441 This exception should be silently discarded."""
442
443 os_fstat_orig = os.fstat
444 os_fstat_replacement = UnconditionallyRaise(OSError)
445 try:
446 t = writeTmp(1, ["\n"])
447 self.addCleanup(remove_tempfiles, t)
448 with FileInput(files=[t], inplace=True) as fi:
449 os.fstat = os_fstat_replacement
450 fi.readline()
451 finally:
452 os.fstat = os_fstat_orig
453
454 # sanity check to make sure that our test scenario was actually hit
455 self.assertTrue(os_fstat_replacement.invoked,
456 "os.fstat() was not invoked")
457
458 @unittest.skipIf(not hasattr(os, "chmod"), "os.chmod does not exist")
459 def test_readline_os_chmod_raises_OSError(self):
460 """Tests invoking FileInput.readline() when os.chmod() raises OSError.
461 This exception should be silently discarded."""
462
463 os_chmod_orig = os.chmod
464 os_chmod_replacement = UnconditionallyRaise(OSError)
465 try:
466 t = writeTmp(1, ["\n"])
467 self.addCleanup(remove_tempfiles, t)
468 with FileInput(files=[t], inplace=True) as fi:
469 os.chmod = os_chmod_replacement
470 fi.readline()
471 finally:
472 os.chmod = os_chmod_orig
473
474 # sanity check to make sure that our test scenario was actually hit
475 self.assertTrue(os_chmod_replacement.invoked,
476 "os.fstat() was not invoked")
477
478 def test_fileno_when_ValueError_raised(self):
479 class FilenoRaisesValueError(UnconditionallyRaise):
480 def __init__(self):
481 UnconditionallyRaise.__init__(self, ValueError)
482 def fileno(self):
483 self.__call__()
484
485 unconditionally_raise_ValueError = FilenoRaisesValueError()
486 t = writeTmp(1, ["\n"])
487 self.addCleanup(remove_tempfiles, t)
488 with FileInput(files=[t]) as fi:
489 file_backup = fi._file
490 try:
491 fi._file = unconditionally_raise_ValueError
492 result = fi.fileno()
493 finally:
494 fi._file = file_backup # make sure the file gets cleaned up
495
496 # sanity check to make sure that our test scenario was actually hit
497 self.assertTrue(unconditionally_raise_ValueError.invoked,
498 "_file.fileno() was not invoked")
499
500 self.assertEqual(result, -1, "fileno() should return -1")
501
Serhiy Storchakacc2dbc52016-03-08 18:28:36 +0200502 def test_readline_buffering(self):
503 src = LineReader()
504 with FileInput(files=['line1\nline2', 'line3\n'],
505 openhook=src.openhook) as fi:
506 self.assertEqual(src.linesread, [])
507 self.assertEqual(fi.readline(), 'line1\n')
508 self.assertEqual(src.linesread, ['line1\n'])
509 self.assertEqual(fi.readline(), 'line2')
510 self.assertEqual(src.linesread, ['line2'])
511 self.assertEqual(fi.readline(), 'line3\n')
512 self.assertEqual(src.linesread, ['', 'line3\n'])
513 self.assertEqual(fi.readline(), '')
514 self.assertEqual(src.linesread, [''])
515 self.assertEqual(fi.readline(), '')
516 self.assertEqual(src.linesread, [])
517
518 def test_iteration_buffering(self):
519 src = LineReader()
520 with FileInput(files=['line1\nline2', 'line3\n'],
521 openhook=src.openhook) as fi:
522 self.assertEqual(src.linesread, [])
523 self.assertEqual(next(fi), 'line1\n')
524 self.assertEqual(src.linesread, ['line1\n'])
525 self.assertEqual(next(fi), 'line2')
526 self.assertEqual(src.linesread, ['line2'])
527 self.assertEqual(next(fi), 'line3\n')
528 self.assertEqual(src.linesread, ['', 'line3\n'])
529 self.assertRaises(StopIteration, next, fi)
530 self.assertEqual(src.linesread, [''])
531 self.assertRaises(StopIteration, next, fi)
532 self.assertEqual(src.linesread, [])
533
Roy Williams002665a2017-05-22 22:24:17 -0700534 def test_pathlib_file(self):
535 t1 = None
536 try:
537 t1 = Path(writeTmp(1, ["Pathlib file."]))
538 with FileInput(t1) as fi:
539 line = fi.readline()
540 self.assertEqual(line, 'Pathlib file.')
541 self.assertEqual(fi.lineno(), 1)
542 self.assertEqual(fi.filelineno(), 1)
543 self.assertEqual(fi.filename(), os.fspath(t1))
544 finally:
545 remove_tempfiles(t1)
546
Zhiming Wang06de1ae2017-09-05 01:37:24 +0800547 def test_pathlib_file_inplace(self):
548 t1 = None
549 try:
550 t1 = Path(writeTmp(1, ['Pathlib file.']))
551 with FileInput(t1, inplace=True) as fi:
552 line = fi.readline()
553 self.assertEqual(line, 'Pathlib file.')
554 print('Modified %s' % line)
555 with open(t1) as f:
556 self.assertEqual(f.read(), 'Modified Pathlib file.\n')
557 finally:
558 remove_tempfiles(t1)
559
Roy Williams002665a2017-05-22 22:24:17 -0700560
briancurtin906f0c42011-03-15 10:29:41 -0400561class MockFileInput:
562 """A class that mocks out fileinput.FileInput for use during unit tests"""
563
564 def __init__(self, files=None, inplace=False, backup="", bufsize=0,
565 mode="r", openhook=None):
566 self.files = files
567 self.inplace = inplace
568 self.backup = backup
569 self.bufsize = bufsize
570 self.mode = mode
571 self.openhook = openhook
572 self._file = None
573 self.invocation_counts = collections.defaultdict(lambda: 0)
574 self.return_values = {}
575
576 def close(self):
577 self.invocation_counts["close"] += 1
578
579 def nextfile(self):
580 self.invocation_counts["nextfile"] += 1
581 return self.return_values["nextfile"]
582
583 def filename(self):
584 self.invocation_counts["filename"] += 1
585 return self.return_values["filename"]
586
587 def lineno(self):
588 self.invocation_counts["lineno"] += 1
589 return self.return_values["lineno"]
590
591 def filelineno(self):
592 self.invocation_counts["filelineno"] += 1
593 return self.return_values["filelineno"]
594
595 def fileno(self):
596 self.invocation_counts["fileno"] += 1
597 return self.return_values["fileno"]
598
599 def isfirstline(self):
600 self.invocation_counts["isfirstline"] += 1
601 return self.return_values["isfirstline"]
602
603 def isstdin(self):
604 self.invocation_counts["isstdin"] += 1
605 return self.return_values["isstdin"]
606
607class BaseFileInputGlobalMethodsTest(unittest.TestCase):
608 """Base class for unit tests for the global function of
609 the fileinput module."""
610
611 def setUp(self):
612 self._orig_state = fileinput._state
613 self._orig_FileInput = fileinput.FileInput
614 fileinput.FileInput = MockFileInput
615
616 def tearDown(self):
617 fileinput.FileInput = self._orig_FileInput
618 fileinput._state = self._orig_state
619
620 def assertExactlyOneInvocation(self, mock_file_input, method_name):
621 # assert that the method with the given name was invoked once
622 actual_count = mock_file_input.invocation_counts[method_name]
623 self.assertEqual(actual_count, 1, method_name)
624 # assert that no other unexpected methods were invoked
625 actual_total_count = len(mock_file_input.invocation_counts)
626 self.assertEqual(actual_total_count, 1)
627
628class Test_fileinput_input(BaseFileInputGlobalMethodsTest):
629 """Unit tests for fileinput.input()"""
630
631 def test_state_is_not_None_and_state_file_is_not_None(self):
632 """Tests invoking fileinput.input() when fileinput._state is not None
633 and its _file attribute is also not None. Expect RuntimeError to
634 be raised with a meaningful error message and for fileinput._state
635 to *not* be modified."""
636 instance = MockFileInput()
637 instance._file = object()
638 fileinput._state = instance
639 with self.assertRaises(RuntimeError) as cm:
640 fileinput.input()
641 self.assertEqual(("input() already active",), cm.exception.args)
642 self.assertIs(instance, fileinput._state, "fileinput._state")
643
644 def test_state_is_not_None_and_state_file_is_None(self):
645 """Tests invoking fileinput.input() when fileinput._state is not None
646 but its _file attribute *is* None. Expect it to create and return
647 a new fileinput.FileInput object with all method parameters passed
648 explicitly to the __init__() method; also ensure that
649 fileinput._state is set to the returned instance."""
650 instance = MockFileInput()
651 instance._file = None
652 fileinput._state = instance
653 self.do_test_call_input()
654
655 def test_state_is_None(self):
656 """Tests invoking fileinput.input() when fileinput._state is None
657 Expect it to create and return a new fileinput.FileInput object
658 with all method parameters passed explicitly to the __init__()
659 method; also ensure that fileinput._state is set to the returned
660 instance."""
661 fileinput._state = None
662 self.do_test_call_input()
663
664 def do_test_call_input(self):
665 """Tests that fileinput.input() creates a new fileinput.FileInput
666 object, passing the given parameters unmodified to
667 fileinput.FileInput.__init__(). Note that this test depends on the
668 monkey patching of fileinput.FileInput done by setUp()."""
669 files = object()
670 inplace = object()
671 backup = object()
672 bufsize = object()
673 mode = object()
674 openhook = object()
675
676 # call fileinput.input() with different values for each argument
677 result = fileinput.input(files=files, inplace=inplace, backup=backup,
678 bufsize=bufsize,
679 mode=mode, openhook=openhook)
680
681 # ensure fileinput._state was set to the returned object
682 self.assertIs(result, fileinput._state, "fileinput._state")
683
684 # ensure the parameters to fileinput.input() were passed directly
685 # to FileInput.__init__()
686 self.assertIs(files, result.files, "files")
687 self.assertIs(inplace, result.inplace, "inplace")
688 self.assertIs(backup, result.backup, "backup")
689 self.assertIs(bufsize, result.bufsize, "bufsize")
690 self.assertIs(mode, result.mode, "mode")
691 self.assertIs(openhook, result.openhook, "openhook")
692
693class Test_fileinput_close(BaseFileInputGlobalMethodsTest):
694 """Unit tests for fileinput.close()"""
695
696 def test_state_is_None(self):
697 """Tests that fileinput.close() does nothing if fileinput._state
698 is None"""
699 fileinput._state = None
700 fileinput.close()
701 self.assertIsNone(fileinput._state)
702
703 def test_state_is_not_None(self):
704 """Tests that fileinput.close() invokes close() on fileinput._state
705 and sets _state=None"""
706 instance = MockFileInput()
707 fileinput._state = instance
708 fileinput.close()
709 self.assertExactlyOneInvocation(instance, "close")
710 self.assertIsNone(fileinput._state)
711
712class Test_fileinput_nextfile(BaseFileInputGlobalMethodsTest):
713 """Unit tests for fileinput.nextfile()"""
714
715 def test_state_is_None(self):
716 """Tests fileinput.nextfile() when fileinput._state is None.
717 Ensure that it raises RuntimeError with a meaningful error message
718 and does not modify fileinput._state"""
719 fileinput._state = None
720 with self.assertRaises(RuntimeError) as cm:
721 fileinput.nextfile()
722 self.assertEqual(("no active input()",), cm.exception.args)
723 self.assertIsNone(fileinput._state)
724
725 def test_state_is_not_None(self):
726 """Tests fileinput.nextfile() when fileinput._state is not None.
727 Ensure that it invokes fileinput._state.nextfile() exactly once,
728 returns whatever it returns, and does not modify fileinput._state
729 to point to a different object."""
730 nextfile_retval = object()
731 instance = MockFileInput()
732 instance.return_values["nextfile"] = nextfile_retval
733 fileinput._state = instance
734 retval = fileinput.nextfile()
735 self.assertExactlyOneInvocation(instance, "nextfile")
736 self.assertIs(retval, nextfile_retval)
737 self.assertIs(fileinput._state, instance)
738
739class Test_fileinput_filename(BaseFileInputGlobalMethodsTest):
740 """Unit tests for fileinput.filename()"""
741
742 def test_state_is_None(self):
743 """Tests fileinput.filename() when fileinput._state is None.
744 Ensure that it raises RuntimeError with a meaningful error message
745 and does not modify fileinput._state"""
746 fileinput._state = None
747 with self.assertRaises(RuntimeError) as cm:
748 fileinput.filename()
749 self.assertEqual(("no active input()",), cm.exception.args)
750 self.assertIsNone(fileinput._state)
751
752 def test_state_is_not_None(self):
753 """Tests fileinput.filename() when fileinput._state is not None.
754 Ensure that it invokes fileinput._state.filename() exactly once,
755 returns whatever it returns, and does not modify fileinput._state
756 to point to a different object."""
757 filename_retval = object()
758 instance = MockFileInput()
759 instance.return_values["filename"] = filename_retval
760 fileinput._state = instance
761 retval = fileinput.filename()
762 self.assertExactlyOneInvocation(instance, "filename")
763 self.assertIs(retval, filename_retval)
764 self.assertIs(fileinput._state, instance)
765
766class Test_fileinput_lineno(BaseFileInputGlobalMethodsTest):
767 """Unit tests for fileinput.lineno()"""
768
769 def test_state_is_None(self):
770 """Tests fileinput.lineno() when fileinput._state is None.
771 Ensure that it raises RuntimeError with a meaningful error message
772 and does not modify fileinput._state"""
773 fileinput._state = None
774 with self.assertRaises(RuntimeError) as cm:
775 fileinput.lineno()
776 self.assertEqual(("no active input()",), cm.exception.args)
777 self.assertIsNone(fileinput._state)
778
779 def test_state_is_not_None(self):
780 """Tests fileinput.lineno() when fileinput._state is not None.
781 Ensure that it invokes fileinput._state.lineno() exactly once,
782 returns whatever it returns, and does not modify fileinput._state
783 to point to a different object."""
784 lineno_retval = object()
785 instance = MockFileInput()
786 instance.return_values["lineno"] = lineno_retval
787 fileinput._state = instance
788 retval = fileinput.lineno()
789 self.assertExactlyOneInvocation(instance, "lineno")
790 self.assertIs(retval, lineno_retval)
791 self.assertIs(fileinput._state, instance)
792
793class Test_fileinput_filelineno(BaseFileInputGlobalMethodsTest):
794 """Unit tests for fileinput.filelineno()"""
795
796 def test_state_is_None(self):
797 """Tests fileinput.filelineno() when fileinput._state is None.
798 Ensure that it raises RuntimeError with a meaningful error message
799 and does not modify fileinput._state"""
800 fileinput._state = None
801 with self.assertRaises(RuntimeError) as cm:
802 fileinput.filelineno()
803 self.assertEqual(("no active input()",), cm.exception.args)
804 self.assertIsNone(fileinput._state)
805
806 def test_state_is_not_None(self):
807 """Tests fileinput.filelineno() when fileinput._state is not None.
808 Ensure that it invokes fileinput._state.filelineno() exactly once,
809 returns whatever it returns, and does not modify fileinput._state
810 to point to a different object."""
811 filelineno_retval = object()
812 instance = MockFileInput()
813 instance.return_values["filelineno"] = filelineno_retval
814 fileinput._state = instance
815 retval = fileinput.filelineno()
816 self.assertExactlyOneInvocation(instance, "filelineno")
817 self.assertIs(retval, filelineno_retval)
818 self.assertIs(fileinput._state, instance)
819
820class Test_fileinput_fileno(BaseFileInputGlobalMethodsTest):
821 """Unit tests for fileinput.fileno()"""
822
823 def test_state_is_None(self):
824 """Tests fileinput.fileno() when fileinput._state is None.
825 Ensure that it raises RuntimeError with a meaningful error message
826 and does not modify fileinput._state"""
827 fileinput._state = None
828 with self.assertRaises(RuntimeError) as cm:
829 fileinput.fileno()
830 self.assertEqual(("no active input()",), cm.exception.args)
831 self.assertIsNone(fileinput._state)
832
833 def test_state_is_not_None(self):
834 """Tests fileinput.fileno() when fileinput._state is not None.
835 Ensure that it invokes fileinput._state.fileno() exactly once,
836 returns whatever it returns, and does not modify fileinput._state
837 to point to a different object."""
838 fileno_retval = object()
839 instance = MockFileInput()
840 instance.return_values["fileno"] = fileno_retval
841 instance.fileno_retval = fileno_retval
842 fileinput._state = instance
843 retval = fileinput.fileno()
844 self.assertExactlyOneInvocation(instance, "fileno")
845 self.assertIs(retval, fileno_retval)
846 self.assertIs(fileinput._state, instance)
847
848class Test_fileinput_isfirstline(BaseFileInputGlobalMethodsTest):
849 """Unit tests for fileinput.isfirstline()"""
850
851 def test_state_is_None(self):
852 """Tests fileinput.isfirstline() when fileinput._state is None.
853 Ensure that it raises RuntimeError with a meaningful error message
854 and does not modify fileinput._state"""
855 fileinput._state = None
856 with self.assertRaises(RuntimeError) as cm:
857 fileinput.isfirstline()
858 self.assertEqual(("no active input()",), cm.exception.args)
859 self.assertIsNone(fileinput._state)
860
861 def test_state_is_not_None(self):
862 """Tests fileinput.isfirstline() when fileinput._state is not None.
863 Ensure that it invokes fileinput._state.isfirstline() exactly once,
864 returns whatever it returns, and does not modify fileinput._state
865 to point to a different object."""
866 isfirstline_retval = object()
867 instance = MockFileInput()
868 instance.return_values["isfirstline"] = isfirstline_retval
869 fileinput._state = instance
870 retval = fileinput.isfirstline()
871 self.assertExactlyOneInvocation(instance, "isfirstline")
872 self.assertIs(retval, isfirstline_retval)
873 self.assertIs(fileinput._state, instance)
874
875class Test_fileinput_isstdin(BaseFileInputGlobalMethodsTest):
876 """Unit tests for fileinput.isstdin()"""
877
878 def test_state_is_None(self):
879 """Tests fileinput.isstdin() when fileinput._state is None.
880 Ensure that it raises RuntimeError with a meaningful error message
881 and does not modify fileinput._state"""
882 fileinput._state = None
883 with self.assertRaises(RuntimeError) as cm:
884 fileinput.isstdin()
885 self.assertEqual(("no active input()",), cm.exception.args)
886 self.assertIsNone(fileinput._state)
887
888 def test_state_is_not_None(self):
889 """Tests fileinput.isstdin() when fileinput._state is not None.
890 Ensure that it invokes fileinput._state.isstdin() exactly once,
891 returns whatever it returns, and does not modify fileinput._state
892 to point to a different object."""
893 isstdin_retval = object()
894 instance = MockFileInput()
895 instance.return_values["isstdin"] = isstdin_retval
896 fileinput._state = instance
897 retval = fileinput.isstdin()
898 self.assertExactlyOneInvocation(instance, "isstdin")
899 self.assertIs(retval, isstdin_retval)
900 self.assertIs(fileinput._state, instance)
901
902class InvocationRecorder:
903 def __init__(self):
904 self.invocation_count = 0
905 def __call__(self, *args, **kwargs):
906 self.invocation_count += 1
907 self.last_invocation = (args, kwargs)
908
909class Test_hook_compressed(unittest.TestCase):
910 """Unit tests for fileinput.hook_compressed()"""
911
912 def setUp(self):
913 self.fake_open = InvocationRecorder()
914
915 def test_empty_string(self):
916 self.do_test_use_builtin_open("", 1)
917
918 def test_no_ext(self):
919 self.do_test_use_builtin_open("abcd", 2)
920
Ezio Melottic3afbb92011-05-14 10:10:53 +0300921 @unittest.skipUnless(gzip, "Requires gzip and zlib")
briancurtin5eb35912011-03-15 10:59:36 -0400922 def test_gz_ext_fake(self):
briancurtin906f0c42011-03-15 10:29:41 -0400923 original_open = gzip.open
924 gzip.open = self.fake_open
925 try:
926 result = fileinput.hook_compressed("test.gz", 3)
927 finally:
928 gzip.open = original_open
929
930 self.assertEqual(self.fake_open.invocation_count, 1)
931 self.assertEqual(self.fake_open.last_invocation, (("test.gz", 3), {}))
932
briancurtinf84f3c32011-03-18 13:03:17 -0500933 @unittest.skipUnless(bz2, "Requires bz2")
briancurtin5eb35912011-03-15 10:59:36 -0400934 def test_bz2_ext_fake(self):
briancurtin906f0c42011-03-15 10:29:41 -0400935 original_open = bz2.BZ2File
936 bz2.BZ2File = self.fake_open
937 try:
938 result = fileinput.hook_compressed("test.bz2", 4)
939 finally:
940 bz2.BZ2File = original_open
941
942 self.assertEqual(self.fake_open.invocation_count, 1)
943 self.assertEqual(self.fake_open.last_invocation, (("test.bz2", 4), {}))
944
945 def test_blah_ext(self):
946 self.do_test_use_builtin_open("abcd.blah", 5)
947
briancurtin5eb35912011-03-15 10:59:36 -0400948 def test_gz_ext_builtin(self):
briancurtin906f0c42011-03-15 10:29:41 -0400949 self.do_test_use_builtin_open("abcd.Gz", 6)
950
briancurtin5eb35912011-03-15 10:59:36 -0400951 def test_bz2_ext_builtin(self):
briancurtin906f0c42011-03-15 10:29:41 -0400952 self.do_test_use_builtin_open("abcd.Bz2", 7)
953
954 def do_test_use_builtin_open(self, filename, mode):
955 original_open = self.replace_builtin_open(self.fake_open)
956 try:
957 result = fileinput.hook_compressed(filename, mode)
958 finally:
959 self.replace_builtin_open(original_open)
960
961 self.assertEqual(self.fake_open.invocation_count, 1)
962 self.assertEqual(self.fake_open.last_invocation,
963 ((filename, mode), {}))
964
965 @staticmethod
966 def replace_builtin_open(new_open_func):
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100967 original_open = builtins.open
968 builtins.open = new_open_func
briancurtin906f0c42011-03-15 10:29:41 -0400969 return original_open
970
971class Test_hook_encoded(unittest.TestCase):
972 """Unit tests for fileinput.hook_encoded()"""
973
974 def test(self):
975 encoding = object()
Serhiy Storchakab2752102016-04-27 23:13:46 +0300976 errors = object()
977 result = fileinput.hook_encoded(encoding, errors=errors)
briancurtin906f0c42011-03-15 10:29:41 -0400978
979 fake_open = InvocationRecorder()
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100980 original_open = builtins.open
981 builtins.open = fake_open
briancurtin906f0c42011-03-15 10:29:41 -0400982 try:
983 filename = object()
984 mode = object()
985 open_result = result(filename, mode)
986 finally:
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100987 builtins.open = original_open
briancurtin906f0c42011-03-15 10:29:41 -0400988
989 self.assertEqual(fake_open.invocation_count, 1)
990
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100991 args, kwargs = fake_open.last_invocation
briancurtin906f0c42011-03-15 10:29:41 -0400992 self.assertIs(args[0], filename)
993 self.assertIs(args[1], mode)
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100994 self.assertIs(kwargs.pop('encoding'), encoding)
Serhiy Storchakab2752102016-04-27 23:13:46 +0300995 self.assertIs(kwargs.pop('errors'), errors)
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100996 self.assertFalse(kwargs)
Georg Brandl6cb7b652010-07-31 20:08:15 +0000997
Serhiy Storchakab2752102016-04-27 23:13:46 +0300998 def test_errors(self):
999 with open(TESTFN, 'wb') as f:
1000 f.write(b'\x80abc')
1001 self.addCleanup(safe_unlink, TESTFN)
1002
1003 def check(errors, expected_lines):
1004 with FileInput(files=TESTFN, mode='r',
1005 openhook=hook_encoded('utf-8', errors=errors)) as fi:
1006 lines = list(fi)
1007 self.assertEqual(lines, expected_lines)
1008
1009 check('ignore', ['abc'])
1010 with self.assertRaises(UnicodeDecodeError):
1011 check('strict', ['abc'])
1012 check('replace', ['\ufffdabc'])
1013 check('backslashreplace', ['\\x80abc'])
1014
Serhiy Storchaka517b7472014-02-26 20:59:43 +02001015 def test_modes(self):
Serhiy Storchaka517b7472014-02-26 20:59:43 +02001016 with open(TESTFN, 'wb') as f:
Serhiy Storchaka682ea5f2014-03-03 21:17:17 +02001017 # UTF-7 is a convenient, seldom used encoding
Serhiy Storchaka517b7472014-02-26 20:59:43 +02001018 f.write(b'A\nB\r\nC\rD+IKw-')
1019 self.addCleanup(safe_unlink, TESTFN)
1020
1021 def check(mode, expected_lines):
1022 with FileInput(files=TESTFN, mode=mode,
1023 openhook=hook_encoded('utf-7')) as fi:
1024 lines = list(fi)
1025 self.assertEqual(lines, expected_lines)
1026
1027 check('r', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
Serhiy Storchaka9fff8492014-02-26 21:03:19 +02001028 with self.assertWarns(DeprecationWarning):
1029 check('rU', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
1030 with self.assertWarns(DeprecationWarning):
1031 check('U', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
Serhiy Storchaka517b7472014-02-26 20:59:43 +02001032 with self.assertRaises(ValueError):
1033 check('rb', ['A\n', 'B\r\n', 'C\r', 'D\u20ac'])
1034
Guido van Rossumd8faa362007-04-27 19:54:29 +00001035
Martin Panter7978e102016-01-16 06:26:54 +00001036class MiscTest(unittest.TestCase):
1037
1038 def test_all(self):
Serhiy Storchaka674e2d02016-03-08 18:35:19 +02001039 support.check__all__(self, fileinput)
Martin Panter7978e102016-01-16 06:26:54 +00001040
1041
Guido van Rossumd8faa362007-04-27 19:54:29 +00001042if __name__ == "__main__":
Brett Cannon3e9a9ae2013-06-12 21:25:59 -04001043 unittest.main()