| """Unit tests for contextlib.py, and other context managers.""" |
| |
| import io |
| import sys |
| import tempfile |
| import threading |
| import unittest |
| from contextlib import * # Tests __all__ |
| from test import support |
| |
| |
| class TestAbstractContextManager(unittest.TestCase): |
| |
| def test_enter(self): |
| class DefaultEnter(AbstractContextManager): |
| def __exit__(self, *args): |
| super().__exit__(*args) |
| |
| manager = DefaultEnter() |
| self.assertIs(manager.__enter__(), manager) |
| |
| def test_exit_is_abstract(self): |
| class MissingExit(AbstractContextManager): |
| pass |
| |
| with self.assertRaises(TypeError): |
| MissingExit() |
| |
| def test_structural_subclassing(self): |
| class ManagerFromScratch: |
| def __enter__(self): |
| return self |
| def __exit__(self, exc_type, exc_value, traceback): |
| return None |
| |
| self.assertTrue(issubclass(ManagerFromScratch, AbstractContextManager)) |
| |
| class DefaultEnter(AbstractContextManager): |
| def __exit__(self, *args): |
| super().__exit__(*args) |
| |
| self.assertTrue(issubclass(DefaultEnter, AbstractContextManager)) |
| |
| class NoEnter(ManagerFromScratch): |
| __enter__ = None |
| |
| self.assertFalse(issubclass(NoEnter, AbstractContextManager)) |
| |
| class NoExit(ManagerFromScratch): |
| __exit__ = None |
| |
| self.assertFalse(issubclass(NoExit, AbstractContextManager)) |
| |
| |
| class ContextManagerTestCase(unittest.TestCase): |
| |
| def test_contextmanager_plain(self): |
| state = [] |
| @contextmanager |
| def woohoo(): |
| state.append(1) |
| yield 42 |
| state.append(999) |
| with woohoo() as x: |
| self.assertEqual(state, [1]) |
| self.assertEqual(x, 42) |
| state.append(x) |
| self.assertEqual(state, [1, 42, 999]) |
| |
| def test_contextmanager_finally(self): |
| state = [] |
| @contextmanager |
| def woohoo(): |
| state.append(1) |
| try: |
| yield 42 |
| finally: |
| state.append(999) |
| with self.assertRaises(ZeroDivisionError): |
| with woohoo() as x: |
| self.assertEqual(state, [1]) |
| self.assertEqual(x, 42) |
| state.append(x) |
| raise ZeroDivisionError() |
| self.assertEqual(state, [1, 42, 999]) |
| |
| def test_contextmanager_no_reraise(self): |
| @contextmanager |
| def whee(): |
| yield |
| ctx = whee() |
| ctx.__enter__() |
| # Calling __exit__ should not result in an exception |
| self.assertFalse(ctx.__exit__(TypeError, TypeError("foo"), None)) |
| |
| def test_contextmanager_trap_yield_after_throw(self): |
| @contextmanager |
| def whoo(): |
| try: |
| yield |
| except: |
| yield |
| ctx = whoo() |
| ctx.__enter__() |
| self.assertRaises( |
| RuntimeError, ctx.__exit__, TypeError, TypeError("foo"), None |
| ) |
| |
| def test_contextmanager_except(self): |
| state = [] |
| @contextmanager |
| def woohoo(): |
| state.append(1) |
| try: |
| yield 42 |
| except ZeroDivisionError as e: |
| state.append(e.args[0]) |
| self.assertEqual(state, [1, 42, 999]) |
| with woohoo() as x: |
| self.assertEqual(state, [1]) |
| self.assertEqual(x, 42) |
| state.append(x) |
| raise ZeroDivisionError(999) |
| self.assertEqual(state, [1, 42, 999]) |
| |
| def test_contextmanager_except_stopiter(self): |
| stop_exc = StopIteration('spam') |
| @contextmanager |
| def woohoo(): |
| yield |
| try: |
| with self.assertWarnsRegex(DeprecationWarning, |
| "StopIteration"): |
| with woohoo(): |
| raise stop_exc |
| except Exception as ex: |
| self.assertIs(ex, stop_exc) |
| else: |
| self.fail('StopIteration was suppressed') |
| |
| def test_contextmanager_except_pep479(self): |
| code = """\ |
| from __future__ import generator_stop |
| from contextlib import contextmanager |
| @contextmanager |
| def woohoo(): |
| yield |
| """ |
| locals = {} |
| exec(code, locals, locals) |
| woohoo = locals['woohoo'] |
| |
| stop_exc = StopIteration('spam') |
| try: |
| with woohoo(): |
| raise stop_exc |
| except Exception as ex: |
| self.assertIs(ex, stop_exc) |
| else: |
| self.fail('StopIteration was suppressed') |
| |
| def test_contextmanager_do_not_unchain_non_stopiteration_exceptions(self): |
| @contextmanager |
| def test_issue29692(): |
| try: |
| yield |
| except Exception as exc: |
| raise RuntimeError('issue29692:Chained') from exc |
| try: |
| with test_issue29692(): |
| raise ZeroDivisionError |
| except Exception as ex: |
| self.assertIs(type(ex), RuntimeError) |
| self.assertEqual(ex.args[0], 'issue29692:Chained') |
| self.assertIsInstance(ex.__cause__, ZeroDivisionError) |
| |
| try: |
| with test_issue29692(): |
| raise StopIteration('issue29692:Unchained') |
| except Exception as ex: |
| self.assertIs(type(ex), StopIteration) |
| self.assertEqual(ex.args[0], 'issue29692:Unchained') |
| self.assertIsNone(ex.__cause__) |
| |
| def _create_contextmanager_attribs(self): |
| def attribs(**kw): |
| def decorate(func): |
| for k,v in kw.items(): |
| setattr(func,k,v) |
| return func |
| return decorate |
| @contextmanager |
| @attribs(foo='bar') |
| def baz(spam): |
| """Whee!""" |
| return baz |
| |
| def test_contextmanager_attribs(self): |
| baz = self._create_contextmanager_attribs() |
| self.assertEqual(baz.__name__,'baz') |
| self.assertEqual(baz.foo, 'bar') |
| |
| @support.requires_docstrings |
| def test_contextmanager_doc_attrib(self): |
| baz = self._create_contextmanager_attribs() |
| self.assertEqual(baz.__doc__, "Whee!") |
| |
| @support.requires_docstrings |
| def test_instance_docstring_given_cm_docstring(self): |
| baz = self._create_contextmanager_attribs()(None) |
| self.assertEqual(baz.__doc__, "Whee!") |
| |
| def test_keywords(self): |
| # Ensure no keyword arguments are inhibited |
| @contextmanager |
| def woohoo(self, func, args, kwds): |
| yield (self, func, args, kwds) |
| with woohoo(self=11, func=22, args=33, kwds=44) as target: |
| self.assertEqual(target, (11, 22, 33, 44)) |
| |
| |
| class ClosingTestCase(unittest.TestCase): |
| |
| @support.requires_docstrings |
| def test_instance_docs(self): |
| # Issue 19330: ensure context manager instances have good docstrings |
| cm_docstring = closing.__doc__ |
| obj = closing(None) |
| self.assertEqual(obj.__doc__, cm_docstring) |
| |
| def test_closing(self): |
| state = [] |
| class C: |
| def close(self): |
| state.append(1) |
| x = C() |
| self.assertEqual(state, []) |
| with closing(x) as y: |
| self.assertEqual(x, y) |
| self.assertEqual(state, [1]) |
| |
| def test_closing_error(self): |
| state = [] |
| class C: |
| def close(self): |
| state.append(1) |
| x = C() |
| self.assertEqual(state, []) |
| with self.assertRaises(ZeroDivisionError): |
| with closing(x) as y: |
| self.assertEqual(x, y) |
| 1 / 0 |
| self.assertEqual(state, [1]) |
| |
| |
| class NullcontextTestCase(unittest.TestCase): |
| def test_nullcontext(self): |
| class C: |
| pass |
| c = C() |
| with nullcontext(c) as c_in: |
| self.assertIs(c_in, c) |
| |
| |
| class FileContextTestCase(unittest.TestCase): |
| |
| def testWithOpen(self): |
| tfn = tempfile.mktemp() |
| try: |
| f = None |
| with open(tfn, "w") as f: |
| self.assertFalse(f.closed) |
| f.write("Booh\n") |
| self.assertTrue(f.closed) |
| f = None |
| with self.assertRaises(ZeroDivisionError): |
| with open(tfn, "r") as f: |
| self.assertFalse(f.closed) |
| self.assertEqual(f.read(), "Booh\n") |
| 1 / 0 |
| self.assertTrue(f.closed) |
| finally: |
| support.unlink(tfn) |
| |
| class LockContextTestCase(unittest.TestCase): |
| |
| def boilerPlate(self, lock, locked): |
| self.assertFalse(locked()) |
| with lock: |
| self.assertTrue(locked()) |
| self.assertFalse(locked()) |
| with self.assertRaises(ZeroDivisionError): |
| with lock: |
| self.assertTrue(locked()) |
| 1 / 0 |
| self.assertFalse(locked()) |
| |
| def testWithLock(self): |
| lock = threading.Lock() |
| self.boilerPlate(lock, lock.locked) |
| |
| def testWithRLock(self): |
| lock = threading.RLock() |
| self.boilerPlate(lock, lock._is_owned) |
| |
| def testWithCondition(self): |
| lock = threading.Condition() |
| def locked(): |
| return lock._is_owned() |
| self.boilerPlate(lock, locked) |
| |
| def testWithSemaphore(self): |
| lock = threading.Semaphore() |
| def locked(): |
| if lock.acquire(False): |
| lock.release() |
| return False |
| else: |
| return True |
| self.boilerPlate(lock, locked) |
| |
| def testWithBoundedSemaphore(self): |
| lock = threading.BoundedSemaphore() |
| def locked(): |
| if lock.acquire(False): |
| lock.release() |
| return False |
| else: |
| return True |
| self.boilerPlate(lock, locked) |
| |
| |
| class mycontext(ContextDecorator): |
| """Example decoration-compatible context manager for testing""" |
| started = False |
| exc = None |
| catch = False |
| |
| def __enter__(self): |
| self.started = True |
| return self |
| |
| def __exit__(self, *exc): |
| self.exc = exc |
| return self.catch |
| |
| |
| class TestContextDecorator(unittest.TestCase): |
| |
| @support.requires_docstrings |
| def test_instance_docs(self): |
| # Issue 19330: ensure context manager instances have good docstrings |
| cm_docstring = mycontext.__doc__ |
| obj = mycontext() |
| self.assertEqual(obj.__doc__, cm_docstring) |
| |
| def test_contextdecorator(self): |
| context = mycontext() |
| with context as result: |
| self.assertIs(result, context) |
| self.assertTrue(context.started) |
| |
| self.assertEqual(context.exc, (None, None, None)) |
| |
| |
| def test_contextdecorator_with_exception(self): |
| context = mycontext() |
| |
| with self.assertRaisesRegex(NameError, 'foo'): |
| with context: |
| raise NameError('foo') |
| self.assertIsNotNone(context.exc) |
| self.assertIs(context.exc[0], NameError) |
| |
| context = mycontext() |
| context.catch = True |
| with context: |
| raise NameError('foo') |
| self.assertIsNotNone(context.exc) |
| self.assertIs(context.exc[0], NameError) |
| |
| |
| def test_decorator(self): |
| context = mycontext() |
| |
| @context |
| def test(): |
| self.assertIsNone(context.exc) |
| self.assertTrue(context.started) |
| test() |
| self.assertEqual(context.exc, (None, None, None)) |
| |
| |
| def test_decorator_with_exception(self): |
| context = mycontext() |
| |
| @context |
| def test(): |
| self.assertIsNone(context.exc) |
| self.assertTrue(context.started) |
| raise NameError('foo') |
| |
| with self.assertRaisesRegex(NameError, 'foo'): |
| test() |
| self.assertIsNotNone(context.exc) |
| self.assertIs(context.exc[0], NameError) |
| |
| |
| def test_decorating_method(self): |
| context = mycontext() |
| |
| class Test(object): |
| |
| @context |
| def method(self, a, b, c=None): |
| self.a = a |
| self.b = b |
| self.c = c |
| |
| # these tests are for argument passing when used as a decorator |
| test = Test() |
| test.method(1, 2) |
| self.assertEqual(test.a, 1) |
| self.assertEqual(test.b, 2) |
| self.assertEqual(test.c, None) |
| |
| test = Test() |
| test.method('a', 'b', 'c') |
| self.assertEqual(test.a, 'a') |
| self.assertEqual(test.b, 'b') |
| self.assertEqual(test.c, 'c') |
| |
| test = Test() |
| test.method(a=1, b=2) |
| self.assertEqual(test.a, 1) |
| self.assertEqual(test.b, 2) |
| |
| |
| def test_typo_enter(self): |
| class mycontext(ContextDecorator): |
| def __unter__(self): |
| pass |
| def __exit__(self, *exc): |
| pass |
| |
| with self.assertRaises(AttributeError): |
| with mycontext(): |
| pass |
| |
| |
| def test_typo_exit(self): |
| class mycontext(ContextDecorator): |
| def __enter__(self): |
| pass |
| def __uxit__(self, *exc): |
| pass |
| |
| with self.assertRaises(AttributeError): |
| with mycontext(): |
| pass |
| |
| |
| def test_contextdecorator_as_mixin(self): |
| class somecontext(object): |
| started = False |
| exc = None |
| |
| def __enter__(self): |
| self.started = True |
| return self |
| |
| def __exit__(self, *exc): |
| self.exc = exc |
| |
| class mycontext(somecontext, ContextDecorator): |
| pass |
| |
| context = mycontext() |
| @context |
| def test(): |
| self.assertIsNone(context.exc) |
| self.assertTrue(context.started) |
| test() |
| self.assertEqual(context.exc, (None, None, None)) |
| |
| |
| def test_contextmanager_as_decorator(self): |
| @contextmanager |
| def woohoo(y): |
| state.append(y) |
| yield |
| state.append(999) |
| |
| state = [] |
| @woohoo(1) |
| def test(x): |
| self.assertEqual(state, [1]) |
| state.append(x) |
| test('something') |
| self.assertEqual(state, [1, 'something', 999]) |
| |
| # Issue #11647: Ensure the decorated function is 'reusable' |
| state = [] |
| test('something else') |
| self.assertEqual(state, [1, 'something else', 999]) |
| |
| |
| class TestExitStack(unittest.TestCase): |
| |
| @support.requires_docstrings |
| def test_instance_docs(self): |
| # Issue 19330: ensure context manager instances have good docstrings |
| cm_docstring = ExitStack.__doc__ |
| obj = ExitStack() |
| self.assertEqual(obj.__doc__, cm_docstring) |
| |
| def test_no_resources(self): |
| with ExitStack(): |
| pass |
| |
| def test_callback(self): |
| expected = [ |
| ((), {}), |
| ((1,), {}), |
| ((1,2), {}), |
| ((), dict(example=1)), |
| ((1,), dict(example=1)), |
| ((1,2), dict(example=1)), |
| ] |
| result = [] |
| def _exit(*args, **kwds): |
| """Test metadata propagation""" |
| result.append((args, kwds)) |
| with ExitStack() as stack: |
| for args, kwds in reversed(expected): |
| if args and kwds: |
| f = stack.callback(_exit, *args, **kwds) |
| elif args: |
| f = stack.callback(_exit, *args) |
| elif kwds: |
| f = stack.callback(_exit, **kwds) |
| else: |
| f = stack.callback(_exit) |
| self.assertIs(f, _exit) |
| for wrapper in stack._exit_callbacks: |
| self.assertIs(wrapper.__wrapped__, _exit) |
| self.assertNotEqual(wrapper.__name__, _exit.__name__) |
| self.assertIsNone(wrapper.__doc__, _exit.__doc__) |
| self.assertEqual(result, expected) |
| |
| def test_push(self): |
| exc_raised = ZeroDivisionError |
| def _expect_exc(exc_type, exc, exc_tb): |
| self.assertIs(exc_type, exc_raised) |
| def _suppress_exc(*exc_details): |
| return True |
| def _expect_ok(exc_type, exc, exc_tb): |
| self.assertIsNone(exc_type) |
| self.assertIsNone(exc) |
| self.assertIsNone(exc_tb) |
| class ExitCM(object): |
| def __init__(self, check_exc): |
| self.check_exc = check_exc |
| def __enter__(self): |
| self.fail("Should not be called!") |
| def __exit__(self, *exc_details): |
| self.check_exc(*exc_details) |
| with ExitStack() as stack: |
| stack.push(_expect_ok) |
| self.assertIs(stack._exit_callbacks[-1], _expect_ok) |
| cm = ExitCM(_expect_ok) |
| stack.push(cm) |
| self.assertIs(stack._exit_callbacks[-1].__self__, cm) |
| stack.push(_suppress_exc) |
| self.assertIs(stack._exit_callbacks[-1], _suppress_exc) |
| cm = ExitCM(_expect_exc) |
| stack.push(cm) |
| self.assertIs(stack._exit_callbacks[-1].__self__, cm) |
| stack.push(_expect_exc) |
| self.assertIs(stack._exit_callbacks[-1], _expect_exc) |
| stack.push(_expect_exc) |
| self.assertIs(stack._exit_callbacks[-1], _expect_exc) |
| 1/0 |
| |
| def test_enter_context(self): |
| class TestCM(object): |
| def __enter__(self): |
| result.append(1) |
| def __exit__(self, *exc_details): |
| result.append(3) |
| |
| result = [] |
| cm = TestCM() |
| with ExitStack() as stack: |
| @stack.callback # Registered first => cleaned up last |
| def _exit(): |
| result.append(4) |
| self.assertIsNotNone(_exit) |
| stack.enter_context(cm) |
| self.assertIs(stack._exit_callbacks[-1].__self__, cm) |
| result.append(2) |
| self.assertEqual(result, [1, 2, 3, 4]) |
| |
| def test_close(self): |
| result = [] |
| with ExitStack() as stack: |
| @stack.callback |
| def _exit(): |
| result.append(1) |
| self.assertIsNotNone(_exit) |
| stack.close() |
| result.append(2) |
| self.assertEqual(result, [1, 2]) |
| |
| def test_pop_all(self): |
| result = [] |
| with ExitStack() as stack: |
| @stack.callback |
| def _exit(): |
| result.append(3) |
| self.assertIsNotNone(_exit) |
| new_stack = stack.pop_all() |
| result.append(1) |
| result.append(2) |
| new_stack.close() |
| self.assertEqual(result, [1, 2, 3]) |
| |
| def test_exit_raise(self): |
| with self.assertRaises(ZeroDivisionError): |
| with ExitStack() as stack: |
| stack.push(lambda *exc: False) |
| 1/0 |
| |
| def test_exit_suppress(self): |
| with ExitStack() as stack: |
| stack.push(lambda *exc: True) |
| 1/0 |
| |
| def test_exit_exception_chaining_reference(self): |
| # Sanity check to make sure that ExitStack chaining matches |
| # actual nested with statements |
| class RaiseExc: |
| def __init__(self, exc): |
| self.exc = exc |
| def __enter__(self): |
| return self |
| def __exit__(self, *exc_details): |
| raise self.exc |
| |
| class RaiseExcWithContext: |
| def __init__(self, outer, inner): |
| self.outer = outer |
| self.inner = inner |
| def __enter__(self): |
| return self |
| def __exit__(self, *exc_details): |
| try: |
| raise self.inner |
| except: |
| raise self.outer |
| |
| class SuppressExc: |
| def __enter__(self): |
| return self |
| def __exit__(self, *exc_details): |
| type(self).saved_details = exc_details |
| return True |
| |
| try: |
| with RaiseExc(IndexError): |
| with RaiseExcWithContext(KeyError, AttributeError): |
| with SuppressExc(): |
| with RaiseExc(ValueError): |
| 1 / 0 |
| except IndexError as exc: |
| self.assertIsInstance(exc.__context__, KeyError) |
| self.assertIsInstance(exc.__context__.__context__, AttributeError) |
| # Inner exceptions were suppressed |
| self.assertIsNone(exc.__context__.__context__.__context__) |
| else: |
| self.fail("Expected IndexError, but no exception was raised") |
| # Check the inner exceptions |
| inner_exc = SuppressExc.saved_details[1] |
| self.assertIsInstance(inner_exc, ValueError) |
| self.assertIsInstance(inner_exc.__context__, ZeroDivisionError) |
| |
| def test_exit_exception_chaining(self): |
| # Ensure exception chaining matches the reference behaviour |
| def raise_exc(exc): |
| raise exc |
| |
| saved_details = None |
| def suppress_exc(*exc_details): |
| nonlocal saved_details |
| saved_details = exc_details |
| return True |
| |
| try: |
| with ExitStack() as stack: |
| stack.callback(raise_exc, IndexError) |
| stack.callback(raise_exc, KeyError) |
| stack.callback(raise_exc, AttributeError) |
| stack.push(suppress_exc) |
| stack.callback(raise_exc, ValueError) |
| 1 / 0 |
| except IndexError as exc: |
| self.assertIsInstance(exc.__context__, KeyError) |
| self.assertIsInstance(exc.__context__.__context__, AttributeError) |
| # Inner exceptions were suppressed |
| self.assertIsNone(exc.__context__.__context__.__context__) |
| else: |
| self.fail("Expected IndexError, but no exception was raised") |
| # Check the inner exceptions |
| inner_exc = saved_details[1] |
| self.assertIsInstance(inner_exc, ValueError) |
| self.assertIsInstance(inner_exc.__context__, ZeroDivisionError) |
| |
| def test_exit_exception_non_suppressing(self): |
| # http://bugs.python.org/issue19092 |
| def raise_exc(exc): |
| raise exc |
| |
| def suppress_exc(*exc_details): |
| return True |
| |
| try: |
| with ExitStack() as stack: |
| stack.callback(lambda: None) |
| stack.callback(raise_exc, IndexError) |
| except Exception as exc: |
| self.assertIsInstance(exc, IndexError) |
| else: |
| self.fail("Expected IndexError, but no exception was raised") |
| |
| try: |
| with ExitStack() as stack: |
| stack.callback(raise_exc, KeyError) |
| stack.push(suppress_exc) |
| stack.callback(raise_exc, IndexError) |
| except Exception as exc: |
| self.assertIsInstance(exc, KeyError) |
| else: |
| self.fail("Expected KeyError, but no exception was raised") |
| |
| def test_exit_exception_with_correct_context(self): |
| # http://bugs.python.org/issue20317 |
| @contextmanager |
| def gets_the_context_right(exc): |
| try: |
| yield |
| finally: |
| raise exc |
| |
| exc1 = Exception(1) |
| exc2 = Exception(2) |
| exc3 = Exception(3) |
| exc4 = Exception(4) |
| |
| # The contextmanager already fixes the context, so prior to the |
| # fix, ExitStack would try to fix it *again* and get into an |
| # infinite self-referential loop |
| try: |
| with ExitStack() as stack: |
| stack.enter_context(gets_the_context_right(exc4)) |
| stack.enter_context(gets_the_context_right(exc3)) |
| stack.enter_context(gets_the_context_right(exc2)) |
| raise exc1 |
| except Exception as exc: |
| self.assertIs(exc, exc4) |
| self.assertIs(exc.__context__, exc3) |
| self.assertIs(exc.__context__.__context__, exc2) |
| self.assertIs(exc.__context__.__context__.__context__, exc1) |
| self.assertIsNone( |
| exc.__context__.__context__.__context__.__context__) |
| |
| def test_exit_exception_with_existing_context(self): |
| # Addresses a lack of test coverage discovered after checking in a |
| # fix for issue 20317 that still contained debugging code. |
| def raise_nested(inner_exc, outer_exc): |
| try: |
| raise inner_exc |
| finally: |
| raise outer_exc |
| exc1 = Exception(1) |
| exc2 = Exception(2) |
| exc3 = Exception(3) |
| exc4 = Exception(4) |
| exc5 = Exception(5) |
| try: |
| with ExitStack() as stack: |
| stack.callback(raise_nested, exc4, exc5) |
| stack.callback(raise_nested, exc2, exc3) |
| raise exc1 |
| except Exception as exc: |
| self.assertIs(exc, exc5) |
| self.assertIs(exc.__context__, exc4) |
| self.assertIs(exc.__context__.__context__, exc3) |
| self.assertIs(exc.__context__.__context__.__context__, exc2) |
| self.assertIs( |
| exc.__context__.__context__.__context__.__context__, exc1) |
| self.assertIsNone( |
| exc.__context__.__context__.__context__.__context__.__context__) |
| |
| |
| |
| def test_body_exception_suppress(self): |
| def suppress_exc(*exc_details): |
| return True |
| try: |
| with ExitStack() as stack: |
| stack.push(suppress_exc) |
| 1/0 |
| except IndexError as exc: |
| self.fail("Expected no exception, got IndexError") |
| |
| def test_exit_exception_chaining_suppress(self): |
| with ExitStack() as stack: |
| stack.push(lambda *exc: True) |
| stack.push(lambda *exc: 1/0) |
| stack.push(lambda *exc: {}[1]) |
| |
| def test_excessive_nesting(self): |
| # The original implementation would die with RecursionError here |
| with ExitStack() as stack: |
| for i in range(10000): |
| stack.callback(int) |
| |
| def test_instance_bypass(self): |
| class Example(object): pass |
| cm = Example() |
| cm.__exit__ = object() |
| stack = ExitStack() |
| self.assertRaises(AttributeError, stack.enter_context, cm) |
| stack.push(cm) |
| self.assertIs(stack._exit_callbacks[-1], cm) |
| |
| def test_dont_reraise_RuntimeError(self): |
| # https://bugs.python.org/issue27122 |
| class UniqueException(Exception): pass |
| class UniqueRuntimeError(RuntimeError): pass |
| |
| @contextmanager |
| def second(): |
| try: |
| yield 1 |
| except Exception as exc: |
| raise UniqueException("new exception") from exc |
| |
| @contextmanager |
| def first(): |
| try: |
| yield 1 |
| except Exception as exc: |
| raise exc |
| |
| # The UniqueRuntimeError should be caught by second()'s exception |
| # handler which chain raised a new UniqueException. |
| with self.assertRaises(UniqueException) as err_ctx: |
| with ExitStack() as es_ctx: |
| es_ctx.enter_context(second()) |
| es_ctx.enter_context(first()) |
| raise UniqueRuntimeError("please no infinite loop.") |
| |
| exc = err_ctx.exception |
| self.assertIsInstance(exc, UniqueException) |
| self.assertIsInstance(exc.__context__, UniqueRuntimeError) |
| self.assertIsNone(exc.__context__.__context__) |
| self.assertIsNone(exc.__context__.__cause__) |
| self.assertIs(exc.__cause__, exc.__context__) |
| |
| |
| class TestRedirectStream: |
| |
| redirect_stream = None |
| orig_stream = None |
| |
| @support.requires_docstrings |
| def test_instance_docs(self): |
| # Issue 19330: ensure context manager instances have good docstrings |
| cm_docstring = self.redirect_stream.__doc__ |
| obj = self.redirect_stream(None) |
| self.assertEqual(obj.__doc__, cm_docstring) |
| |
| def test_no_redirect_in_init(self): |
| orig_stdout = getattr(sys, self.orig_stream) |
| self.redirect_stream(None) |
| self.assertIs(getattr(sys, self.orig_stream), orig_stdout) |
| |
| def test_redirect_to_string_io(self): |
| f = io.StringIO() |
| msg = "Consider an API like help(), which prints directly to stdout" |
| orig_stdout = getattr(sys, self.orig_stream) |
| with self.redirect_stream(f): |
| print(msg, file=getattr(sys, self.orig_stream)) |
| self.assertIs(getattr(sys, self.orig_stream), orig_stdout) |
| s = f.getvalue().strip() |
| self.assertEqual(s, msg) |
| |
| def test_enter_result_is_target(self): |
| f = io.StringIO() |
| with self.redirect_stream(f) as enter_result: |
| self.assertIs(enter_result, f) |
| |
| def test_cm_is_reusable(self): |
| f = io.StringIO() |
| write_to_f = self.redirect_stream(f) |
| orig_stdout = getattr(sys, self.orig_stream) |
| with write_to_f: |
| print("Hello", end=" ", file=getattr(sys, self.orig_stream)) |
| with write_to_f: |
| print("World!", file=getattr(sys, self.orig_stream)) |
| self.assertIs(getattr(sys, self.orig_stream), orig_stdout) |
| s = f.getvalue() |
| self.assertEqual(s, "Hello World!\n") |
| |
| def test_cm_is_reentrant(self): |
| f = io.StringIO() |
| write_to_f = self.redirect_stream(f) |
| orig_stdout = getattr(sys, self.orig_stream) |
| with write_to_f: |
| print("Hello", end=" ", file=getattr(sys, self.orig_stream)) |
| with write_to_f: |
| print("World!", file=getattr(sys, self.orig_stream)) |
| self.assertIs(getattr(sys, self.orig_stream), orig_stdout) |
| s = f.getvalue() |
| self.assertEqual(s, "Hello World!\n") |
| |
| |
| class TestRedirectStdout(TestRedirectStream, unittest.TestCase): |
| |
| redirect_stream = redirect_stdout |
| orig_stream = "stdout" |
| |
| |
| class TestRedirectStderr(TestRedirectStream, unittest.TestCase): |
| |
| redirect_stream = redirect_stderr |
| orig_stream = "stderr" |
| |
| |
| class TestSuppress(unittest.TestCase): |
| |
| @support.requires_docstrings |
| def test_instance_docs(self): |
| # Issue 19330: ensure context manager instances have good docstrings |
| cm_docstring = suppress.__doc__ |
| obj = suppress() |
| self.assertEqual(obj.__doc__, cm_docstring) |
| |
| def test_no_result_from_enter(self): |
| with suppress(ValueError) as enter_result: |
| self.assertIsNone(enter_result) |
| |
| def test_no_exception(self): |
| with suppress(ValueError): |
| self.assertEqual(pow(2, 5), 32) |
| |
| def test_exact_exception(self): |
| with suppress(TypeError): |
| len(5) |
| |
| def test_exception_hierarchy(self): |
| with suppress(LookupError): |
| 'Hello'[50] |
| |
| def test_other_exception(self): |
| with self.assertRaises(ZeroDivisionError): |
| with suppress(TypeError): |
| 1/0 |
| |
| def test_no_args(self): |
| with self.assertRaises(ZeroDivisionError): |
| with suppress(): |
| 1/0 |
| |
| def test_multiple_exception_args(self): |
| with suppress(ZeroDivisionError, TypeError): |
| 1/0 |
| with suppress(ZeroDivisionError, TypeError): |
| len(5) |
| |
| def test_cm_is_reentrant(self): |
| ignore_exceptions = suppress(Exception) |
| with ignore_exceptions: |
| pass |
| with ignore_exceptions: |
| len(5) |
| with ignore_exceptions: |
| with ignore_exceptions: # Check nested usage |
| len(5) |
| outer_continued = True |
| 1/0 |
| self.assertTrue(outer_continued) |
| |
| if __name__ == "__main__": |
| unittest.main() |