blob: ad8130468b56ab8a7e275b15d7579929646d50e1 [file] [log] [blame]
Tim Peters3230d5c2001-07-11 22:21:17 +00001'''
2Tests for fileinput module.
3Nick Mathewson
4'''
Benjamin Petersoneb462882011-03-15 09:50:18 -05005import os
6import sys
7import re
briancurtin906f0c42011-03-15 10:29:41 -04008import fileinput
9import collections
Florent Xiclunaa011e2b2011-11-07 19:43:07 +010010import builtins
Benjamin Petersoneb462882011-03-15 09:50:18 -050011import unittest
12
briancurtinf84f3c32011-03-18 13:03:17 -050013try:
14 import bz2
15except ImportError:
16 bz2 = None
Ezio Melottic3afbb92011-05-14 10:10:53 +030017try:
18 import gzip
19except ImportError:
20 gzip = None
briancurtinf84f3c32011-03-18 13:03:17 -050021
Serhiy Storchaka946cfc32014-05-14 21:08:33 +030022from io import BytesIO, StringIO
Benjamin Petersoneb462882011-03-15 09:50:18 -050023from fileinput import FileInput, hook_encoded
24
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +020025from test.support import verbose, TESTFN, run_unittest, check_warnings
Benjamin Petersoneb462882011-03-15 09:50:18 -050026from test.support import unlink as safe_unlink
Martin Panter7978e102016-01-16 06:26:54 +000027from test import support
Serhiy Storchaka946cfc32014-05-14 21:08:33 +030028from unittest import mock
Benjamin Petersoneb462882011-03-15 09:50:18 -050029
Tim Peters3230d5c2001-07-11 22:21:17 +000030
31# The fileinput module has 2 interfaces: the FileInput class which does
32# all the work, and a few functions (input, etc.) that use a global _state
briancurtin906f0c42011-03-15 10:29:41 -040033# variable.
Tim Peters3230d5c2001-07-11 22:21:17 +000034
35# Write lines (a list of lines) to temp file number i, and return the
36# temp file's name.
Tim Peters4d7cad12006-02-19 21:22:10 +000037def writeTmp(i, lines, mode='w'): # opening in text mode is the default
Tim Peters3230d5c2001-07-11 22:21:17 +000038 name = TESTFN + str(i)
Tim Peters4d7cad12006-02-19 21:22:10 +000039 f = open(name, mode)
Guido van Rossumc43e79f2007-06-18 18:26:36 +000040 for line in lines:
41 f.write(line)
Tim Peters3230d5c2001-07-11 22:21:17 +000042 f.close()
43 return name
44
Tim Peters3230d5c2001-07-11 22:21:17 +000045def remove_tempfiles(*names):
46 for name in names:
Guido van Rossume22905a2007-08-27 23:09:25 +000047 if name:
48 safe_unlink(name)
Tim Peters3230d5c2001-07-11 22:21:17 +000049
Guido van Rossumd8faa362007-04-27 19:54:29 +000050class BufferSizesTests(unittest.TestCase):
51 def test_buffer_sizes(self):
52 # First, run the tests with default and teeny buffer size.
53 for round, bs in (0, 0), (1, 30):
Neal Norwitz2595e762008-03-24 06:10:13 +000054 t1 = t2 = t3 = t4 = None
Guido van Rossumd8faa362007-04-27 19:54:29 +000055 try:
56 t1 = writeTmp(1, ["Line %s of file 1\n" % (i+1) for i in range(15)])
57 t2 = writeTmp(2, ["Line %s of file 2\n" % (i+1) for i in range(10)])
58 t3 = writeTmp(3, ["Line %s of file 3\n" % (i+1) for i in range(5)])
59 t4 = writeTmp(4, ["Line %s of file 4\n" % (i+1) for i in range(1)])
60 self.buffer_size_test(t1, t2, t3, t4, bs, round)
61 finally:
62 remove_tempfiles(t1, t2, t3, t4)
Tim Peters3230d5c2001-07-11 22:21:17 +000063
Guido van Rossumd8faa362007-04-27 19:54:29 +000064 def buffer_size_test(self, t1, t2, t3, t4, bs=0, round=0):
65 pat = re.compile(r'LINE (\d+) OF FILE (\d+)')
Tim Peters3230d5c2001-07-11 22:21:17 +000066
Guido van Rossumd8faa362007-04-27 19:54:29 +000067 start = 1 + round*6
68 if verbose:
69 print('%s. Simple iteration (bs=%s)' % (start+0, bs))
70 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
Tim Peters3230d5c2001-07-11 22:21:17 +000071 lines = list(fi)
Tim Peters3230d5c2001-07-11 22:21:17 +000072 fi.close()
Guido van Rossumd8faa362007-04-27 19:54:29 +000073 self.assertEqual(len(lines), 31)
74 self.assertEqual(lines[4], 'Line 5 of file 1\n')
75 self.assertEqual(lines[30], 'Line 1 of file 4\n')
76 self.assertEqual(fi.lineno(), 31)
77 self.assertEqual(fi.filename(), t4)
Tim Peters3230d5c2001-07-11 22:21:17 +000078
Guido van Rossumd8faa362007-04-27 19:54:29 +000079 if verbose:
80 print('%s. Status variables (bs=%s)' % (start+1, bs))
81 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
82 s = "x"
83 while s and s != 'Line 6 of file 2\n':
84 s = fi.readline()
85 self.assertEqual(fi.filename(), t2)
86 self.assertEqual(fi.lineno(), 21)
87 self.assertEqual(fi.filelineno(), 6)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000088 self.assertFalse(fi.isfirstline())
89 self.assertFalse(fi.isstdin())
Tim Peters3230d5c2001-07-11 22:21:17 +000090
Guido van Rossumd8faa362007-04-27 19:54:29 +000091 if verbose:
92 print('%s. Nextfile (bs=%s)' % (start+2, bs))
93 fi.nextfile()
94 self.assertEqual(fi.readline(), 'Line 1 of file 3\n')
95 self.assertEqual(fi.lineno(), 22)
96 fi.close()
Tim Peters3230d5c2001-07-11 22:21:17 +000097
Guido van Rossumd8faa362007-04-27 19:54:29 +000098 if verbose:
99 print('%s. Stdin (bs=%s)' % (start+3, bs))
100 fi = FileInput(files=(t1, t2, t3, t4, '-'), bufsize=bs)
101 savestdin = sys.stdin
102 try:
103 sys.stdin = StringIO("Line 1 of stdin\nLine 2 of stdin\n")
104 lines = list(fi)
105 self.assertEqual(len(lines), 33)
106 self.assertEqual(lines[32], 'Line 2 of stdin\n')
107 self.assertEqual(fi.filename(), '<stdin>')
108 fi.nextfile()
109 finally:
110 sys.stdin = savestdin
Tim Peters3230d5c2001-07-11 22:21:17 +0000111
Guido van Rossumd8faa362007-04-27 19:54:29 +0000112 if verbose:
113 print('%s. Boundary conditions (bs=%s)' % (start+4, bs))
114 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
115 self.assertEqual(fi.lineno(), 0)
116 self.assertEqual(fi.filename(), None)
117 fi.nextfile()
118 self.assertEqual(fi.lineno(), 0)
119 self.assertEqual(fi.filename(), None)
Tim Peters3230d5c2001-07-11 22:21:17 +0000120
Guido van Rossumd8faa362007-04-27 19:54:29 +0000121 if verbose:
122 print('%s. Inplace (bs=%s)' % (start+5, bs))
123 savestdout = sys.stdout
124 try:
125 fi = FileInput(files=(t1, t2, t3, t4), inplace=1, bufsize=bs)
126 for line in fi:
127 line = line[:-1].upper()
128 print(line)
129 fi.close()
130 finally:
131 sys.stdout = savestdout
Tim Peters3230d5c2001-07-11 22:21:17 +0000132
Guido van Rossumd8faa362007-04-27 19:54:29 +0000133 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
134 for line in fi:
135 self.assertEqual(line[-1], '\n')
136 m = pat.match(line[:-1])
137 self.assertNotEqual(m, None)
138 self.assertEqual(int(m.group(1)), fi.filelineno())
139 fi.close()
Georg Brandle4662172006-02-19 09:51:27 +0000140
briancurtin906f0c42011-03-15 10:29:41 -0400141class UnconditionallyRaise:
142 def __init__(self, exception_type):
143 self.exception_type = exception_type
144 self.invoked = False
145 def __call__(self, *args, **kwargs):
146 self.invoked = True
147 raise self.exception_type()
148
Guido van Rossumd8faa362007-04-27 19:54:29 +0000149class FileInputTests(unittest.TestCase):
briancurtin906f0c42011-03-15 10:29:41 -0400150
Guido van Rossumd8faa362007-04-27 19:54:29 +0000151 def test_zero_byte_files(self):
Neal Norwitz2595e762008-03-24 06:10:13 +0000152 t1 = t2 = t3 = t4 = None
Guido van Rossumd8faa362007-04-27 19:54:29 +0000153 try:
154 t1 = writeTmp(1, [""])
155 t2 = writeTmp(2, [""])
156 t3 = writeTmp(3, ["The only line there is.\n"])
157 t4 = writeTmp(4, [""])
158 fi = FileInput(files=(t1, t2, t3, t4))
Georg Brandl67e9fb92006-02-19 13:56:17 +0000159
Guido van Rossumd8faa362007-04-27 19:54:29 +0000160 line = fi.readline()
161 self.assertEqual(line, 'The only line there is.\n')
162 self.assertEqual(fi.lineno(), 1)
163 self.assertEqual(fi.filelineno(), 1)
164 self.assertEqual(fi.filename(), t3)
Georg Brandlc029f872006-02-19 14:12:34 +0000165
Guido van Rossumd8faa362007-04-27 19:54:29 +0000166 line = fi.readline()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000167 self.assertFalse(line)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000168 self.assertEqual(fi.lineno(), 1)
169 self.assertEqual(fi.filelineno(), 0)
170 self.assertEqual(fi.filename(), t4)
171 fi.close()
172 finally:
173 remove_tempfiles(t1, t2, t3, t4)
Georg Brandlc98eeed2006-02-19 14:57:47 +0000174
Guido van Rossumd8faa362007-04-27 19:54:29 +0000175 def test_files_that_dont_end_with_newline(self):
Neal Norwitz2595e762008-03-24 06:10:13 +0000176 t1 = t2 = None
Guido van Rossumd8faa362007-04-27 19:54:29 +0000177 try:
178 t1 = writeTmp(1, ["A\nB\nC"])
179 t2 = writeTmp(2, ["D\nE\nF"])
180 fi = FileInput(files=(t1, t2))
181 lines = list(fi)
182 self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"])
183 self.assertEqual(fi.filelineno(), 3)
184 self.assertEqual(fi.lineno(), 6)
185 finally:
186 remove_tempfiles(t1, t2)
187
Guido van Rossumc43e79f2007-06-18 18:26:36 +0000188## def test_unicode_filenames(self):
189## # XXX A unicode string is always returned by writeTmp.
190## # So is this needed?
191## try:
192## t1 = writeTmp(1, ["A\nB"])
193## encoding = sys.getfilesystemencoding()
194## if encoding is None:
195## encoding = 'ascii'
196## fi = FileInput(files=str(t1, encoding))
197## lines = list(fi)
198## self.assertEqual(lines, ["A\n", "B"])
199## finally:
200## remove_tempfiles(t1)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000201
202 def test_fileno(self):
Neal Norwitz2595e762008-03-24 06:10:13 +0000203 t1 = t2 = None
Guido van Rossumd8faa362007-04-27 19:54:29 +0000204 try:
205 t1 = writeTmp(1, ["A\nB"])
206 t2 = writeTmp(2, ["C\nD"])
207 fi = FileInput(files=(t1, t2))
208 self.assertEqual(fi.fileno(), -1)
209 line =next( fi)
210 self.assertNotEqual(fi.fileno(), -1)
211 fi.nextfile()
212 self.assertEqual(fi.fileno(), -1)
213 line = list(fi)
214 self.assertEqual(fi.fileno(), -1)
215 finally:
216 remove_tempfiles(t1, t2)
217
218 def test_opening_mode(self):
219 try:
220 # invalid mode, should raise ValueError
221 fi = FileInput(mode="w")
222 self.fail("FileInput should reject invalid mode argument")
223 except ValueError:
224 pass
Guido van Rossume22905a2007-08-27 23:09:25 +0000225 t1 = None
Guido van Rossumd8faa362007-04-27 19:54:29 +0000226 try:
227 # try opening in universal newline mode
Guido van Rossume22905a2007-08-27 23:09:25 +0000228 t1 = writeTmp(1, [b"A\nB\r\nC\rD"], mode="wb")
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +0200229 with check_warnings(('', DeprecationWarning)):
230 fi = FileInput(files=t1, mode="U")
231 with check_warnings(('', DeprecationWarning)):
232 lines = list(fi)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000233 self.assertEqual(lines, ["A\n", "B\n", "C\n", "D"])
234 finally:
235 remove_tempfiles(t1)
236
Serhiy Storchaka946cfc32014-05-14 21:08:33 +0300237 def test_stdin_binary_mode(self):
238 with mock.patch('sys.stdin') as m_stdin:
239 m_stdin.buffer = BytesIO(b'spam, bacon, sausage, and spam')
240 fi = FileInput(files=['-'], mode='rb')
241 lines = list(fi)
242 self.assertEqual(lines, [b'spam, bacon, sausage, and spam'])
243
R David Murray830207e2016-01-02 15:41:41 -0500244 def test_detached_stdin_binary_mode(self):
245 orig_stdin = sys.stdin
246 try:
247 sys.stdin = BytesIO(b'spam, bacon, sausage, and spam')
248 self.assertFalse(hasattr(sys.stdin, 'buffer'))
249 fi = FileInput(files=['-'], mode='rb')
250 lines = list(fi)
251 self.assertEqual(lines, [b'spam, bacon, sausage, and spam'])
252 finally:
253 sys.stdin = orig_stdin
254
Guido van Rossume22905a2007-08-27 23:09:25 +0000255 def test_file_opening_hook(self):
256 try:
257 # cannot use openhook and inplace mode
258 fi = FileInput(inplace=1, openhook=lambda f, m: None)
259 self.fail("FileInput should raise if both inplace "
260 "and openhook arguments are given")
261 except ValueError:
262 pass
263 try:
264 fi = FileInput(openhook=1)
265 self.fail("FileInput should check openhook for being callable")
266 except ValueError:
267 pass
briancurtin906f0c42011-03-15 10:29:41 -0400268
269 class CustomOpenHook:
270 def __init__(self):
271 self.invoked = False
272 def __call__(self, *args):
273 self.invoked = True
274 return open(*args)
275
276 t = writeTmp(1, ["\n"])
277 self.addCleanup(remove_tempfiles, t)
278 custom_open_hook = CustomOpenHook()
279 with FileInput([t], openhook=custom_open_hook) as fi:
280 fi.readline()
281 self.assertTrue(custom_open_hook.invoked, "openhook not invoked")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000282
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200283 def test_readline(self):
284 with open(TESTFN, 'wb') as f:
285 f.write(b'A\nB\r\nC\r')
286 # Fill TextIOWrapper buffer.
287 f.write(b'123456789\n' * 1000)
288 # Issue #20501: readline() shouldn't read whole file.
289 f.write(b'\x80')
290 self.addCleanup(safe_unlink, TESTFN)
291
292 with FileInput(files=TESTFN,
293 openhook=hook_encoded('ascii'), bufsize=8) as fi:
Serhiy Storchaka682ea5f2014-03-03 21:17:17 +0200294 try:
295 self.assertEqual(fi.readline(), 'A\n')
296 self.assertEqual(fi.readline(), 'B\n')
297 self.assertEqual(fi.readline(), 'C\n')
298 except UnicodeDecodeError:
299 self.fail('Read to end of file')
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200300 with self.assertRaises(UnicodeDecodeError):
301 # Read to the end of file.
302 list(fi)
Serhiy Storchaka314464d2015-11-01 16:43:58 +0200303 self.assertEqual(fi.readline(), '')
304 self.assertEqual(fi.readline(), '')
305
306 def test_readline_binary_mode(self):
307 with open(TESTFN, 'wb') as f:
308 f.write(b'A\nB\r\nC\rD')
309 self.addCleanup(safe_unlink, TESTFN)
310
311 with FileInput(files=TESTFN, mode='rb') as fi:
312 self.assertEqual(fi.readline(), b'A\n')
313 self.assertEqual(fi.readline(), b'B\r\n')
314 self.assertEqual(fi.readline(), b'C\rD')
315 # Read to the end of file.
316 self.assertEqual(fi.readline(), b'')
317 self.assertEqual(fi.readline(), b'')
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200318
Georg Brandl6cb7b652010-07-31 20:08:15 +0000319 def test_context_manager(self):
320 try:
321 t1 = writeTmp(1, ["A\nB\nC"])
322 t2 = writeTmp(2, ["D\nE\nF"])
323 with FileInput(files=(t1, t2)) as fi:
324 lines = list(fi)
325 self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"])
326 self.assertEqual(fi.filelineno(), 3)
327 self.assertEqual(fi.lineno(), 6)
328 self.assertEqual(fi._files, ())
329 finally:
330 remove_tempfiles(t1, t2)
331
332 def test_close_on_exception(self):
333 try:
334 t1 = writeTmp(1, [""])
335 with FileInput(files=t1) as fi:
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200336 raise OSError
337 except OSError:
Georg Brandl6cb7b652010-07-31 20:08:15 +0000338 self.assertEqual(fi._files, ())
339 finally:
340 remove_tempfiles(t1)
341
briancurtin906f0c42011-03-15 10:29:41 -0400342 def test_empty_files_list_specified_to_constructor(self):
343 with FileInput(files=[]) as fi:
Brett Cannond47af532011-03-15 15:55:12 -0400344 self.assertEqual(fi._files, ('-',))
briancurtin906f0c42011-03-15 10:29:41 -0400345
346 def test__getitem__(self):
347 """Tests invoking FileInput.__getitem__() with the current
348 line number"""
349 t = writeTmp(1, ["line1\n", "line2\n"])
350 self.addCleanup(remove_tempfiles, t)
351 with FileInput(files=[t]) as fi:
352 retval1 = fi[0]
353 self.assertEqual(retval1, "line1\n")
354 retval2 = fi[1]
355 self.assertEqual(retval2, "line2\n")
356
357 def test__getitem__invalid_key(self):
358 """Tests invoking FileInput.__getitem__() with an index unequal to
359 the line number"""
360 t = writeTmp(1, ["line1\n", "line2\n"])
361 self.addCleanup(remove_tempfiles, t)
362 with FileInput(files=[t]) as fi:
363 with self.assertRaises(RuntimeError) as cm:
364 fi[1]
Brett Cannond47af532011-03-15 15:55:12 -0400365 self.assertEqual(cm.exception.args, ("accessing lines out of order",))
briancurtin906f0c42011-03-15 10:29:41 -0400366
367 def test__getitem__eof(self):
368 """Tests invoking FileInput.__getitem__() with the line number but at
369 end-of-input"""
370 t = writeTmp(1, [])
371 self.addCleanup(remove_tempfiles, t)
372 with FileInput(files=[t]) as fi:
373 with self.assertRaises(IndexError) as cm:
374 fi[0]
Brett Cannond47af532011-03-15 15:55:12 -0400375 self.assertEqual(cm.exception.args, ("end of input reached",))
briancurtin906f0c42011-03-15 10:29:41 -0400376
377 def test_nextfile_oserror_deleting_backup(self):
378 """Tests invoking FileInput.nextfile() when the attempt to delete
379 the backup file would raise OSError. This error is expected to be
380 silently ignored"""
381
382 os_unlink_orig = os.unlink
383 os_unlink_replacement = UnconditionallyRaise(OSError)
384 try:
385 t = writeTmp(1, ["\n"])
386 self.addCleanup(remove_tempfiles, t)
387 with FileInput(files=[t], inplace=True) as fi:
388 next(fi) # make sure the file is opened
389 os.unlink = os_unlink_replacement
390 fi.nextfile()
391 finally:
392 os.unlink = os_unlink_orig
393
394 # sanity check to make sure that our test scenario was actually hit
395 self.assertTrue(os_unlink_replacement.invoked,
396 "os.unlink() was not invoked")
397
398 def test_readline_os_fstat_raises_OSError(self):
399 """Tests invoking FileInput.readline() when os.fstat() raises OSError.
400 This exception should be silently discarded."""
401
402 os_fstat_orig = os.fstat
403 os_fstat_replacement = UnconditionallyRaise(OSError)
404 try:
405 t = writeTmp(1, ["\n"])
406 self.addCleanup(remove_tempfiles, t)
407 with FileInput(files=[t], inplace=True) as fi:
408 os.fstat = os_fstat_replacement
409 fi.readline()
410 finally:
411 os.fstat = os_fstat_orig
412
413 # sanity check to make sure that our test scenario was actually hit
414 self.assertTrue(os_fstat_replacement.invoked,
415 "os.fstat() was not invoked")
416
417 @unittest.skipIf(not hasattr(os, "chmod"), "os.chmod does not exist")
418 def test_readline_os_chmod_raises_OSError(self):
419 """Tests invoking FileInput.readline() when os.chmod() raises OSError.
420 This exception should be silently discarded."""
421
422 os_chmod_orig = os.chmod
423 os_chmod_replacement = UnconditionallyRaise(OSError)
424 try:
425 t = writeTmp(1, ["\n"])
426 self.addCleanup(remove_tempfiles, t)
427 with FileInput(files=[t], inplace=True) as fi:
428 os.chmod = os_chmod_replacement
429 fi.readline()
430 finally:
431 os.chmod = os_chmod_orig
432
433 # sanity check to make sure that our test scenario was actually hit
434 self.assertTrue(os_chmod_replacement.invoked,
435 "os.fstat() was not invoked")
436
437 def test_fileno_when_ValueError_raised(self):
438 class FilenoRaisesValueError(UnconditionallyRaise):
439 def __init__(self):
440 UnconditionallyRaise.__init__(self, ValueError)
441 def fileno(self):
442 self.__call__()
443
444 unconditionally_raise_ValueError = FilenoRaisesValueError()
445 t = writeTmp(1, ["\n"])
446 self.addCleanup(remove_tempfiles, t)
447 with FileInput(files=[t]) as fi:
448 file_backup = fi._file
449 try:
450 fi._file = unconditionally_raise_ValueError
451 result = fi.fileno()
452 finally:
453 fi._file = file_backup # make sure the file gets cleaned up
454
455 # sanity check to make sure that our test scenario was actually hit
456 self.assertTrue(unconditionally_raise_ValueError.invoked,
457 "_file.fileno() was not invoked")
458
459 self.assertEqual(result, -1, "fileno() should return -1")
460
461class MockFileInput:
462 """A class that mocks out fileinput.FileInput for use during unit tests"""
463
464 def __init__(self, files=None, inplace=False, backup="", bufsize=0,
465 mode="r", openhook=None):
466 self.files = files
467 self.inplace = inplace
468 self.backup = backup
469 self.bufsize = bufsize
470 self.mode = mode
471 self.openhook = openhook
472 self._file = None
473 self.invocation_counts = collections.defaultdict(lambda: 0)
474 self.return_values = {}
475
476 def close(self):
477 self.invocation_counts["close"] += 1
478
479 def nextfile(self):
480 self.invocation_counts["nextfile"] += 1
481 return self.return_values["nextfile"]
482
483 def filename(self):
484 self.invocation_counts["filename"] += 1
485 return self.return_values["filename"]
486
487 def lineno(self):
488 self.invocation_counts["lineno"] += 1
489 return self.return_values["lineno"]
490
491 def filelineno(self):
492 self.invocation_counts["filelineno"] += 1
493 return self.return_values["filelineno"]
494
495 def fileno(self):
496 self.invocation_counts["fileno"] += 1
497 return self.return_values["fileno"]
498
499 def isfirstline(self):
500 self.invocation_counts["isfirstline"] += 1
501 return self.return_values["isfirstline"]
502
503 def isstdin(self):
504 self.invocation_counts["isstdin"] += 1
505 return self.return_values["isstdin"]
506
507class BaseFileInputGlobalMethodsTest(unittest.TestCase):
508 """Base class for unit tests for the global function of
509 the fileinput module."""
510
511 def setUp(self):
512 self._orig_state = fileinput._state
513 self._orig_FileInput = fileinput.FileInput
514 fileinput.FileInput = MockFileInput
515
516 def tearDown(self):
517 fileinput.FileInput = self._orig_FileInput
518 fileinput._state = self._orig_state
519
520 def assertExactlyOneInvocation(self, mock_file_input, method_name):
521 # assert that the method with the given name was invoked once
522 actual_count = mock_file_input.invocation_counts[method_name]
523 self.assertEqual(actual_count, 1, method_name)
524 # assert that no other unexpected methods were invoked
525 actual_total_count = len(mock_file_input.invocation_counts)
526 self.assertEqual(actual_total_count, 1)
527
528class Test_fileinput_input(BaseFileInputGlobalMethodsTest):
529 """Unit tests for fileinput.input()"""
530
531 def test_state_is_not_None_and_state_file_is_not_None(self):
532 """Tests invoking fileinput.input() when fileinput._state is not None
533 and its _file attribute is also not None. Expect RuntimeError to
534 be raised with a meaningful error message and for fileinput._state
535 to *not* be modified."""
536 instance = MockFileInput()
537 instance._file = object()
538 fileinput._state = instance
539 with self.assertRaises(RuntimeError) as cm:
540 fileinput.input()
541 self.assertEqual(("input() already active",), cm.exception.args)
542 self.assertIs(instance, fileinput._state, "fileinput._state")
543
544 def test_state_is_not_None_and_state_file_is_None(self):
545 """Tests invoking fileinput.input() when fileinput._state is not None
546 but its _file attribute *is* None. Expect it to create and return
547 a new fileinput.FileInput object with all method parameters passed
548 explicitly to the __init__() method; also ensure that
549 fileinput._state is set to the returned instance."""
550 instance = MockFileInput()
551 instance._file = None
552 fileinput._state = instance
553 self.do_test_call_input()
554
555 def test_state_is_None(self):
556 """Tests invoking fileinput.input() when fileinput._state is None
557 Expect it to create and return a new fileinput.FileInput object
558 with all method parameters passed explicitly to the __init__()
559 method; also ensure that fileinput._state is set to the returned
560 instance."""
561 fileinput._state = None
562 self.do_test_call_input()
563
564 def do_test_call_input(self):
565 """Tests that fileinput.input() creates a new fileinput.FileInput
566 object, passing the given parameters unmodified to
567 fileinput.FileInput.__init__(). Note that this test depends on the
568 monkey patching of fileinput.FileInput done by setUp()."""
569 files = object()
570 inplace = object()
571 backup = object()
572 bufsize = object()
573 mode = object()
574 openhook = object()
575
576 # call fileinput.input() with different values for each argument
577 result = fileinput.input(files=files, inplace=inplace, backup=backup,
578 bufsize=bufsize,
579 mode=mode, openhook=openhook)
580
581 # ensure fileinput._state was set to the returned object
582 self.assertIs(result, fileinput._state, "fileinput._state")
583
584 # ensure the parameters to fileinput.input() were passed directly
585 # to FileInput.__init__()
586 self.assertIs(files, result.files, "files")
587 self.assertIs(inplace, result.inplace, "inplace")
588 self.assertIs(backup, result.backup, "backup")
589 self.assertIs(bufsize, result.bufsize, "bufsize")
590 self.assertIs(mode, result.mode, "mode")
591 self.assertIs(openhook, result.openhook, "openhook")
592
593class Test_fileinput_close(BaseFileInputGlobalMethodsTest):
594 """Unit tests for fileinput.close()"""
595
596 def test_state_is_None(self):
597 """Tests that fileinput.close() does nothing if fileinput._state
598 is None"""
599 fileinput._state = None
600 fileinput.close()
601 self.assertIsNone(fileinput._state)
602
603 def test_state_is_not_None(self):
604 """Tests that fileinput.close() invokes close() on fileinput._state
605 and sets _state=None"""
606 instance = MockFileInput()
607 fileinput._state = instance
608 fileinput.close()
609 self.assertExactlyOneInvocation(instance, "close")
610 self.assertIsNone(fileinput._state)
611
612class Test_fileinput_nextfile(BaseFileInputGlobalMethodsTest):
613 """Unit tests for fileinput.nextfile()"""
614
615 def test_state_is_None(self):
616 """Tests fileinput.nextfile() when fileinput._state is None.
617 Ensure that it raises RuntimeError with a meaningful error message
618 and does not modify fileinput._state"""
619 fileinput._state = None
620 with self.assertRaises(RuntimeError) as cm:
621 fileinput.nextfile()
622 self.assertEqual(("no active input()",), cm.exception.args)
623 self.assertIsNone(fileinput._state)
624
625 def test_state_is_not_None(self):
626 """Tests fileinput.nextfile() when fileinput._state is not None.
627 Ensure that it invokes fileinput._state.nextfile() exactly once,
628 returns whatever it returns, and does not modify fileinput._state
629 to point to a different object."""
630 nextfile_retval = object()
631 instance = MockFileInput()
632 instance.return_values["nextfile"] = nextfile_retval
633 fileinput._state = instance
634 retval = fileinput.nextfile()
635 self.assertExactlyOneInvocation(instance, "nextfile")
636 self.assertIs(retval, nextfile_retval)
637 self.assertIs(fileinput._state, instance)
638
639class Test_fileinput_filename(BaseFileInputGlobalMethodsTest):
640 """Unit tests for fileinput.filename()"""
641
642 def test_state_is_None(self):
643 """Tests fileinput.filename() when fileinput._state is None.
644 Ensure that it raises RuntimeError with a meaningful error message
645 and does not modify fileinput._state"""
646 fileinput._state = None
647 with self.assertRaises(RuntimeError) as cm:
648 fileinput.filename()
649 self.assertEqual(("no active input()",), cm.exception.args)
650 self.assertIsNone(fileinput._state)
651
652 def test_state_is_not_None(self):
653 """Tests fileinput.filename() when fileinput._state is not None.
654 Ensure that it invokes fileinput._state.filename() exactly once,
655 returns whatever it returns, and does not modify fileinput._state
656 to point to a different object."""
657 filename_retval = object()
658 instance = MockFileInput()
659 instance.return_values["filename"] = filename_retval
660 fileinput._state = instance
661 retval = fileinput.filename()
662 self.assertExactlyOneInvocation(instance, "filename")
663 self.assertIs(retval, filename_retval)
664 self.assertIs(fileinput._state, instance)
665
666class Test_fileinput_lineno(BaseFileInputGlobalMethodsTest):
667 """Unit tests for fileinput.lineno()"""
668
669 def test_state_is_None(self):
670 """Tests fileinput.lineno() when fileinput._state is None.
671 Ensure that it raises RuntimeError with a meaningful error message
672 and does not modify fileinput._state"""
673 fileinput._state = None
674 with self.assertRaises(RuntimeError) as cm:
675 fileinput.lineno()
676 self.assertEqual(("no active input()",), cm.exception.args)
677 self.assertIsNone(fileinput._state)
678
679 def test_state_is_not_None(self):
680 """Tests fileinput.lineno() when fileinput._state is not None.
681 Ensure that it invokes fileinput._state.lineno() exactly once,
682 returns whatever it returns, and does not modify fileinput._state
683 to point to a different object."""
684 lineno_retval = object()
685 instance = MockFileInput()
686 instance.return_values["lineno"] = lineno_retval
687 fileinput._state = instance
688 retval = fileinput.lineno()
689 self.assertExactlyOneInvocation(instance, "lineno")
690 self.assertIs(retval, lineno_retval)
691 self.assertIs(fileinput._state, instance)
692
693class Test_fileinput_filelineno(BaseFileInputGlobalMethodsTest):
694 """Unit tests for fileinput.filelineno()"""
695
696 def test_state_is_None(self):
697 """Tests fileinput.filelineno() when fileinput._state is None.
698 Ensure that it raises RuntimeError with a meaningful error message
699 and does not modify fileinput._state"""
700 fileinput._state = None
701 with self.assertRaises(RuntimeError) as cm:
702 fileinput.filelineno()
703 self.assertEqual(("no active input()",), cm.exception.args)
704 self.assertIsNone(fileinput._state)
705
706 def test_state_is_not_None(self):
707 """Tests fileinput.filelineno() when fileinput._state is not None.
708 Ensure that it invokes fileinput._state.filelineno() exactly once,
709 returns whatever it returns, and does not modify fileinput._state
710 to point to a different object."""
711 filelineno_retval = object()
712 instance = MockFileInput()
713 instance.return_values["filelineno"] = filelineno_retval
714 fileinput._state = instance
715 retval = fileinput.filelineno()
716 self.assertExactlyOneInvocation(instance, "filelineno")
717 self.assertIs(retval, filelineno_retval)
718 self.assertIs(fileinput._state, instance)
719
720class Test_fileinput_fileno(BaseFileInputGlobalMethodsTest):
721 """Unit tests for fileinput.fileno()"""
722
723 def test_state_is_None(self):
724 """Tests fileinput.fileno() when fileinput._state is None.
725 Ensure that it raises RuntimeError with a meaningful error message
726 and does not modify fileinput._state"""
727 fileinput._state = None
728 with self.assertRaises(RuntimeError) as cm:
729 fileinput.fileno()
730 self.assertEqual(("no active input()",), cm.exception.args)
731 self.assertIsNone(fileinput._state)
732
733 def test_state_is_not_None(self):
734 """Tests fileinput.fileno() when fileinput._state is not None.
735 Ensure that it invokes fileinput._state.fileno() exactly once,
736 returns whatever it returns, and does not modify fileinput._state
737 to point to a different object."""
738 fileno_retval = object()
739 instance = MockFileInput()
740 instance.return_values["fileno"] = fileno_retval
741 instance.fileno_retval = fileno_retval
742 fileinput._state = instance
743 retval = fileinput.fileno()
744 self.assertExactlyOneInvocation(instance, "fileno")
745 self.assertIs(retval, fileno_retval)
746 self.assertIs(fileinput._state, instance)
747
748class Test_fileinput_isfirstline(BaseFileInputGlobalMethodsTest):
749 """Unit tests for fileinput.isfirstline()"""
750
751 def test_state_is_None(self):
752 """Tests fileinput.isfirstline() when fileinput._state is None.
753 Ensure that it raises RuntimeError with a meaningful error message
754 and does not modify fileinput._state"""
755 fileinput._state = None
756 with self.assertRaises(RuntimeError) as cm:
757 fileinput.isfirstline()
758 self.assertEqual(("no active input()",), cm.exception.args)
759 self.assertIsNone(fileinput._state)
760
761 def test_state_is_not_None(self):
762 """Tests fileinput.isfirstline() when fileinput._state is not None.
763 Ensure that it invokes fileinput._state.isfirstline() exactly once,
764 returns whatever it returns, and does not modify fileinput._state
765 to point to a different object."""
766 isfirstline_retval = object()
767 instance = MockFileInput()
768 instance.return_values["isfirstline"] = isfirstline_retval
769 fileinput._state = instance
770 retval = fileinput.isfirstline()
771 self.assertExactlyOneInvocation(instance, "isfirstline")
772 self.assertIs(retval, isfirstline_retval)
773 self.assertIs(fileinput._state, instance)
774
775class Test_fileinput_isstdin(BaseFileInputGlobalMethodsTest):
776 """Unit tests for fileinput.isstdin()"""
777
778 def test_state_is_None(self):
779 """Tests fileinput.isstdin() when fileinput._state is None.
780 Ensure that it raises RuntimeError with a meaningful error message
781 and does not modify fileinput._state"""
782 fileinput._state = None
783 with self.assertRaises(RuntimeError) as cm:
784 fileinput.isstdin()
785 self.assertEqual(("no active input()",), cm.exception.args)
786 self.assertIsNone(fileinput._state)
787
788 def test_state_is_not_None(self):
789 """Tests fileinput.isstdin() when fileinput._state is not None.
790 Ensure that it invokes fileinput._state.isstdin() exactly once,
791 returns whatever it returns, and does not modify fileinput._state
792 to point to a different object."""
793 isstdin_retval = object()
794 instance = MockFileInput()
795 instance.return_values["isstdin"] = isstdin_retval
796 fileinput._state = instance
797 retval = fileinput.isstdin()
798 self.assertExactlyOneInvocation(instance, "isstdin")
799 self.assertIs(retval, isstdin_retval)
800 self.assertIs(fileinput._state, instance)
801
802class InvocationRecorder:
803 def __init__(self):
804 self.invocation_count = 0
805 def __call__(self, *args, **kwargs):
806 self.invocation_count += 1
807 self.last_invocation = (args, kwargs)
808
809class Test_hook_compressed(unittest.TestCase):
810 """Unit tests for fileinput.hook_compressed()"""
811
812 def setUp(self):
813 self.fake_open = InvocationRecorder()
814
815 def test_empty_string(self):
816 self.do_test_use_builtin_open("", 1)
817
818 def test_no_ext(self):
819 self.do_test_use_builtin_open("abcd", 2)
820
Ezio Melottic3afbb92011-05-14 10:10:53 +0300821 @unittest.skipUnless(gzip, "Requires gzip and zlib")
briancurtin5eb35912011-03-15 10:59:36 -0400822 def test_gz_ext_fake(self):
briancurtin906f0c42011-03-15 10:29:41 -0400823 original_open = gzip.open
824 gzip.open = self.fake_open
825 try:
826 result = fileinput.hook_compressed("test.gz", 3)
827 finally:
828 gzip.open = original_open
829
830 self.assertEqual(self.fake_open.invocation_count, 1)
831 self.assertEqual(self.fake_open.last_invocation, (("test.gz", 3), {}))
832
briancurtinf84f3c32011-03-18 13:03:17 -0500833 @unittest.skipUnless(bz2, "Requires bz2")
briancurtin5eb35912011-03-15 10:59:36 -0400834 def test_bz2_ext_fake(self):
briancurtin906f0c42011-03-15 10:29:41 -0400835 original_open = bz2.BZ2File
836 bz2.BZ2File = self.fake_open
837 try:
838 result = fileinput.hook_compressed("test.bz2", 4)
839 finally:
840 bz2.BZ2File = original_open
841
842 self.assertEqual(self.fake_open.invocation_count, 1)
843 self.assertEqual(self.fake_open.last_invocation, (("test.bz2", 4), {}))
844
845 def test_blah_ext(self):
846 self.do_test_use_builtin_open("abcd.blah", 5)
847
briancurtin5eb35912011-03-15 10:59:36 -0400848 def test_gz_ext_builtin(self):
briancurtin906f0c42011-03-15 10:29:41 -0400849 self.do_test_use_builtin_open("abcd.Gz", 6)
850
briancurtin5eb35912011-03-15 10:59:36 -0400851 def test_bz2_ext_builtin(self):
briancurtin906f0c42011-03-15 10:29:41 -0400852 self.do_test_use_builtin_open("abcd.Bz2", 7)
853
854 def do_test_use_builtin_open(self, filename, mode):
855 original_open = self.replace_builtin_open(self.fake_open)
856 try:
857 result = fileinput.hook_compressed(filename, mode)
858 finally:
859 self.replace_builtin_open(original_open)
860
861 self.assertEqual(self.fake_open.invocation_count, 1)
862 self.assertEqual(self.fake_open.last_invocation,
863 ((filename, mode), {}))
864
865 @staticmethod
866 def replace_builtin_open(new_open_func):
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100867 original_open = builtins.open
868 builtins.open = new_open_func
briancurtin906f0c42011-03-15 10:29:41 -0400869 return original_open
870
871class Test_hook_encoded(unittest.TestCase):
872 """Unit tests for fileinput.hook_encoded()"""
873
874 def test(self):
875 encoding = object()
876 result = fileinput.hook_encoded(encoding)
877
878 fake_open = InvocationRecorder()
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100879 original_open = builtins.open
880 builtins.open = fake_open
briancurtin906f0c42011-03-15 10:29:41 -0400881 try:
882 filename = object()
883 mode = object()
884 open_result = result(filename, mode)
885 finally:
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100886 builtins.open = original_open
briancurtin906f0c42011-03-15 10:29:41 -0400887
888 self.assertEqual(fake_open.invocation_count, 1)
889
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100890 args, kwargs = fake_open.last_invocation
briancurtin906f0c42011-03-15 10:29:41 -0400891 self.assertIs(args[0], filename)
892 self.assertIs(args[1], mode)
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100893 self.assertIs(kwargs.pop('encoding'), encoding)
894 self.assertFalse(kwargs)
Georg Brandl6cb7b652010-07-31 20:08:15 +0000895
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200896 def test_modes(self):
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200897 with open(TESTFN, 'wb') as f:
Serhiy Storchaka682ea5f2014-03-03 21:17:17 +0200898 # UTF-7 is a convenient, seldom used encoding
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200899 f.write(b'A\nB\r\nC\rD+IKw-')
900 self.addCleanup(safe_unlink, TESTFN)
901
902 def check(mode, expected_lines):
903 with FileInput(files=TESTFN, mode=mode,
904 openhook=hook_encoded('utf-7')) as fi:
905 lines = list(fi)
906 self.assertEqual(lines, expected_lines)
907
908 check('r', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
Serhiy Storchaka9fff8492014-02-26 21:03:19 +0200909 with self.assertWarns(DeprecationWarning):
910 check('rU', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
911 with self.assertWarns(DeprecationWarning):
912 check('U', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200913 with self.assertRaises(ValueError):
914 check('rb', ['A\n', 'B\r\n', 'C\r', 'D\u20ac'])
915
Guido van Rossumd8faa362007-04-27 19:54:29 +0000916
Martin Panter7978e102016-01-16 06:26:54 +0000917class MiscTest(unittest.TestCase):
918
919 def test_all(self):
920 blacklist = {'DEFAULT_BUFSIZE'}
921 support.check__all__(self, fileinput, blacklist=blacklist)
922
923
Guido van Rossumd8faa362007-04-27 19:54:29 +0000924if __name__ == "__main__":
Brett Cannon3e9a9ae2013-06-12 21:25:59 -0400925 unittest.main()