blob: 565633fcccd97c61a240387906a0c05abb6b1be2 [file] [log] [blame]
Tim Peters3230d5c2001-07-11 22:21:17 +00001'''
2Tests for fileinput module.
3Nick Mathewson
4'''
Benjamin Petersoneb462882011-03-15 09:50:18 -05005import os
6import sys
7import re
briancurtin906f0c42011-03-15 10:29:41 -04008import fileinput
9import collections
Florent Xiclunaa011e2b2011-11-07 19:43:07 +010010import builtins
Benjamin Petersoneb462882011-03-15 09:50:18 -050011import unittest
12
briancurtinf84f3c32011-03-18 13:03:17 -050013try:
14 import bz2
15except ImportError:
16 bz2 = None
Ezio Melottic3afbb92011-05-14 10:10:53 +030017try:
18 import gzip
19except ImportError:
20 gzip = None
briancurtinf84f3c32011-03-18 13:03:17 -050021
Serhiy Storchaka946cfc32014-05-14 21:08:33 +030022from io import BytesIO, StringIO
Benjamin Petersoneb462882011-03-15 09:50:18 -050023from fileinput import FileInput, hook_encoded
24
Serhiy Storchaka597d15a2016-04-24 13:45:58 +030025from test.support import verbose, TESTFN, check_warnings
Benjamin Petersoneb462882011-03-15 09:50:18 -050026from test.support import unlink as safe_unlink
Martin Panter7978e102016-01-16 06:26:54 +000027from test import support
Serhiy Storchaka946cfc32014-05-14 21:08:33 +030028from unittest import mock
Benjamin Petersoneb462882011-03-15 09:50:18 -050029
Tim Peters3230d5c2001-07-11 22:21:17 +000030
31# The fileinput module has 2 interfaces: the FileInput class which does
32# all the work, and a few functions (input, etc.) that use a global _state
briancurtin906f0c42011-03-15 10:29:41 -040033# variable.
Tim Peters3230d5c2001-07-11 22:21:17 +000034
35# Write lines (a list of lines) to temp file number i, and return the
36# temp file's name.
Tim Peters4d7cad12006-02-19 21:22:10 +000037def writeTmp(i, lines, mode='w'): # opening in text mode is the default
Tim Peters3230d5c2001-07-11 22:21:17 +000038 name = TESTFN + str(i)
Tim Peters4d7cad12006-02-19 21:22:10 +000039 f = open(name, mode)
Guido van Rossumc43e79f2007-06-18 18:26:36 +000040 for line in lines:
41 f.write(line)
Tim Peters3230d5c2001-07-11 22:21:17 +000042 f.close()
43 return name
44
Tim Peters3230d5c2001-07-11 22:21:17 +000045def remove_tempfiles(*names):
46 for name in names:
Guido van Rossume22905a2007-08-27 23:09:25 +000047 if name:
48 safe_unlink(name)
Tim Peters3230d5c2001-07-11 22:21:17 +000049
Serhiy Storchakacc2dbc52016-03-08 18:28:36 +020050class LineReader:
51
52 def __init__(self):
53 self._linesread = []
54
55 @property
56 def linesread(self):
57 try:
58 return self._linesread[:]
59 finally:
60 self._linesread = []
61
62 def openhook(self, filename, mode):
63 self.it = iter(filename.splitlines(True))
64 return self
65
66 def readline(self, size=None):
67 line = next(self.it, '')
68 self._linesread.append(line)
69 return line
70
71 def readlines(self, hint=-1):
72 lines = []
73 size = 0
74 while True:
75 line = self.readline()
76 if not line:
77 return lines
78 lines.append(line)
79 size += len(line)
80 if size >= hint:
81 return lines
82
83 def close(self):
84 pass
85
Guido van Rossumd8faa362007-04-27 19:54:29 +000086class BufferSizesTests(unittest.TestCase):
87 def test_buffer_sizes(self):
88 # First, run the tests with default and teeny buffer size.
89 for round, bs in (0, 0), (1, 30):
Neal Norwitz2595e762008-03-24 06:10:13 +000090 t1 = t2 = t3 = t4 = None
Guido van Rossumd8faa362007-04-27 19:54:29 +000091 try:
92 t1 = writeTmp(1, ["Line %s of file 1\n" % (i+1) for i in range(15)])
93 t2 = writeTmp(2, ["Line %s of file 2\n" % (i+1) for i in range(10)])
94 t3 = writeTmp(3, ["Line %s of file 3\n" % (i+1) for i in range(5)])
95 t4 = writeTmp(4, ["Line %s of file 4\n" % (i+1) for i in range(1)])
Serhiy Storchaka674e2d02016-03-08 18:35:19 +020096 if bs:
97 with self.assertWarns(DeprecationWarning):
98 self.buffer_size_test(t1, t2, t3, t4, bs, round)
99 else:
100 self.buffer_size_test(t1, t2, t3, t4, bs, round)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000101 finally:
102 remove_tempfiles(t1, t2, t3, t4)
Tim Peters3230d5c2001-07-11 22:21:17 +0000103
Guido van Rossumd8faa362007-04-27 19:54:29 +0000104 def buffer_size_test(self, t1, t2, t3, t4, bs=0, round=0):
105 pat = re.compile(r'LINE (\d+) OF FILE (\d+)')
Tim Peters3230d5c2001-07-11 22:21:17 +0000106
Guido van Rossumd8faa362007-04-27 19:54:29 +0000107 start = 1 + round*6
108 if verbose:
109 print('%s. Simple iteration (bs=%s)' % (start+0, bs))
110 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
Tim Peters3230d5c2001-07-11 22:21:17 +0000111 lines = list(fi)
Tim Peters3230d5c2001-07-11 22:21:17 +0000112 fi.close()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000113 self.assertEqual(len(lines), 31)
114 self.assertEqual(lines[4], 'Line 5 of file 1\n')
115 self.assertEqual(lines[30], 'Line 1 of file 4\n')
116 self.assertEqual(fi.lineno(), 31)
117 self.assertEqual(fi.filename(), t4)
Tim Peters3230d5c2001-07-11 22:21:17 +0000118
Guido van Rossumd8faa362007-04-27 19:54:29 +0000119 if verbose:
120 print('%s. Status variables (bs=%s)' % (start+1, bs))
121 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
122 s = "x"
123 while s and s != 'Line 6 of file 2\n':
124 s = fi.readline()
125 self.assertEqual(fi.filename(), t2)
126 self.assertEqual(fi.lineno(), 21)
127 self.assertEqual(fi.filelineno(), 6)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000128 self.assertFalse(fi.isfirstline())
129 self.assertFalse(fi.isstdin())
Tim Peters3230d5c2001-07-11 22:21:17 +0000130
Guido van Rossumd8faa362007-04-27 19:54:29 +0000131 if verbose:
132 print('%s. Nextfile (bs=%s)' % (start+2, bs))
133 fi.nextfile()
134 self.assertEqual(fi.readline(), 'Line 1 of file 3\n')
135 self.assertEqual(fi.lineno(), 22)
136 fi.close()
Tim Peters3230d5c2001-07-11 22:21:17 +0000137
Guido van Rossumd8faa362007-04-27 19:54:29 +0000138 if verbose:
139 print('%s. Stdin (bs=%s)' % (start+3, bs))
140 fi = FileInput(files=(t1, t2, t3, t4, '-'), bufsize=bs)
141 savestdin = sys.stdin
142 try:
143 sys.stdin = StringIO("Line 1 of stdin\nLine 2 of stdin\n")
144 lines = list(fi)
145 self.assertEqual(len(lines), 33)
146 self.assertEqual(lines[32], 'Line 2 of stdin\n')
147 self.assertEqual(fi.filename(), '<stdin>')
148 fi.nextfile()
149 finally:
150 sys.stdin = savestdin
Tim Peters3230d5c2001-07-11 22:21:17 +0000151
Guido van Rossumd8faa362007-04-27 19:54:29 +0000152 if verbose:
153 print('%s. Boundary conditions (bs=%s)' % (start+4, bs))
154 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
155 self.assertEqual(fi.lineno(), 0)
156 self.assertEqual(fi.filename(), None)
157 fi.nextfile()
158 self.assertEqual(fi.lineno(), 0)
159 self.assertEqual(fi.filename(), None)
Tim Peters3230d5c2001-07-11 22:21:17 +0000160
Guido van Rossumd8faa362007-04-27 19:54:29 +0000161 if verbose:
162 print('%s. Inplace (bs=%s)' % (start+5, bs))
163 savestdout = sys.stdout
164 try:
165 fi = FileInput(files=(t1, t2, t3, t4), inplace=1, bufsize=bs)
166 for line in fi:
167 line = line[:-1].upper()
168 print(line)
169 fi.close()
170 finally:
171 sys.stdout = savestdout
Tim Peters3230d5c2001-07-11 22:21:17 +0000172
Guido van Rossumd8faa362007-04-27 19:54:29 +0000173 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
174 for line in fi:
175 self.assertEqual(line[-1], '\n')
176 m = pat.match(line[:-1])
177 self.assertNotEqual(m, None)
178 self.assertEqual(int(m.group(1)), fi.filelineno())
179 fi.close()
Georg Brandle4662172006-02-19 09:51:27 +0000180
briancurtin906f0c42011-03-15 10:29:41 -0400181class UnconditionallyRaise:
182 def __init__(self, exception_type):
183 self.exception_type = exception_type
184 self.invoked = False
185 def __call__(self, *args, **kwargs):
186 self.invoked = True
187 raise self.exception_type()
188
Guido van Rossumd8faa362007-04-27 19:54:29 +0000189class FileInputTests(unittest.TestCase):
briancurtin906f0c42011-03-15 10:29:41 -0400190
Guido van Rossumd8faa362007-04-27 19:54:29 +0000191 def test_zero_byte_files(self):
Neal Norwitz2595e762008-03-24 06:10:13 +0000192 t1 = t2 = t3 = t4 = None
Guido van Rossumd8faa362007-04-27 19:54:29 +0000193 try:
194 t1 = writeTmp(1, [""])
195 t2 = writeTmp(2, [""])
196 t3 = writeTmp(3, ["The only line there is.\n"])
197 t4 = writeTmp(4, [""])
198 fi = FileInput(files=(t1, t2, t3, t4))
Georg Brandl67e9fb92006-02-19 13:56:17 +0000199
Guido van Rossumd8faa362007-04-27 19:54:29 +0000200 line = fi.readline()
201 self.assertEqual(line, 'The only line there is.\n')
202 self.assertEqual(fi.lineno(), 1)
203 self.assertEqual(fi.filelineno(), 1)
204 self.assertEqual(fi.filename(), t3)
Georg Brandlc029f872006-02-19 14:12:34 +0000205
Guido van Rossumd8faa362007-04-27 19:54:29 +0000206 line = fi.readline()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000207 self.assertFalse(line)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000208 self.assertEqual(fi.lineno(), 1)
209 self.assertEqual(fi.filelineno(), 0)
210 self.assertEqual(fi.filename(), t4)
211 fi.close()
212 finally:
213 remove_tempfiles(t1, t2, t3, t4)
Georg Brandlc98eeed2006-02-19 14:57:47 +0000214
Guido van Rossumd8faa362007-04-27 19:54:29 +0000215 def test_files_that_dont_end_with_newline(self):
Neal Norwitz2595e762008-03-24 06:10:13 +0000216 t1 = t2 = None
Guido van Rossumd8faa362007-04-27 19:54:29 +0000217 try:
218 t1 = writeTmp(1, ["A\nB\nC"])
219 t2 = writeTmp(2, ["D\nE\nF"])
220 fi = FileInput(files=(t1, t2))
221 lines = list(fi)
222 self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"])
223 self.assertEqual(fi.filelineno(), 3)
224 self.assertEqual(fi.lineno(), 6)
225 finally:
226 remove_tempfiles(t1, t2)
227
Guido van Rossumc43e79f2007-06-18 18:26:36 +0000228## def test_unicode_filenames(self):
229## # XXX A unicode string is always returned by writeTmp.
230## # So is this needed?
231## try:
232## t1 = writeTmp(1, ["A\nB"])
233## encoding = sys.getfilesystemencoding()
234## if encoding is None:
235## encoding = 'ascii'
236## fi = FileInput(files=str(t1, encoding))
237## lines = list(fi)
238## self.assertEqual(lines, ["A\n", "B"])
239## finally:
240## remove_tempfiles(t1)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000241
242 def test_fileno(self):
Neal Norwitz2595e762008-03-24 06:10:13 +0000243 t1 = t2 = None
Guido van Rossumd8faa362007-04-27 19:54:29 +0000244 try:
245 t1 = writeTmp(1, ["A\nB"])
246 t2 = writeTmp(2, ["C\nD"])
247 fi = FileInput(files=(t1, t2))
248 self.assertEqual(fi.fileno(), -1)
249 line =next( fi)
250 self.assertNotEqual(fi.fileno(), -1)
251 fi.nextfile()
252 self.assertEqual(fi.fileno(), -1)
253 line = list(fi)
254 self.assertEqual(fi.fileno(), -1)
255 finally:
256 remove_tempfiles(t1, t2)
257
258 def test_opening_mode(self):
259 try:
260 # invalid mode, should raise ValueError
261 fi = FileInput(mode="w")
262 self.fail("FileInput should reject invalid mode argument")
263 except ValueError:
264 pass
Guido van Rossume22905a2007-08-27 23:09:25 +0000265 t1 = None
Guido van Rossumd8faa362007-04-27 19:54:29 +0000266 try:
267 # try opening in universal newline mode
Guido van Rossume22905a2007-08-27 23:09:25 +0000268 t1 = writeTmp(1, [b"A\nB\r\nC\rD"], mode="wb")
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +0200269 with check_warnings(('', DeprecationWarning)):
270 fi = FileInput(files=t1, mode="U")
271 with check_warnings(('', DeprecationWarning)):
272 lines = list(fi)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000273 self.assertEqual(lines, ["A\n", "B\n", "C\n", "D"])
274 finally:
275 remove_tempfiles(t1)
276
Serhiy Storchaka946cfc32014-05-14 21:08:33 +0300277 def test_stdin_binary_mode(self):
278 with mock.patch('sys.stdin') as m_stdin:
279 m_stdin.buffer = BytesIO(b'spam, bacon, sausage, and spam')
280 fi = FileInput(files=['-'], mode='rb')
281 lines = list(fi)
282 self.assertEqual(lines, [b'spam, bacon, sausage, and spam'])
283
R David Murray830207e2016-01-02 15:41:41 -0500284 def test_detached_stdin_binary_mode(self):
285 orig_stdin = sys.stdin
286 try:
287 sys.stdin = BytesIO(b'spam, bacon, sausage, and spam')
288 self.assertFalse(hasattr(sys.stdin, 'buffer'))
289 fi = FileInput(files=['-'], mode='rb')
290 lines = list(fi)
291 self.assertEqual(lines, [b'spam, bacon, sausage, and spam'])
292 finally:
293 sys.stdin = orig_stdin
294
Guido van Rossume22905a2007-08-27 23:09:25 +0000295 def test_file_opening_hook(self):
296 try:
297 # cannot use openhook and inplace mode
298 fi = FileInput(inplace=1, openhook=lambda f, m: None)
299 self.fail("FileInput should raise if both inplace "
300 "and openhook arguments are given")
301 except ValueError:
302 pass
303 try:
304 fi = FileInput(openhook=1)
305 self.fail("FileInput should check openhook for being callable")
306 except ValueError:
307 pass
briancurtin906f0c42011-03-15 10:29:41 -0400308
309 class CustomOpenHook:
310 def __init__(self):
311 self.invoked = False
312 def __call__(self, *args):
313 self.invoked = True
314 return open(*args)
315
316 t = writeTmp(1, ["\n"])
317 self.addCleanup(remove_tempfiles, t)
318 custom_open_hook = CustomOpenHook()
319 with FileInput([t], openhook=custom_open_hook) as fi:
320 fi.readline()
321 self.assertTrue(custom_open_hook.invoked, "openhook not invoked")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000322
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200323 def test_readline(self):
324 with open(TESTFN, 'wb') as f:
325 f.write(b'A\nB\r\nC\r')
326 # Fill TextIOWrapper buffer.
327 f.write(b'123456789\n' * 1000)
328 # Issue #20501: readline() shouldn't read whole file.
329 f.write(b'\x80')
330 self.addCleanup(safe_unlink, TESTFN)
331
332 with FileInput(files=TESTFN,
Serhiy Storchakacc2dbc52016-03-08 18:28:36 +0200333 openhook=hook_encoded('ascii')) as fi:
Serhiy Storchaka682ea5f2014-03-03 21:17:17 +0200334 try:
335 self.assertEqual(fi.readline(), 'A\n')
336 self.assertEqual(fi.readline(), 'B\n')
337 self.assertEqual(fi.readline(), 'C\n')
338 except UnicodeDecodeError:
339 self.fail('Read to end of file')
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200340 with self.assertRaises(UnicodeDecodeError):
341 # Read to the end of file.
342 list(fi)
Serhiy Storchaka314464d2015-11-01 16:43:58 +0200343 self.assertEqual(fi.readline(), '')
344 self.assertEqual(fi.readline(), '')
345
346 def test_readline_binary_mode(self):
347 with open(TESTFN, 'wb') as f:
348 f.write(b'A\nB\r\nC\rD')
349 self.addCleanup(safe_unlink, TESTFN)
350
351 with FileInput(files=TESTFN, mode='rb') as fi:
352 self.assertEqual(fi.readline(), b'A\n')
353 self.assertEqual(fi.readline(), b'B\r\n')
354 self.assertEqual(fi.readline(), b'C\rD')
355 # Read to the end of file.
356 self.assertEqual(fi.readline(), b'')
357 self.assertEqual(fi.readline(), b'')
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200358
Georg Brandl6cb7b652010-07-31 20:08:15 +0000359 def test_context_manager(self):
360 try:
361 t1 = writeTmp(1, ["A\nB\nC"])
362 t2 = writeTmp(2, ["D\nE\nF"])
363 with FileInput(files=(t1, t2)) as fi:
364 lines = list(fi)
365 self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"])
366 self.assertEqual(fi.filelineno(), 3)
367 self.assertEqual(fi.lineno(), 6)
368 self.assertEqual(fi._files, ())
369 finally:
370 remove_tempfiles(t1, t2)
371
372 def test_close_on_exception(self):
373 try:
374 t1 = writeTmp(1, [""])
375 with FileInput(files=t1) as fi:
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200376 raise OSError
377 except OSError:
Georg Brandl6cb7b652010-07-31 20:08:15 +0000378 self.assertEqual(fi._files, ())
379 finally:
380 remove_tempfiles(t1)
381
briancurtin906f0c42011-03-15 10:29:41 -0400382 def test_empty_files_list_specified_to_constructor(self):
383 with FileInput(files=[]) as fi:
Brett Cannond47af532011-03-15 15:55:12 -0400384 self.assertEqual(fi._files, ('-',))
briancurtin906f0c42011-03-15 10:29:41 -0400385
386 def test__getitem__(self):
387 """Tests invoking FileInput.__getitem__() with the current
388 line number"""
389 t = writeTmp(1, ["line1\n", "line2\n"])
390 self.addCleanup(remove_tempfiles, t)
391 with FileInput(files=[t]) as fi:
392 retval1 = fi[0]
393 self.assertEqual(retval1, "line1\n")
394 retval2 = fi[1]
395 self.assertEqual(retval2, "line2\n")
396
397 def test__getitem__invalid_key(self):
398 """Tests invoking FileInput.__getitem__() with an index unequal to
399 the line number"""
400 t = writeTmp(1, ["line1\n", "line2\n"])
401 self.addCleanup(remove_tempfiles, t)
402 with FileInput(files=[t]) as fi:
403 with self.assertRaises(RuntimeError) as cm:
404 fi[1]
Brett Cannond47af532011-03-15 15:55:12 -0400405 self.assertEqual(cm.exception.args, ("accessing lines out of order",))
briancurtin906f0c42011-03-15 10:29:41 -0400406
407 def test__getitem__eof(self):
408 """Tests invoking FileInput.__getitem__() with the line number but at
409 end-of-input"""
410 t = writeTmp(1, [])
411 self.addCleanup(remove_tempfiles, t)
412 with FileInput(files=[t]) as fi:
413 with self.assertRaises(IndexError) as cm:
414 fi[0]
Brett Cannond47af532011-03-15 15:55:12 -0400415 self.assertEqual(cm.exception.args, ("end of input reached",))
briancurtin906f0c42011-03-15 10:29:41 -0400416
417 def test_nextfile_oserror_deleting_backup(self):
418 """Tests invoking FileInput.nextfile() when the attempt to delete
419 the backup file would raise OSError. This error is expected to be
420 silently ignored"""
421
422 os_unlink_orig = os.unlink
423 os_unlink_replacement = UnconditionallyRaise(OSError)
424 try:
425 t = writeTmp(1, ["\n"])
426 self.addCleanup(remove_tempfiles, t)
427 with FileInput(files=[t], inplace=True) as fi:
428 next(fi) # make sure the file is opened
429 os.unlink = os_unlink_replacement
430 fi.nextfile()
431 finally:
432 os.unlink = os_unlink_orig
433
434 # sanity check to make sure that our test scenario was actually hit
435 self.assertTrue(os_unlink_replacement.invoked,
436 "os.unlink() was not invoked")
437
438 def test_readline_os_fstat_raises_OSError(self):
439 """Tests invoking FileInput.readline() when os.fstat() raises OSError.
440 This exception should be silently discarded."""
441
442 os_fstat_orig = os.fstat
443 os_fstat_replacement = UnconditionallyRaise(OSError)
444 try:
445 t = writeTmp(1, ["\n"])
446 self.addCleanup(remove_tempfiles, t)
447 with FileInput(files=[t], inplace=True) as fi:
448 os.fstat = os_fstat_replacement
449 fi.readline()
450 finally:
451 os.fstat = os_fstat_orig
452
453 # sanity check to make sure that our test scenario was actually hit
454 self.assertTrue(os_fstat_replacement.invoked,
455 "os.fstat() was not invoked")
456
457 @unittest.skipIf(not hasattr(os, "chmod"), "os.chmod does not exist")
458 def test_readline_os_chmod_raises_OSError(self):
459 """Tests invoking FileInput.readline() when os.chmod() raises OSError.
460 This exception should be silently discarded."""
461
462 os_chmod_orig = os.chmod
463 os_chmod_replacement = UnconditionallyRaise(OSError)
464 try:
465 t = writeTmp(1, ["\n"])
466 self.addCleanup(remove_tempfiles, t)
467 with FileInput(files=[t], inplace=True) as fi:
468 os.chmod = os_chmod_replacement
469 fi.readline()
470 finally:
471 os.chmod = os_chmod_orig
472
473 # sanity check to make sure that our test scenario was actually hit
474 self.assertTrue(os_chmod_replacement.invoked,
475 "os.fstat() was not invoked")
476
477 def test_fileno_when_ValueError_raised(self):
478 class FilenoRaisesValueError(UnconditionallyRaise):
479 def __init__(self):
480 UnconditionallyRaise.__init__(self, ValueError)
481 def fileno(self):
482 self.__call__()
483
484 unconditionally_raise_ValueError = FilenoRaisesValueError()
485 t = writeTmp(1, ["\n"])
486 self.addCleanup(remove_tempfiles, t)
487 with FileInput(files=[t]) as fi:
488 file_backup = fi._file
489 try:
490 fi._file = unconditionally_raise_ValueError
491 result = fi.fileno()
492 finally:
493 fi._file = file_backup # make sure the file gets cleaned up
494
495 # sanity check to make sure that our test scenario was actually hit
496 self.assertTrue(unconditionally_raise_ValueError.invoked,
497 "_file.fileno() was not invoked")
498
499 self.assertEqual(result, -1, "fileno() should return -1")
500
Serhiy Storchakacc2dbc52016-03-08 18:28:36 +0200501 def test_readline_buffering(self):
502 src = LineReader()
503 with FileInput(files=['line1\nline2', 'line3\n'],
504 openhook=src.openhook) as fi:
505 self.assertEqual(src.linesread, [])
506 self.assertEqual(fi.readline(), 'line1\n')
507 self.assertEqual(src.linesread, ['line1\n'])
508 self.assertEqual(fi.readline(), 'line2')
509 self.assertEqual(src.linesread, ['line2'])
510 self.assertEqual(fi.readline(), 'line3\n')
511 self.assertEqual(src.linesread, ['', 'line3\n'])
512 self.assertEqual(fi.readline(), '')
513 self.assertEqual(src.linesread, [''])
514 self.assertEqual(fi.readline(), '')
515 self.assertEqual(src.linesread, [])
516
517 def test_iteration_buffering(self):
518 src = LineReader()
519 with FileInput(files=['line1\nline2', 'line3\n'],
520 openhook=src.openhook) as fi:
521 self.assertEqual(src.linesread, [])
522 self.assertEqual(next(fi), 'line1\n')
523 self.assertEqual(src.linesread, ['line1\n'])
524 self.assertEqual(next(fi), 'line2')
525 self.assertEqual(src.linesread, ['line2'])
526 self.assertEqual(next(fi), 'line3\n')
527 self.assertEqual(src.linesread, ['', 'line3\n'])
528 self.assertRaises(StopIteration, next, fi)
529 self.assertEqual(src.linesread, [''])
530 self.assertRaises(StopIteration, next, fi)
531 self.assertEqual(src.linesread, [])
532
briancurtin906f0c42011-03-15 10:29:41 -0400533class MockFileInput:
534 """A class that mocks out fileinput.FileInput for use during unit tests"""
535
536 def __init__(self, files=None, inplace=False, backup="", bufsize=0,
537 mode="r", openhook=None):
538 self.files = files
539 self.inplace = inplace
540 self.backup = backup
541 self.bufsize = bufsize
542 self.mode = mode
543 self.openhook = openhook
544 self._file = None
545 self.invocation_counts = collections.defaultdict(lambda: 0)
546 self.return_values = {}
547
548 def close(self):
549 self.invocation_counts["close"] += 1
550
551 def nextfile(self):
552 self.invocation_counts["nextfile"] += 1
553 return self.return_values["nextfile"]
554
555 def filename(self):
556 self.invocation_counts["filename"] += 1
557 return self.return_values["filename"]
558
559 def lineno(self):
560 self.invocation_counts["lineno"] += 1
561 return self.return_values["lineno"]
562
563 def filelineno(self):
564 self.invocation_counts["filelineno"] += 1
565 return self.return_values["filelineno"]
566
567 def fileno(self):
568 self.invocation_counts["fileno"] += 1
569 return self.return_values["fileno"]
570
571 def isfirstline(self):
572 self.invocation_counts["isfirstline"] += 1
573 return self.return_values["isfirstline"]
574
575 def isstdin(self):
576 self.invocation_counts["isstdin"] += 1
577 return self.return_values["isstdin"]
578
579class BaseFileInputGlobalMethodsTest(unittest.TestCase):
580 """Base class for unit tests for the global function of
581 the fileinput module."""
582
583 def setUp(self):
584 self._orig_state = fileinput._state
585 self._orig_FileInput = fileinput.FileInput
586 fileinput.FileInput = MockFileInput
587
588 def tearDown(self):
589 fileinput.FileInput = self._orig_FileInput
590 fileinput._state = self._orig_state
591
592 def assertExactlyOneInvocation(self, mock_file_input, method_name):
593 # assert that the method with the given name was invoked once
594 actual_count = mock_file_input.invocation_counts[method_name]
595 self.assertEqual(actual_count, 1, method_name)
596 # assert that no other unexpected methods were invoked
597 actual_total_count = len(mock_file_input.invocation_counts)
598 self.assertEqual(actual_total_count, 1)
599
600class Test_fileinput_input(BaseFileInputGlobalMethodsTest):
601 """Unit tests for fileinput.input()"""
602
603 def test_state_is_not_None_and_state_file_is_not_None(self):
604 """Tests invoking fileinput.input() when fileinput._state is not None
605 and its _file attribute is also not None. Expect RuntimeError to
606 be raised with a meaningful error message and for fileinput._state
607 to *not* be modified."""
608 instance = MockFileInput()
609 instance._file = object()
610 fileinput._state = instance
611 with self.assertRaises(RuntimeError) as cm:
612 fileinput.input()
613 self.assertEqual(("input() already active",), cm.exception.args)
614 self.assertIs(instance, fileinput._state, "fileinput._state")
615
616 def test_state_is_not_None_and_state_file_is_None(self):
617 """Tests invoking fileinput.input() when fileinput._state is not None
618 but its _file attribute *is* None. Expect it to create and return
619 a new fileinput.FileInput object with all method parameters passed
620 explicitly to the __init__() method; also ensure that
621 fileinput._state is set to the returned instance."""
622 instance = MockFileInput()
623 instance._file = None
624 fileinput._state = instance
625 self.do_test_call_input()
626
627 def test_state_is_None(self):
628 """Tests invoking fileinput.input() when fileinput._state is None
629 Expect it to create and return a new fileinput.FileInput object
630 with all method parameters passed explicitly to the __init__()
631 method; also ensure that fileinput._state is set to the returned
632 instance."""
633 fileinput._state = None
634 self.do_test_call_input()
635
636 def do_test_call_input(self):
637 """Tests that fileinput.input() creates a new fileinput.FileInput
638 object, passing the given parameters unmodified to
639 fileinput.FileInput.__init__(). Note that this test depends on the
640 monkey patching of fileinput.FileInput done by setUp()."""
641 files = object()
642 inplace = object()
643 backup = object()
644 bufsize = object()
645 mode = object()
646 openhook = object()
647
648 # call fileinput.input() with different values for each argument
649 result = fileinput.input(files=files, inplace=inplace, backup=backup,
650 bufsize=bufsize,
651 mode=mode, openhook=openhook)
652
653 # ensure fileinput._state was set to the returned object
654 self.assertIs(result, fileinput._state, "fileinput._state")
655
656 # ensure the parameters to fileinput.input() were passed directly
657 # to FileInput.__init__()
658 self.assertIs(files, result.files, "files")
659 self.assertIs(inplace, result.inplace, "inplace")
660 self.assertIs(backup, result.backup, "backup")
661 self.assertIs(bufsize, result.bufsize, "bufsize")
662 self.assertIs(mode, result.mode, "mode")
663 self.assertIs(openhook, result.openhook, "openhook")
664
665class Test_fileinput_close(BaseFileInputGlobalMethodsTest):
666 """Unit tests for fileinput.close()"""
667
668 def test_state_is_None(self):
669 """Tests that fileinput.close() does nothing if fileinput._state
670 is None"""
671 fileinput._state = None
672 fileinput.close()
673 self.assertIsNone(fileinput._state)
674
675 def test_state_is_not_None(self):
676 """Tests that fileinput.close() invokes close() on fileinput._state
677 and sets _state=None"""
678 instance = MockFileInput()
679 fileinput._state = instance
680 fileinput.close()
681 self.assertExactlyOneInvocation(instance, "close")
682 self.assertIsNone(fileinput._state)
683
684class Test_fileinput_nextfile(BaseFileInputGlobalMethodsTest):
685 """Unit tests for fileinput.nextfile()"""
686
687 def test_state_is_None(self):
688 """Tests fileinput.nextfile() when fileinput._state is None.
689 Ensure that it raises RuntimeError with a meaningful error message
690 and does not modify fileinput._state"""
691 fileinput._state = None
692 with self.assertRaises(RuntimeError) as cm:
693 fileinput.nextfile()
694 self.assertEqual(("no active input()",), cm.exception.args)
695 self.assertIsNone(fileinput._state)
696
697 def test_state_is_not_None(self):
698 """Tests fileinput.nextfile() when fileinput._state is not None.
699 Ensure that it invokes fileinput._state.nextfile() exactly once,
700 returns whatever it returns, and does not modify fileinput._state
701 to point to a different object."""
702 nextfile_retval = object()
703 instance = MockFileInput()
704 instance.return_values["nextfile"] = nextfile_retval
705 fileinput._state = instance
706 retval = fileinput.nextfile()
707 self.assertExactlyOneInvocation(instance, "nextfile")
708 self.assertIs(retval, nextfile_retval)
709 self.assertIs(fileinput._state, instance)
710
711class Test_fileinput_filename(BaseFileInputGlobalMethodsTest):
712 """Unit tests for fileinput.filename()"""
713
714 def test_state_is_None(self):
715 """Tests fileinput.filename() when fileinput._state is None.
716 Ensure that it raises RuntimeError with a meaningful error message
717 and does not modify fileinput._state"""
718 fileinput._state = None
719 with self.assertRaises(RuntimeError) as cm:
720 fileinput.filename()
721 self.assertEqual(("no active input()",), cm.exception.args)
722 self.assertIsNone(fileinput._state)
723
724 def test_state_is_not_None(self):
725 """Tests fileinput.filename() when fileinput._state is not None.
726 Ensure that it invokes fileinput._state.filename() exactly once,
727 returns whatever it returns, and does not modify fileinput._state
728 to point to a different object."""
729 filename_retval = object()
730 instance = MockFileInput()
731 instance.return_values["filename"] = filename_retval
732 fileinput._state = instance
733 retval = fileinput.filename()
734 self.assertExactlyOneInvocation(instance, "filename")
735 self.assertIs(retval, filename_retval)
736 self.assertIs(fileinput._state, instance)
737
738class Test_fileinput_lineno(BaseFileInputGlobalMethodsTest):
739 """Unit tests for fileinput.lineno()"""
740
741 def test_state_is_None(self):
742 """Tests fileinput.lineno() when fileinput._state is None.
743 Ensure that it raises RuntimeError with a meaningful error message
744 and does not modify fileinput._state"""
745 fileinput._state = None
746 with self.assertRaises(RuntimeError) as cm:
747 fileinput.lineno()
748 self.assertEqual(("no active input()",), cm.exception.args)
749 self.assertIsNone(fileinput._state)
750
751 def test_state_is_not_None(self):
752 """Tests fileinput.lineno() when fileinput._state is not None.
753 Ensure that it invokes fileinput._state.lineno() exactly once,
754 returns whatever it returns, and does not modify fileinput._state
755 to point to a different object."""
756 lineno_retval = object()
757 instance = MockFileInput()
758 instance.return_values["lineno"] = lineno_retval
759 fileinput._state = instance
760 retval = fileinput.lineno()
761 self.assertExactlyOneInvocation(instance, "lineno")
762 self.assertIs(retval, lineno_retval)
763 self.assertIs(fileinput._state, instance)
764
765class Test_fileinput_filelineno(BaseFileInputGlobalMethodsTest):
766 """Unit tests for fileinput.filelineno()"""
767
768 def test_state_is_None(self):
769 """Tests fileinput.filelineno() when fileinput._state is None.
770 Ensure that it raises RuntimeError with a meaningful error message
771 and does not modify fileinput._state"""
772 fileinput._state = None
773 with self.assertRaises(RuntimeError) as cm:
774 fileinput.filelineno()
775 self.assertEqual(("no active input()",), cm.exception.args)
776 self.assertIsNone(fileinput._state)
777
778 def test_state_is_not_None(self):
779 """Tests fileinput.filelineno() when fileinput._state is not None.
780 Ensure that it invokes fileinput._state.filelineno() exactly once,
781 returns whatever it returns, and does not modify fileinput._state
782 to point to a different object."""
783 filelineno_retval = object()
784 instance = MockFileInput()
785 instance.return_values["filelineno"] = filelineno_retval
786 fileinput._state = instance
787 retval = fileinput.filelineno()
788 self.assertExactlyOneInvocation(instance, "filelineno")
789 self.assertIs(retval, filelineno_retval)
790 self.assertIs(fileinput._state, instance)
791
792class Test_fileinput_fileno(BaseFileInputGlobalMethodsTest):
793 """Unit tests for fileinput.fileno()"""
794
795 def test_state_is_None(self):
796 """Tests fileinput.fileno() when fileinput._state is None.
797 Ensure that it raises RuntimeError with a meaningful error message
798 and does not modify fileinput._state"""
799 fileinput._state = None
800 with self.assertRaises(RuntimeError) as cm:
801 fileinput.fileno()
802 self.assertEqual(("no active input()",), cm.exception.args)
803 self.assertIsNone(fileinput._state)
804
805 def test_state_is_not_None(self):
806 """Tests fileinput.fileno() when fileinput._state is not None.
807 Ensure that it invokes fileinput._state.fileno() exactly once,
808 returns whatever it returns, and does not modify fileinput._state
809 to point to a different object."""
810 fileno_retval = object()
811 instance = MockFileInput()
812 instance.return_values["fileno"] = fileno_retval
813 instance.fileno_retval = fileno_retval
814 fileinput._state = instance
815 retval = fileinput.fileno()
816 self.assertExactlyOneInvocation(instance, "fileno")
817 self.assertIs(retval, fileno_retval)
818 self.assertIs(fileinput._state, instance)
819
820class Test_fileinput_isfirstline(BaseFileInputGlobalMethodsTest):
821 """Unit tests for fileinput.isfirstline()"""
822
823 def test_state_is_None(self):
824 """Tests fileinput.isfirstline() when fileinput._state is None.
825 Ensure that it raises RuntimeError with a meaningful error message
826 and does not modify fileinput._state"""
827 fileinput._state = None
828 with self.assertRaises(RuntimeError) as cm:
829 fileinput.isfirstline()
830 self.assertEqual(("no active input()",), cm.exception.args)
831 self.assertIsNone(fileinput._state)
832
833 def test_state_is_not_None(self):
834 """Tests fileinput.isfirstline() when fileinput._state is not None.
835 Ensure that it invokes fileinput._state.isfirstline() exactly once,
836 returns whatever it returns, and does not modify fileinput._state
837 to point to a different object."""
838 isfirstline_retval = object()
839 instance = MockFileInput()
840 instance.return_values["isfirstline"] = isfirstline_retval
841 fileinput._state = instance
842 retval = fileinput.isfirstline()
843 self.assertExactlyOneInvocation(instance, "isfirstline")
844 self.assertIs(retval, isfirstline_retval)
845 self.assertIs(fileinput._state, instance)
846
847class Test_fileinput_isstdin(BaseFileInputGlobalMethodsTest):
848 """Unit tests for fileinput.isstdin()"""
849
850 def test_state_is_None(self):
851 """Tests fileinput.isstdin() when fileinput._state is None.
852 Ensure that it raises RuntimeError with a meaningful error message
853 and does not modify fileinput._state"""
854 fileinput._state = None
855 with self.assertRaises(RuntimeError) as cm:
856 fileinput.isstdin()
857 self.assertEqual(("no active input()",), cm.exception.args)
858 self.assertIsNone(fileinput._state)
859
860 def test_state_is_not_None(self):
861 """Tests fileinput.isstdin() when fileinput._state is not None.
862 Ensure that it invokes fileinput._state.isstdin() exactly once,
863 returns whatever it returns, and does not modify fileinput._state
864 to point to a different object."""
865 isstdin_retval = object()
866 instance = MockFileInput()
867 instance.return_values["isstdin"] = isstdin_retval
868 fileinput._state = instance
869 retval = fileinput.isstdin()
870 self.assertExactlyOneInvocation(instance, "isstdin")
871 self.assertIs(retval, isstdin_retval)
872 self.assertIs(fileinput._state, instance)
873
874class InvocationRecorder:
875 def __init__(self):
876 self.invocation_count = 0
877 def __call__(self, *args, **kwargs):
878 self.invocation_count += 1
879 self.last_invocation = (args, kwargs)
880
881class Test_hook_compressed(unittest.TestCase):
882 """Unit tests for fileinput.hook_compressed()"""
883
884 def setUp(self):
885 self.fake_open = InvocationRecorder()
886
887 def test_empty_string(self):
888 self.do_test_use_builtin_open("", 1)
889
890 def test_no_ext(self):
891 self.do_test_use_builtin_open("abcd", 2)
892
Ezio Melottic3afbb92011-05-14 10:10:53 +0300893 @unittest.skipUnless(gzip, "Requires gzip and zlib")
briancurtin5eb35912011-03-15 10:59:36 -0400894 def test_gz_ext_fake(self):
briancurtin906f0c42011-03-15 10:29:41 -0400895 original_open = gzip.open
896 gzip.open = self.fake_open
897 try:
898 result = fileinput.hook_compressed("test.gz", 3)
899 finally:
900 gzip.open = original_open
901
902 self.assertEqual(self.fake_open.invocation_count, 1)
903 self.assertEqual(self.fake_open.last_invocation, (("test.gz", 3), {}))
904
briancurtinf84f3c32011-03-18 13:03:17 -0500905 @unittest.skipUnless(bz2, "Requires bz2")
briancurtin5eb35912011-03-15 10:59:36 -0400906 def test_bz2_ext_fake(self):
briancurtin906f0c42011-03-15 10:29:41 -0400907 original_open = bz2.BZ2File
908 bz2.BZ2File = self.fake_open
909 try:
910 result = fileinput.hook_compressed("test.bz2", 4)
911 finally:
912 bz2.BZ2File = original_open
913
914 self.assertEqual(self.fake_open.invocation_count, 1)
915 self.assertEqual(self.fake_open.last_invocation, (("test.bz2", 4), {}))
916
917 def test_blah_ext(self):
918 self.do_test_use_builtin_open("abcd.blah", 5)
919
briancurtin5eb35912011-03-15 10:59:36 -0400920 def test_gz_ext_builtin(self):
briancurtin906f0c42011-03-15 10:29:41 -0400921 self.do_test_use_builtin_open("abcd.Gz", 6)
922
briancurtin5eb35912011-03-15 10:59:36 -0400923 def test_bz2_ext_builtin(self):
briancurtin906f0c42011-03-15 10:29:41 -0400924 self.do_test_use_builtin_open("abcd.Bz2", 7)
925
926 def do_test_use_builtin_open(self, filename, mode):
927 original_open = self.replace_builtin_open(self.fake_open)
928 try:
929 result = fileinput.hook_compressed(filename, mode)
930 finally:
931 self.replace_builtin_open(original_open)
932
933 self.assertEqual(self.fake_open.invocation_count, 1)
934 self.assertEqual(self.fake_open.last_invocation,
935 ((filename, mode), {}))
936
937 @staticmethod
938 def replace_builtin_open(new_open_func):
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100939 original_open = builtins.open
940 builtins.open = new_open_func
briancurtin906f0c42011-03-15 10:29:41 -0400941 return original_open
942
943class Test_hook_encoded(unittest.TestCase):
944 """Unit tests for fileinput.hook_encoded()"""
945
946 def test(self):
947 encoding = object()
Serhiy Storchakab2752102016-04-27 23:13:46 +0300948 errors = object()
949 result = fileinput.hook_encoded(encoding, errors=errors)
briancurtin906f0c42011-03-15 10:29:41 -0400950
951 fake_open = InvocationRecorder()
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100952 original_open = builtins.open
953 builtins.open = fake_open
briancurtin906f0c42011-03-15 10:29:41 -0400954 try:
955 filename = object()
956 mode = object()
957 open_result = result(filename, mode)
958 finally:
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100959 builtins.open = original_open
briancurtin906f0c42011-03-15 10:29:41 -0400960
961 self.assertEqual(fake_open.invocation_count, 1)
962
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100963 args, kwargs = fake_open.last_invocation
briancurtin906f0c42011-03-15 10:29:41 -0400964 self.assertIs(args[0], filename)
965 self.assertIs(args[1], mode)
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100966 self.assertIs(kwargs.pop('encoding'), encoding)
Serhiy Storchakab2752102016-04-27 23:13:46 +0300967 self.assertIs(kwargs.pop('errors'), errors)
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100968 self.assertFalse(kwargs)
Georg Brandl6cb7b652010-07-31 20:08:15 +0000969
Serhiy Storchakab2752102016-04-27 23:13:46 +0300970 def test_errors(self):
971 with open(TESTFN, 'wb') as f:
972 f.write(b'\x80abc')
973 self.addCleanup(safe_unlink, TESTFN)
974
975 def check(errors, expected_lines):
976 with FileInput(files=TESTFN, mode='r',
977 openhook=hook_encoded('utf-8', errors=errors)) as fi:
978 lines = list(fi)
979 self.assertEqual(lines, expected_lines)
980
981 check('ignore', ['abc'])
982 with self.assertRaises(UnicodeDecodeError):
983 check('strict', ['abc'])
984 check('replace', ['\ufffdabc'])
985 check('backslashreplace', ['\\x80abc'])
986
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200987 def test_modes(self):
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200988 with open(TESTFN, 'wb') as f:
Serhiy Storchaka682ea5f2014-03-03 21:17:17 +0200989 # UTF-7 is a convenient, seldom used encoding
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200990 f.write(b'A\nB\r\nC\rD+IKw-')
991 self.addCleanup(safe_unlink, TESTFN)
992
993 def check(mode, expected_lines):
994 with FileInput(files=TESTFN, mode=mode,
995 openhook=hook_encoded('utf-7')) as fi:
996 lines = list(fi)
997 self.assertEqual(lines, expected_lines)
998
999 check('r', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
Serhiy Storchaka9fff8492014-02-26 21:03:19 +02001000 with self.assertWarns(DeprecationWarning):
1001 check('rU', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
1002 with self.assertWarns(DeprecationWarning):
1003 check('U', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
Serhiy Storchaka517b7472014-02-26 20:59:43 +02001004 with self.assertRaises(ValueError):
1005 check('rb', ['A\n', 'B\r\n', 'C\r', 'D\u20ac'])
1006
Guido van Rossumd8faa362007-04-27 19:54:29 +00001007
Martin Panter7978e102016-01-16 06:26:54 +00001008class MiscTest(unittest.TestCase):
1009
1010 def test_all(self):
Serhiy Storchaka674e2d02016-03-08 18:35:19 +02001011 support.check__all__(self, fileinput)
Martin Panter7978e102016-01-16 06:26:54 +00001012
1013
Guido van Rossumd8faa362007-04-27 19:54:29 +00001014if __name__ == "__main__":
Brett Cannon3e9a9ae2013-06-12 21:25:59 -04001015 unittest.main()