blob: 784bc92d23c80d6dfc8b550228fcbdfb8e5cbd7d [file] [log] [blame]
Tim Peters3230d5c2001-07-11 22:21:17 +00001'''
2Tests for fileinput module.
3Nick Mathewson
4'''
Benjamin Petersoneb462882011-03-15 09:50:18 -05005import os
6import sys
7import re
briancurtin906f0c42011-03-15 10:29:41 -04008import fileinput
9import collections
Florent Xiclunaa011e2b2011-11-07 19:43:07 +010010import builtins
Benjamin Petersoneb462882011-03-15 09:50:18 -050011import unittest
12
briancurtinf84f3c32011-03-18 13:03:17 -050013try:
14 import bz2
15except ImportError:
16 bz2 = None
Ezio Melottic3afbb92011-05-14 10:10:53 +030017try:
18 import gzip
19except ImportError:
20 gzip = None
briancurtinf84f3c32011-03-18 13:03:17 -050021
Serhiy Storchaka946cfc32014-05-14 21:08:33 +030022from io import BytesIO, StringIO
Benjamin Petersoneb462882011-03-15 09:50:18 -050023from fileinput import FileInput, hook_encoded
24
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +020025from test.support import verbose, TESTFN, run_unittest, check_warnings
Benjamin Petersoneb462882011-03-15 09:50:18 -050026from test.support import unlink as safe_unlink
Serhiy Storchaka946cfc32014-05-14 21:08:33 +030027from unittest import mock
Benjamin Petersoneb462882011-03-15 09:50:18 -050028
Tim Peters3230d5c2001-07-11 22:21:17 +000029
30# The fileinput module has 2 interfaces: the FileInput class which does
31# all the work, and a few functions (input, etc.) that use a global _state
briancurtin906f0c42011-03-15 10:29:41 -040032# variable.
Tim Peters3230d5c2001-07-11 22:21:17 +000033
34# Write lines (a list of lines) to temp file number i, and return the
35# temp file's name.
Tim Peters4d7cad12006-02-19 21:22:10 +000036def writeTmp(i, lines, mode='w'): # opening in text mode is the default
Tim Peters3230d5c2001-07-11 22:21:17 +000037 name = TESTFN + str(i)
Tim Peters4d7cad12006-02-19 21:22:10 +000038 f = open(name, mode)
Guido van Rossumc43e79f2007-06-18 18:26:36 +000039 for line in lines:
40 f.write(line)
Tim Peters3230d5c2001-07-11 22:21:17 +000041 f.close()
42 return name
43
Tim Peters3230d5c2001-07-11 22:21:17 +000044def remove_tempfiles(*names):
45 for name in names:
Guido van Rossume22905a2007-08-27 23:09:25 +000046 if name:
47 safe_unlink(name)
Tim Peters3230d5c2001-07-11 22:21:17 +000048
Serhiy Storchakacc2dbc52016-03-08 18:28:36 +020049class LineReader:
50
51 def __init__(self):
52 self._linesread = []
53
54 @property
55 def linesread(self):
56 try:
57 return self._linesread[:]
58 finally:
59 self._linesread = []
60
61 def openhook(self, filename, mode):
62 self.it = iter(filename.splitlines(True))
63 return self
64
65 def readline(self, size=None):
66 line = next(self.it, '')
67 self._linesread.append(line)
68 return line
69
70 def readlines(self, hint=-1):
71 lines = []
72 size = 0
73 while True:
74 line = self.readline()
75 if not line:
76 return lines
77 lines.append(line)
78 size += len(line)
79 if size >= hint:
80 return lines
81
82 def close(self):
83 pass
84
Guido van Rossumd8faa362007-04-27 19:54:29 +000085class BufferSizesTests(unittest.TestCase):
86 def test_buffer_sizes(self):
87 # First, run the tests with default and teeny buffer size.
88 for round, bs in (0, 0), (1, 30):
Neal Norwitz2595e762008-03-24 06:10:13 +000089 t1 = t2 = t3 = t4 = None
Guido van Rossumd8faa362007-04-27 19:54:29 +000090 try:
91 t1 = writeTmp(1, ["Line %s of file 1\n" % (i+1) for i in range(15)])
92 t2 = writeTmp(2, ["Line %s of file 2\n" % (i+1) for i in range(10)])
93 t3 = writeTmp(3, ["Line %s of file 3\n" % (i+1) for i in range(5)])
94 t4 = writeTmp(4, ["Line %s of file 4\n" % (i+1) for i in range(1)])
95 self.buffer_size_test(t1, t2, t3, t4, bs, round)
96 finally:
97 remove_tempfiles(t1, t2, t3, t4)
Tim Peters3230d5c2001-07-11 22:21:17 +000098
Guido van Rossumd8faa362007-04-27 19:54:29 +000099 def buffer_size_test(self, t1, t2, t3, t4, bs=0, round=0):
100 pat = re.compile(r'LINE (\d+) OF FILE (\d+)')
Tim Peters3230d5c2001-07-11 22:21:17 +0000101
Guido van Rossumd8faa362007-04-27 19:54:29 +0000102 start = 1 + round*6
103 if verbose:
104 print('%s. Simple iteration (bs=%s)' % (start+0, bs))
105 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
Tim Peters3230d5c2001-07-11 22:21:17 +0000106 lines = list(fi)
Tim Peters3230d5c2001-07-11 22:21:17 +0000107 fi.close()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000108 self.assertEqual(len(lines), 31)
109 self.assertEqual(lines[4], 'Line 5 of file 1\n')
110 self.assertEqual(lines[30], 'Line 1 of file 4\n')
111 self.assertEqual(fi.lineno(), 31)
112 self.assertEqual(fi.filename(), t4)
Tim Peters3230d5c2001-07-11 22:21:17 +0000113
Guido van Rossumd8faa362007-04-27 19:54:29 +0000114 if verbose:
115 print('%s. Status variables (bs=%s)' % (start+1, bs))
116 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
117 s = "x"
118 while s and s != 'Line 6 of file 2\n':
119 s = fi.readline()
120 self.assertEqual(fi.filename(), t2)
121 self.assertEqual(fi.lineno(), 21)
122 self.assertEqual(fi.filelineno(), 6)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000123 self.assertFalse(fi.isfirstline())
124 self.assertFalse(fi.isstdin())
Tim Peters3230d5c2001-07-11 22:21:17 +0000125
Guido van Rossumd8faa362007-04-27 19:54:29 +0000126 if verbose:
127 print('%s. Nextfile (bs=%s)' % (start+2, bs))
128 fi.nextfile()
129 self.assertEqual(fi.readline(), 'Line 1 of file 3\n')
130 self.assertEqual(fi.lineno(), 22)
131 fi.close()
Tim Peters3230d5c2001-07-11 22:21:17 +0000132
Guido van Rossumd8faa362007-04-27 19:54:29 +0000133 if verbose:
134 print('%s. Stdin (bs=%s)' % (start+3, bs))
135 fi = FileInput(files=(t1, t2, t3, t4, '-'), bufsize=bs)
136 savestdin = sys.stdin
137 try:
138 sys.stdin = StringIO("Line 1 of stdin\nLine 2 of stdin\n")
139 lines = list(fi)
140 self.assertEqual(len(lines), 33)
141 self.assertEqual(lines[32], 'Line 2 of stdin\n')
142 self.assertEqual(fi.filename(), '<stdin>')
143 fi.nextfile()
144 finally:
145 sys.stdin = savestdin
Tim Peters3230d5c2001-07-11 22:21:17 +0000146
Guido van Rossumd8faa362007-04-27 19:54:29 +0000147 if verbose:
148 print('%s. Boundary conditions (bs=%s)' % (start+4, bs))
149 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
150 self.assertEqual(fi.lineno(), 0)
151 self.assertEqual(fi.filename(), None)
152 fi.nextfile()
153 self.assertEqual(fi.lineno(), 0)
154 self.assertEqual(fi.filename(), None)
Tim Peters3230d5c2001-07-11 22:21:17 +0000155
Guido van Rossumd8faa362007-04-27 19:54:29 +0000156 if verbose:
157 print('%s. Inplace (bs=%s)' % (start+5, bs))
158 savestdout = sys.stdout
159 try:
160 fi = FileInput(files=(t1, t2, t3, t4), inplace=1, bufsize=bs)
161 for line in fi:
162 line = line[:-1].upper()
163 print(line)
164 fi.close()
165 finally:
166 sys.stdout = savestdout
Tim Peters3230d5c2001-07-11 22:21:17 +0000167
Guido van Rossumd8faa362007-04-27 19:54:29 +0000168 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
169 for line in fi:
170 self.assertEqual(line[-1], '\n')
171 m = pat.match(line[:-1])
172 self.assertNotEqual(m, None)
173 self.assertEqual(int(m.group(1)), fi.filelineno())
174 fi.close()
Georg Brandle4662172006-02-19 09:51:27 +0000175
briancurtin906f0c42011-03-15 10:29:41 -0400176class UnconditionallyRaise:
177 def __init__(self, exception_type):
178 self.exception_type = exception_type
179 self.invoked = False
180 def __call__(self, *args, **kwargs):
181 self.invoked = True
182 raise self.exception_type()
183
Guido van Rossumd8faa362007-04-27 19:54:29 +0000184class FileInputTests(unittest.TestCase):
briancurtin906f0c42011-03-15 10:29:41 -0400185
Guido van Rossumd8faa362007-04-27 19:54:29 +0000186 def test_zero_byte_files(self):
Neal Norwitz2595e762008-03-24 06:10:13 +0000187 t1 = t2 = t3 = t4 = None
Guido van Rossumd8faa362007-04-27 19:54:29 +0000188 try:
189 t1 = writeTmp(1, [""])
190 t2 = writeTmp(2, [""])
191 t3 = writeTmp(3, ["The only line there is.\n"])
192 t4 = writeTmp(4, [""])
193 fi = FileInput(files=(t1, t2, t3, t4))
Georg Brandl67e9fb92006-02-19 13:56:17 +0000194
Guido van Rossumd8faa362007-04-27 19:54:29 +0000195 line = fi.readline()
196 self.assertEqual(line, 'The only line there is.\n')
197 self.assertEqual(fi.lineno(), 1)
198 self.assertEqual(fi.filelineno(), 1)
199 self.assertEqual(fi.filename(), t3)
Georg Brandlc029f872006-02-19 14:12:34 +0000200
Guido van Rossumd8faa362007-04-27 19:54:29 +0000201 line = fi.readline()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000202 self.assertFalse(line)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000203 self.assertEqual(fi.lineno(), 1)
204 self.assertEqual(fi.filelineno(), 0)
205 self.assertEqual(fi.filename(), t4)
206 fi.close()
207 finally:
208 remove_tempfiles(t1, t2, t3, t4)
Georg Brandlc98eeed2006-02-19 14:57:47 +0000209
Guido van Rossumd8faa362007-04-27 19:54:29 +0000210 def test_files_that_dont_end_with_newline(self):
Neal Norwitz2595e762008-03-24 06:10:13 +0000211 t1 = t2 = None
Guido van Rossumd8faa362007-04-27 19:54:29 +0000212 try:
213 t1 = writeTmp(1, ["A\nB\nC"])
214 t2 = writeTmp(2, ["D\nE\nF"])
215 fi = FileInput(files=(t1, t2))
216 lines = list(fi)
217 self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"])
218 self.assertEqual(fi.filelineno(), 3)
219 self.assertEqual(fi.lineno(), 6)
220 finally:
221 remove_tempfiles(t1, t2)
222
Guido van Rossumc43e79f2007-06-18 18:26:36 +0000223## def test_unicode_filenames(self):
224## # XXX A unicode string is always returned by writeTmp.
225## # So is this needed?
226## try:
227## t1 = writeTmp(1, ["A\nB"])
228## encoding = sys.getfilesystemencoding()
229## if encoding is None:
230## encoding = 'ascii'
231## fi = FileInput(files=str(t1, encoding))
232## lines = list(fi)
233## self.assertEqual(lines, ["A\n", "B"])
234## finally:
235## remove_tempfiles(t1)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000236
237 def test_fileno(self):
Neal Norwitz2595e762008-03-24 06:10:13 +0000238 t1 = t2 = None
Guido van Rossumd8faa362007-04-27 19:54:29 +0000239 try:
240 t1 = writeTmp(1, ["A\nB"])
241 t2 = writeTmp(2, ["C\nD"])
242 fi = FileInput(files=(t1, t2))
243 self.assertEqual(fi.fileno(), -1)
244 line =next( fi)
245 self.assertNotEqual(fi.fileno(), -1)
246 fi.nextfile()
247 self.assertEqual(fi.fileno(), -1)
248 line = list(fi)
249 self.assertEqual(fi.fileno(), -1)
250 finally:
251 remove_tempfiles(t1, t2)
252
253 def test_opening_mode(self):
254 try:
255 # invalid mode, should raise ValueError
256 fi = FileInput(mode="w")
257 self.fail("FileInput should reject invalid mode argument")
258 except ValueError:
259 pass
Guido van Rossume22905a2007-08-27 23:09:25 +0000260 t1 = None
Guido van Rossumd8faa362007-04-27 19:54:29 +0000261 try:
262 # try opening in universal newline mode
Guido van Rossume22905a2007-08-27 23:09:25 +0000263 t1 = writeTmp(1, [b"A\nB\r\nC\rD"], mode="wb")
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +0200264 with check_warnings(('', DeprecationWarning)):
265 fi = FileInput(files=t1, mode="U")
266 with check_warnings(('', DeprecationWarning)):
267 lines = list(fi)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000268 self.assertEqual(lines, ["A\n", "B\n", "C\n", "D"])
269 finally:
270 remove_tempfiles(t1)
271
Serhiy Storchaka946cfc32014-05-14 21:08:33 +0300272 def test_stdin_binary_mode(self):
273 with mock.patch('sys.stdin') as m_stdin:
274 m_stdin.buffer = BytesIO(b'spam, bacon, sausage, and spam')
275 fi = FileInput(files=['-'], mode='rb')
276 lines = list(fi)
277 self.assertEqual(lines, [b'spam, bacon, sausage, and spam'])
278
R David Murray830207e2016-01-02 15:41:41 -0500279 def test_detached_stdin_binary_mode(self):
280 orig_stdin = sys.stdin
281 try:
282 sys.stdin = BytesIO(b'spam, bacon, sausage, and spam')
283 self.assertFalse(hasattr(sys.stdin, 'buffer'))
284 fi = FileInput(files=['-'], mode='rb')
285 lines = list(fi)
286 self.assertEqual(lines, [b'spam, bacon, sausage, and spam'])
287 finally:
288 sys.stdin = orig_stdin
289
Guido van Rossume22905a2007-08-27 23:09:25 +0000290 def test_file_opening_hook(self):
291 try:
292 # cannot use openhook and inplace mode
293 fi = FileInput(inplace=1, openhook=lambda f, m: None)
294 self.fail("FileInput should raise if both inplace "
295 "and openhook arguments are given")
296 except ValueError:
297 pass
298 try:
299 fi = FileInput(openhook=1)
300 self.fail("FileInput should check openhook for being callable")
301 except ValueError:
302 pass
briancurtin906f0c42011-03-15 10:29:41 -0400303
304 class CustomOpenHook:
305 def __init__(self):
306 self.invoked = False
307 def __call__(self, *args):
308 self.invoked = True
309 return open(*args)
310
311 t = writeTmp(1, ["\n"])
312 self.addCleanup(remove_tempfiles, t)
313 custom_open_hook = CustomOpenHook()
314 with FileInput([t], openhook=custom_open_hook) as fi:
315 fi.readline()
316 self.assertTrue(custom_open_hook.invoked, "openhook not invoked")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000317
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200318 def test_readline(self):
319 with open(TESTFN, 'wb') as f:
320 f.write(b'A\nB\r\nC\r')
321 # Fill TextIOWrapper buffer.
322 f.write(b'123456789\n' * 1000)
323 # Issue #20501: readline() shouldn't read whole file.
324 f.write(b'\x80')
325 self.addCleanup(safe_unlink, TESTFN)
326
327 with FileInput(files=TESTFN,
Serhiy Storchakacc2dbc52016-03-08 18:28:36 +0200328 openhook=hook_encoded('ascii')) as fi:
Serhiy Storchaka682ea5f2014-03-03 21:17:17 +0200329 try:
330 self.assertEqual(fi.readline(), 'A\n')
331 self.assertEqual(fi.readline(), 'B\n')
332 self.assertEqual(fi.readline(), 'C\n')
333 except UnicodeDecodeError:
334 self.fail('Read to end of file')
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200335 with self.assertRaises(UnicodeDecodeError):
336 # Read to the end of file.
337 list(fi)
Serhiy Storchaka314464d2015-11-01 16:43:58 +0200338 self.assertEqual(fi.readline(), '')
339 self.assertEqual(fi.readline(), '')
340
341 def test_readline_binary_mode(self):
342 with open(TESTFN, 'wb') as f:
343 f.write(b'A\nB\r\nC\rD')
344 self.addCleanup(safe_unlink, TESTFN)
345
346 with FileInput(files=TESTFN, mode='rb') as fi:
347 self.assertEqual(fi.readline(), b'A\n')
348 self.assertEqual(fi.readline(), b'B\r\n')
349 self.assertEqual(fi.readline(), b'C\rD')
350 # Read to the end of file.
351 self.assertEqual(fi.readline(), b'')
352 self.assertEqual(fi.readline(), b'')
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200353
Georg Brandl6cb7b652010-07-31 20:08:15 +0000354 def test_context_manager(self):
355 try:
356 t1 = writeTmp(1, ["A\nB\nC"])
357 t2 = writeTmp(2, ["D\nE\nF"])
358 with FileInput(files=(t1, t2)) as fi:
359 lines = list(fi)
360 self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"])
361 self.assertEqual(fi.filelineno(), 3)
362 self.assertEqual(fi.lineno(), 6)
363 self.assertEqual(fi._files, ())
364 finally:
365 remove_tempfiles(t1, t2)
366
367 def test_close_on_exception(self):
368 try:
369 t1 = writeTmp(1, [""])
370 with FileInput(files=t1) as fi:
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200371 raise OSError
372 except OSError:
Georg Brandl6cb7b652010-07-31 20:08:15 +0000373 self.assertEqual(fi._files, ())
374 finally:
375 remove_tempfiles(t1)
376
briancurtin906f0c42011-03-15 10:29:41 -0400377 def test_empty_files_list_specified_to_constructor(self):
378 with FileInput(files=[]) as fi:
Brett Cannond47af532011-03-15 15:55:12 -0400379 self.assertEqual(fi._files, ('-',))
briancurtin906f0c42011-03-15 10:29:41 -0400380
381 def test__getitem__(self):
382 """Tests invoking FileInput.__getitem__() with the current
383 line number"""
384 t = writeTmp(1, ["line1\n", "line2\n"])
385 self.addCleanup(remove_tempfiles, t)
386 with FileInput(files=[t]) as fi:
387 retval1 = fi[0]
388 self.assertEqual(retval1, "line1\n")
389 retval2 = fi[1]
390 self.assertEqual(retval2, "line2\n")
391
392 def test__getitem__invalid_key(self):
393 """Tests invoking FileInput.__getitem__() with an index unequal to
394 the line number"""
395 t = writeTmp(1, ["line1\n", "line2\n"])
396 self.addCleanup(remove_tempfiles, t)
397 with FileInput(files=[t]) as fi:
398 with self.assertRaises(RuntimeError) as cm:
399 fi[1]
Brett Cannond47af532011-03-15 15:55:12 -0400400 self.assertEqual(cm.exception.args, ("accessing lines out of order",))
briancurtin906f0c42011-03-15 10:29:41 -0400401
402 def test__getitem__eof(self):
403 """Tests invoking FileInput.__getitem__() with the line number but at
404 end-of-input"""
405 t = writeTmp(1, [])
406 self.addCleanup(remove_tempfiles, t)
407 with FileInput(files=[t]) as fi:
408 with self.assertRaises(IndexError) as cm:
409 fi[0]
Brett Cannond47af532011-03-15 15:55:12 -0400410 self.assertEqual(cm.exception.args, ("end of input reached",))
briancurtin906f0c42011-03-15 10:29:41 -0400411
412 def test_nextfile_oserror_deleting_backup(self):
413 """Tests invoking FileInput.nextfile() when the attempt to delete
414 the backup file would raise OSError. This error is expected to be
415 silently ignored"""
416
417 os_unlink_orig = os.unlink
418 os_unlink_replacement = UnconditionallyRaise(OSError)
419 try:
420 t = writeTmp(1, ["\n"])
421 self.addCleanup(remove_tempfiles, t)
422 with FileInput(files=[t], inplace=True) as fi:
423 next(fi) # make sure the file is opened
424 os.unlink = os_unlink_replacement
425 fi.nextfile()
426 finally:
427 os.unlink = os_unlink_orig
428
429 # sanity check to make sure that our test scenario was actually hit
430 self.assertTrue(os_unlink_replacement.invoked,
431 "os.unlink() was not invoked")
432
433 def test_readline_os_fstat_raises_OSError(self):
434 """Tests invoking FileInput.readline() when os.fstat() raises OSError.
435 This exception should be silently discarded."""
436
437 os_fstat_orig = os.fstat
438 os_fstat_replacement = UnconditionallyRaise(OSError)
439 try:
440 t = writeTmp(1, ["\n"])
441 self.addCleanup(remove_tempfiles, t)
442 with FileInput(files=[t], inplace=True) as fi:
443 os.fstat = os_fstat_replacement
444 fi.readline()
445 finally:
446 os.fstat = os_fstat_orig
447
448 # sanity check to make sure that our test scenario was actually hit
449 self.assertTrue(os_fstat_replacement.invoked,
450 "os.fstat() was not invoked")
451
452 @unittest.skipIf(not hasattr(os, "chmod"), "os.chmod does not exist")
453 def test_readline_os_chmod_raises_OSError(self):
454 """Tests invoking FileInput.readline() when os.chmod() raises OSError.
455 This exception should be silently discarded."""
456
457 os_chmod_orig = os.chmod
458 os_chmod_replacement = UnconditionallyRaise(OSError)
459 try:
460 t = writeTmp(1, ["\n"])
461 self.addCleanup(remove_tempfiles, t)
462 with FileInput(files=[t], inplace=True) as fi:
463 os.chmod = os_chmod_replacement
464 fi.readline()
465 finally:
466 os.chmod = os_chmod_orig
467
468 # sanity check to make sure that our test scenario was actually hit
469 self.assertTrue(os_chmod_replacement.invoked,
470 "os.fstat() was not invoked")
471
472 def test_fileno_when_ValueError_raised(self):
473 class FilenoRaisesValueError(UnconditionallyRaise):
474 def __init__(self):
475 UnconditionallyRaise.__init__(self, ValueError)
476 def fileno(self):
477 self.__call__()
478
479 unconditionally_raise_ValueError = FilenoRaisesValueError()
480 t = writeTmp(1, ["\n"])
481 self.addCleanup(remove_tempfiles, t)
482 with FileInput(files=[t]) as fi:
483 file_backup = fi._file
484 try:
485 fi._file = unconditionally_raise_ValueError
486 result = fi.fileno()
487 finally:
488 fi._file = file_backup # make sure the file gets cleaned up
489
490 # sanity check to make sure that our test scenario was actually hit
491 self.assertTrue(unconditionally_raise_ValueError.invoked,
492 "_file.fileno() was not invoked")
493
494 self.assertEqual(result, -1, "fileno() should return -1")
495
Serhiy Storchakacc2dbc52016-03-08 18:28:36 +0200496 def test_readline_buffering(self):
497 src = LineReader()
498 with FileInput(files=['line1\nline2', 'line3\n'],
499 openhook=src.openhook) as fi:
500 self.assertEqual(src.linesread, [])
501 self.assertEqual(fi.readline(), 'line1\n')
502 self.assertEqual(src.linesread, ['line1\n'])
503 self.assertEqual(fi.readline(), 'line2')
504 self.assertEqual(src.linesread, ['line2'])
505 self.assertEqual(fi.readline(), 'line3\n')
506 self.assertEqual(src.linesread, ['', 'line3\n'])
507 self.assertEqual(fi.readline(), '')
508 self.assertEqual(src.linesread, [''])
509 self.assertEqual(fi.readline(), '')
510 self.assertEqual(src.linesread, [])
511
512 def test_iteration_buffering(self):
513 src = LineReader()
514 with FileInput(files=['line1\nline2', 'line3\n'],
515 openhook=src.openhook) as fi:
516 self.assertEqual(src.linesread, [])
517 self.assertEqual(next(fi), 'line1\n')
518 self.assertEqual(src.linesread, ['line1\n'])
519 self.assertEqual(next(fi), 'line2')
520 self.assertEqual(src.linesread, ['line2'])
521 self.assertEqual(next(fi), 'line3\n')
522 self.assertEqual(src.linesread, ['', 'line3\n'])
523 self.assertRaises(StopIteration, next, fi)
524 self.assertEqual(src.linesread, [''])
525 self.assertRaises(StopIteration, next, fi)
526 self.assertEqual(src.linesread, [])
527
briancurtin906f0c42011-03-15 10:29:41 -0400528class MockFileInput:
529 """A class that mocks out fileinput.FileInput for use during unit tests"""
530
531 def __init__(self, files=None, inplace=False, backup="", bufsize=0,
532 mode="r", openhook=None):
533 self.files = files
534 self.inplace = inplace
535 self.backup = backup
536 self.bufsize = bufsize
537 self.mode = mode
538 self.openhook = openhook
539 self._file = None
540 self.invocation_counts = collections.defaultdict(lambda: 0)
541 self.return_values = {}
542
543 def close(self):
544 self.invocation_counts["close"] += 1
545
546 def nextfile(self):
547 self.invocation_counts["nextfile"] += 1
548 return self.return_values["nextfile"]
549
550 def filename(self):
551 self.invocation_counts["filename"] += 1
552 return self.return_values["filename"]
553
554 def lineno(self):
555 self.invocation_counts["lineno"] += 1
556 return self.return_values["lineno"]
557
558 def filelineno(self):
559 self.invocation_counts["filelineno"] += 1
560 return self.return_values["filelineno"]
561
562 def fileno(self):
563 self.invocation_counts["fileno"] += 1
564 return self.return_values["fileno"]
565
566 def isfirstline(self):
567 self.invocation_counts["isfirstline"] += 1
568 return self.return_values["isfirstline"]
569
570 def isstdin(self):
571 self.invocation_counts["isstdin"] += 1
572 return self.return_values["isstdin"]
573
574class BaseFileInputGlobalMethodsTest(unittest.TestCase):
575 """Base class for unit tests for the global function of
576 the fileinput module."""
577
578 def setUp(self):
579 self._orig_state = fileinput._state
580 self._orig_FileInput = fileinput.FileInput
581 fileinput.FileInput = MockFileInput
582
583 def tearDown(self):
584 fileinput.FileInput = self._orig_FileInput
585 fileinput._state = self._orig_state
586
587 def assertExactlyOneInvocation(self, mock_file_input, method_name):
588 # assert that the method with the given name was invoked once
589 actual_count = mock_file_input.invocation_counts[method_name]
590 self.assertEqual(actual_count, 1, method_name)
591 # assert that no other unexpected methods were invoked
592 actual_total_count = len(mock_file_input.invocation_counts)
593 self.assertEqual(actual_total_count, 1)
594
595class Test_fileinput_input(BaseFileInputGlobalMethodsTest):
596 """Unit tests for fileinput.input()"""
597
598 def test_state_is_not_None_and_state_file_is_not_None(self):
599 """Tests invoking fileinput.input() when fileinput._state is not None
600 and its _file attribute is also not None. Expect RuntimeError to
601 be raised with a meaningful error message and for fileinput._state
602 to *not* be modified."""
603 instance = MockFileInput()
604 instance._file = object()
605 fileinput._state = instance
606 with self.assertRaises(RuntimeError) as cm:
607 fileinput.input()
608 self.assertEqual(("input() already active",), cm.exception.args)
609 self.assertIs(instance, fileinput._state, "fileinput._state")
610
611 def test_state_is_not_None_and_state_file_is_None(self):
612 """Tests invoking fileinput.input() when fileinput._state is not None
613 but its _file attribute *is* None. Expect it to create and return
614 a new fileinput.FileInput object with all method parameters passed
615 explicitly to the __init__() method; also ensure that
616 fileinput._state is set to the returned instance."""
617 instance = MockFileInput()
618 instance._file = None
619 fileinput._state = instance
620 self.do_test_call_input()
621
622 def test_state_is_None(self):
623 """Tests invoking fileinput.input() when fileinput._state is None
624 Expect it to create and return a new fileinput.FileInput object
625 with all method parameters passed explicitly to the __init__()
626 method; also ensure that fileinput._state is set to the returned
627 instance."""
628 fileinput._state = None
629 self.do_test_call_input()
630
631 def do_test_call_input(self):
632 """Tests that fileinput.input() creates a new fileinput.FileInput
633 object, passing the given parameters unmodified to
634 fileinput.FileInput.__init__(). Note that this test depends on the
635 monkey patching of fileinput.FileInput done by setUp()."""
636 files = object()
637 inplace = object()
638 backup = object()
639 bufsize = object()
640 mode = object()
641 openhook = object()
642
643 # call fileinput.input() with different values for each argument
644 result = fileinput.input(files=files, inplace=inplace, backup=backup,
645 bufsize=bufsize,
646 mode=mode, openhook=openhook)
647
648 # ensure fileinput._state was set to the returned object
649 self.assertIs(result, fileinput._state, "fileinput._state")
650
651 # ensure the parameters to fileinput.input() were passed directly
652 # to FileInput.__init__()
653 self.assertIs(files, result.files, "files")
654 self.assertIs(inplace, result.inplace, "inplace")
655 self.assertIs(backup, result.backup, "backup")
656 self.assertIs(bufsize, result.bufsize, "bufsize")
657 self.assertIs(mode, result.mode, "mode")
658 self.assertIs(openhook, result.openhook, "openhook")
659
660class Test_fileinput_close(BaseFileInputGlobalMethodsTest):
661 """Unit tests for fileinput.close()"""
662
663 def test_state_is_None(self):
664 """Tests that fileinput.close() does nothing if fileinput._state
665 is None"""
666 fileinput._state = None
667 fileinput.close()
668 self.assertIsNone(fileinput._state)
669
670 def test_state_is_not_None(self):
671 """Tests that fileinput.close() invokes close() on fileinput._state
672 and sets _state=None"""
673 instance = MockFileInput()
674 fileinput._state = instance
675 fileinput.close()
676 self.assertExactlyOneInvocation(instance, "close")
677 self.assertIsNone(fileinput._state)
678
679class Test_fileinput_nextfile(BaseFileInputGlobalMethodsTest):
680 """Unit tests for fileinput.nextfile()"""
681
682 def test_state_is_None(self):
683 """Tests fileinput.nextfile() when fileinput._state is None.
684 Ensure that it raises RuntimeError with a meaningful error message
685 and does not modify fileinput._state"""
686 fileinput._state = None
687 with self.assertRaises(RuntimeError) as cm:
688 fileinput.nextfile()
689 self.assertEqual(("no active input()",), cm.exception.args)
690 self.assertIsNone(fileinput._state)
691
692 def test_state_is_not_None(self):
693 """Tests fileinput.nextfile() when fileinput._state is not None.
694 Ensure that it invokes fileinput._state.nextfile() exactly once,
695 returns whatever it returns, and does not modify fileinput._state
696 to point to a different object."""
697 nextfile_retval = object()
698 instance = MockFileInput()
699 instance.return_values["nextfile"] = nextfile_retval
700 fileinput._state = instance
701 retval = fileinput.nextfile()
702 self.assertExactlyOneInvocation(instance, "nextfile")
703 self.assertIs(retval, nextfile_retval)
704 self.assertIs(fileinput._state, instance)
705
706class Test_fileinput_filename(BaseFileInputGlobalMethodsTest):
707 """Unit tests for fileinput.filename()"""
708
709 def test_state_is_None(self):
710 """Tests fileinput.filename() when fileinput._state is None.
711 Ensure that it raises RuntimeError with a meaningful error message
712 and does not modify fileinput._state"""
713 fileinput._state = None
714 with self.assertRaises(RuntimeError) as cm:
715 fileinput.filename()
716 self.assertEqual(("no active input()",), cm.exception.args)
717 self.assertIsNone(fileinput._state)
718
719 def test_state_is_not_None(self):
720 """Tests fileinput.filename() when fileinput._state is not None.
721 Ensure that it invokes fileinput._state.filename() exactly once,
722 returns whatever it returns, and does not modify fileinput._state
723 to point to a different object."""
724 filename_retval = object()
725 instance = MockFileInput()
726 instance.return_values["filename"] = filename_retval
727 fileinput._state = instance
728 retval = fileinput.filename()
729 self.assertExactlyOneInvocation(instance, "filename")
730 self.assertIs(retval, filename_retval)
731 self.assertIs(fileinput._state, instance)
732
733class Test_fileinput_lineno(BaseFileInputGlobalMethodsTest):
734 """Unit tests for fileinput.lineno()"""
735
736 def test_state_is_None(self):
737 """Tests fileinput.lineno() when fileinput._state is None.
738 Ensure that it raises RuntimeError with a meaningful error message
739 and does not modify fileinput._state"""
740 fileinput._state = None
741 with self.assertRaises(RuntimeError) as cm:
742 fileinput.lineno()
743 self.assertEqual(("no active input()",), cm.exception.args)
744 self.assertIsNone(fileinput._state)
745
746 def test_state_is_not_None(self):
747 """Tests fileinput.lineno() when fileinput._state is not None.
748 Ensure that it invokes fileinput._state.lineno() exactly once,
749 returns whatever it returns, and does not modify fileinput._state
750 to point to a different object."""
751 lineno_retval = object()
752 instance = MockFileInput()
753 instance.return_values["lineno"] = lineno_retval
754 fileinput._state = instance
755 retval = fileinput.lineno()
756 self.assertExactlyOneInvocation(instance, "lineno")
757 self.assertIs(retval, lineno_retval)
758 self.assertIs(fileinput._state, instance)
759
760class Test_fileinput_filelineno(BaseFileInputGlobalMethodsTest):
761 """Unit tests for fileinput.filelineno()"""
762
763 def test_state_is_None(self):
764 """Tests fileinput.filelineno() when fileinput._state is None.
765 Ensure that it raises RuntimeError with a meaningful error message
766 and does not modify fileinput._state"""
767 fileinput._state = None
768 with self.assertRaises(RuntimeError) as cm:
769 fileinput.filelineno()
770 self.assertEqual(("no active input()",), cm.exception.args)
771 self.assertIsNone(fileinput._state)
772
773 def test_state_is_not_None(self):
774 """Tests fileinput.filelineno() when fileinput._state is not None.
775 Ensure that it invokes fileinput._state.filelineno() exactly once,
776 returns whatever it returns, and does not modify fileinput._state
777 to point to a different object."""
778 filelineno_retval = object()
779 instance = MockFileInput()
780 instance.return_values["filelineno"] = filelineno_retval
781 fileinput._state = instance
782 retval = fileinput.filelineno()
783 self.assertExactlyOneInvocation(instance, "filelineno")
784 self.assertIs(retval, filelineno_retval)
785 self.assertIs(fileinput._state, instance)
786
787class Test_fileinput_fileno(BaseFileInputGlobalMethodsTest):
788 """Unit tests for fileinput.fileno()"""
789
790 def test_state_is_None(self):
791 """Tests fileinput.fileno() when fileinput._state is None.
792 Ensure that it raises RuntimeError with a meaningful error message
793 and does not modify fileinput._state"""
794 fileinput._state = None
795 with self.assertRaises(RuntimeError) as cm:
796 fileinput.fileno()
797 self.assertEqual(("no active input()",), cm.exception.args)
798 self.assertIsNone(fileinput._state)
799
800 def test_state_is_not_None(self):
801 """Tests fileinput.fileno() when fileinput._state is not None.
802 Ensure that it invokes fileinput._state.fileno() exactly once,
803 returns whatever it returns, and does not modify fileinput._state
804 to point to a different object."""
805 fileno_retval = object()
806 instance = MockFileInput()
807 instance.return_values["fileno"] = fileno_retval
808 instance.fileno_retval = fileno_retval
809 fileinput._state = instance
810 retval = fileinput.fileno()
811 self.assertExactlyOneInvocation(instance, "fileno")
812 self.assertIs(retval, fileno_retval)
813 self.assertIs(fileinput._state, instance)
814
815class Test_fileinput_isfirstline(BaseFileInputGlobalMethodsTest):
816 """Unit tests for fileinput.isfirstline()"""
817
818 def test_state_is_None(self):
819 """Tests fileinput.isfirstline() when fileinput._state is None.
820 Ensure that it raises RuntimeError with a meaningful error message
821 and does not modify fileinput._state"""
822 fileinput._state = None
823 with self.assertRaises(RuntimeError) as cm:
824 fileinput.isfirstline()
825 self.assertEqual(("no active input()",), cm.exception.args)
826 self.assertIsNone(fileinput._state)
827
828 def test_state_is_not_None(self):
829 """Tests fileinput.isfirstline() when fileinput._state is not None.
830 Ensure that it invokes fileinput._state.isfirstline() exactly once,
831 returns whatever it returns, and does not modify fileinput._state
832 to point to a different object."""
833 isfirstline_retval = object()
834 instance = MockFileInput()
835 instance.return_values["isfirstline"] = isfirstline_retval
836 fileinput._state = instance
837 retval = fileinput.isfirstline()
838 self.assertExactlyOneInvocation(instance, "isfirstline")
839 self.assertIs(retval, isfirstline_retval)
840 self.assertIs(fileinput._state, instance)
841
842class Test_fileinput_isstdin(BaseFileInputGlobalMethodsTest):
843 """Unit tests for fileinput.isstdin()"""
844
845 def test_state_is_None(self):
846 """Tests fileinput.isstdin() when fileinput._state is None.
847 Ensure that it raises RuntimeError with a meaningful error message
848 and does not modify fileinput._state"""
849 fileinput._state = None
850 with self.assertRaises(RuntimeError) as cm:
851 fileinput.isstdin()
852 self.assertEqual(("no active input()",), cm.exception.args)
853 self.assertIsNone(fileinput._state)
854
855 def test_state_is_not_None(self):
856 """Tests fileinput.isstdin() when fileinput._state is not None.
857 Ensure that it invokes fileinput._state.isstdin() exactly once,
858 returns whatever it returns, and does not modify fileinput._state
859 to point to a different object."""
860 isstdin_retval = object()
861 instance = MockFileInput()
862 instance.return_values["isstdin"] = isstdin_retval
863 fileinput._state = instance
864 retval = fileinput.isstdin()
865 self.assertExactlyOneInvocation(instance, "isstdin")
866 self.assertIs(retval, isstdin_retval)
867 self.assertIs(fileinput._state, instance)
868
869class InvocationRecorder:
870 def __init__(self):
871 self.invocation_count = 0
872 def __call__(self, *args, **kwargs):
873 self.invocation_count += 1
874 self.last_invocation = (args, kwargs)
875
876class Test_hook_compressed(unittest.TestCase):
877 """Unit tests for fileinput.hook_compressed()"""
878
879 def setUp(self):
880 self.fake_open = InvocationRecorder()
881
882 def test_empty_string(self):
883 self.do_test_use_builtin_open("", 1)
884
885 def test_no_ext(self):
886 self.do_test_use_builtin_open("abcd", 2)
887
Ezio Melottic3afbb92011-05-14 10:10:53 +0300888 @unittest.skipUnless(gzip, "Requires gzip and zlib")
briancurtin5eb35912011-03-15 10:59:36 -0400889 def test_gz_ext_fake(self):
briancurtin906f0c42011-03-15 10:29:41 -0400890 original_open = gzip.open
891 gzip.open = self.fake_open
892 try:
893 result = fileinput.hook_compressed("test.gz", 3)
894 finally:
895 gzip.open = original_open
896
897 self.assertEqual(self.fake_open.invocation_count, 1)
898 self.assertEqual(self.fake_open.last_invocation, (("test.gz", 3), {}))
899
briancurtinf84f3c32011-03-18 13:03:17 -0500900 @unittest.skipUnless(bz2, "Requires bz2")
briancurtin5eb35912011-03-15 10:59:36 -0400901 def test_bz2_ext_fake(self):
briancurtin906f0c42011-03-15 10:29:41 -0400902 original_open = bz2.BZ2File
903 bz2.BZ2File = self.fake_open
904 try:
905 result = fileinput.hook_compressed("test.bz2", 4)
906 finally:
907 bz2.BZ2File = original_open
908
909 self.assertEqual(self.fake_open.invocation_count, 1)
910 self.assertEqual(self.fake_open.last_invocation, (("test.bz2", 4), {}))
911
912 def test_blah_ext(self):
913 self.do_test_use_builtin_open("abcd.blah", 5)
914
briancurtin5eb35912011-03-15 10:59:36 -0400915 def test_gz_ext_builtin(self):
briancurtin906f0c42011-03-15 10:29:41 -0400916 self.do_test_use_builtin_open("abcd.Gz", 6)
917
briancurtin5eb35912011-03-15 10:59:36 -0400918 def test_bz2_ext_builtin(self):
briancurtin906f0c42011-03-15 10:29:41 -0400919 self.do_test_use_builtin_open("abcd.Bz2", 7)
920
921 def do_test_use_builtin_open(self, filename, mode):
922 original_open = self.replace_builtin_open(self.fake_open)
923 try:
924 result = fileinput.hook_compressed(filename, mode)
925 finally:
926 self.replace_builtin_open(original_open)
927
928 self.assertEqual(self.fake_open.invocation_count, 1)
929 self.assertEqual(self.fake_open.last_invocation,
930 ((filename, mode), {}))
931
932 @staticmethod
933 def replace_builtin_open(new_open_func):
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100934 original_open = builtins.open
935 builtins.open = new_open_func
briancurtin906f0c42011-03-15 10:29:41 -0400936 return original_open
937
938class Test_hook_encoded(unittest.TestCase):
939 """Unit tests for fileinput.hook_encoded()"""
940
941 def test(self):
942 encoding = object()
943 result = fileinput.hook_encoded(encoding)
944
945 fake_open = InvocationRecorder()
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100946 original_open = builtins.open
947 builtins.open = fake_open
briancurtin906f0c42011-03-15 10:29:41 -0400948 try:
949 filename = object()
950 mode = object()
951 open_result = result(filename, mode)
952 finally:
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100953 builtins.open = original_open
briancurtin906f0c42011-03-15 10:29:41 -0400954
955 self.assertEqual(fake_open.invocation_count, 1)
956
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100957 args, kwargs = fake_open.last_invocation
briancurtin906f0c42011-03-15 10:29:41 -0400958 self.assertIs(args[0], filename)
959 self.assertIs(args[1], mode)
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100960 self.assertIs(kwargs.pop('encoding'), encoding)
961 self.assertFalse(kwargs)
Georg Brandl6cb7b652010-07-31 20:08:15 +0000962
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200963 def test_modes(self):
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200964 with open(TESTFN, 'wb') as f:
Serhiy Storchaka682ea5f2014-03-03 21:17:17 +0200965 # UTF-7 is a convenient, seldom used encoding
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200966 f.write(b'A\nB\r\nC\rD+IKw-')
967 self.addCleanup(safe_unlink, TESTFN)
968
969 def check(mode, expected_lines):
970 with FileInput(files=TESTFN, mode=mode,
971 openhook=hook_encoded('utf-7')) as fi:
972 lines = list(fi)
973 self.assertEqual(lines, expected_lines)
974
975 check('r', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
Serhiy Storchaka9fff8492014-02-26 21:03:19 +0200976 with self.assertWarns(DeprecationWarning):
977 check('rU', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
978 with self.assertWarns(DeprecationWarning):
979 check('U', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200980 with self.assertRaises(ValueError):
981 check('rb', ['A\n', 'B\r\n', 'C\r', 'D\u20ac'])
982
Guido van Rossumd8faa362007-04-27 19:54:29 +0000983
984if __name__ == "__main__":
Brett Cannon3e9a9ae2013-06-12 21:25:59 -0400985 unittest.main()