blob: c97bb4cb3b0265ff471da27d81d9f15787513498 [file] [log] [blame]
Tim Peters3230d5c2001-07-11 22:21:17 +00001'''
2Tests for fileinput module.
3Nick Mathewson
4'''
Benjamin Petersoneb462882011-03-15 09:50:18 -05005import os
6import sys
7import re
briancurtin906f0c42011-03-15 10:29:41 -04008import fileinput
9import collections
Florent Xiclunaa011e2b2011-11-07 19:43:07 +010010import builtins
Miss Islington (bot)fff25cc2018-06-05 03:13:28 -070011import tempfile
Benjamin Petersoneb462882011-03-15 09:50:18 -050012import unittest
13
briancurtinf84f3c32011-03-18 13:03:17 -050014try:
15 import bz2
16except ImportError:
17 bz2 = None
Ezio Melottic3afbb92011-05-14 10:10:53 +030018try:
19 import gzip
20except ImportError:
21 gzip = None
briancurtinf84f3c32011-03-18 13:03:17 -050022
Serhiy Storchaka946cfc32014-05-14 21:08:33 +030023from io import BytesIO, StringIO
Benjamin Petersoneb462882011-03-15 09:50:18 -050024from fileinput import FileInput, hook_encoded
Roy Williams002665a2017-05-22 22:24:17 -070025from pathlib import Path
Benjamin Petersoneb462882011-03-15 09:50:18 -050026
Serhiy Storchaka597d15a2016-04-24 13:45:58 +030027from test.support import verbose, TESTFN, check_warnings
Benjamin Petersoneb462882011-03-15 09:50:18 -050028from test.support import unlink as safe_unlink
Martin Panter7978e102016-01-16 06:26:54 +000029from test import support
Serhiy Storchaka946cfc32014-05-14 21:08:33 +030030from unittest import mock
Benjamin Petersoneb462882011-03-15 09:50:18 -050031
Tim Peters3230d5c2001-07-11 22:21:17 +000032
33# The fileinput module has 2 interfaces: the FileInput class which does
34# all the work, and a few functions (input, etc.) that use a global _state
briancurtin906f0c42011-03-15 10:29:41 -040035# variable.
Tim Peters3230d5c2001-07-11 22:21:17 +000036
Miss Islington (bot)fff25cc2018-06-05 03:13:28 -070037class BaseTests:
38 # Write a content (str or bytes) to temp file, and return the
39 # temp file's name.
40 def writeTmp(self, content, *, mode='w'): # opening in text mode is the default
41 fd, name = tempfile.mkstemp()
42 self.addCleanup(support.unlink, name)
43 with open(fd, mode) as f:
44 f.write(content)
45 return name
Tim Peters3230d5c2001-07-11 22:21:17 +000046
Serhiy Storchakacc2dbc52016-03-08 18:28:36 +020047class LineReader:
48
49 def __init__(self):
50 self._linesread = []
51
52 @property
53 def linesread(self):
54 try:
55 return self._linesread[:]
56 finally:
57 self._linesread = []
58
59 def openhook(self, filename, mode):
60 self.it = iter(filename.splitlines(True))
61 return self
62
63 def readline(self, size=None):
64 line = next(self.it, '')
65 self._linesread.append(line)
66 return line
67
68 def readlines(self, hint=-1):
69 lines = []
70 size = 0
71 while True:
72 line = self.readline()
73 if not line:
74 return lines
75 lines.append(line)
76 size += len(line)
77 if size >= hint:
78 return lines
79
80 def close(self):
81 pass
82
Miss Islington (bot)fff25cc2018-06-05 03:13:28 -070083class BufferSizesTests(BaseTests, unittest.TestCase):
Guido van Rossumd8faa362007-04-27 19:54:29 +000084 def test_buffer_sizes(self):
85 # First, run the tests with default and teeny buffer size.
86 for round, bs in (0, 0), (1, 30):
Miss Islington (bot)fff25cc2018-06-05 03:13:28 -070087 t1 = self.writeTmp(''.join("Line %s of file 1\n" % (i+1) for i in range(15)))
88 t2 = self.writeTmp(''.join("Line %s of file 2\n" % (i+1) for i in range(10)))
89 t3 = self.writeTmp(''.join("Line %s of file 3\n" % (i+1) for i in range(5)))
90 t4 = self.writeTmp(''.join("Line %s of file 4\n" % (i+1) for i in range(1)))
91 if bs:
92 with self.assertWarns(DeprecationWarning):
Serhiy Storchaka674e2d02016-03-08 18:35:19 +020093 self.buffer_size_test(t1, t2, t3, t4, bs, round)
Miss Islington (bot)fff25cc2018-06-05 03:13:28 -070094 else:
95 self.buffer_size_test(t1, t2, t3, t4, bs, round)
Tim Peters3230d5c2001-07-11 22:21:17 +000096
Guido van Rossumd8faa362007-04-27 19:54:29 +000097 def buffer_size_test(self, t1, t2, t3, t4, bs=0, round=0):
98 pat = re.compile(r'LINE (\d+) OF FILE (\d+)')
Tim Peters3230d5c2001-07-11 22:21:17 +000099
Guido van Rossumd8faa362007-04-27 19:54:29 +0000100 start = 1 + round*6
101 if verbose:
102 print('%s. Simple iteration (bs=%s)' % (start+0, bs))
103 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
Tim Peters3230d5c2001-07-11 22:21:17 +0000104 lines = list(fi)
Tim Peters3230d5c2001-07-11 22:21:17 +0000105 fi.close()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000106 self.assertEqual(len(lines), 31)
107 self.assertEqual(lines[4], 'Line 5 of file 1\n')
108 self.assertEqual(lines[30], 'Line 1 of file 4\n')
109 self.assertEqual(fi.lineno(), 31)
110 self.assertEqual(fi.filename(), t4)
Tim Peters3230d5c2001-07-11 22:21:17 +0000111
Guido van Rossumd8faa362007-04-27 19:54:29 +0000112 if verbose:
113 print('%s. Status variables (bs=%s)' % (start+1, bs))
114 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
115 s = "x"
116 while s and s != 'Line 6 of file 2\n':
117 s = fi.readline()
118 self.assertEqual(fi.filename(), t2)
119 self.assertEqual(fi.lineno(), 21)
120 self.assertEqual(fi.filelineno(), 6)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000121 self.assertFalse(fi.isfirstline())
122 self.assertFalse(fi.isstdin())
Tim Peters3230d5c2001-07-11 22:21:17 +0000123
Guido van Rossumd8faa362007-04-27 19:54:29 +0000124 if verbose:
125 print('%s. Nextfile (bs=%s)' % (start+2, bs))
126 fi.nextfile()
127 self.assertEqual(fi.readline(), 'Line 1 of file 3\n')
128 self.assertEqual(fi.lineno(), 22)
129 fi.close()
Tim Peters3230d5c2001-07-11 22:21:17 +0000130
Guido van Rossumd8faa362007-04-27 19:54:29 +0000131 if verbose:
132 print('%s. Stdin (bs=%s)' % (start+3, bs))
133 fi = FileInput(files=(t1, t2, t3, t4, '-'), bufsize=bs)
134 savestdin = sys.stdin
135 try:
136 sys.stdin = StringIO("Line 1 of stdin\nLine 2 of stdin\n")
137 lines = list(fi)
138 self.assertEqual(len(lines), 33)
139 self.assertEqual(lines[32], 'Line 2 of stdin\n')
140 self.assertEqual(fi.filename(), '<stdin>')
141 fi.nextfile()
142 finally:
143 sys.stdin = savestdin
Tim Peters3230d5c2001-07-11 22:21:17 +0000144
Guido van Rossumd8faa362007-04-27 19:54:29 +0000145 if verbose:
146 print('%s. Boundary conditions (bs=%s)' % (start+4, bs))
147 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
148 self.assertEqual(fi.lineno(), 0)
149 self.assertEqual(fi.filename(), None)
150 fi.nextfile()
151 self.assertEqual(fi.lineno(), 0)
152 self.assertEqual(fi.filename(), None)
Tim Peters3230d5c2001-07-11 22:21:17 +0000153
Guido van Rossumd8faa362007-04-27 19:54:29 +0000154 if verbose:
155 print('%s. Inplace (bs=%s)' % (start+5, bs))
156 savestdout = sys.stdout
157 try:
158 fi = FileInput(files=(t1, t2, t3, t4), inplace=1, bufsize=bs)
159 for line in fi:
160 line = line[:-1].upper()
161 print(line)
162 fi.close()
163 finally:
164 sys.stdout = savestdout
Tim Peters3230d5c2001-07-11 22:21:17 +0000165
Guido van Rossumd8faa362007-04-27 19:54:29 +0000166 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
167 for line in fi:
168 self.assertEqual(line[-1], '\n')
169 m = pat.match(line[:-1])
170 self.assertNotEqual(m, None)
171 self.assertEqual(int(m.group(1)), fi.filelineno())
172 fi.close()
Georg Brandle4662172006-02-19 09:51:27 +0000173
briancurtin906f0c42011-03-15 10:29:41 -0400174class UnconditionallyRaise:
175 def __init__(self, exception_type):
176 self.exception_type = exception_type
177 self.invoked = False
178 def __call__(self, *args, **kwargs):
179 self.invoked = True
180 raise self.exception_type()
181
Miss Islington (bot)fff25cc2018-06-05 03:13:28 -0700182class FileInputTests(BaseTests, unittest.TestCase):
briancurtin906f0c42011-03-15 10:29:41 -0400183
Guido van Rossumd8faa362007-04-27 19:54:29 +0000184 def test_zero_byte_files(self):
Miss Islington (bot)fff25cc2018-06-05 03:13:28 -0700185 t1 = self.writeTmp("")
186 t2 = self.writeTmp("")
187 t3 = self.writeTmp("The only line there is.\n")
188 t4 = self.writeTmp("")
189 fi = FileInput(files=(t1, t2, t3, t4))
Georg Brandl67e9fb92006-02-19 13:56:17 +0000190
Miss Islington (bot)fff25cc2018-06-05 03:13:28 -0700191 line = fi.readline()
192 self.assertEqual(line, 'The only line there is.\n')
193 self.assertEqual(fi.lineno(), 1)
194 self.assertEqual(fi.filelineno(), 1)
195 self.assertEqual(fi.filename(), t3)
Georg Brandlc029f872006-02-19 14:12:34 +0000196
Miss Islington (bot)fff25cc2018-06-05 03:13:28 -0700197 line = fi.readline()
198 self.assertFalse(line)
199 self.assertEqual(fi.lineno(), 1)
200 self.assertEqual(fi.filelineno(), 0)
201 self.assertEqual(fi.filename(), t4)
202 fi.close()
Georg Brandlc98eeed2006-02-19 14:57:47 +0000203
Guido van Rossumd8faa362007-04-27 19:54:29 +0000204 def test_files_that_dont_end_with_newline(self):
Miss Islington (bot)fff25cc2018-06-05 03:13:28 -0700205 t1 = self.writeTmp("A\nB\nC")
206 t2 = self.writeTmp("D\nE\nF")
207 fi = FileInput(files=(t1, t2))
208 lines = list(fi)
209 self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"])
210 self.assertEqual(fi.filelineno(), 3)
211 self.assertEqual(fi.lineno(), 6)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000212
Guido van Rossumc43e79f2007-06-18 18:26:36 +0000213## def test_unicode_filenames(self):
214## # XXX A unicode string is always returned by writeTmp.
215## # So is this needed?
Miss Islington (bot)fff25cc2018-06-05 03:13:28 -0700216## t1 = self.writeTmp("A\nB")
217## encoding = sys.getfilesystemencoding()
218## if encoding is None:
219## encoding = 'ascii'
220## fi = FileInput(files=str(t1, encoding))
221## lines = list(fi)
222## self.assertEqual(lines, ["A\n", "B"])
Guido van Rossumd8faa362007-04-27 19:54:29 +0000223
224 def test_fileno(self):
Miss Islington (bot)fff25cc2018-06-05 03:13:28 -0700225 t1 = self.writeTmp("A\nB")
226 t2 = self.writeTmp("C\nD")
227 fi = FileInput(files=(t1, t2))
228 self.assertEqual(fi.fileno(), -1)
229 line = next(fi)
230 self.assertNotEqual(fi.fileno(), -1)
231 fi.nextfile()
232 self.assertEqual(fi.fileno(), -1)
233 line = list(fi)
234 self.assertEqual(fi.fileno(), -1)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000235
236 def test_opening_mode(self):
237 try:
238 # invalid mode, should raise ValueError
239 fi = FileInput(mode="w")
240 self.fail("FileInput should reject invalid mode argument")
241 except ValueError:
242 pass
Miss Islington (bot)fff25cc2018-06-05 03:13:28 -0700243 # try opening in universal newline mode
244 t1 = self.writeTmp(b"A\nB\r\nC\rD", mode="wb")
245 with check_warnings(('', DeprecationWarning)):
246 fi = FileInput(files=t1, mode="U")
247 with check_warnings(('', DeprecationWarning)):
248 lines = list(fi)
249 self.assertEqual(lines, ["A\n", "B\n", "C\n", "D"])
Guido van Rossumd8faa362007-04-27 19:54:29 +0000250
Serhiy Storchaka946cfc32014-05-14 21:08:33 +0300251 def test_stdin_binary_mode(self):
252 with mock.patch('sys.stdin') as m_stdin:
253 m_stdin.buffer = BytesIO(b'spam, bacon, sausage, and spam')
254 fi = FileInput(files=['-'], mode='rb')
255 lines = list(fi)
256 self.assertEqual(lines, [b'spam, bacon, sausage, and spam'])
257
R David Murray830207e2016-01-02 15:41:41 -0500258 def test_detached_stdin_binary_mode(self):
259 orig_stdin = sys.stdin
260 try:
261 sys.stdin = BytesIO(b'spam, bacon, sausage, and spam')
262 self.assertFalse(hasattr(sys.stdin, 'buffer'))
263 fi = FileInput(files=['-'], mode='rb')
264 lines = list(fi)
265 self.assertEqual(lines, [b'spam, bacon, sausage, and spam'])
266 finally:
267 sys.stdin = orig_stdin
268
Guido van Rossume22905a2007-08-27 23:09:25 +0000269 def test_file_opening_hook(self):
270 try:
271 # cannot use openhook and inplace mode
272 fi = FileInput(inplace=1, openhook=lambda f, m: None)
273 self.fail("FileInput should raise if both inplace "
274 "and openhook arguments are given")
275 except ValueError:
276 pass
277 try:
278 fi = FileInput(openhook=1)
279 self.fail("FileInput should check openhook for being callable")
280 except ValueError:
281 pass
briancurtin906f0c42011-03-15 10:29:41 -0400282
283 class CustomOpenHook:
284 def __init__(self):
285 self.invoked = False
286 def __call__(self, *args):
287 self.invoked = True
288 return open(*args)
289
Miss Islington (bot)fff25cc2018-06-05 03:13:28 -0700290 t = self.writeTmp("\n")
briancurtin906f0c42011-03-15 10:29:41 -0400291 custom_open_hook = CustomOpenHook()
292 with FileInput([t], openhook=custom_open_hook) as fi:
293 fi.readline()
294 self.assertTrue(custom_open_hook.invoked, "openhook not invoked")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000295
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200296 def test_readline(self):
297 with open(TESTFN, 'wb') as f:
298 f.write(b'A\nB\r\nC\r')
299 # Fill TextIOWrapper buffer.
300 f.write(b'123456789\n' * 1000)
301 # Issue #20501: readline() shouldn't read whole file.
302 f.write(b'\x80')
303 self.addCleanup(safe_unlink, TESTFN)
304
305 with FileInput(files=TESTFN,
Serhiy Storchakacc2dbc52016-03-08 18:28:36 +0200306 openhook=hook_encoded('ascii')) as fi:
Serhiy Storchaka682ea5f2014-03-03 21:17:17 +0200307 try:
308 self.assertEqual(fi.readline(), 'A\n')
309 self.assertEqual(fi.readline(), 'B\n')
310 self.assertEqual(fi.readline(), 'C\n')
311 except UnicodeDecodeError:
312 self.fail('Read to end of file')
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200313 with self.assertRaises(UnicodeDecodeError):
314 # Read to the end of file.
315 list(fi)
Serhiy Storchaka314464d2015-11-01 16:43:58 +0200316 self.assertEqual(fi.readline(), '')
317 self.assertEqual(fi.readline(), '')
318
319 def test_readline_binary_mode(self):
320 with open(TESTFN, 'wb') as f:
321 f.write(b'A\nB\r\nC\rD')
322 self.addCleanup(safe_unlink, TESTFN)
323
324 with FileInput(files=TESTFN, mode='rb') as fi:
325 self.assertEqual(fi.readline(), b'A\n')
326 self.assertEqual(fi.readline(), b'B\r\n')
327 self.assertEqual(fi.readline(), b'C\rD')
328 # Read to the end of file.
329 self.assertEqual(fi.readline(), b'')
330 self.assertEqual(fi.readline(), b'')
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200331
Georg Brandl6cb7b652010-07-31 20:08:15 +0000332 def test_context_manager(self):
Miss Islington (bot)fff25cc2018-06-05 03:13:28 -0700333 t1 = self.writeTmp("A\nB\nC")
334 t2 = self.writeTmp("D\nE\nF")
335 with FileInput(files=(t1, t2)) as fi:
336 lines = list(fi)
337 self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"])
338 self.assertEqual(fi.filelineno(), 3)
339 self.assertEqual(fi.lineno(), 6)
340 self.assertEqual(fi._files, ())
Georg Brandl6cb7b652010-07-31 20:08:15 +0000341
342 def test_close_on_exception(self):
Miss Islington (bot)fff25cc2018-06-05 03:13:28 -0700343 t1 = self.writeTmp("")
Georg Brandl6cb7b652010-07-31 20:08:15 +0000344 try:
Georg Brandl6cb7b652010-07-31 20:08:15 +0000345 with FileInput(files=t1) as fi:
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200346 raise OSError
347 except OSError:
Georg Brandl6cb7b652010-07-31 20:08:15 +0000348 self.assertEqual(fi._files, ())
Georg Brandl6cb7b652010-07-31 20:08:15 +0000349
briancurtin906f0c42011-03-15 10:29:41 -0400350 def test_empty_files_list_specified_to_constructor(self):
351 with FileInput(files=[]) as fi:
Brett Cannond47af532011-03-15 15:55:12 -0400352 self.assertEqual(fi._files, ('-',))
briancurtin906f0c42011-03-15 10:29:41 -0400353
354 def test__getitem__(self):
355 """Tests invoking FileInput.__getitem__() with the current
356 line number"""
Miss Islington (bot)fff25cc2018-06-05 03:13:28 -0700357 t = self.writeTmp("line1\nline2\n")
briancurtin906f0c42011-03-15 10:29:41 -0400358 with FileInput(files=[t]) as fi:
359 retval1 = fi[0]
360 self.assertEqual(retval1, "line1\n")
361 retval2 = fi[1]
362 self.assertEqual(retval2, "line2\n")
363
364 def test__getitem__invalid_key(self):
365 """Tests invoking FileInput.__getitem__() with an index unequal to
366 the line number"""
Miss Islington (bot)fff25cc2018-06-05 03:13:28 -0700367 t = self.writeTmp("line1\nline2\n")
briancurtin906f0c42011-03-15 10:29:41 -0400368 with FileInput(files=[t]) as fi:
369 with self.assertRaises(RuntimeError) as cm:
370 fi[1]
Brett Cannond47af532011-03-15 15:55:12 -0400371 self.assertEqual(cm.exception.args, ("accessing lines out of order",))
briancurtin906f0c42011-03-15 10:29:41 -0400372
373 def test__getitem__eof(self):
374 """Tests invoking FileInput.__getitem__() with the line number but at
375 end-of-input"""
Miss Islington (bot)fff25cc2018-06-05 03:13:28 -0700376 t = self.writeTmp('')
briancurtin906f0c42011-03-15 10:29:41 -0400377 with FileInput(files=[t]) as fi:
378 with self.assertRaises(IndexError) as cm:
379 fi[0]
Brett Cannond47af532011-03-15 15:55:12 -0400380 self.assertEqual(cm.exception.args, ("end of input reached",))
briancurtin906f0c42011-03-15 10:29:41 -0400381
382 def test_nextfile_oserror_deleting_backup(self):
383 """Tests invoking FileInput.nextfile() when the attempt to delete
384 the backup file would raise OSError. This error is expected to be
385 silently ignored"""
386
387 os_unlink_orig = os.unlink
388 os_unlink_replacement = UnconditionallyRaise(OSError)
389 try:
Miss Islington (bot)fff25cc2018-06-05 03:13:28 -0700390 t = self.writeTmp("\n")
391 self.addCleanup(support.unlink, t + '.bak')
briancurtin906f0c42011-03-15 10:29:41 -0400392 with FileInput(files=[t], inplace=True) as fi:
393 next(fi) # make sure the file is opened
394 os.unlink = os_unlink_replacement
395 fi.nextfile()
396 finally:
397 os.unlink = os_unlink_orig
398
399 # sanity check to make sure that our test scenario was actually hit
400 self.assertTrue(os_unlink_replacement.invoked,
401 "os.unlink() was not invoked")
402
403 def test_readline_os_fstat_raises_OSError(self):
404 """Tests invoking FileInput.readline() when os.fstat() raises OSError.
405 This exception should be silently discarded."""
406
407 os_fstat_orig = os.fstat
408 os_fstat_replacement = UnconditionallyRaise(OSError)
409 try:
Miss Islington (bot)fff25cc2018-06-05 03:13:28 -0700410 t = self.writeTmp("\n")
briancurtin906f0c42011-03-15 10:29:41 -0400411 with FileInput(files=[t], inplace=True) as fi:
412 os.fstat = os_fstat_replacement
413 fi.readline()
414 finally:
415 os.fstat = os_fstat_orig
416
417 # sanity check to make sure that our test scenario was actually hit
418 self.assertTrue(os_fstat_replacement.invoked,
419 "os.fstat() was not invoked")
420
421 @unittest.skipIf(not hasattr(os, "chmod"), "os.chmod does not exist")
422 def test_readline_os_chmod_raises_OSError(self):
423 """Tests invoking FileInput.readline() when os.chmod() raises OSError.
424 This exception should be silently discarded."""
425
426 os_chmod_orig = os.chmod
427 os_chmod_replacement = UnconditionallyRaise(OSError)
428 try:
Miss Islington (bot)fff25cc2018-06-05 03:13:28 -0700429 t = self.writeTmp("\n")
briancurtin906f0c42011-03-15 10:29:41 -0400430 with FileInput(files=[t], inplace=True) as fi:
431 os.chmod = os_chmod_replacement
432 fi.readline()
433 finally:
434 os.chmod = os_chmod_orig
435
436 # sanity check to make sure that our test scenario was actually hit
437 self.assertTrue(os_chmod_replacement.invoked,
438 "os.fstat() was not invoked")
439
440 def test_fileno_when_ValueError_raised(self):
441 class FilenoRaisesValueError(UnconditionallyRaise):
442 def __init__(self):
443 UnconditionallyRaise.__init__(self, ValueError)
444 def fileno(self):
445 self.__call__()
446
447 unconditionally_raise_ValueError = FilenoRaisesValueError()
Miss Islington (bot)fff25cc2018-06-05 03:13:28 -0700448 t = self.writeTmp("\n")
briancurtin906f0c42011-03-15 10:29:41 -0400449 with FileInput(files=[t]) as fi:
450 file_backup = fi._file
451 try:
452 fi._file = unconditionally_raise_ValueError
453 result = fi.fileno()
454 finally:
455 fi._file = file_backup # make sure the file gets cleaned up
456
457 # sanity check to make sure that our test scenario was actually hit
458 self.assertTrue(unconditionally_raise_ValueError.invoked,
459 "_file.fileno() was not invoked")
460
461 self.assertEqual(result, -1, "fileno() should return -1")
462
Serhiy Storchakacc2dbc52016-03-08 18:28:36 +0200463 def test_readline_buffering(self):
464 src = LineReader()
465 with FileInput(files=['line1\nline2', 'line3\n'],
466 openhook=src.openhook) as fi:
467 self.assertEqual(src.linesread, [])
468 self.assertEqual(fi.readline(), 'line1\n')
469 self.assertEqual(src.linesread, ['line1\n'])
470 self.assertEqual(fi.readline(), 'line2')
471 self.assertEqual(src.linesread, ['line2'])
472 self.assertEqual(fi.readline(), 'line3\n')
473 self.assertEqual(src.linesread, ['', 'line3\n'])
474 self.assertEqual(fi.readline(), '')
475 self.assertEqual(src.linesread, [''])
476 self.assertEqual(fi.readline(), '')
477 self.assertEqual(src.linesread, [])
478
479 def test_iteration_buffering(self):
480 src = LineReader()
481 with FileInput(files=['line1\nline2', 'line3\n'],
482 openhook=src.openhook) as fi:
483 self.assertEqual(src.linesread, [])
484 self.assertEqual(next(fi), 'line1\n')
485 self.assertEqual(src.linesread, ['line1\n'])
486 self.assertEqual(next(fi), 'line2')
487 self.assertEqual(src.linesread, ['line2'])
488 self.assertEqual(next(fi), 'line3\n')
489 self.assertEqual(src.linesread, ['', 'line3\n'])
490 self.assertRaises(StopIteration, next, fi)
491 self.assertEqual(src.linesread, [''])
492 self.assertRaises(StopIteration, next, fi)
493 self.assertEqual(src.linesread, [])
494
Roy Williams002665a2017-05-22 22:24:17 -0700495 def test_pathlib_file(self):
Miss Islington (bot)fff25cc2018-06-05 03:13:28 -0700496 t1 = Path(self.writeTmp("Pathlib file."))
497 with FileInput(t1) as fi:
498 line = fi.readline()
499 self.assertEqual(line, 'Pathlib file.')
500 self.assertEqual(fi.lineno(), 1)
501 self.assertEqual(fi.filelineno(), 1)
502 self.assertEqual(fi.filename(), os.fspath(t1))
Roy Williams002665a2017-05-22 22:24:17 -0700503
Zhiming Wang06de1ae2017-09-05 01:37:24 +0800504 def test_pathlib_file_inplace(self):
Miss Islington (bot)fff25cc2018-06-05 03:13:28 -0700505 t1 = Path(self.writeTmp('Pathlib file.'))
506 with FileInput(t1, inplace=True) as fi:
507 line = fi.readline()
508 self.assertEqual(line, 'Pathlib file.')
509 print('Modified %s' % line)
510 with open(t1) as f:
511 self.assertEqual(f.read(), 'Modified Pathlib file.\n')
Zhiming Wang06de1ae2017-09-05 01:37:24 +0800512
Roy Williams002665a2017-05-22 22:24:17 -0700513
briancurtin906f0c42011-03-15 10:29:41 -0400514class MockFileInput:
515 """A class that mocks out fileinput.FileInput for use during unit tests"""
516
517 def __init__(self, files=None, inplace=False, backup="", bufsize=0,
518 mode="r", openhook=None):
519 self.files = files
520 self.inplace = inplace
521 self.backup = backup
522 self.bufsize = bufsize
523 self.mode = mode
524 self.openhook = openhook
525 self._file = None
526 self.invocation_counts = collections.defaultdict(lambda: 0)
527 self.return_values = {}
528
529 def close(self):
530 self.invocation_counts["close"] += 1
531
532 def nextfile(self):
533 self.invocation_counts["nextfile"] += 1
534 return self.return_values["nextfile"]
535
536 def filename(self):
537 self.invocation_counts["filename"] += 1
538 return self.return_values["filename"]
539
540 def lineno(self):
541 self.invocation_counts["lineno"] += 1
542 return self.return_values["lineno"]
543
544 def filelineno(self):
545 self.invocation_counts["filelineno"] += 1
546 return self.return_values["filelineno"]
547
548 def fileno(self):
549 self.invocation_counts["fileno"] += 1
550 return self.return_values["fileno"]
551
552 def isfirstline(self):
553 self.invocation_counts["isfirstline"] += 1
554 return self.return_values["isfirstline"]
555
556 def isstdin(self):
557 self.invocation_counts["isstdin"] += 1
558 return self.return_values["isstdin"]
559
560class BaseFileInputGlobalMethodsTest(unittest.TestCase):
561 """Base class for unit tests for the global function of
562 the fileinput module."""
563
564 def setUp(self):
565 self._orig_state = fileinput._state
566 self._orig_FileInput = fileinput.FileInput
567 fileinput.FileInput = MockFileInput
568
569 def tearDown(self):
570 fileinput.FileInput = self._orig_FileInput
571 fileinput._state = self._orig_state
572
573 def assertExactlyOneInvocation(self, mock_file_input, method_name):
574 # assert that the method with the given name was invoked once
575 actual_count = mock_file_input.invocation_counts[method_name]
576 self.assertEqual(actual_count, 1, method_name)
577 # assert that no other unexpected methods were invoked
578 actual_total_count = len(mock_file_input.invocation_counts)
579 self.assertEqual(actual_total_count, 1)
580
581class Test_fileinput_input(BaseFileInputGlobalMethodsTest):
582 """Unit tests for fileinput.input()"""
583
584 def test_state_is_not_None_and_state_file_is_not_None(self):
585 """Tests invoking fileinput.input() when fileinput._state is not None
586 and its _file attribute is also not None. Expect RuntimeError to
587 be raised with a meaningful error message and for fileinput._state
588 to *not* be modified."""
589 instance = MockFileInput()
590 instance._file = object()
591 fileinput._state = instance
592 with self.assertRaises(RuntimeError) as cm:
593 fileinput.input()
594 self.assertEqual(("input() already active",), cm.exception.args)
595 self.assertIs(instance, fileinput._state, "fileinput._state")
596
597 def test_state_is_not_None_and_state_file_is_None(self):
598 """Tests invoking fileinput.input() when fileinput._state is not None
599 but its _file attribute *is* None. Expect it to create and return
600 a new fileinput.FileInput object with all method parameters passed
601 explicitly to the __init__() method; also ensure that
602 fileinput._state is set to the returned instance."""
603 instance = MockFileInput()
604 instance._file = None
605 fileinput._state = instance
606 self.do_test_call_input()
607
608 def test_state_is_None(self):
609 """Tests invoking fileinput.input() when fileinput._state is None
610 Expect it to create and return a new fileinput.FileInput object
611 with all method parameters passed explicitly to the __init__()
612 method; also ensure that fileinput._state is set to the returned
613 instance."""
614 fileinput._state = None
615 self.do_test_call_input()
616
617 def do_test_call_input(self):
618 """Tests that fileinput.input() creates a new fileinput.FileInput
619 object, passing the given parameters unmodified to
620 fileinput.FileInput.__init__(). Note that this test depends on the
621 monkey patching of fileinput.FileInput done by setUp()."""
622 files = object()
623 inplace = object()
624 backup = object()
625 bufsize = object()
626 mode = object()
627 openhook = object()
628
629 # call fileinput.input() with different values for each argument
630 result = fileinput.input(files=files, inplace=inplace, backup=backup,
631 bufsize=bufsize,
632 mode=mode, openhook=openhook)
633
634 # ensure fileinput._state was set to the returned object
635 self.assertIs(result, fileinput._state, "fileinput._state")
636
637 # ensure the parameters to fileinput.input() were passed directly
638 # to FileInput.__init__()
639 self.assertIs(files, result.files, "files")
640 self.assertIs(inplace, result.inplace, "inplace")
641 self.assertIs(backup, result.backup, "backup")
642 self.assertIs(bufsize, result.bufsize, "bufsize")
643 self.assertIs(mode, result.mode, "mode")
644 self.assertIs(openhook, result.openhook, "openhook")
645
646class Test_fileinput_close(BaseFileInputGlobalMethodsTest):
647 """Unit tests for fileinput.close()"""
648
649 def test_state_is_None(self):
650 """Tests that fileinput.close() does nothing if fileinput._state
651 is None"""
652 fileinput._state = None
653 fileinput.close()
654 self.assertIsNone(fileinput._state)
655
656 def test_state_is_not_None(self):
657 """Tests that fileinput.close() invokes close() on fileinput._state
658 and sets _state=None"""
659 instance = MockFileInput()
660 fileinput._state = instance
661 fileinput.close()
662 self.assertExactlyOneInvocation(instance, "close")
663 self.assertIsNone(fileinput._state)
664
665class Test_fileinput_nextfile(BaseFileInputGlobalMethodsTest):
666 """Unit tests for fileinput.nextfile()"""
667
668 def test_state_is_None(self):
669 """Tests fileinput.nextfile() when fileinput._state is None.
670 Ensure that it raises RuntimeError with a meaningful error message
671 and does not modify fileinput._state"""
672 fileinput._state = None
673 with self.assertRaises(RuntimeError) as cm:
674 fileinput.nextfile()
675 self.assertEqual(("no active input()",), cm.exception.args)
676 self.assertIsNone(fileinput._state)
677
678 def test_state_is_not_None(self):
679 """Tests fileinput.nextfile() when fileinput._state is not None.
680 Ensure that it invokes fileinput._state.nextfile() exactly once,
681 returns whatever it returns, and does not modify fileinput._state
682 to point to a different object."""
683 nextfile_retval = object()
684 instance = MockFileInput()
685 instance.return_values["nextfile"] = nextfile_retval
686 fileinput._state = instance
687 retval = fileinput.nextfile()
688 self.assertExactlyOneInvocation(instance, "nextfile")
689 self.assertIs(retval, nextfile_retval)
690 self.assertIs(fileinput._state, instance)
691
692class Test_fileinput_filename(BaseFileInputGlobalMethodsTest):
693 """Unit tests for fileinput.filename()"""
694
695 def test_state_is_None(self):
696 """Tests fileinput.filename() when fileinput._state is None.
697 Ensure that it raises RuntimeError with a meaningful error message
698 and does not modify fileinput._state"""
699 fileinput._state = None
700 with self.assertRaises(RuntimeError) as cm:
701 fileinput.filename()
702 self.assertEqual(("no active input()",), cm.exception.args)
703 self.assertIsNone(fileinput._state)
704
705 def test_state_is_not_None(self):
706 """Tests fileinput.filename() when fileinput._state is not None.
707 Ensure that it invokes fileinput._state.filename() exactly once,
708 returns whatever it returns, and does not modify fileinput._state
709 to point to a different object."""
710 filename_retval = object()
711 instance = MockFileInput()
712 instance.return_values["filename"] = filename_retval
713 fileinput._state = instance
714 retval = fileinput.filename()
715 self.assertExactlyOneInvocation(instance, "filename")
716 self.assertIs(retval, filename_retval)
717 self.assertIs(fileinput._state, instance)
718
719class Test_fileinput_lineno(BaseFileInputGlobalMethodsTest):
720 """Unit tests for fileinput.lineno()"""
721
722 def test_state_is_None(self):
723 """Tests fileinput.lineno() when fileinput._state is None.
724 Ensure that it raises RuntimeError with a meaningful error message
725 and does not modify fileinput._state"""
726 fileinput._state = None
727 with self.assertRaises(RuntimeError) as cm:
728 fileinput.lineno()
729 self.assertEqual(("no active input()",), cm.exception.args)
730 self.assertIsNone(fileinput._state)
731
732 def test_state_is_not_None(self):
733 """Tests fileinput.lineno() when fileinput._state is not None.
734 Ensure that it invokes fileinput._state.lineno() exactly once,
735 returns whatever it returns, and does not modify fileinput._state
736 to point to a different object."""
737 lineno_retval = object()
738 instance = MockFileInput()
739 instance.return_values["lineno"] = lineno_retval
740 fileinput._state = instance
741 retval = fileinput.lineno()
742 self.assertExactlyOneInvocation(instance, "lineno")
743 self.assertIs(retval, lineno_retval)
744 self.assertIs(fileinput._state, instance)
745
746class Test_fileinput_filelineno(BaseFileInputGlobalMethodsTest):
747 """Unit tests for fileinput.filelineno()"""
748
749 def test_state_is_None(self):
750 """Tests fileinput.filelineno() when fileinput._state is None.
751 Ensure that it raises RuntimeError with a meaningful error message
752 and does not modify fileinput._state"""
753 fileinput._state = None
754 with self.assertRaises(RuntimeError) as cm:
755 fileinput.filelineno()
756 self.assertEqual(("no active input()",), cm.exception.args)
757 self.assertIsNone(fileinput._state)
758
759 def test_state_is_not_None(self):
760 """Tests fileinput.filelineno() when fileinput._state is not None.
761 Ensure that it invokes fileinput._state.filelineno() exactly once,
762 returns whatever it returns, and does not modify fileinput._state
763 to point to a different object."""
764 filelineno_retval = object()
765 instance = MockFileInput()
766 instance.return_values["filelineno"] = filelineno_retval
767 fileinput._state = instance
768 retval = fileinput.filelineno()
769 self.assertExactlyOneInvocation(instance, "filelineno")
770 self.assertIs(retval, filelineno_retval)
771 self.assertIs(fileinput._state, instance)
772
773class Test_fileinput_fileno(BaseFileInputGlobalMethodsTest):
774 """Unit tests for fileinput.fileno()"""
775
776 def test_state_is_None(self):
777 """Tests fileinput.fileno() when fileinput._state is None.
778 Ensure that it raises RuntimeError with a meaningful error message
779 and does not modify fileinput._state"""
780 fileinput._state = None
781 with self.assertRaises(RuntimeError) as cm:
782 fileinput.fileno()
783 self.assertEqual(("no active input()",), cm.exception.args)
784 self.assertIsNone(fileinput._state)
785
786 def test_state_is_not_None(self):
787 """Tests fileinput.fileno() when fileinput._state is not None.
788 Ensure that it invokes fileinput._state.fileno() exactly once,
789 returns whatever it returns, and does not modify fileinput._state
790 to point to a different object."""
791 fileno_retval = object()
792 instance = MockFileInput()
793 instance.return_values["fileno"] = fileno_retval
794 instance.fileno_retval = fileno_retval
795 fileinput._state = instance
796 retval = fileinput.fileno()
797 self.assertExactlyOneInvocation(instance, "fileno")
798 self.assertIs(retval, fileno_retval)
799 self.assertIs(fileinput._state, instance)
800
801class Test_fileinput_isfirstline(BaseFileInputGlobalMethodsTest):
802 """Unit tests for fileinput.isfirstline()"""
803
804 def test_state_is_None(self):
805 """Tests fileinput.isfirstline() when fileinput._state is None.
806 Ensure that it raises RuntimeError with a meaningful error message
807 and does not modify fileinput._state"""
808 fileinput._state = None
809 with self.assertRaises(RuntimeError) as cm:
810 fileinput.isfirstline()
811 self.assertEqual(("no active input()",), cm.exception.args)
812 self.assertIsNone(fileinput._state)
813
814 def test_state_is_not_None(self):
815 """Tests fileinput.isfirstline() when fileinput._state is not None.
816 Ensure that it invokes fileinput._state.isfirstline() exactly once,
817 returns whatever it returns, and does not modify fileinput._state
818 to point to a different object."""
819 isfirstline_retval = object()
820 instance = MockFileInput()
821 instance.return_values["isfirstline"] = isfirstline_retval
822 fileinput._state = instance
823 retval = fileinput.isfirstline()
824 self.assertExactlyOneInvocation(instance, "isfirstline")
825 self.assertIs(retval, isfirstline_retval)
826 self.assertIs(fileinput._state, instance)
827
828class Test_fileinput_isstdin(BaseFileInputGlobalMethodsTest):
829 """Unit tests for fileinput.isstdin()"""
830
831 def test_state_is_None(self):
832 """Tests fileinput.isstdin() when fileinput._state is None.
833 Ensure that it raises RuntimeError with a meaningful error message
834 and does not modify fileinput._state"""
835 fileinput._state = None
836 with self.assertRaises(RuntimeError) as cm:
837 fileinput.isstdin()
838 self.assertEqual(("no active input()",), cm.exception.args)
839 self.assertIsNone(fileinput._state)
840
841 def test_state_is_not_None(self):
842 """Tests fileinput.isstdin() when fileinput._state is not None.
843 Ensure that it invokes fileinput._state.isstdin() exactly once,
844 returns whatever it returns, and does not modify fileinput._state
845 to point to a different object."""
846 isstdin_retval = object()
847 instance = MockFileInput()
848 instance.return_values["isstdin"] = isstdin_retval
849 fileinput._state = instance
850 retval = fileinput.isstdin()
851 self.assertExactlyOneInvocation(instance, "isstdin")
852 self.assertIs(retval, isstdin_retval)
853 self.assertIs(fileinput._state, instance)
854
855class InvocationRecorder:
856 def __init__(self):
857 self.invocation_count = 0
858 def __call__(self, *args, **kwargs):
859 self.invocation_count += 1
860 self.last_invocation = (args, kwargs)
861
862class Test_hook_compressed(unittest.TestCase):
863 """Unit tests for fileinput.hook_compressed()"""
864
865 def setUp(self):
866 self.fake_open = InvocationRecorder()
867
868 def test_empty_string(self):
869 self.do_test_use_builtin_open("", 1)
870
871 def test_no_ext(self):
872 self.do_test_use_builtin_open("abcd", 2)
873
Ezio Melottic3afbb92011-05-14 10:10:53 +0300874 @unittest.skipUnless(gzip, "Requires gzip and zlib")
briancurtin5eb35912011-03-15 10:59:36 -0400875 def test_gz_ext_fake(self):
briancurtin906f0c42011-03-15 10:29:41 -0400876 original_open = gzip.open
877 gzip.open = self.fake_open
878 try:
879 result = fileinput.hook_compressed("test.gz", 3)
880 finally:
881 gzip.open = original_open
882
883 self.assertEqual(self.fake_open.invocation_count, 1)
884 self.assertEqual(self.fake_open.last_invocation, (("test.gz", 3), {}))
885
briancurtinf84f3c32011-03-18 13:03:17 -0500886 @unittest.skipUnless(bz2, "Requires bz2")
briancurtin5eb35912011-03-15 10:59:36 -0400887 def test_bz2_ext_fake(self):
briancurtin906f0c42011-03-15 10:29:41 -0400888 original_open = bz2.BZ2File
889 bz2.BZ2File = self.fake_open
890 try:
891 result = fileinput.hook_compressed("test.bz2", 4)
892 finally:
893 bz2.BZ2File = original_open
894
895 self.assertEqual(self.fake_open.invocation_count, 1)
896 self.assertEqual(self.fake_open.last_invocation, (("test.bz2", 4), {}))
897
898 def test_blah_ext(self):
899 self.do_test_use_builtin_open("abcd.blah", 5)
900
briancurtin5eb35912011-03-15 10:59:36 -0400901 def test_gz_ext_builtin(self):
briancurtin906f0c42011-03-15 10:29:41 -0400902 self.do_test_use_builtin_open("abcd.Gz", 6)
903
briancurtin5eb35912011-03-15 10:59:36 -0400904 def test_bz2_ext_builtin(self):
briancurtin906f0c42011-03-15 10:29:41 -0400905 self.do_test_use_builtin_open("abcd.Bz2", 7)
906
907 def do_test_use_builtin_open(self, filename, mode):
908 original_open = self.replace_builtin_open(self.fake_open)
909 try:
910 result = fileinput.hook_compressed(filename, mode)
911 finally:
912 self.replace_builtin_open(original_open)
913
914 self.assertEqual(self.fake_open.invocation_count, 1)
915 self.assertEqual(self.fake_open.last_invocation,
916 ((filename, mode), {}))
917
918 @staticmethod
919 def replace_builtin_open(new_open_func):
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100920 original_open = builtins.open
921 builtins.open = new_open_func
briancurtin906f0c42011-03-15 10:29:41 -0400922 return original_open
923
924class Test_hook_encoded(unittest.TestCase):
925 """Unit tests for fileinput.hook_encoded()"""
926
927 def test(self):
928 encoding = object()
Serhiy Storchakab2752102016-04-27 23:13:46 +0300929 errors = object()
930 result = fileinput.hook_encoded(encoding, errors=errors)
briancurtin906f0c42011-03-15 10:29:41 -0400931
932 fake_open = InvocationRecorder()
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100933 original_open = builtins.open
934 builtins.open = fake_open
briancurtin906f0c42011-03-15 10:29:41 -0400935 try:
936 filename = object()
937 mode = object()
938 open_result = result(filename, mode)
939 finally:
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100940 builtins.open = original_open
briancurtin906f0c42011-03-15 10:29:41 -0400941
942 self.assertEqual(fake_open.invocation_count, 1)
943
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100944 args, kwargs = fake_open.last_invocation
briancurtin906f0c42011-03-15 10:29:41 -0400945 self.assertIs(args[0], filename)
946 self.assertIs(args[1], mode)
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100947 self.assertIs(kwargs.pop('encoding'), encoding)
Serhiy Storchakab2752102016-04-27 23:13:46 +0300948 self.assertIs(kwargs.pop('errors'), errors)
Florent Xiclunaa011e2b2011-11-07 19:43:07 +0100949 self.assertFalse(kwargs)
Georg Brandl6cb7b652010-07-31 20:08:15 +0000950
Serhiy Storchakab2752102016-04-27 23:13:46 +0300951 def test_errors(self):
952 with open(TESTFN, 'wb') as f:
953 f.write(b'\x80abc')
954 self.addCleanup(safe_unlink, TESTFN)
955
956 def check(errors, expected_lines):
957 with FileInput(files=TESTFN, mode='r',
958 openhook=hook_encoded('utf-8', errors=errors)) as fi:
959 lines = list(fi)
960 self.assertEqual(lines, expected_lines)
961
962 check('ignore', ['abc'])
963 with self.assertRaises(UnicodeDecodeError):
964 check('strict', ['abc'])
965 check('replace', ['\ufffdabc'])
966 check('backslashreplace', ['\\x80abc'])
967
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200968 def test_modes(self):
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200969 with open(TESTFN, 'wb') as f:
Serhiy Storchaka682ea5f2014-03-03 21:17:17 +0200970 # UTF-7 is a convenient, seldom used encoding
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200971 f.write(b'A\nB\r\nC\rD+IKw-')
972 self.addCleanup(safe_unlink, TESTFN)
973
974 def check(mode, expected_lines):
975 with FileInput(files=TESTFN, mode=mode,
976 openhook=hook_encoded('utf-7')) as fi:
977 lines = list(fi)
978 self.assertEqual(lines, expected_lines)
979
980 check('r', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
Serhiy Storchaka9fff8492014-02-26 21:03:19 +0200981 with self.assertWarns(DeprecationWarning):
982 check('rU', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
983 with self.assertWarns(DeprecationWarning):
984 check('U', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
Serhiy Storchaka517b7472014-02-26 20:59:43 +0200985 with self.assertRaises(ValueError):
986 check('rb', ['A\n', 'B\r\n', 'C\r', 'D\u20ac'])
987
Guido van Rossumd8faa362007-04-27 19:54:29 +0000988
Martin Panter7978e102016-01-16 06:26:54 +0000989class MiscTest(unittest.TestCase):
990
991 def test_all(self):
Serhiy Storchaka674e2d02016-03-08 18:35:19 +0200992 support.check__all__(self, fileinput)
Martin Panter7978e102016-01-16 06:26:54 +0000993
994
Guido van Rossumd8faa362007-04-27 19:54:29 +0000995if __name__ == "__main__":
Brett Cannon3e9a9ae2013-06-12 21:25:59 -0400996 unittest.main()