blob: 91c11668bd6dbeb296c4a1d0b46a04dfbbeeca16 [file] [log] [blame]
Tim Peters3230d5c2001-07-11 22:21:17 +00001'''
2Tests for fileinput module.
3Nick Mathewson
4'''
Benjamin Petersoneb462882011-03-15 09:50:18 -05005import os
6import sys
7import re
briancurtin906f0c42011-03-15 10:29:41 -04008import fileinput
9import collections
Florent Xiclunaa011e2b2011-11-07 19:43:07 +010010import builtins
Benjamin Petersoneb462882011-03-15 09:50:18 -050011import unittest
12
briancurtinf84f3c32011-03-18 13:03:17 -050013try:
14 import bz2
15except ImportError:
16 bz2 = None
Ezio Melottic3afbb92011-05-14 10:10:53 +030017try:
18 import gzip
19except ImportError:
20 gzip = None
briancurtinf84f3c32011-03-18 13:03:17 -050021
Serhiy Storchaka946cfc32014-05-14 21:08:33 +030022from io import BytesIO, StringIO
Benjamin Petersoneb462882011-03-15 09:50:18 -050023from fileinput import FileInput, hook_encoded
24
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +020025from test.support import verbose, TESTFN, run_unittest, check_warnings
Benjamin Petersoneb462882011-03-15 09:50:18 -050026from test.support import unlink as safe_unlink
Serhiy Storchaka946cfc32014-05-14 21:08:33 +030027from unittest import mock
Benjamin Petersoneb462882011-03-15 09:50:18 -050028
Tim Peters3230d5c2001-07-11 22:21:17 +000029
30# The fileinput module has 2 interfaces: the FileInput class which does
31# all the work, and a few functions (input, etc.) that use a global _state
briancurtin906f0c42011-03-15 10:29:41 -040032# variable.
Tim Peters3230d5c2001-07-11 22:21:17 +000033
34# Write lines (a list of lines) to temp file number i, and return the
35# temp file's name.
Tim Peters4d7cad12006-02-19 21:22:10 +000036def writeTmp(i, lines, mode='w'): # opening in text mode is the default
Tim Peters3230d5c2001-07-11 22:21:17 +000037 name = TESTFN + str(i)
Tim Peters4d7cad12006-02-19 21:22:10 +000038 f = open(name, mode)
Guido van Rossumc43e79f2007-06-18 18:26:36 +000039 for line in lines:
40 f.write(line)
Tim Peters3230d5c2001-07-11 22:21:17 +000041 f.close()
42 return name
43
Tim Peters3230d5c2001-07-11 22:21:17 +000044def remove_tempfiles(*names):
45 for name in names:
Guido van Rossume22905a2007-08-27 23:09:25 +000046 if name:
47 safe_unlink(name)
Tim Peters3230d5c2001-07-11 22:21:17 +000048
Guido van Rossumd8faa362007-04-27 19:54:29 +000049class BufferSizesTests(unittest.TestCase):
50 def test_buffer_sizes(self):
51 # First, run the tests with default and teeny buffer size.
52 for round, bs in (0, 0), (1, 30):
Neal Norwitz2595e762008-03-24 06:10:13 +000053 t1 = t2 = t3 = t4 = None
Guido van Rossumd8faa362007-04-27 19:54:29 +000054 try:
55 t1 = writeTmp(1, ["Line %s of file 1\n" % (i+1) for i in range(15)])
56 t2 = writeTmp(2, ["Line %s of file 2\n" % (i+1) for i in range(10)])
57 t3 = writeTmp(3, ["Line %s of file 3\n" % (i+1) for i in range(5)])
58 t4 = writeTmp(4, ["Line %s of file 4\n" % (i+1) for i in range(1)])
59 self.buffer_size_test(t1, t2, t3, t4, bs, round)
60 finally:
61 remove_tempfiles(t1, t2, t3, t4)
Tim Peters3230d5c2001-07-11 22:21:17 +000062
Guido van Rossumd8faa362007-04-27 19:54:29 +000063 def buffer_size_test(self, t1, t2, t3, t4, bs=0, round=0):
64 pat = re.compile(r'LINE (\d+) OF FILE (\d+)')
Tim Peters3230d5c2001-07-11 22:21:17 +000065
Guido van Rossumd8faa362007-04-27 19:54:29 +000066 start = 1 + round*6
67 if verbose:
68 print('%s. Simple iteration (bs=%s)' % (start+0, bs))
69 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
Tim Peters3230d5c2001-07-11 22:21:17 +000070 lines = list(fi)
Tim Peters3230d5c2001-07-11 22:21:17 +000071 fi.close()
Guido van Rossumd8faa362007-04-27 19:54:29 +000072 self.assertEqual(len(lines), 31)
73 self.assertEqual(lines[4], 'Line 5 of file 1\n')
74 self.assertEqual(lines[30], 'Line 1 of file 4\n')
75 self.assertEqual(fi.lineno(), 31)
76 self.assertEqual(fi.filename(), t4)
Tim Peters3230d5c2001-07-11 22:21:17 +000077
Guido van Rossumd8faa362007-04-27 19:54:29 +000078 if verbose:
79 print('%s. Status variables (bs=%s)' % (start+1, bs))
80 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
81 s = "x"
82 while s and s != 'Line 6 of file 2\n':
83 s = fi.readline()
84 self.assertEqual(fi.filename(), t2)
85 self.assertEqual(fi.lineno(), 21)
86 self.assertEqual(fi.filelineno(), 6)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000087 self.assertFalse(fi.isfirstline())
88 self.assertFalse(fi.isstdin())
Tim Peters3230d5c2001-07-11 22:21:17 +000089
Guido van Rossumd8faa362007-04-27 19:54:29 +000090 if verbose:
91 print('%s. Nextfile (bs=%s)' % (start+2, bs))
92 fi.nextfile()
93 self.assertEqual(fi.readline(), 'Line 1 of file 3\n')
94 self.assertEqual(fi.lineno(), 22)
95 fi.close()
Tim Peters3230d5c2001-07-11 22:21:17 +000096
Guido van Rossumd8faa362007-04-27 19:54:29 +000097 if verbose:
98 print('%s. Stdin (bs=%s)' % (start+3, bs))
99 fi = FileInput(files=(t1, t2, t3, t4, '-'), bufsize=bs)
100 savestdin = sys.stdin
101 try:
102 sys.stdin = StringIO("Line 1 of stdin\nLine 2 of stdin\n")
103 lines = list(fi)
104 self.assertEqual(len(lines), 33)
105 self.assertEqual(lines[32], 'Line 2 of stdin\n')
106 self.assertEqual(fi.filename(), '<stdin>')
107 fi.nextfile()
108 finally:
109 sys.stdin = savestdin
Tim Peters3230d5c2001-07-11 22:21:17 +0000110
Guido van Rossumd8faa362007-04-27 19:54:29 +0000111 if verbose:
112 print('%s. Boundary conditions (bs=%s)' % (start+4, bs))
113 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
114 self.assertEqual(fi.lineno(), 0)
115 self.assertEqual(fi.filename(), None)
116 fi.nextfile()
117 self.assertEqual(fi.lineno(), 0)
118 self.assertEqual(fi.filename(), None)
Tim Peters3230d5c2001-07-11 22:21:17 +0000119
Guido van Rossumd8faa362007-04-27 19:54:29 +0000120 if verbose:
121 print('%s. Inplace (bs=%s)' % (start+5, bs))
122 savestdout = sys.stdout
123 try:
124 fi = FileInput(files=(t1, t2, t3, t4), inplace=1, bufsize=bs)
125 for line in fi:
126 line = line[:-1].upper()
127 print(line)
128 fi.close()
129 finally:
130 sys.stdout = savestdout
Tim Peters3230d5c2001-07-11 22:21:17 +0000131
Guido van Rossumd8faa362007-04-27 19:54:29 +0000132 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
133 for line in fi:
134 self.assertEqual(line[-1], '\n')
135 m = pat.match(line[:-1])
136 self.assertNotEqual(m, None)
137 self.assertEqual(int(m.group(1)), fi.filelineno())
138 fi.close()
Georg Brandle4662172006-02-19 09:51:27 +0000139
briancurtin906f0c42011-03-15 10:29:41 -0400140class UnconditionallyRaise:
141 def __init__(self, exception_type):
142 self.exception_type = exception_type
143 self.invoked = False
144 def __call__(self, *args, **kwargs):
145 self.invoked = True
146 raise self.exception_type()
147
Guido van Rossumd8faa362007-04-27 19:54:29 +0000148class FileInputTests(unittest.TestCase):
briancurtin906f0c42011-03-15 10:29:41 -0400149
Guido van Rossumd8faa362007-04-27 19:54:29 +0000150 def test_zero_byte_files(self):
Neal Norwitz2595e762008-03-24 06:10:13 +0000151 t1 = t2 = t3 = t4 = None
Guido van Rossumd8faa362007-04-27 19:54:29 +0000152 try:
153 t1 = writeTmp(1, [""])
154 t2 = writeTmp(2, [""])
155 t3 = writeTmp(3, ["The only line there is.\n"])
156 t4 = writeTmp(4, [""])
157 fi = FileInput(files=(t1, t2, t3, t4))
Georg Brandl67e9fb92006-02-19 13:56:17 +0000158
Guido van Rossumd8faa362007-04-27 19:54:29 +0000159 line = fi.readline()
160 self.assertEqual(line, 'The only line there is.\n')
161 self.assertEqual(fi.lineno(), 1)
162 self.assertEqual(fi.filelineno(), 1)
163 self.assertEqual(fi.filename(), t3)
Georg Brandlc029f872006-02-19 14:12:34 +0000164
Guido van Rossumd8faa362007-04-27 19:54:29 +0000165 line = fi.readline()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000166 self.assertFalse(line)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000167 self.assertEqual(fi.lineno(), 1)
168 self.assertEqual(fi.filelineno(), 0)
169 self.assertEqual(fi.filename(), t4)
170 fi.close()
171 finally:
172 remove_tempfiles(t1, t2, t3, t4)
Georg Brandlc98eeed2006-02-19 14:57:47 +0000173
Guido van Rossumd8faa362007-04-27 19:54:29 +0000174 def test_files_that_dont_end_with_newline(self):
Neal Norwitz2595e762008-03-24 06:10:13 +0000175 t1 = t2 = None
Guido van Rossumd8faa362007-04-27 19:54:29 +0000176 try:
177 t1 = writeTmp(1, ["A\nB\nC"])
178 t2 = writeTmp(2, ["D\nE\nF"])
179 fi = FileInput(files=(t1, t2))
180 lines = list(fi)
181 self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"])
182 self.assertEqual(fi.filelineno(), 3)
183 self.assertEqual(fi.lineno(), 6)
184 finally:
185 remove_tempfiles(t1, t2)
186
Guido van Rossumc43e79f2007-06-18 18:26:36 +0000187## def test_unicode_filenames(self):
188## # XXX A unicode string is always returned by writeTmp.
189## # So is this needed?
190## try:
191## t1 = writeTmp(1, ["A\nB"])
192## encoding = sys.getfilesystemencoding()
193## if encoding is None:
194## encoding = 'ascii'
195## fi = FileInput(files=str(t1, encoding))
196## lines = list(fi)
197## self.assertEqual(lines, ["A\n", "B"])
198## finally:
199## remove_tempfiles(t1)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000200
201 def test_fileno(self):
Neal Norwitz2595e762008-03-24 06:10:13 +0000202 t1 = t2 = None
Guido van Rossumd8faa362007-04-27 19:54:29 +0000203 try:
204 t1 = writeTmp(1, ["A\nB"])
205 t2 = writeTmp(2, ["C\nD"])
206 fi = FileInput(files=(t1, t2))
207 self.assertEqual(fi.fileno(), -1)
208 line =next( fi)
209 self.assertNotEqual(fi.fileno(), -1)
210 fi.nextfile()
211 self.assertEqual(fi.fileno(), -1)
212 line = list(fi)
213 self.assertEqual(fi.fileno(), -1)
214 finally:
215 remove_tempfiles(t1, t2)
216
217 def test_opening_mode(self):
218 try:
219 # invalid mode, should raise ValueError
220 fi = FileInput(mode="w")
221 self.fail("FileInput should reject invalid mode argument")
222 except ValueError:
223 pass
Guido van Rossume22905a2007-08-27 23:09:25 +0000224 t1 = None
Guido van Rossumd8faa362007-04-27 19:54:29 +0000225 try:
226 # try opening in universal newline mode
Guido van Rossume22905a2007-08-27 23:09:25 +0000227 t1 = writeTmp(1, [b"A\nB\r\nC\rD"], mode="wb")
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +0200228 with check_warnings(('', DeprecationWarning)):
229 fi = FileInput(files=t1, mode="U")
230 with check_warnings(('', DeprecationWarning)):
231 lines = list(fi)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000232 self.assertEqual(lines, ["A\n", "B\n", "C\n", "D"])
233 finally:
234 remove_tempfiles(t1)
235
Serhiy Storchaka946cfc32014-05-14 21:08:33 +0300236 def test_stdin_binary_mode(self):
237 with mock.patch('sys.stdin') as m_stdin:
238 m_stdin.buffer = BytesIO(b'spam, bacon, sausage, and spam')
239 fi = FileInput(files=['-'], mode='rb')
240 lines = list(fi)
241 self.assertEqual(lines, [b'spam, bacon, sausage, and spam'])
242
R David Murray830207e2016-01-02 15:41:41 -0500243 def test_detached_stdin_binary_mode(self):
244 orig_stdin = sys.stdin
245 try:
246 sys.stdin = BytesIO(b'spam, bacon, sausage, and spam')
247 self.assertFalse(hasattr(sys.stdin, 'buffer'))
248 fi = FileInput(files=['-'], mode='rb')
249 lines = list(fi)
250 self.assertEqual(lines, [b'spam, bacon, sausage, and spam'])
251 finally:
252 sys.stdin = orig_stdin
253
Guido van Rossume22905a2007-08-27 23:09:25 +0000254 def test_file_opening_hook(self):
255 try:
256 # cannot use openhook and inplace mode
257 fi = FileInput(inplace=1, openhook=lambda f, m: None)
258 self.fail("FileInput should raise if both inplace "
259 "and openhook arguments are given")
260 except ValueError:
261 pass
262 try:
263 fi = FileInput(openhook=1)
264 self.fail("FileInput should check openhook for being callable")
265 except ValueError:
266 pass
briancurtin906f0c42011-03-15 10:29:41 -0400267
268 class CustomOpenHook:
269 def __init__(self):
270 self.invoked = False
271 def __call__(self, *args):
272 self.invoked = True
273 return open(*args)
274
275 t = writeTmp(1, ["\n"])
276 self.addCleanup(remove_tempfiles, t)
277 custom_open_hook = CustomOpenHook()
278 with FileInput([t], openhook=custom_open_hook) as fi:
279 fi.readline()
280 self.assertTrue(custom_open_hook.invoked, "openhook not invoked")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000281
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200282 def test_readline(self):
283 with open(TESTFN, 'wb') as f:
284 f.write(b'A\nB\r\nC\r')
285 # Fill TextIOWrapper buffer.
286 f.write(b'123456789\n' * 1000)
287 # Issue #20501: readline() shouldn't read whole file.
288 f.write(b'\x80')
289 self.addCleanup(safe_unlink, TESTFN)
290
291 with FileInput(files=TESTFN,
292 openhook=hook_encoded('ascii'), bufsize=8) as fi:
Serhiy Storchaka682ea5f2014-03-03 21:17:17 +0200293 try:
294 self.assertEqual(fi.readline(), 'A\n')
295 self.assertEqual(fi.readline(), 'B\n')
296 self.assertEqual(fi.readline(), 'C\n')
297 except UnicodeDecodeError:
298 self.fail('Read to end of file')
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200299 with self.assertRaises(UnicodeDecodeError):
300 # Read to the end of file.
301 list(fi)
Serhiy Storchaka314464d2015-11-01 16:43:58 +0200302 self.assertEqual(fi.readline(), '')
303 self.assertEqual(fi.readline(), '')
304
305 def test_readline_binary_mode(self):
306 with open(TESTFN, 'wb') as f:
307 f.write(b'A\nB\r\nC\rD')
308 self.addCleanup(safe_unlink, TESTFN)
309
310 with FileInput(files=TESTFN, mode='rb') as fi:
311 self.assertEqual(fi.readline(), b'A\n')
312 self.assertEqual(fi.readline(), b'B\r\n')
313 self.assertEqual(fi.readline(), b'C\rD')
314 # Read to the end of file.
315 self.assertEqual(fi.readline(), b'')
316 self.assertEqual(fi.readline(), b'')
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200317
Georg Brandl6cb7b652010-07-31 20:08:15 +0000318 def test_context_manager(self):
319 try:
320 t1 = writeTmp(1, ["A\nB\nC"])
321 t2 = writeTmp(2, ["D\nE\nF"])
322 with FileInput(files=(t1, t2)) as fi:
323 lines = list(fi)
324 self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"])
325 self.assertEqual(fi.filelineno(), 3)
326 self.assertEqual(fi.lineno(), 6)
327 self.assertEqual(fi._files, ())
328 finally:
329 remove_tempfiles(t1, t2)
330
331 def test_close_on_exception(self):
332 try:
333 t1 = writeTmp(1, [""])
334 with FileInput(files=t1) as fi:
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200335 raise OSError
336 except OSError:
Georg Brandl6cb7b652010-07-31 20:08:15 +0000337 self.assertEqual(fi._files, ())
338 finally:
339 remove_tempfiles(t1)
340
briancurtin906f0c42011-03-15 10:29:41 -0400341 def test_empty_files_list_specified_to_constructor(self):
342 with FileInput(files=[]) as fi:
Brett Cannond47af532011-03-15 15:55:12 -0400343 self.assertEqual(fi._files, ('-',))
briancurtin906f0c42011-03-15 10:29:41 -0400344
345 def test__getitem__(self):
346 """Tests invoking FileInput.__getitem__() with the current
347 line number"""
348 t = writeTmp(1, ["line1\n", "line2\n"])
349 self.addCleanup(remove_tempfiles, t)
350 with FileInput(files=[t]) as fi:
351 retval1 = fi[0]
352 self.assertEqual(retval1, "line1\n")
353 retval2 = fi[1]
354 self.assertEqual(retval2, "line2\n")
355
356 def test__getitem__invalid_key(self):
357 """Tests invoking FileInput.__getitem__() with an index unequal to
358 the line number"""
359 t = writeTmp(1, ["line1\n", "line2\n"])
360 self.addCleanup(remove_tempfiles, t)
361 with FileInput(files=[t]) as fi:
362 with self.assertRaises(RuntimeError) as cm:
363 fi[1]
Brett Cannond47af532011-03-15 15:55:12 -0400364 self.assertEqual(cm.exception.args, ("accessing lines out of order",))
briancurtin906f0c42011-03-15 10:29:41 -0400365
366 def test__getitem__eof(self):
367 """Tests invoking FileInput.__getitem__() with the line number but at
368 end-of-input"""
369 t = writeTmp(1, [])
370 self.addCleanup(remove_tempfiles, t)
371 with FileInput(files=[t]) as fi:
372 with self.assertRaises(IndexError) as cm:
373 fi[0]
Brett Cannond47af532011-03-15 15:55:12 -0400374 self.assertEqual(cm.exception.args, ("end of input reached",))
briancurtin906f0c42011-03-15 10:29:41 -0400375
376 def test_nextfile_oserror_deleting_backup(self):
377 """Tests invoking FileInput.nextfile() when the attempt to delete
378 the backup file would raise OSError. This error is expected to be
379 silently ignored"""
380
381 os_unlink_orig = os.unlink
382 os_unlink_replacement = UnconditionallyRaise(OSError)
383 try:
384 t = writeTmp(1, ["\n"])
385 self.addCleanup(remove_tempfiles, t)
386 with FileInput(files=[t], inplace=True) as fi:
387 next(fi) # make sure the file is opened
388 os.unlink = os_unlink_replacement
389 fi.nextfile()
390 finally:
391 os.unlink = os_unlink_orig
392
393 # sanity check to make sure that our test scenario was actually hit
394 self.assertTrue(os_unlink_replacement.invoked,
395 "os.unlink() was not invoked")
396
397 def test_readline_os_fstat_raises_OSError(self):
398 """Tests invoking FileInput.readline() when os.fstat() raises OSError.
399 This exception should be silently discarded."""
400
401 os_fstat_orig = os.fstat
402 os_fstat_replacement = UnconditionallyRaise(OSError)
403 try:
404 t = writeTmp(1, ["\n"])
405 self.addCleanup(remove_tempfiles, t)
406 with FileInput(files=[t], inplace=True) as fi:
407 os.fstat = os_fstat_replacement
408 fi.readline()
409 finally:
410 os.fstat = os_fstat_orig
411
412 # sanity check to make sure that our test scenario was actually hit
413 self.assertTrue(os_fstat_replacement.invoked,
414 "os.fstat() was not invoked")
415
416 @unittest.skipIf(not hasattr(os, "chmod"), "os.chmod does not exist")
417 def test_readline_os_chmod_raises_OSError(self):
418 """Tests invoking FileInput.readline() when os.chmod() raises OSError.
419 This exception should be silently discarded."""
420
421 os_chmod_orig = os.chmod
422 os_chmod_replacement = UnconditionallyRaise(OSError)
423 try:
424 t = writeTmp(1, ["\n"])
425 self.addCleanup(remove_tempfiles, t)
426 with FileInput(files=[t], inplace=True) as fi:
427 os.chmod = os_chmod_replacement
428 fi.readline()
429 finally:
430 os.chmod = os_chmod_orig
431
432 # sanity check to make sure that our test scenario was actually hit
433 self.assertTrue(os_chmod_replacement.invoked,
434 "os.fstat() was not invoked")
435
436 def test_fileno_when_ValueError_raised(self):
437 class FilenoRaisesValueError(UnconditionallyRaise):
438 def __init__(self):
439 UnconditionallyRaise.__init__(self, ValueError)
440 def fileno(self):
441 self.__call__()
442
443 unconditionally_raise_ValueError = FilenoRaisesValueError()
444 t = writeTmp(1, ["\n"])
445 self.addCleanup(remove_tempfiles, t)
446 with FileInput(files=[t]) as fi:
447 file_backup = fi._file
448 try:
449 fi._file = unconditionally_raise_ValueError
450 result = fi.fileno()
451 finally:
452 fi._file = file_backup # make sure the file gets cleaned up
453
454 # sanity check to make sure that our test scenario was actually hit
455 self.assertTrue(unconditionally_raise_ValueError.invoked,
456 "_file.fileno() was not invoked")
457
458 self.assertEqual(result, -1, "fileno() should return -1")
459
460class MockFileInput:
461 """A class that mocks out fileinput.FileInput for use during unit tests"""
462
463 def __init__(self, files=None, inplace=False, backup="", bufsize=0,
464 mode="r", openhook=None):
465 self.files = files
466 self.inplace = inplace
467 self.backup = backup
468 self.bufsize = bufsize
469 self.mode = mode
470 self.openhook = openhook
471 self._file = None
472 self.invocation_counts = collections.defaultdict(lambda: 0)
473 self.return_values = {}
474
475 def close(self):
476 self.invocation_counts["close"] += 1
477
478 def nextfile(self):
479 self.invocation_counts["nextfile"] += 1
480 return self.return_values["nextfile"]
481
482 def filename(self):
483 self.invocation_counts["filename"] += 1
484 return self.return_values["filename"]
485
486 def lineno(self):
487 self.invocation_counts["lineno"] += 1
488 return self.return_values["lineno"]
489
490 def filelineno(self):
491 self.invocation_counts["filelineno"] += 1
492 return self.return_values["filelineno"]
493
494 def fileno(self):
495 self.invocation_counts["fileno"] += 1
496 return self.return_values["fileno"]
497
498 def isfirstline(self):
499 self.invocation_counts["isfirstline"] += 1
500 return self.return_values["isfirstline"]
501
502 def isstdin(self):
503 self.invocation_counts["isstdin"] += 1
504 return self.return_values["isstdin"]
505
506class BaseFileInputGlobalMethodsTest(unittest.TestCase):
507 """Base class for unit tests for the global function of
508 the fileinput module."""
509
510 def setUp(self):
511 self._orig_state = fileinput._state
512 self._orig_FileInput = fileinput.FileInput
513 fileinput.FileInput = MockFileInput
514
515 def tearDown(self):
516 fileinput.FileInput = self._orig_FileInput
517 fileinput._state = self._orig_state
518
519 def assertExactlyOneInvocation(self, mock_file_input, method_name):
520 # assert that the method with the given name was invoked once
521 actual_count = mock_file_input.invocation_counts[method_name]
522 self.assertEqual(actual_count, 1, method_name)
523 # assert that no other unexpected methods were invoked
524 actual_total_count = len(mock_file_input.invocation_counts)
525 self.assertEqual(actual_total_count, 1)
526
527class Test_fileinput_input(BaseFileInputGlobalMethodsTest):
528 """Unit tests for fileinput.input()"""
529
530 def test_state_is_not_None_and_state_file_is_not_None(self):
531 """Tests invoking fileinput.input() when fileinput._state is not None
532 and its _file attribute is also not None. Expect RuntimeError to
533 be raised with a meaningful error message and for fileinput._state
534 to *not* be modified."""
535 instance = MockFileInput()
536 instance._file = object()
537 fileinput._state = instance
538 with self.assertRaises(RuntimeError) as cm:
539 fileinput.input()
540 self.assertEqual(("input() already active",), cm.exception.args)
541 self.assertIs(instance, fileinput._state, "fileinput._state")
542
543 def test_state_is_not_None_and_state_file_is_None(self):
544 """Tests invoking fileinput.input() when fileinput._state is not None
545 but its _file attribute *is* None. Expect it to create and return
546 a new fileinput.FileInput object with all method parameters passed
547 explicitly to the __init__() method; also ensure that
548 fileinput._state is set to the returned instance."""
549 instance = MockFileInput()
550 instance._file = None
551 fileinput._state = instance
552 self.do_test_call_input()
553
554 def test_state_is_None(self):
555 """Tests invoking fileinput.input() when fileinput._state is None
556 Expect it to create and return a new fileinput.FileInput object
557 with all method parameters passed explicitly to the __init__()
558 method; also ensure that fileinput._state is set to the returned
559 instance."""
560 fileinput._state = None
561 self.do_test_call_input()
562
563 def do_test_call_input(self):
564 """Tests that fileinput.input() creates a new fileinput.FileInput
565 object, passing the given parameters unmodified to
566 fileinput.FileInput.__init__(). Note that this test depends on the
567 monkey patching of fileinput.FileInput done by setUp()."""
568 files = object()
569 inplace = object()
570 backup = object()
571 bufsize = object()
572 mode = object()
573 openhook = object()
574
575 # call fileinput.input() with different values for each argument
576 result = fileinput.input(files=files, inplace=inplace, backup=backup,
577 bufsize=bufsize,
578 mode=mode, openhook=openhook)
579
580 # ensure fileinput._state was set to the returned object
581 self.assertIs(result, fileinput._state, "fileinput._state")
582
583 # ensure the parameters to fileinput.input() were passed directly
584 # to FileInput.__init__()
585 self.assertIs(files, result.files, "files")
586 self.assertIs(inplace, result.inplace, "inplace")
587 self.assertIs(backup, result.backup, "backup")
588 self.assertIs(bufsize, result.bufsize, "bufsize")
589 self.assertIs(mode, result.mode, "mode")
590 self.assertIs(openhook, result.openhook, "openhook")
591
592class Test_fileinput_close(BaseFileInputGlobalMethodsTest):
593 """Unit tests for fileinput.close()"""
594
595 def test_state_is_None(self):
596 """Tests that fileinput.close() does nothing if fileinput._state
597 is None"""
598 fileinput._state = None
599 fileinput.close()
600 self.assertIsNone(fileinput._state)
601
602 def test_state_is_not_None(self):
603 """Tests that fileinput.close() invokes close() on fileinput._state
604 and sets _state=None"""
605 instance = MockFileInput()
606 fileinput._state = instance
607 fileinput.close()
608 self.assertExactlyOneInvocation(instance, "close")
609 self.assertIsNone(fileinput._state)
610
611class Test_fileinput_nextfile(BaseFileInputGlobalMethodsTest):
612 """Unit tests for fileinput.nextfile()"""
613
614 def test_state_is_None(self):
615 """Tests fileinput.nextfile() when fileinput._state is None.
616 Ensure that it raises RuntimeError with a meaningful error message
617 and does not modify fileinput._state"""
618 fileinput._state = None
619 with self.assertRaises(RuntimeError) as cm:
620 fileinput.nextfile()
621 self.assertEqual(("no active input()",), cm.exception.args)
622 self.assertIsNone(fileinput._state)
623
624 def test_state_is_not_None(self):
625 """Tests fileinput.nextfile() when fileinput._state is not None.
626 Ensure that it invokes fileinput._state.nextfile() exactly once,
627 returns whatever it returns, and does not modify fileinput._state
628 to point to a different object."""
629 nextfile_retval = object()
630 instance = MockFileInput()
631 instance.return_values["nextfile"] = nextfile_retval
632 fileinput._state = instance
633 retval = fileinput.nextfile()
634 self.assertExactlyOneInvocation(instance, "nextfile")
635 self.assertIs(retval, nextfile_retval)
636 self.assertIs(fileinput._state, instance)
637
638class Test_fileinput_filename(BaseFileInputGlobalMethodsTest):
639 """Unit tests for fileinput.filename()"""
640
641 def test_state_is_None(self):
642 """Tests fileinput.filename() when fileinput._state is None.
643 Ensure that it raises RuntimeError with a meaningful error message
644 and does not modify fileinput._state"""
645 fileinput._state = None
646 with self.assertRaises(RuntimeError) as cm:
647 fileinput.filename()
648 self.assertEqual(("no active input()",), cm.exception.args)
649 self.assertIsNone(fileinput._state)
650
651 def test_state_is_not_None(self):
652 """Tests fileinput.filename() when fileinput._state is not None.
653 Ensure that it invokes fileinput._state.filename() exactly once,
654 returns whatever it returns, and does not modify fileinput._state
655 to point to a different object."""
656 filename_retval = object()
657 instance = MockFileInput()
658 instance.return_values["filename"] = filename_retval
659 fileinput._state = instance
660 retval = fileinput.filename()
661 self.assertExactlyOneInvocation(instance, "filename")
662 self.assertIs(retval, filename_retval)
663 self.assertIs(fileinput._state, instance)
664
665class Test_fileinput_lineno(BaseFileInputGlobalMethodsTest):
666 """Unit tests for fileinput.lineno()"""
667
668 def test_state_is_None(self):
669 """Tests fileinput.lineno() when fileinput._state is None.
670 Ensure that it raises RuntimeError with a meaningful error message
671 and does not modify fileinput._state"""
672 fileinput._state = None
673 with self.assertRaises(RuntimeError) as cm:
674 fileinput.lineno()
675 self.assertEqual(("no active input()",), cm.exception.args)
676 self.assertIsNone(fileinput._state)
677
678 def test_state_is_not_None(self):
679 """Tests fileinput.lineno() when fileinput._state is not None.
680 Ensure that it invokes fileinput._state.lineno() exactly once,
681 returns whatever it returns, and does not modify fileinput._state
682 to point to a different object."""
683 lineno_retval = object()
684 instance = MockFileInput()
685 instance.return_values["lineno"] = lineno_retval
686 fileinput._state = instance
687 retval = fileinput.lineno()
688 self.assertExactlyOneInvocation(instance, "lineno")
689 self.assertIs(retval, lineno_retval)
690 self.assertIs(fileinput._state, instance)
691
692class Test_fileinput_filelineno(BaseFileInputGlobalMethodsTest):
693 """Unit tests for fileinput.filelineno()"""
694
695 def test_state_is_None(self):
696 """Tests fileinput.filelineno() when fileinput._state is None.
697 Ensure that it raises RuntimeError with a meaningful error message
698 and does not modify fileinput._state"""
699 fileinput._state = None
700 with self.assertRaises(RuntimeError) as cm:
701 fileinput.filelineno()
702 self.assertEqual(("no active input()",), cm.exception.args)
703 self.assertIsNone(fileinput._state)
704
705 def test_state_is_not_None(self):
706 """Tests fileinput.filelineno() when fileinput._state is not None.
707 Ensure that it invokes fileinput._state.filelineno() exactly once,
708 returns whatever it returns, and does not modify fileinput._state
709 to point to a different object."""
710 filelineno_retval = object()
711 instance = MockFileInput()
712 instance.return_values["filelineno"] = filelineno_retval
713 fileinput._state = instance
714 retval = fileinput.filelineno()
715 self.assertExactlyOneInvocation(instance, "filelineno")
716 self.assertIs(retval, filelineno_retval)
717 self.assertIs(fileinput._state, instance)
718
719class Test_fileinput_fileno(BaseFileInputGlobalMethodsTest):
720 """Unit tests for fileinput.fileno()"""
721
722 def test_state_is_None(self):
723 """Tests fileinput.fileno() when fileinput._state is None.
724 Ensure that it raises RuntimeError with a meaningful error message
725 and does not modify fileinput._state"""
726 fileinput._state = None
727 with self.assertRaises(RuntimeError) as cm:
728 fileinput.fileno()
729 self.assertEqual(("no active input()",), cm.exception.args)
730 self.assertIsNone(fileinput._state)
731
732 def test_state_is_not_None(self):
733 """Tests fileinput.fileno() when fileinput._state is not None.
734 Ensure that it invokes fileinput._state.fileno() exactly once,
735 returns whatever it returns, and does not modify fileinput._state
736 to point to a different object."""
737 fileno_retval = object()
738 instance = MockFileInput()
739 instance.return_values["fileno"] = fileno_retval
740 instance.fileno_retval = fileno_retval
741 fileinput._state = instance
742 retval = fileinput.fileno()
743 self.assertExactlyOneInvocation(instance, "fileno")
744 self.assertIs(retval, fileno_retval)
745 self.assertIs(fileinput._state, instance)
746
747class Test_fileinput_isfirstline(BaseFileInputGlobalMethodsTest):
748 """Unit tests for fileinput.isfirstline()"""
749
750 def test_state_is_None(self):
751 """Tests fileinput.isfirstline() when fileinput._state is None.
752 Ensure that it raises RuntimeError with a meaningful error message
753 and does not modify fileinput._state"""
754 fileinput._state = None
755 with self.assertRaises(RuntimeError) as cm:
756 fileinput.isfirstline()
757 self.assertEqual(("no active input()",), cm.exception.args)
758 self.assertIsNone(fileinput._state)
759
760 def test_state_is_not_None(self):
761 """Tests fileinput.isfirstline() when fileinput._state is not None.
762 Ensure that it invokes fileinput._state.isfirstline() exactly once,
763 returns whatever it returns, and does not modify fileinput._state
764 to point to a different object."""
765 isfirstline_retval = object()
766 instance = MockFileInput()
767 instance.return_values["isfirstline"] = isfirstline_retval
768 fileinput._state = instance
769 retval = fileinput.isfirstline()
770 self.assertExactlyOneInvocation(instance, "isfirstline")
771 self.assertIs(retval, isfirstline_retval)
772 self.assertIs(fileinput._state, instance)
773
774class Test_fileinput_isstdin(BaseFileInputGlobalMethodsTest):
775 """Unit tests for fileinput.isstdin()"""
776
777 def test_state_is_None(self):
778 """Tests fileinput.isstdin() when fileinput._state is None.
779 Ensure that it raises RuntimeError with a meaningful error message
780 and does not modify fileinput._state"""
781 fileinput._state = None
782 with self.assertRaises(RuntimeError) as cm:
783 fileinput.isstdin()
784 self.assertEqual(("no active input()",), cm.exception.args)
785 self.assertIsNone(fileinput._state)
786
787 def test_state_is_not_None(self):
788 """Tests fileinput.isstdin() when fileinput._state is not None.
789 Ensure that it invokes fileinput._state.isstdin() exactly once,
790 returns whatever it returns, and does not modify fileinput._state
791 to point to a different object."""
792 isstdin_retval = object()
793 instance = MockFileInput()
794 instance.return_values["isstdin"] = isstdin_retval
795 fileinput._state = instance
796 retval = fileinput.isstdin()
797 self.assertExactlyOneInvocation(instance, "isstdin")
798 self.assertIs(retval, isstdin_retval)
799 self.assertIs(fileinput._state, instance)
800
801class InvocationRecorder:
802 def __init__(self):
803 self.invocation_count = 0
804 def __call__(self, *args, **kwargs):
805 self.invocation_count += 1
806 self.last_invocation = (args, kwargs)
807
808class Test_hook_compressed(unittest.TestCase):
809 """Unit tests for fileinput.hook_compressed()"""
810
811 def setUp(self):
812 self.fake_open = InvocationRecorder()
813
814 def test_empty_string(self):
815 self.do_test_use_builtin_open("", 1)
816
817 def test_no_ext(self):
818 self.do_test_use_builtin_open("abcd", 2)
819
Ezio Melottic3afbb92011-05-14 10:10:53 +0300820 @unittest.skipUnless(gzip, "Requires gzip and zlib")
briancurtin5eb35912011-03-15 10:59:36 -0400821 def test_gz_ext_fake(self):
briancurtin906f0c42011-03-15 10:29:41 -0400822 original_open = gzip.open
823 gzip.open = self.fake_open
824 try:
825 result = fileinput.hook_compressed("test.gz", 3)
826 finally:
827 gzip.open = original_open
828
829 self.assertEqual(self.fake_open.invocation_count, 1)
830 self.assertEqual(self.fake_open.last_invocation, (("test.gz", 3), {}))
831
briancurtinf84f3c32011-03-18 13:03:17 -0500832 @unittest.skipUnless(bz2, "Requires bz2")
briancurtin5eb35912011-03-15 10:59:36 -0400833 def test_bz2_ext_fake(self):
briancurtin906f0c42011-03-15 10:29:41 -0400834 original_open = bz2.BZ2File
835 bz2.BZ2File = self.fake_open
836 try:
837 result = fileinput.hook_compressed("test.bz2", 4)
838 finally:
839 bz2.BZ2File = original_open
840
841 self.assertEqual(self.fake_open.invocation_count, 1)
842 self.assertEqual(self.fake_open.last_invocation, (("test.bz2", 4), {}))
843
844 def test_blah_ext(self):
845 self.do_test_use_builtin_open("abcd.blah", 5)
846
briancurtin5eb35912011-03-15 10:59:36 -0400847 def test_gz_ext_builtin(self):
briancurtin906f0c42011-03-15 10:29:41 -0400848 self.do_test_use_builtin_open("abcd.Gz", 6)
849
briancurtin5eb35912011-03-15 10:59:36 -0400850 def test_bz2_ext_builtin(self):
briancurtin906f0c42011-03-15 10:29:41 -0400851 self.do_test_use_builtin_open("abcd.Bz2", 7)
852
853 def do_test_use_builtin_open(self, filename, mode):
854 original_open = self.replace_builtin_open(self.fake_open)
855 try:
856 result = fileinput.hook_compressed(filename, mode)
857 finally:
858 self.replace_builtin_open(original_open)
859
860 self.assertEqual(self.fake_open.invocation_count, 1)
861 self.assertEqual(self.fake_open.last_invocation,
862 ((filename, mode), {}))
863
864 @staticmethod
865 def replace_builtin_open(new_open_func):
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100866 original_open = builtins.open
867 builtins.open = new_open_func
briancurtin906f0c42011-03-15 10:29:41 -0400868 return original_open
869
870class Test_hook_encoded(unittest.TestCase):
871 """Unit tests for fileinput.hook_encoded()"""
872
873 def test(self):
874 encoding = object()
875 result = fileinput.hook_encoded(encoding)
876
877 fake_open = InvocationRecorder()
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100878 original_open = builtins.open
879 builtins.open = fake_open
briancurtin906f0c42011-03-15 10:29:41 -0400880 try:
881 filename = object()
882 mode = object()
883 open_result = result(filename, mode)
884 finally:
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100885 builtins.open = original_open
briancurtin906f0c42011-03-15 10:29:41 -0400886
887 self.assertEqual(fake_open.invocation_count, 1)
888
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100889 args, kwargs = fake_open.last_invocation
briancurtin906f0c42011-03-15 10:29:41 -0400890 self.assertIs(args[0], filename)
891 self.assertIs(args[1], mode)
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100892 self.assertIs(kwargs.pop('encoding'), encoding)
893 self.assertFalse(kwargs)
Georg Brandl6cb7b652010-07-31 20:08:15 +0000894
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200895 def test_modes(self):
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200896 with open(TESTFN, 'wb') as f:
Serhiy Storchaka682ea5f2014-03-03 21:17:17 +0200897 # UTF-7 is a convenient, seldom used encoding
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200898 f.write(b'A\nB\r\nC\rD+IKw-')
899 self.addCleanup(safe_unlink, TESTFN)
900
901 def check(mode, expected_lines):
902 with FileInput(files=TESTFN, mode=mode,
903 openhook=hook_encoded('utf-7')) as fi:
904 lines = list(fi)
905 self.assertEqual(lines, expected_lines)
906
907 check('r', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
Serhiy Storchaka9fff8492014-02-26 21:03:19 +0200908 with self.assertWarns(DeprecationWarning):
909 check('rU', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
910 with self.assertWarns(DeprecationWarning):
911 check('U', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200912 with self.assertRaises(ValueError):
913 check('rb', ['A\n', 'B\r\n', 'C\r', 'D\u20ac'])
914
Guido van Rossumd8faa362007-04-27 19:54:29 +0000915
916if __name__ == "__main__":
Brett Cannon3e9a9ae2013-06-12 21:25:59 -0400917 unittest.main()