Fix #11509. Significantly increase test coverage for fileinput.
Patch by Denver Coneybeare at the PyCon 2011 Sprints.
diff --git a/Lib/test/ b/Lib/test/
index f312882..b32e0e0 100644
--- a/Lib/test/
+++ b/Lib/test/
@@ -6,14 +6,18 @@
 import unittest
 from import verbose, TESTFN, run_unittest
 from import unlink as safe_unlink
-import sys, re
+import os, sys, re
 from io import StringIO
 from fileinput import FileInput, hook_encoded
+import fileinput
+import collections
+import gzip, bz2
+import types
+import codecs
 # The fileinput module has 2 interfaces: the FileInput class which does
 # all the work, and a few functions (input, etc.) that use a global _state
-# variable.  We only test the FileInput class, since the other functions
-# only provide a thin facade over FileInput.
+# variable.
 # Write lines (a list of lines) to temp file number i, and return the
 # temp file's name.
@@ -121,7 +125,16 @@
             self.assertEqual(int(, fi.filelineno())
+class UnconditionallyRaise:
+    def __init__(self, exception_type):
+        self.exception_type = exception_type
+        self.invoked = False
+    def __call__(self, *args, **kwargs):
+        self.invoked = True
+        raise self.exception_type()
 class FileInputTests(unittest.TestCase):
     def test_zero_byte_files(self):
         t1 = t2 = t3 = t4 = None
@@ -219,17 +232,20 @@
   "FileInput should check openhook for being callable")
         except ValueError:
-        # XXX The rot13 codec was removed.
-        #     So this test needs to be changed to use something else.
-        #     (Or perhaps the API needs to change so we can just pass
-        #     an encoding rather than using a hook?)
-##         try:
-##             t1 = writeTmp(1, ["A\nB"], mode="wb")
-##             fi = FileInput(files=t1, openhook=hook_encoded("rot13"))
-##             lines = list(fi)
-##             self.assertEqual(lines, ["N\n", "O"])
-##         finally:
-##             remove_tempfiles(t1)
+        class CustomOpenHook:
+            def __init__(self):
+                self.invoked = False
+            def __call__(self, *args):
+                self.invoked = True
+                return open(*args)
+        t = writeTmp(1, ["\n"])
+        self.addCleanup(remove_tempfiles, t)
+        custom_open_hook = CustomOpenHook()
+        with FileInput([t], openhook=custom_open_hook) as fi:
+            fi.readline()
+        self.assertTrue(custom_open_hook.invoked, "openhook not invoked")
     def test_context_manager(self):
@@ -254,9 +270,583 @@
+    def test_empty_files_list_specified_to_constructor(self):
+        with FileInput(files=[]) as fi:
+            self.assertEquals(fi._files, ('-',))
+    def test__getitem__(self):
+        """Tests invoking FileInput.__getitem__() with the current
+           line number"""
+        t = writeTmp(1, ["line1\n", "line2\n"])
+        self.addCleanup(remove_tempfiles, t)
+        with FileInput(files=[t]) as fi:
+            retval1 = fi[0]
+            self.assertEqual(retval1, "line1\n")
+            retval2 = fi[1]
+            self.assertEqual(retval2, "line2\n")
+    def test__getitem__invalid_key(self):
+        """Tests invoking FileInput.__getitem__() with an index unequal to
+           the line number"""
+        t = writeTmp(1, ["line1\n", "line2\n"])
+        self.addCleanup(remove_tempfiles, t)
+        with FileInput(files=[t]) as fi:
+            with self.assertRaises(RuntimeError) as cm:
+                fi[1]
+        self.assertEquals(cm.exception.args, ("accessing lines out of order",))
+    def test__getitem__eof(self):
+        """Tests invoking FileInput.__getitem__() with the line number but at
+           end-of-input"""
+        t = writeTmp(1, [])
+        self.addCleanup(remove_tempfiles, t)
+        with FileInput(files=[t]) as fi:
+            with self.assertRaises(IndexError) as cm:
+                fi[0]
+        self.assertEquals(cm.exception.args, ("end of input reached",))
+    def test_nextfile_oserror_deleting_backup(self):
+        """Tests invoking FileInput.nextfile() when the attempt to delete
+           the backup file would raise OSError.  This error is expected to be
+           silently ignored"""
+        os_unlink_orig = os.unlink
+        os_unlink_replacement = UnconditionallyRaise(OSError)
+        try:
+            t = writeTmp(1, ["\n"])
+            self.addCleanup(remove_tempfiles, t)
+            with FileInput(files=[t], inplace=True) as fi:
+                next(fi) # make sure the file is opened
+                os.unlink = os_unlink_replacement
+                fi.nextfile()
+        finally:
+            os.unlink = os_unlink_orig
+        # sanity check to make sure that our test scenario was actually hit
+        self.assertTrue(os_unlink_replacement.invoked,
+                        "os.unlink() was not invoked")
+    def test_readline_os_fstat_raises_OSError(self):
+        """Tests invoking FileInput.readline() when os.fstat() raises OSError.
+           This exception should be silently discarded."""
+        os_fstat_orig = os.fstat
+        os_fstat_replacement = UnconditionallyRaise(OSError)
+        try:
+            t = writeTmp(1, ["\n"])
+            self.addCleanup(remove_tempfiles, t)
+            with FileInput(files=[t], inplace=True) as fi:
+                os.fstat = os_fstat_replacement
+                fi.readline()
+        finally:
+            os.fstat = os_fstat_orig
+        # sanity check to make sure that our test scenario was actually hit
+        self.assertTrue(os_fstat_replacement.invoked,
+                        "os.fstat() was not invoked")
+    @unittest.skipIf(not hasattr(os, "chmod"), "os.chmod does not exist")
+    def test_readline_os_chmod_raises_OSError(self):
+        """Tests invoking FileInput.readline() when os.chmod() raises OSError.
+           This exception should be silently discarded."""
+        os_chmod_orig = os.chmod
+        os_chmod_replacement = UnconditionallyRaise(OSError)
+        try:
+            t = writeTmp(1, ["\n"])
+            self.addCleanup(remove_tempfiles, t)
+            with FileInput(files=[t], inplace=True) as fi:
+                os.chmod = os_chmod_replacement
+                fi.readline()
+        finally:
+            os.chmod = os_chmod_orig
+        # sanity check to make sure that our test scenario was actually hit
+        self.assertTrue(os_chmod_replacement.invoked,
+                        "os.fstat() was not invoked")
+    def test_fileno_when_ValueError_raised(self):
+        class FilenoRaisesValueError(UnconditionallyRaise):
+            def __init__(self):
+                UnconditionallyRaise.__init__(self, ValueError)
+            def fileno(self):
+                self.__call__()
+        unconditionally_raise_ValueError = FilenoRaisesValueError()
+        t = writeTmp(1, ["\n"])
+        self.addCleanup(remove_tempfiles, t)
+        with FileInput(files=[t]) as fi:
+            file_backup = fi._file
+            try:
+                fi._file = unconditionally_raise_ValueError
+                result = fi.fileno()
+            finally:
+                fi._file = file_backup # make sure the file gets cleaned up
+        # sanity check to make sure that our test scenario was actually hit
+        self.assertTrue(unconditionally_raise_ValueError.invoked,
+                        "_file.fileno() was not invoked")
+        self.assertEqual(result, -1, "fileno() should return -1")
+class MockFileInput:
+    """A class that mocks out fileinput.FileInput for use during unit tests"""
+    def __init__(self, files=None, inplace=False, backup="", bufsize=0,
+                 mode="r", openhook=None):
+        self.files = files
+        self.inplace = inplace
+        self.backup = backup
+        self.bufsize = bufsize
+        self.mode = mode
+        self.openhook = openhook
+        self._file = None
+        self.invocation_counts = collections.defaultdict(lambda: 0)
+        self.return_values = {}
+    def close(self):
+        self.invocation_counts["close"] += 1
+    def nextfile(self):
+        self.invocation_counts["nextfile"] += 1
+        return self.return_values["nextfile"]
+    def filename(self):
+        self.invocation_counts["filename"] += 1
+        return self.return_values["filename"]
+    def lineno(self):
+        self.invocation_counts["lineno"] += 1
+        return self.return_values["lineno"]
+    def filelineno(self):
+        self.invocation_counts["filelineno"] += 1
+        return self.return_values["filelineno"]
+    def fileno(self):
+        self.invocation_counts["fileno"] += 1
+        return self.return_values["fileno"]
+    def isfirstline(self):
+        self.invocation_counts["isfirstline"] += 1
+        return self.return_values["isfirstline"]
+    def isstdin(self):
+        self.invocation_counts["isstdin"] += 1
+        return self.return_values["isstdin"]
+class BaseFileInputGlobalMethodsTest(unittest.TestCase):
+    """Base class for unit tests for the global function of
+       the fileinput module."""
+    def setUp(self):
+        self._orig_state = fileinput._state
+        self._orig_FileInput = fileinput.FileInput
+        fileinput.FileInput = MockFileInput
+    def tearDown(self):
+        fileinput.FileInput = self._orig_FileInput
+        fileinput._state = self._orig_state
+    def assertExactlyOneInvocation(self, mock_file_input, method_name):
+        # assert that the method with the given name was invoked once
+        actual_count = mock_file_input.invocation_counts[method_name]
+        self.assertEqual(actual_count, 1, method_name)
+        # assert that no other unexpected methods were invoked
+        actual_total_count = len(mock_file_input.invocation_counts)
+        self.assertEqual(actual_total_count, 1)
+class Test_fileinput_input(BaseFileInputGlobalMethodsTest):
+    """Unit tests for fileinput.input()"""
+    def test_state_is_not_None_and_state_file_is_not_None(self):
+        """Tests invoking fileinput.input() when fileinput._state is not None
+           and its _file attribute is also not None.  Expect RuntimeError to
+           be raised with a meaningful error message and for fileinput._state
+           to *not* be modified."""
+        instance = MockFileInput()
+        instance._file = object()
+        fileinput._state = instance
+        with self.assertRaises(RuntimeError) as cm:
+            fileinput.input()
+        self.assertEqual(("input() already active",), cm.exception.args)
+        self.assertIs(instance, fileinput._state, "fileinput._state")
+    def test_state_is_not_None_and_state_file_is_None(self):
+        """Tests invoking fileinput.input() when fileinput._state is not None
+           but its _file attribute *is* None.  Expect it to create and return
+           a new fileinput.FileInput object with all method parameters passed
+           explicitly to the __init__() method; also ensure that
+           fileinput._state is set to the returned instance."""
+        instance = MockFileInput()
+        instance._file = None
+        fileinput._state = instance
+        self.do_test_call_input()
+    def test_state_is_None(self):
+        """Tests invoking fileinput.input() when fileinput._state is None
+           Expect it to create and return a new fileinput.FileInput object
+           with all method parameters passed explicitly to the __init__()
+           method; also ensure that fileinput._state is set to the returned
+           instance."""
+        fileinput._state = None
+        self.do_test_call_input()
+    def do_test_call_input(self):
+        """Tests that fileinput.input() creates a new fileinput.FileInput
+           object, passing the given parameters unmodified to
+           fileinput.FileInput.__init__().  Note that this test depends on the
+           monkey patching of fileinput.FileInput done by setUp()."""
+        files = object()
+        inplace = object()
+        backup = object()
+        bufsize = object()
+        mode = object()
+        openhook = object()
+        # call fileinput.input() with different values for each argument
+        result = fileinput.input(files=files, inplace=inplace, backup=backup,
+                                 bufsize=bufsize,
+            mode=mode, openhook=openhook)
+        # ensure fileinput._state was set to the returned object
+        self.assertIs(result, fileinput._state, "fileinput._state")
+        # ensure the parameters to fileinput.input() were passed directly
+        # to FileInput.__init__()
+        self.assertIs(files, result.files, "files")
+        self.assertIs(inplace, result.inplace, "inplace")
+        self.assertIs(backup, result.backup, "backup")
+        self.assertIs(bufsize, result.bufsize, "bufsize")
+        self.assertIs(mode, result.mode, "mode")
+        self.assertIs(openhook, result.openhook, "openhook")
+class Test_fileinput_close(BaseFileInputGlobalMethodsTest):
+    """Unit tests for fileinput.close()"""
+    def test_state_is_None(self):
+        """Tests that fileinput.close() does nothing if fileinput._state
+           is None"""
+        fileinput._state = None
+        fileinput.close()
+        self.assertIsNone(fileinput._state)
+    def test_state_is_not_None(self):
+        """Tests that fileinput.close() invokes close() on fileinput._state
+           and sets _state=None"""
+        instance = MockFileInput()
+        fileinput._state = instance
+        fileinput.close()
+        self.assertExactlyOneInvocation(instance, "close")
+        self.assertIsNone(fileinput._state)
+class Test_fileinput_nextfile(BaseFileInputGlobalMethodsTest):
+    """Unit tests for fileinput.nextfile()"""
+    def test_state_is_None(self):
+        """Tests fileinput.nextfile() when fileinput._state is None.
+           Ensure that it raises RuntimeError with a meaningful error message
+           and does not modify fileinput._state"""
+        fileinput._state = None
+        with self.assertRaises(RuntimeError) as cm:
+            fileinput.nextfile()
+        self.assertEqual(("no active input()",), cm.exception.args)
+        self.assertIsNone(fileinput._state)
+    def test_state_is_not_None(self):
+        """Tests fileinput.nextfile() when fileinput._state is not None.
+           Ensure that it invokes fileinput._state.nextfile() exactly once,
+           returns whatever it returns, and does not modify fileinput._state
+           to point to a different object."""
+        nextfile_retval = object()
+        instance = MockFileInput()
+        instance.return_values["nextfile"] = nextfile_retval
+        fileinput._state = instance
+        retval = fileinput.nextfile()
+        self.assertExactlyOneInvocation(instance, "nextfile")
+        self.assertIs(retval, nextfile_retval)
+        self.assertIs(fileinput._state, instance)
+class Test_fileinput_filename(BaseFileInputGlobalMethodsTest):
+    """Unit tests for fileinput.filename()"""
+    def test_state_is_None(self):
+        """Tests fileinput.filename() when fileinput._state is None.
+           Ensure that it raises RuntimeError with a meaningful error message
+           and does not modify fileinput._state"""
+        fileinput._state = None
+        with self.assertRaises(RuntimeError) as cm:
+            fileinput.filename()
+        self.assertEqual(("no active input()",), cm.exception.args)
+        self.assertIsNone(fileinput._state)
+    def test_state_is_not_None(self):
+        """Tests fileinput.filename() when fileinput._state is not None.
+           Ensure that it invokes fileinput._state.filename() exactly once,
+           returns whatever it returns, and does not modify fileinput._state
+           to point to a different object."""
+        filename_retval = object()
+        instance = MockFileInput()
+        instance.return_values["filename"] = filename_retval
+        fileinput._state = instance
+        retval = fileinput.filename()
+        self.assertExactlyOneInvocation(instance, "filename")
+        self.assertIs(retval, filename_retval)
+        self.assertIs(fileinput._state, instance)
+class Test_fileinput_lineno(BaseFileInputGlobalMethodsTest):
+    """Unit tests for fileinput.lineno()"""
+    def test_state_is_None(self):
+        """Tests fileinput.lineno() when fileinput._state is None.
+           Ensure that it raises RuntimeError with a meaningful error message
+           and does not modify fileinput._state"""
+        fileinput._state = None
+        with self.assertRaises(RuntimeError) as cm:
+            fileinput.lineno()
+        self.assertEqual(("no active input()",), cm.exception.args)
+        self.assertIsNone(fileinput._state)
+    def test_state_is_not_None(self):
+        """Tests fileinput.lineno() when fileinput._state is not None.
+           Ensure that it invokes fileinput._state.lineno() exactly once,
+           returns whatever it returns, and does not modify fileinput._state
+           to point to a different object."""
+        lineno_retval = object()
+        instance = MockFileInput()
+        instance.return_values["lineno"] = lineno_retval
+        fileinput._state = instance
+        retval = fileinput.lineno()
+        self.assertExactlyOneInvocation(instance, "lineno")
+        self.assertIs(retval, lineno_retval)
+        self.assertIs(fileinput._state, instance)
+class Test_fileinput_filelineno(BaseFileInputGlobalMethodsTest):
+    """Unit tests for fileinput.filelineno()"""
+    def test_state_is_None(self):
+        """Tests fileinput.filelineno() when fileinput._state is None.
+           Ensure that it raises RuntimeError with a meaningful error message
+           and does not modify fileinput._state"""
+        fileinput._state = None
+        with self.assertRaises(RuntimeError) as cm:
+            fileinput.filelineno()
+        self.assertEqual(("no active input()",), cm.exception.args)
+        self.assertIsNone(fileinput._state)
+    def test_state_is_not_None(self):
+        """Tests fileinput.filelineno() when fileinput._state is not None.
+           Ensure that it invokes fileinput._state.filelineno() exactly once,
+           returns whatever it returns, and does not modify fileinput._state
+           to point to a different object."""
+        filelineno_retval = object()
+        instance = MockFileInput()
+        instance.return_values["filelineno"] = filelineno_retval
+        fileinput._state = instance
+        retval = fileinput.filelineno()
+        self.assertExactlyOneInvocation(instance, "filelineno")
+        self.assertIs(retval, filelineno_retval)
+        self.assertIs(fileinput._state, instance)
+class Test_fileinput_fileno(BaseFileInputGlobalMethodsTest):
+    """Unit tests for fileinput.fileno()"""
+    def test_state_is_None(self):
+        """Tests fileinput.fileno() when fileinput._state is None.
+           Ensure that it raises RuntimeError with a meaningful error message
+           and does not modify fileinput._state"""
+        fileinput._state = None
+        with self.assertRaises(RuntimeError) as cm:
+            fileinput.fileno()
+        self.assertEqual(("no active input()",), cm.exception.args)
+        self.assertIsNone(fileinput._state)
+    def test_state_is_not_None(self):
+        """Tests fileinput.fileno() when fileinput._state is not None.
+           Ensure that it invokes fileinput._state.fileno() exactly once,
+           returns whatever it returns, and does not modify fileinput._state
+           to point to a different object."""
+        fileno_retval = object()
+        instance = MockFileInput()
+        instance.return_values["fileno"] = fileno_retval
+        instance.fileno_retval = fileno_retval
+        fileinput._state = instance
+        retval = fileinput.fileno()
+        self.assertExactlyOneInvocation(instance, "fileno")
+        self.assertIs(retval, fileno_retval)
+        self.assertIs(fileinput._state, instance)
+class Test_fileinput_isfirstline(BaseFileInputGlobalMethodsTest):
+    """Unit tests for fileinput.isfirstline()"""
+    def test_state_is_None(self):
+        """Tests fileinput.isfirstline() when fileinput._state is None.
+           Ensure that it raises RuntimeError with a meaningful error message
+           and does not modify fileinput._state"""
+        fileinput._state = None
+        with self.assertRaises(RuntimeError) as cm:
+            fileinput.isfirstline()
+        self.assertEqual(("no active input()",), cm.exception.args)
+        self.assertIsNone(fileinput._state)
+    def test_state_is_not_None(self):
+        """Tests fileinput.isfirstline() when fileinput._state is not None.
+           Ensure that it invokes fileinput._state.isfirstline() exactly once,
+           returns whatever it returns, and does not modify fileinput._state
+           to point to a different object."""
+        isfirstline_retval = object()
+        instance = MockFileInput()
+        instance.return_values["isfirstline"] = isfirstline_retval
+        fileinput._state = instance
+        retval = fileinput.isfirstline()
+        self.assertExactlyOneInvocation(instance, "isfirstline")
+        self.assertIs(retval, isfirstline_retval)
+        self.assertIs(fileinput._state, instance)
+class Test_fileinput_isstdin(BaseFileInputGlobalMethodsTest):
+    """Unit tests for fileinput.isstdin()"""
+    def test_state_is_None(self):
+        """Tests fileinput.isstdin() when fileinput._state is None.
+           Ensure that it raises RuntimeError with a meaningful error message
+           and does not modify fileinput._state"""
+        fileinput._state = None
+        with self.assertRaises(RuntimeError) as cm:
+            fileinput.isstdin()
+        self.assertEqual(("no active input()",), cm.exception.args)
+        self.assertIsNone(fileinput._state)
+    def test_state_is_not_None(self):
+        """Tests fileinput.isstdin() when fileinput._state is not None.
+           Ensure that it invokes fileinput._state.isstdin() exactly once,
+           returns whatever it returns, and does not modify fileinput._state
+           to point to a different object."""
+        isstdin_retval = object()
+        instance = MockFileInput()
+        instance.return_values["isstdin"] = isstdin_retval
+        fileinput._state = instance
+        retval = fileinput.isstdin()
+        self.assertExactlyOneInvocation(instance, "isstdin")
+        self.assertIs(retval, isstdin_retval)
+        self.assertIs(fileinput._state, instance)
+class InvocationRecorder:
+    def __init__(self):
+        self.invocation_count = 0
+    def __call__(self, *args, **kwargs):
+        self.invocation_count += 1
+        self.last_invocation = (args, kwargs)
+class Test_hook_compressed(unittest.TestCase):
+    """Unit tests for fileinput.hook_compressed()"""
+    def setUp(self):
+        self.fake_open = InvocationRecorder()
+    def test_empty_string(self):
+        self.do_test_use_builtin_open("", 1)
+    def test_no_ext(self):
+        self.do_test_use_builtin_open("abcd", 2)
+    def test_gz_ext(self):
+        original_open =
+ = self.fake_open
+        try:
+            result = fileinput.hook_compressed("test.gz", 3)
+        finally:
+   = original_open
+        self.assertEqual(self.fake_open.invocation_count, 1)
+        self.assertEqual(self.fake_open.last_invocation, (("test.gz", 3), {}))
+    def test_bz2_ext(self):
+        original_open = bz2.BZ2File
+        bz2.BZ2File = self.fake_open
+        try:
+            result = fileinput.hook_compressed("test.bz2", 4)
+        finally:
+            bz2.BZ2File = original_open
+        self.assertEqual(self.fake_open.invocation_count, 1)
+        self.assertEqual(self.fake_open.last_invocation, (("test.bz2", 4), {}))
+    def test_blah_ext(self):
+        self.do_test_use_builtin_open("abcd.blah", 5)
+    def test_Gz_ext(self):
+        self.do_test_use_builtin_open("abcd.Gz", 6)
+    def test_Bz2_ext(self):
+        self.do_test_use_builtin_open("abcd.Bz2", 7)
+    def do_test_use_builtin_open(self, filename, mode):
+        original_open = self.replace_builtin_open(self.fake_open)
+        try:
+            result = fileinput.hook_compressed(filename, mode)
+        finally:
+            self.replace_builtin_open(original_open)
+        self.assertEqual(self.fake_open.invocation_count, 1)
+        self.assertEqual(self.fake_open.last_invocation,
+                         ((filename, mode), {}))
+    @staticmethod
+    def replace_builtin_open(new_open_func):
+        builtins_type = type(__builtins__)
+        if builtins_type is dict:
+            original_open = __builtins__["open"]
+            __builtins__["open"] = new_open_func
+        elif builtins_type is types.ModuleType:
+            original_open =
+   = new_open_func
+        else:
+            raise RuntimeError(
+                "unknown __builtins__ type: %r (unable to replace open)" %
+                builtins_type)
+        return original_open
+class Test_hook_encoded(unittest.TestCase):
+    """Unit tests for fileinput.hook_encoded()"""
+    def test(self):
+        encoding = object()
+        result = fileinput.hook_encoded(encoding)
+        fake_open = InvocationRecorder()
+        original_open =
+ = fake_open
+        try:
+            filename = object()
+            mode = object()
+            open_result = result(filename, mode)
+        finally:
+   = original_open
+        self.assertEqual(fake_open.invocation_count, 1)
+        args = fake_open.last_invocation[0]
+        self.assertIs(args[0], filename)
+        self.assertIs(args[1], mode)
+        self.assertIs(args[2], encoding)
 def test_main():
-    run_unittest(BufferSizesTests, FileInputTests)
+    run_unittest(
+        BufferSizesTests,
+        FileInputTests,
+        Test_fileinput_input,
+        Test_fileinput_close,
+        Test_fileinput_nextfile,
+        Test_fileinput_filename,
+        Test_fileinput_lineno,
+        Test_fileinput_filelineno,
+        Test_fileinput_fileno,
+        Test_fileinput_isfirstline,
+        Test_fileinput_isstdin,
+        Test_hook_compressed,
+        Test_hook_encoded,
+    )
 if __name__ == "__main__":