blob: 76e4d16841f9c717326d61a4068b79a453ab4729 [file] [log] [blame]
Tim Peters3230d5c2001-07-11 22:21:17 +00001'''
2Tests for fileinput module.
3Nick Mathewson
4'''
Benjamin Petersoneb462882011-03-15 09:50:18 -05005import os
6import sys
7import re
briancurtin906f0c42011-03-15 10:29:41 -04008import fileinput
9import collections
Benjamin Petersoneb462882011-03-15 09:50:18 -050010import gzip
briancurtin906f0c42011-03-15 10:29:41 -040011import types
12import codecs
Benjamin Petersoneb462882011-03-15 09:50:18 -050013import unittest
14
briancurtinf84f3c32011-03-18 13:03:17 -050015try:
16 import bz2
17except ImportError:
18 bz2 = None
19
Benjamin Petersoneb462882011-03-15 09:50:18 -050020from io import StringIO
21from fileinput import FileInput, hook_encoded
22
23from test.support import verbose, TESTFN, run_unittest
24from test.support import unlink as safe_unlink
25
Tim Peters3230d5c2001-07-11 22:21:17 +000026
27# The fileinput module has 2 interfaces: the FileInput class which does
28# all the work, and a few functions (input, etc.) that use a global _state
briancurtin906f0c42011-03-15 10:29:41 -040029# variable.
Tim Peters3230d5c2001-07-11 22:21:17 +000030
31# Write lines (a list of lines) to temp file number i, and return the
32# temp file's name.
Tim Peters4d7cad12006-02-19 21:22:10 +000033def writeTmp(i, lines, mode='w'): # opening in text mode is the default
Tim Peters3230d5c2001-07-11 22:21:17 +000034 name = TESTFN + str(i)
Tim Peters4d7cad12006-02-19 21:22:10 +000035 f = open(name, mode)
Guido van Rossumc43e79f2007-06-18 18:26:36 +000036 for line in lines:
37 f.write(line)
Tim Peters3230d5c2001-07-11 22:21:17 +000038 f.close()
39 return name
40
Tim Peters3230d5c2001-07-11 22:21:17 +000041def remove_tempfiles(*names):
42 for name in names:
Guido van Rossume22905a2007-08-27 23:09:25 +000043 if name:
44 safe_unlink(name)
Tim Peters3230d5c2001-07-11 22:21:17 +000045
Guido van Rossumd8faa362007-04-27 19:54:29 +000046class BufferSizesTests(unittest.TestCase):
47 def test_buffer_sizes(self):
48 # First, run the tests with default and teeny buffer size.
49 for round, bs in (0, 0), (1, 30):
Neal Norwitz2595e762008-03-24 06:10:13 +000050 t1 = t2 = t3 = t4 = None
Guido van Rossumd8faa362007-04-27 19:54:29 +000051 try:
52 t1 = writeTmp(1, ["Line %s of file 1\n" % (i+1) for i in range(15)])
53 t2 = writeTmp(2, ["Line %s of file 2\n" % (i+1) for i in range(10)])
54 t3 = writeTmp(3, ["Line %s of file 3\n" % (i+1) for i in range(5)])
55 t4 = writeTmp(4, ["Line %s of file 4\n" % (i+1) for i in range(1)])
56 self.buffer_size_test(t1, t2, t3, t4, bs, round)
57 finally:
58 remove_tempfiles(t1, t2, t3, t4)
Tim Peters3230d5c2001-07-11 22:21:17 +000059
Guido van Rossumd8faa362007-04-27 19:54:29 +000060 def buffer_size_test(self, t1, t2, t3, t4, bs=0, round=0):
61 pat = re.compile(r'LINE (\d+) OF FILE (\d+)')
Tim Peters3230d5c2001-07-11 22:21:17 +000062
Guido van Rossumd8faa362007-04-27 19:54:29 +000063 start = 1 + round*6
64 if verbose:
65 print('%s. Simple iteration (bs=%s)' % (start+0, bs))
66 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
Tim Peters3230d5c2001-07-11 22:21:17 +000067 lines = list(fi)
Tim Peters3230d5c2001-07-11 22:21:17 +000068 fi.close()
Guido van Rossumd8faa362007-04-27 19:54:29 +000069 self.assertEqual(len(lines), 31)
70 self.assertEqual(lines[4], 'Line 5 of file 1\n')
71 self.assertEqual(lines[30], 'Line 1 of file 4\n')
72 self.assertEqual(fi.lineno(), 31)
73 self.assertEqual(fi.filename(), t4)
Tim Peters3230d5c2001-07-11 22:21:17 +000074
Guido van Rossumd8faa362007-04-27 19:54:29 +000075 if verbose:
76 print('%s. Status variables (bs=%s)' % (start+1, bs))
77 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
78 s = "x"
79 while s and s != 'Line 6 of file 2\n':
80 s = fi.readline()
81 self.assertEqual(fi.filename(), t2)
82 self.assertEqual(fi.lineno(), 21)
83 self.assertEqual(fi.filelineno(), 6)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000084 self.assertFalse(fi.isfirstline())
85 self.assertFalse(fi.isstdin())
Tim Peters3230d5c2001-07-11 22:21:17 +000086
Guido van Rossumd8faa362007-04-27 19:54:29 +000087 if verbose:
88 print('%s. Nextfile (bs=%s)' % (start+2, bs))
89 fi.nextfile()
90 self.assertEqual(fi.readline(), 'Line 1 of file 3\n')
91 self.assertEqual(fi.lineno(), 22)
92 fi.close()
Tim Peters3230d5c2001-07-11 22:21:17 +000093
Guido van Rossumd8faa362007-04-27 19:54:29 +000094 if verbose:
95 print('%s. Stdin (bs=%s)' % (start+3, bs))
96 fi = FileInput(files=(t1, t2, t3, t4, '-'), bufsize=bs)
97 savestdin = sys.stdin
98 try:
99 sys.stdin = StringIO("Line 1 of stdin\nLine 2 of stdin\n")
100 lines = list(fi)
101 self.assertEqual(len(lines), 33)
102 self.assertEqual(lines[32], 'Line 2 of stdin\n')
103 self.assertEqual(fi.filename(), '<stdin>')
104 fi.nextfile()
105 finally:
106 sys.stdin = savestdin
Tim Peters3230d5c2001-07-11 22:21:17 +0000107
Guido van Rossumd8faa362007-04-27 19:54:29 +0000108 if verbose:
109 print('%s. Boundary conditions (bs=%s)' % (start+4, bs))
110 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
111 self.assertEqual(fi.lineno(), 0)
112 self.assertEqual(fi.filename(), None)
113 fi.nextfile()
114 self.assertEqual(fi.lineno(), 0)
115 self.assertEqual(fi.filename(), None)
Tim Peters3230d5c2001-07-11 22:21:17 +0000116
Guido van Rossumd8faa362007-04-27 19:54:29 +0000117 if verbose:
118 print('%s. Inplace (bs=%s)' % (start+5, bs))
119 savestdout = sys.stdout
120 try:
121 fi = FileInput(files=(t1, t2, t3, t4), inplace=1, bufsize=bs)
122 for line in fi:
123 line = line[:-1].upper()
124 print(line)
125 fi.close()
126 finally:
127 sys.stdout = savestdout
Tim Peters3230d5c2001-07-11 22:21:17 +0000128
Guido van Rossumd8faa362007-04-27 19:54:29 +0000129 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
130 for line in fi:
131 self.assertEqual(line[-1], '\n')
132 m = pat.match(line[:-1])
133 self.assertNotEqual(m, None)
134 self.assertEqual(int(m.group(1)), fi.filelineno())
135 fi.close()
Georg Brandle4662172006-02-19 09:51:27 +0000136
briancurtin906f0c42011-03-15 10:29:41 -0400137class UnconditionallyRaise:
138 def __init__(self, exception_type):
139 self.exception_type = exception_type
140 self.invoked = False
141 def __call__(self, *args, **kwargs):
142 self.invoked = True
143 raise self.exception_type()
144
Guido van Rossumd8faa362007-04-27 19:54:29 +0000145class FileInputTests(unittest.TestCase):
briancurtin906f0c42011-03-15 10:29:41 -0400146
Guido van Rossumd8faa362007-04-27 19:54:29 +0000147 def test_zero_byte_files(self):
Neal Norwitz2595e762008-03-24 06:10:13 +0000148 t1 = t2 = t3 = t4 = None
Guido van Rossumd8faa362007-04-27 19:54:29 +0000149 try:
150 t1 = writeTmp(1, [""])
151 t2 = writeTmp(2, [""])
152 t3 = writeTmp(3, ["The only line there is.\n"])
153 t4 = writeTmp(4, [""])
154 fi = FileInput(files=(t1, t2, t3, t4))
Georg Brandl67e9fb92006-02-19 13:56:17 +0000155
Guido van Rossumd8faa362007-04-27 19:54:29 +0000156 line = fi.readline()
157 self.assertEqual(line, 'The only line there is.\n')
158 self.assertEqual(fi.lineno(), 1)
159 self.assertEqual(fi.filelineno(), 1)
160 self.assertEqual(fi.filename(), t3)
Georg Brandlc029f872006-02-19 14:12:34 +0000161
Guido van Rossumd8faa362007-04-27 19:54:29 +0000162 line = fi.readline()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000163 self.assertFalse(line)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000164 self.assertEqual(fi.lineno(), 1)
165 self.assertEqual(fi.filelineno(), 0)
166 self.assertEqual(fi.filename(), t4)
167 fi.close()
168 finally:
169 remove_tempfiles(t1, t2, t3, t4)
Georg Brandlc98eeed2006-02-19 14:57:47 +0000170
Guido van Rossumd8faa362007-04-27 19:54:29 +0000171 def test_files_that_dont_end_with_newline(self):
Neal Norwitz2595e762008-03-24 06:10:13 +0000172 t1 = t2 = None
Guido van Rossumd8faa362007-04-27 19:54:29 +0000173 try:
174 t1 = writeTmp(1, ["A\nB\nC"])
175 t2 = writeTmp(2, ["D\nE\nF"])
176 fi = FileInput(files=(t1, t2))
177 lines = list(fi)
178 self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"])
179 self.assertEqual(fi.filelineno(), 3)
180 self.assertEqual(fi.lineno(), 6)
181 finally:
182 remove_tempfiles(t1, t2)
183
Guido van Rossumc43e79f2007-06-18 18:26:36 +0000184## def test_unicode_filenames(self):
185## # XXX A unicode string is always returned by writeTmp.
186## # So is this needed?
187## try:
188## t1 = writeTmp(1, ["A\nB"])
189## encoding = sys.getfilesystemencoding()
190## if encoding is None:
191## encoding = 'ascii'
192## fi = FileInput(files=str(t1, encoding))
193## lines = list(fi)
194## self.assertEqual(lines, ["A\n", "B"])
195## finally:
196## remove_tempfiles(t1)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000197
198 def test_fileno(self):
Neal Norwitz2595e762008-03-24 06:10:13 +0000199 t1 = t2 = None
Guido van Rossumd8faa362007-04-27 19:54:29 +0000200 try:
201 t1 = writeTmp(1, ["A\nB"])
202 t2 = writeTmp(2, ["C\nD"])
203 fi = FileInput(files=(t1, t2))
204 self.assertEqual(fi.fileno(), -1)
205 line =next( fi)
206 self.assertNotEqual(fi.fileno(), -1)
207 fi.nextfile()
208 self.assertEqual(fi.fileno(), -1)
209 line = list(fi)
210 self.assertEqual(fi.fileno(), -1)
211 finally:
212 remove_tempfiles(t1, t2)
213
214 def test_opening_mode(self):
215 try:
216 # invalid mode, should raise ValueError
217 fi = FileInput(mode="w")
218 self.fail("FileInput should reject invalid mode argument")
219 except ValueError:
220 pass
Guido van Rossume22905a2007-08-27 23:09:25 +0000221 t1 = None
Guido van Rossumd8faa362007-04-27 19:54:29 +0000222 try:
223 # try opening in universal newline mode
Guido van Rossume22905a2007-08-27 23:09:25 +0000224 t1 = writeTmp(1, [b"A\nB\r\nC\rD"], mode="wb")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000225 fi = FileInput(files=t1, mode="U")
226 lines = list(fi)
227 self.assertEqual(lines, ["A\n", "B\n", "C\n", "D"])
228 finally:
229 remove_tempfiles(t1)
230
Guido van Rossume22905a2007-08-27 23:09:25 +0000231 def test_file_opening_hook(self):
232 try:
233 # cannot use openhook and inplace mode
234 fi = FileInput(inplace=1, openhook=lambda f, m: None)
235 self.fail("FileInput should raise if both inplace "
236 "and openhook arguments are given")
237 except ValueError:
238 pass
239 try:
240 fi = FileInput(openhook=1)
241 self.fail("FileInput should check openhook for being callable")
242 except ValueError:
243 pass
briancurtin906f0c42011-03-15 10:29:41 -0400244
245 class CustomOpenHook:
246 def __init__(self):
247 self.invoked = False
248 def __call__(self, *args):
249 self.invoked = True
250 return open(*args)
251
252 t = writeTmp(1, ["\n"])
253 self.addCleanup(remove_tempfiles, t)
254 custom_open_hook = CustomOpenHook()
255 with FileInput([t], openhook=custom_open_hook) as fi:
256 fi.readline()
257 self.assertTrue(custom_open_hook.invoked, "openhook not invoked")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000258
Georg Brandl6cb7b652010-07-31 20:08:15 +0000259 def test_context_manager(self):
260 try:
261 t1 = writeTmp(1, ["A\nB\nC"])
262 t2 = writeTmp(2, ["D\nE\nF"])
263 with FileInput(files=(t1, t2)) as fi:
264 lines = list(fi)
265 self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"])
266 self.assertEqual(fi.filelineno(), 3)
267 self.assertEqual(fi.lineno(), 6)
268 self.assertEqual(fi._files, ())
269 finally:
270 remove_tempfiles(t1, t2)
271
272 def test_close_on_exception(self):
273 try:
274 t1 = writeTmp(1, [""])
275 with FileInput(files=t1) as fi:
276 raise IOError
277 except IOError:
278 self.assertEqual(fi._files, ())
279 finally:
280 remove_tempfiles(t1)
281
briancurtin906f0c42011-03-15 10:29:41 -0400282 def test_empty_files_list_specified_to_constructor(self):
283 with FileInput(files=[]) as fi:
Brett Cannond47af532011-03-15 15:55:12 -0400284 self.assertEqual(fi._files, ('-',))
briancurtin906f0c42011-03-15 10:29:41 -0400285
286 def test__getitem__(self):
287 """Tests invoking FileInput.__getitem__() with the current
288 line number"""
289 t = writeTmp(1, ["line1\n", "line2\n"])
290 self.addCleanup(remove_tempfiles, t)
291 with FileInput(files=[t]) as fi:
292 retval1 = fi[0]
293 self.assertEqual(retval1, "line1\n")
294 retval2 = fi[1]
295 self.assertEqual(retval2, "line2\n")
296
297 def test__getitem__invalid_key(self):
298 """Tests invoking FileInput.__getitem__() with an index unequal to
299 the line number"""
300 t = writeTmp(1, ["line1\n", "line2\n"])
301 self.addCleanup(remove_tempfiles, t)
302 with FileInput(files=[t]) as fi:
303 with self.assertRaises(RuntimeError) as cm:
304 fi[1]
Brett Cannond47af532011-03-15 15:55:12 -0400305 self.assertEqual(cm.exception.args, ("accessing lines out of order",))
briancurtin906f0c42011-03-15 10:29:41 -0400306
307 def test__getitem__eof(self):
308 """Tests invoking FileInput.__getitem__() with the line number but at
309 end-of-input"""
310 t = writeTmp(1, [])
311 self.addCleanup(remove_tempfiles, t)
312 with FileInput(files=[t]) as fi:
313 with self.assertRaises(IndexError) as cm:
314 fi[0]
Brett Cannond47af532011-03-15 15:55:12 -0400315 self.assertEqual(cm.exception.args, ("end of input reached",))
briancurtin906f0c42011-03-15 10:29:41 -0400316
317 def test_nextfile_oserror_deleting_backup(self):
318 """Tests invoking FileInput.nextfile() when the attempt to delete
319 the backup file would raise OSError. This error is expected to be
320 silently ignored"""
321
322 os_unlink_orig = os.unlink
323 os_unlink_replacement = UnconditionallyRaise(OSError)
324 try:
325 t = writeTmp(1, ["\n"])
326 self.addCleanup(remove_tempfiles, t)
327 with FileInput(files=[t], inplace=True) as fi:
328 next(fi) # make sure the file is opened
329 os.unlink = os_unlink_replacement
330 fi.nextfile()
331 finally:
332 os.unlink = os_unlink_orig
333
334 # sanity check to make sure that our test scenario was actually hit
335 self.assertTrue(os_unlink_replacement.invoked,
336 "os.unlink() was not invoked")
337
338 def test_readline_os_fstat_raises_OSError(self):
339 """Tests invoking FileInput.readline() when os.fstat() raises OSError.
340 This exception should be silently discarded."""
341
342 os_fstat_orig = os.fstat
343 os_fstat_replacement = UnconditionallyRaise(OSError)
344 try:
345 t = writeTmp(1, ["\n"])
346 self.addCleanup(remove_tempfiles, t)
347 with FileInput(files=[t], inplace=True) as fi:
348 os.fstat = os_fstat_replacement
349 fi.readline()
350 finally:
351 os.fstat = os_fstat_orig
352
353 # sanity check to make sure that our test scenario was actually hit
354 self.assertTrue(os_fstat_replacement.invoked,
355 "os.fstat() was not invoked")
356
357 @unittest.skipIf(not hasattr(os, "chmod"), "os.chmod does not exist")
358 def test_readline_os_chmod_raises_OSError(self):
359 """Tests invoking FileInput.readline() when os.chmod() raises OSError.
360 This exception should be silently discarded."""
361
362 os_chmod_orig = os.chmod
363 os_chmod_replacement = UnconditionallyRaise(OSError)
364 try:
365 t = writeTmp(1, ["\n"])
366 self.addCleanup(remove_tempfiles, t)
367 with FileInput(files=[t], inplace=True) as fi:
368 os.chmod = os_chmod_replacement
369 fi.readline()
370 finally:
371 os.chmod = os_chmod_orig
372
373 # sanity check to make sure that our test scenario was actually hit
374 self.assertTrue(os_chmod_replacement.invoked,
375 "os.fstat() was not invoked")
376
377 def test_fileno_when_ValueError_raised(self):
378 class FilenoRaisesValueError(UnconditionallyRaise):
379 def __init__(self):
380 UnconditionallyRaise.__init__(self, ValueError)
381 def fileno(self):
382 self.__call__()
383
384 unconditionally_raise_ValueError = FilenoRaisesValueError()
385 t = writeTmp(1, ["\n"])
386 self.addCleanup(remove_tempfiles, t)
387 with FileInput(files=[t]) as fi:
388 file_backup = fi._file
389 try:
390 fi._file = unconditionally_raise_ValueError
391 result = fi.fileno()
392 finally:
393 fi._file = file_backup # make sure the file gets cleaned up
394
395 # sanity check to make sure that our test scenario was actually hit
396 self.assertTrue(unconditionally_raise_ValueError.invoked,
397 "_file.fileno() was not invoked")
398
399 self.assertEqual(result, -1, "fileno() should return -1")
400
401class MockFileInput:
402 """A class that mocks out fileinput.FileInput for use during unit tests"""
403
404 def __init__(self, files=None, inplace=False, backup="", bufsize=0,
405 mode="r", openhook=None):
406 self.files = files
407 self.inplace = inplace
408 self.backup = backup
409 self.bufsize = bufsize
410 self.mode = mode
411 self.openhook = openhook
412 self._file = None
413 self.invocation_counts = collections.defaultdict(lambda: 0)
414 self.return_values = {}
415
416 def close(self):
417 self.invocation_counts["close"] += 1
418
419 def nextfile(self):
420 self.invocation_counts["nextfile"] += 1
421 return self.return_values["nextfile"]
422
423 def filename(self):
424 self.invocation_counts["filename"] += 1
425 return self.return_values["filename"]
426
427 def lineno(self):
428 self.invocation_counts["lineno"] += 1
429 return self.return_values["lineno"]
430
431 def filelineno(self):
432 self.invocation_counts["filelineno"] += 1
433 return self.return_values["filelineno"]
434
435 def fileno(self):
436 self.invocation_counts["fileno"] += 1
437 return self.return_values["fileno"]
438
439 def isfirstline(self):
440 self.invocation_counts["isfirstline"] += 1
441 return self.return_values["isfirstline"]
442
443 def isstdin(self):
444 self.invocation_counts["isstdin"] += 1
445 return self.return_values["isstdin"]
446
447class BaseFileInputGlobalMethodsTest(unittest.TestCase):
448 """Base class for unit tests for the global function of
449 the fileinput module."""
450
451 def setUp(self):
452 self._orig_state = fileinput._state
453 self._orig_FileInput = fileinput.FileInput
454 fileinput.FileInput = MockFileInput
455
456 def tearDown(self):
457 fileinput.FileInput = self._orig_FileInput
458 fileinput._state = self._orig_state
459
460 def assertExactlyOneInvocation(self, mock_file_input, method_name):
461 # assert that the method with the given name was invoked once
462 actual_count = mock_file_input.invocation_counts[method_name]
463 self.assertEqual(actual_count, 1, method_name)
464 # assert that no other unexpected methods were invoked
465 actual_total_count = len(mock_file_input.invocation_counts)
466 self.assertEqual(actual_total_count, 1)
467
468class Test_fileinput_input(BaseFileInputGlobalMethodsTest):
469 """Unit tests for fileinput.input()"""
470
471 def test_state_is_not_None_and_state_file_is_not_None(self):
472 """Tests invoking fileinput.input() when fileinput._state is not None
473 and its _file attribute is also not None. Expect RuntimeError to
474 be raised with a meaningful error message and for fileinput._state
475 to *not* be modified."""
476 instance = MockFileInput()
477 instance._file = object()
478 fileinput._state = instance
479 with self.assertRaises(RuntimeError) as cm:
480 fileinput.input()
481 self.assertEqual(("input() already active",), cm.exception.args)
482 self.assertIs(instance, fileinput._state, "fileinput._state")
483
484 def test_state_is_not_None_and_state_file_is_None(self):
485 """Tests invoking fileinput.input() when fileinput._state is not None
486 but its _file attribute *is* None. Expect it to create and return
487 a new fileinput.FileInput object with all method parameters passed
488 explicitly to the __init__() method; also ensure that
489 fileinput._state is set to the returned instance."""
490 instance = MockFileInput()
491 instance._file = None
492 fileinput._state = instance
493 self.do_test_call_input()
494
495 def test_state_is_None(self):
496 """Tests invoking fileinput.input() when fileinput._state is None
497 Expect it to create and return a new fileinput.FileInput object
498 with all method parameters passed explicitly to the __init__()
499 method; also ensure that fileinput._state is set to the returned
500 instance."""
501 fileinput._state = None
502 self.do_test_call_input()
503
504 def do_test_call_input(self):
505 """Tests that fileinput.input() creates a new fileinput.FileInput
506 object, passing the given parameters unmodified to
507 fileinput.FileInput.__init__(). Note that this test depends on the
508 monkey patching of fileinput.FileInput done by setUp()."""
509 files = object()
510 inplace = object()
511 backup = object()
512 bufsize = object()
513 mode = object()
514 openhook = object()
515
516 # call fileinput.input() with different values for each argument
517 result = fileinput.input(files=files, inplace=inplace, backup=backup,
518 bufsize=bufsize,
519 mode=mode, openhook=openhook)
520
521 # ensure fileinput._state was set to the returned object
522 self.assertIs(result, fileinput._state, "fileinput._state")
523
524 # ensure the parameters to fileinput.input() were passed directly
525 # to FileInput.__init__()
526 self.assertIs(files, result.files, "files")
527 self.assertIs(inplace, result.inplace, "inplace")
528 self.assertIs(backup, result.backup, "backup")
529 self.assertIs(bufsize, result.bufsize, "bufsize")
530 self.assertIs(mode, result.mode, "mode")
531 self.assertIs(openhook, result.openhook, "openhook")
532
533class Test_fileinput_close(BaseFileInputGlobalMethodsTest):
534 """Unit tests for fileinput.close()"""
535
536 def test_state_is_None(self):
537 """Tests that fileinput.close() does nothing if fileinput._state
538 is None"""
539 fileinput._state = None
540 fileinput.close()
541 self.assertIsNone(fileinput._state)
542
543 def test_state_is_not_None(self):
544 """Tests that fileinput.close() invokes close() on fileinput._state
545 and sets _state=None"""
546 instance = MockFileInput()
547 fileinput._state = instance
548 fileinput.close()
549 self.assertExactlyOneInvocation(instance, "close")
550 self.assertIsNone(fileinput._state)
551
552class Test_fileinput_nextfile(BaseFileInputGlobalMethodsTest):
553 """Unit tests for fileinput.nextfile()"""
554
555 def test_state_is_None(self):
556 """Tests fileinput.nextfile() when fileinput._state is None.
557 Ensure that it raises RuntimeError with a meaningful error message
558 and does not modify fileinput._state"""
559 fileinput._state = None
560 with self.assertRaises(RuntimeError) as cm:
561 fileinput.nextfile()
562 self.assertEqual(("no active input()",), cm.exception.args)
563 self.assertIsNone(fileinput._state)
564
565 def test_state_is_not_None(self):
566 """Tests fileinput.nextfile() when fileinput._state is not None.
567 Ensure that it invokes fileinput._state.nextfile() exactly once,
568 returns whatever it returns, and does not modify fileinput._state
569 to point to a different object."""
570 nextfile_retval = object()
571 instance = MockFileInput()
572 instance.return_values["nextfile"] = nextfile_retval
573 fileinput._state = instance
574 retval = fileinput.nextfile()
575 self.assertExactlyOneInvocation(instance, "nextfile")
576 self.assertIs(retval, nextfile_retval)
577 self.assertIs(fileinput._state, instance)
578
579class Test_fileinput_filename(BaseFileInputGlobalMethodsTest):
580 """Unit tests for fileinput.filename()"""
581
582 def test_state_is_None(self):
583 """Tests fileinput.filename() when fileinput._state is None.
584 Ensure that it raises RuntimeError with a meaningful error message
585 and does not modify fileinput._state"""
586 fileinput._state = None
587 with self.assertRaises(RuntimeError) as cm:
588 fileinput.filename()
589 self.assertEqual(("no active input()",), cm.exception.args)
590 self.assertIsNone(fileinput._state)
591
592 def test_state_is_not_None(self):
593 """Tests fileinput.filename() when fileinput._state is not None.
594 Ensure that it invokes fileinput._state.filename() exactly once,
595 returns whatever it returns, and does not modify fileinput._state
596 to point to a different object."""
597 filename_retval = object()
598 instance = MockFileInput()
599 instance.return_values["filename"] = filename_retval
600 fileinput._state = instance
601 retval = fileinput.filename()
602 self.assertExactlyOneInvocation(instance, "filename")
603 self.assertIs(retval, filename_retval)
604 self.assertIs(fileinput._state, instance)
605
606class Test_fileinput_lineno(BaseFileInputGlobalMethodsTest):
607 """Unit tests for fileinput.lineno()"""
608
609 def test_state_is_None(self):
610 """Tests fileinput.lineno() when fileinput._state is None.
611 Ensure that it raises RuntimeError with a meaningful error message
612 and does not modify fileinput._state"""
613 fileinput._state = None
614 with self.assertRaises(RuntimeError) as cm:
615 fileinput.lineno()
616 self.assertEqual(("no active input()",), cm.exception.args)
617 self.assertIsNone(fileinput._state)
618
619 def test_state_is_not_None(self):
620 """Tests fileinput.lineno() when fileinput._state is not None.
621 Ensure that it invokes fileinput._state.lineno() exactly once,
622 returns whatever it returns, and does not modify fileinput._state
623 to point to a different object."""
624 lineno_retval = object()
625 instance = MockFileInput()
626 instance.return_values["lineno"] = lineno_retval
627 fileinput._state = instance
628 retval = fileinput.lineno()
629 self.assertExactlyOneInvocation(instance, "lineno")
630 self.assertIs(retval, lineno_retval)
631 self.assertIs(fileinput._state, instance)
632
633class Test_fileinput_filelineno(BaseFileInputGlobalMethodsTest):
634 """Unit tests for fileinput.filelineno()"""
635
636 def test_state_is_None(self):
637 """Tests fileinput.filelineno() when fileinput._state is None.
638 Ensure that it raises RuntimeError with a meaningful error message
639 and does not modify fileinput._state"""
640 fileinput._state = None
641 with self.assertRaises(RuntimeError) as cm:
642 fileinput.filelineno()
643 self.assertEqual(("no active input()",), cm.exception.args)
644 self.assertIsNone(fileinput._state)
645
646 def test_state_is_not_None(self):
647 """Tests fileinput.filelineno() when fileinput._state is not None.
648 Ensure that it invokes fileinput._state.filelineno() exactly once,
649 returns whatever it returns, and does not modify fileinput._state
650 to point to a different object."""
651 filelineno_retval = object()
652 instance = MockFileInput()
653 instance.return_values["filelineno"] = filelineno_retval
654 fileinput._state = instance
655 retval = fileinput.filelineno()
656 self.assertExactlyOneInvocation(instance, "filelineno")
657 self.assertIs(retval, filelineno_retval)
658 self.assertIs(fileinput._state, instance)
659
660class Test_fileinput_fileno(BaseFileInputGlobalMethodsTest):
661 """Unit tests for fileinput.fileno()"""
662
663 def test_state_is_None(self):
664 """Tests fileinput.fileno() when fileinput._state is None.
665 Ensure that it raises RuntimeError with a meaningful error message
666 and does not modify fileinput._state"""
667 fileinput._state = None
668 with self.assertRaises(RuntimeError) as cm:
669 fileinput.fileno()
670 self.assertEqual(("no active input()",), cm.exception.args)
671 self.assertIsNone(fileinput._state)
672
673 def test_state_is_not_None(self):
674 """Tests fileinput.fileno() when fileinput._state is not None.
675 Ensure that it invokes fileinput._state.fileno() exactly once,
676 returns whatever it returns, and does not modify fileinput._state
677 to point to a different object."""
678 fileno_retval = object()
679 instance = MockFileInput()
680 instance.return_values["fileno"] = fileno_retval
681 instance.fileno_retval = fileno_retval
682 fileinput._state = instance
683 retval = fileinput.fileno()
684 self.assertExactlyOneInvocation(instance, "fileno")
685 self.assertIs(retval, fileno_retval)
686 self.assertIs(fileinput._state, instance)
687
688class Test_fileinput_isfirstline(BaseFileInputGlobalMethodsTest):
689 """Unit tests for fileinput.isfirstline()"""
690
691 def test_state_is_None(self):
692 """Tests fileinput.isfirstline() when fileinput._state is None.
693 Ensure that it raises RuntimeError with a meaningful error message
694 and does not modify fileinput._state"""
695 fileinput._state = None
696 with self.assertRaises(RuntimeError) as cm:
697 fileinput.isfirstline()
698 self.assertEqual(("no active input()",), cm.exception.args)
699 self.assertIsNone(fileinput._state)
700
701 def test_state_is_not_None(self):
702 """Tests fileinput.isfirstline() when fileinput._state is not None.
703 Ensure that it invokes fileinput._state.isfirstline() exactly once,
704 returns whatever it returns, and does not modify fileinput._state
705 to point to a different object."""
706 isfirstline_retval = object()
707 instance = MockFileInput()
708 instance.return_values["isfirstline"] = isfirstline_retval
709 fileinput._state = instance
710 retval = fileinput.isfirstline()
711 self.assertExactlyOneInvocation(instance, "isfirstline")
712 self.assertIs(retval, isfirstline_retval)
713 self.assertIs(fileinput._state, instance)
714
715class Test_fileinput_isstdin(BaseFileInputGlobalMethodsTest):
716 """Unit tests for fileinput.isstdin()"""
717
718 def test_state_is_None(self):
719 """Tests fileinput.isstdin() when fileinput._state is None.
720 Ensure that it raises RuntimeError with a meaningful error message
721 and does not modify fileinput._state"""
722 fileinput._state = None
723 with self.assertRaises(RuntimeError) as cm:
724 fileinput.isstdin()
725 self.assertEqual(("no active input()",), cm.exception.args)
726 self.assertIsNone(fileinput._state)
727
728 def test_state_is_not_None(self):
729 """Tests fileinput.isstdin() when fileinput._state is not None.
730 Ensure that it invokes fileinput._state.isstdin() exactly once,
731 returns whatever it returns, and does not modify fileinput._state
732 to point to a different object."""
733 isstdin_retval = object()
734 instance = MockFileInput()
735 instance.return_values["isstdin"] = isstdin_retval
736 fileinput._state = instance
737 retval = fileinput.isstdin()
738 self.assertExactlyOneInvocation(instance, "isstdin")
739 self.assertIs(retval, isstdin_retval)
740 self.assertIs(fileinput._state, instance)
741
742class InvocationRecorder:
743 def __init__(self):
744 self.invocation_count = 0
745 def __call__(self, *args, **kwargs):
746 self.invocation_count += 1
747 self.last_invocation = (args, kwargs)
748
749class Test_hook_compressed(unittest.TestCase):
750 """Unit tests for fileinput.hook_compressed()"""
751
752 def setUp(self):
753 self.fake_open = InvocationRecorder()
754
755 def test_empty_string(self):
756 self.do_test_use_builtin_open("", 1)
757
758 def test_no_ext(self):
759 self.do_test_use_builtin_open("abcd", 2)
760
briancurtin5eb35912011-03-15 10:59:36 -0400761 def test_gz_ext_fake(self):
briancurtin906f0c42011-03-15 10:29:41 -0400762 original_open = gzip.open
763 gzip.open = self.fake_open
764 try:
765 result = fileinput.hook_compressed("test.gz", 3)
766 finally:
767 gzip.open = original_open
768
769 self.assertEqual(self.fake_open.invocation_count, 1)
770 self.assertEqual(self.fake_open.last_invocation, (("test.gz", 3), {}))
771
briancurtinf84f3c32011-03-18 13:03:17 -0500772 @unittest.skipUnless(bz2, "Requires bz2")
briancurtin5eb35912011-03-15 10:59:36 -0400773 def test_bz2_ext_fake(self):
briancurtin906f0c42011-03-15 10:29:41 -0400774 original_open = bz2.BZ2File
775 bz2.BZ2File = self.fake_open
776 try:
777 result = fileinput.hook_compressed("test.bz2", 4)
778 finally:
779 bz2.BZ2File = original_open
780
781 self.assertEqual(self.fake_open.invocation_count, 1)
782 self.assertEqual(self.fake_open.last_invocation, (("test.bz2", 4), {}))
783
784 def test_blah_ext(self):
785 self.do_test_use_builtin_open("abcd.blah", 5)
786
briancurtin5eb35912011-03-15 10:59:36 -0400787 def test_gz_ext_builtin(self):
briancurtin906f0c42011-03-15 10:29:41 -0400788 self.do_test_use_builtin_open("abcd.Gz", 6)
789
briancurtin5eb35912011-03-15 10:59:36 -0400790 def test_bz2_ext_builtin(self):
briancurtin906f0c42011-03-15 10:29:41 -0400791 self.do_test_use_builtin_open("abcd.Bz2", 7)
792
793 def do_test_use_builtin_open(self, filename, mode):
794 original_open = self.replace_builtin_open(self.fake_open)
795 try:
796 result = fileinput.hook_compressed(filename, mode)
797 finally:
798 self.replace_builtin_open(original_open)
799
800 self.assertEqual(self.fake_open.invocation_count, 1)
801 self.assertEqual(self.fake_open.last_invocation,
802 ((filename, mode), {}))
803
804 @staticmethod
805 def replace_builtin_open(new_open_func):
806 builtins_type = type(__builtins__)
807 if builtins_type is dict:
808 original_open = __builtins__["open"]
809 __builtins__["open"] = new_open_func
810 elif builtins_type is types.ModuleType:
811 original_open = __builtins__.open
812 __builtins__.open = new_open_func
813 else:
814 raise RuntimeError(
815 "unknown __builtins__ type: %r (unable to replace open)" %
816 builtins_type)
817
818 return original_open
819
820class Test_hook_encoded(unittest.TestCase):
821 """Unit tests for fileinput.hook_encoded()"""
822
823 def test(self):
824 encoding = object()
825 result = fileinput.hook_encoded(encoding)
826
827 fake_open = InvocationRecorder()
828 original_open = codecs.open
829 codecs.open = fake_open
830 try:
831 filename = object()
832 mode = object()
833 open_result = result(filename, mode)
834 finally:
835 codecs.open = original_open
836
837 self.assertEqual(fake_open.invocation_count, 1)
838
839 args = fake_open.last_invocation[0]
840 self.assertIs(args[0], filename)
841 self.assertIs(args[1], mode)
842 self.assertIs(args[2], encoding)
Georg Brandl6cb7b652010-07-31 20:08:15 +0000843
Guido van Rossumd8faa362007-04-27 19:54:29 +0000844def test_main():
briancurtin906f0c42011-03-15 10:29:41 -0400845 run_unittest(
846 BufferSizesTests,
847 FileInputTests,
848 Test_fileinput_input,
849 Test_fileinput_close,
850 Test_fileinput_nextfile,
851 Test_fileinput_filename,
852 Test_fileinput_lineno,
853 Test_fileinput_filelineno,
854 Test_fileinput_fileno,
855 Test_fileinput_isfirstline,
856 Test_fileinput_isstdin,
857 Test_hook_compressed,
858 Test_hook_encoded,
859 )
Guido van Rossumd8faa362007-04-27 19:54:29 +0000860
861if __name__ == "__main__":
862 test_main()