| #!/usr/bin/env python | 
 | # | 
 | # Copyright 2001-2011 by Vinay Sajip. All Rights Reserved. | 
 | # | 
 | # Permission to use, copy, modify, and distribute this software and its | 
 | # documentation for any purpose and without fee is hereby granted, | 
 | # provided that the above copyright notice appear in all copies and that | 
 | # both that copyright notice and this permission notice appear in | 
 | # supporting documentation, and that the name of Vinay Sajip | 
 | # not be used in advertising or publicity pertaining to distribution | 
 | # of the software without specific, written prior permission. | 
 | # VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING | 
 | # ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL | 
 | # VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR | 
 | # ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER | 
 | # IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT | 
 | # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. | 
 |  | 
 | """Test harness for the logging module. Run all tests. | 
 |  | 
 | Copyright (C) 2001-2011 Vinay Sajip. All Rights Reserved. | 
 | """ | 
 |  | 
 | import logging | 
 | import logging.handlers | 
 | import logging.config | 
 |  | 
 | import codecs | 
 | import datetime | 
 | import pickle | 
 | import io | 
 | import gc | 
 | import json | 
 | import os | 
 | import queue | 
 | import re | 
 | import select | 
 | import socket | 
 | from socketserver import ThreadingTCPServer, StreamRequestHandler | 
 | import struct | 
 | import sys | 
 | import tempfile | 
 | from test.support import captured_stdout, run_with_locale, run_unittest | 
 | from test.support import TestHandler, Matcher | 
 | import textwrap | 
 | import unittest | 
 | import warnings | 
 | import weakref | 
 | try: | 
 |     import threading | 
 | except ImportError: | 
 |     threading = None | 
 |  | 
 |  | 
 | class BaseTest(unittest.TestCase): | 
 |  | 
 |     """Base class for logging tests.""" | 
 |  | 
 |     log_format = "%(name)s -> %(levelname)s: %(message)s" | 
 |     expected_log_pat = r"^([\w.]+) -> ([\w]+): ([\d]+)$" | 
 |     message_num = 0 | 
 |  | 
 |     def setUp(self): | 
 |         """Setup the default logging stream to an internal StringIO instance, | 
 |         so that we can examine log output as we want.""" | 
 |         logger_dict = logging.getLogger().manager.loggerDict | 
 |         logging._acquireLock() | 
 |         try: | 
 |             self.saved_handlers = logging._handlers.copy() | 
 |             self.saved_handler_list = logging._handlerList[:] | 
 |             self.saved_loggers = saved_loggers = logger_dict.copy() | 
 |             self.saved_level_names = logging._levelNames.copy() | 
 |             self.logger_states = logger_states = {} | 
 |             for name in saved_loggers: | 
 |                 logger_states[name] = getattr(saved_loggers[name], | 
 |                                               'disabled', None) | 
 |         finally: | 
 |             logging._releaseLock() | 
 |  | 
 |         # Set two unused loggers: one non-ASCII and one Unicode. | 
 |         # This is to test correct operation when sorting existing | 
 |         # loggers in the configuration code. See issue 8201. | 
 |         self.logger1 = logging.getLogger("\xab\xd7\xbb") | 
 |         self.logger2 = logging.getLogger("\u013f\u00d6\u0047") | 
 |  | 
 |         self.root_logger = logging.getLogger("") | 
 |         self.original_logging_level = self.root_logger.getEffectiveLevel() | 
 |  | 
 |         self.stream = io.StringIO() | 
 |         self.root_logger.setLevel(logging.DEBUG) | 
 |         self.root_hdlr = logging.StreamHandler(self.stream) | 
 |         self.root_formatter = logging.Formatter(self.log_format) | 
 |         self.root_hdlr.setFormatter(self.root_formatter) | 
 |         if self.logger1.hasHandlers(): | 
 |             hlist = self.logger1.handlers + self.root_logger.handlers | 
 |             raise AssertionError('Unexpected handlers: %s' % hlist) | 
 |         if self.logger2.hasHandlers(): | 
 |             hlist = self.logger2.handlers + self.root_logger.handlers | 
 |             raise AssertionError('Unexpected handlers: %s' % hlist) | 
 |         self.root_logger.addHandler(self.root_hdlr) | 
 |         self.assertTrue(self.logger1.hasHandlers()) | 
 |         self.assertTrue(self.logger2.hasHandlers()) | 
 |  | 
 |     def tearDown(self): | 
 |         """Remove our logging stream, and restore the original logging | 
 |         level.""" | 
 |         self.stream.close() | 
 |         self.root_logger.removeHandler(self.root_hdlr) | 
 |         while self.root_logger.handlers: | 
 |             h = self.root_logger.handlers[0] | 
 |             self.root_logger.removeHandler(h) | 
 |             h.close() | 
 |         self.root_logger.setLevel(self.original_logging_level) | 
 |         logging._acquireLock() | 
 |         try: | 
 |             logging._levelNames.clear() | 
 |             logging._levelNames.update(self.saved_level_names) | 
 |             logging._handlers.clear() | 
 |             logging._handlers.update(self.saved_handlers) | 
 |             logging._handlerList[:] = self.saved_handler_list | 
 |             loggerDict = logging.getLogger().manager.loggerDict | 
 |             loggerDict.clear() | 
 |             loggerDict.update(self.saved_loggers) | 
 |             logger_states = self.logger_states | 
 |             for name in self.logger_states: | 
 |                 if logger_states[name] is not None: | 
 |                     self.saved_loggers[name].disabled = logger_states[name] | 
 |         finally: | 
 |             logging._releaseLock() | 
 |  | 
 |     def assert_log_lines(self, expected_values, stream=None): | 
 |         """Match the collected log lines against the regular expression | 
 |         self.expected_log_pat, and compare the extracted group values to | 
 |         the expected_values list of tuples.""" | 
 |         stream = stream or self.stream | 
 |         pat = re.compile(self.expected_log_pat) | 
 |         try: | 
 |             stream.reset() | 
 |             actual_lines = stream.readlines() | 
 |         except AttributeError: | 
 |             # StringIO.StringIO lacks a reset() method. | 
 |             actual_lines = stream.getvalue().splitlines() | 
 |         self.assertEqual(len(actual_lines), len(expected_values), | 
 |                           '%s vs. %s' % (actual_lines, expected_values)) | 
 |         for actual, expected in zip(actual_lines, expected_values): | 
 |             match = pat.search(actual) | 
 |             if not match: | 
 |                 self.fail("Log line does not match expected pattern:\n" + | 
 |                             actual) | 
 |             self.assertEqual(tuple(match.groups()), expected) | 
 |         s = stream.read() | 
 |         if s: | 
 |             self.fail("Remaining output at end of log stream:\n" + s) | 
 |  | 
 |     def next_message(self): | 
 |         """Generate a message consisting solely of an auto-incrementing | 
 |         integer.""" | 
 |         self.message_num += 1 | 
 |         return "%d" % self.message_num | 
 |  | 
 |  | 
 | class BuiltinLevelsTest(BaseTest): | 
 |     """Test builtin levels and their inheritance.""" | 
 |  | 
 |     def test_flat(self): | 
 |         #Logging levels in a flat logger namespace. | 
 |         m = self.next_message | 
 |  | 
 |         ERR = logging.getLogger("ERR") | 
 |         ERR.setLevel(logging.ERROR) | 
 |         INF = logging.LoggerAdapter(logging.getLogger("INF"), {}) | 
 |         INF.setLevel(logging.INFO) | 
 |         DEB = logging.getLogger("DEB") | 
 |         DEB.setLevel(logging.DEBUG) | 
 |  | 
 |         # These should log. | 
 |         ERR.log(logging.CRITICAL, m()) | 
 |         ERR.error(m()) | 
 |  | 
 |         INF.log(logging.CRITICAL, m()) | 
 |         INF.error(m()) | 
 |         INF.warn(m()) | 
 |         INF.info(m()) | 
 |  | 
 |         DEB.log(logging.CRITICAL, m()) | 
 |         DEB.error(m()) | 
 |         DEB.warn (m()) | 
 |         DEB.info (m()) | 
 |         DEB.debug(m()) | 
 |  | 
 |         # These should not log. | 
 |         ERR.warn(m()) | 
 |         ERR.info(m()) | 
 |         ERR.debug(m()) | 
 |  | 
 |         INF.debug(m()) | 
 |  | 
 |         self.assert_log_lines([ | 
 |             ('ERR', 'CRITICAL', '1'), | 
 |             ('ERR', 'ERROR', '2'), | 
 |             ('INF', 'CRITICAL', '3'), | 
 |             ('INF', 'ERROR', '4'), | 
 |             ('INF', 'WARNING', '5'), | 
 |             ('INF', 'INFO', '6'), | 
 |             ('DEB', 'CRITICAL', '7'), | 
 |             ('DEB', 'ERROR', '8'), | 
 |             ('DEB', 'WARNING', '9'), | 
 |             ('DEB', 'INFO', '10'), | 
 |             ('DEB', 'DEBUG', '11'), | 
 |         ]) | 
 |  | 
 |     def test_nested_explicit(self): | 
 |         # Logging levels in a nested namespace, all explicitly set. | 
 |         m = self.next_message | 
 |  | 
 |         INF = logging.getLogger("INF") | 
 |         INF.setLevel(logging.INFO) | 
 |         INF_ERR  = logging.getLogger("INF.ERR") | 
 |         INF_ERR.setLevel(logging.ERROR) | 
 |  | 
 |         # These should log. | 
 |         INF_ERR.log(logging.CRITICAL, m()) | 
 |         INF_ERR.error(m()) | 
 |  | 
 |         # These should not log. | 
 |         INF_ERR.warn(m()) | 
 |         INF_ERR.info(m()) | 
 |         INF_ERR.debug(m()) | 
 |  | 
 |         self.assert_log_lines([ | 
 |             ('INF.ERR', 'CRITICAL', '1'), | 
 |             ('INF.ERR', 'ERROR', '2'), | 
 |         ]) | 
 |  | 
 |     def test_nested_inherited(self): | 
 |         #Logging levels in a nested namespace, inherited from parent loggers. | 
 |         m = self.next_message | 
 |  | 
 |         INF = logging.getLogger("INF") | 
 |         INF.setLevel(logging.INFO) | 
 |         INF_ERR  = logging.getLogger("INF.ERR") | 
 |         INF_ERR.setLevel(logging.ERROR) | 
 |         INF_UNDEF = logging.getLogger("INF.UNDEF") | 
 |         INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF") | 
 |         UNDEF = logging.getLogger("UNDEF") | 
 |  | 
 |         # These should log. | 
 |         INF_UNDEF.log(logging.CRITICAL, m()) | 
 |         INF_UNDEF.error(m()) | 
 |         INF_UNDEF.warn(m()) | 
 |         INF_UNDEF.info(m()) | 
 |         INF_ERR_UNDEF.log(logging.CRITICAL, m()) | 
 |         INF_ERR_UNDEF.error(m()) | 
 |  | 
 |         # These should not log. | 
 |         INF_UNDEF.debug(m()) | 
 |         INF_ERR_UNDEF.warn(m()) | 
 |         INF_ERR_UNDEF.info(m()) | 
 |         INF_ERR_UNDEF.debug(m()) | 
 |  | 
 |         self.assert_log_lines([ | 
 |             ('INF.UNDEF', 'CRITICAL', '1'), | 
 |             ('INF.UNDEF', 'ERROR', '2'), | 
 |             ('INF.UNDEF', 'WARNING', '3'), | 
 |             ('INF.UNDEF', 'INFO', '4'), | 
 |             ('INF.ERR.UNDEF', 'CRITICAL', '5'), | 
 |             ('INF.ERR.UNDEF', 'ERROR', '6'), | 
 |         ]) | 
 |  | 
 |     def test_nested_with_virtual_parent(self): | 
 |         # Logging levels when some parent does not exist yet. | 
 |         m = self.next_message | 
 |  | 
 |         INF = logging.getLogger("INF") | 
 |         GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF") | 
 |         CHILD = logging.getLogger("INF.BADPARENT") | 
 |         INF.setLevel(logging.INFO) | 
 |  | 
 |         # These should log. | 
 |         GRANDCHILD.log(logging.FATAL, m()) | 
 |         GRANDCHILD.info(m()) | 
 |         CHILD.log(logging.FATAL, m()) | 
 |         CHILD.info(m()) | 
 |  | 
 |         # These should not log. | 
 |         GRANDCHILD.debug(m()) | 
 |         CHILD.debug(m()) | 
 |  | 
 |         self.assert_log_lines([ | 
 |             ('INF.BADPARENT.UNDEF', 'CRITICAL', '1'), | 
 |             ('INF.BADPARENT.UNDEF', 'INFO', '2'), | 
 |             ('INF.BADPARENT', 'CRITICAL', '3'), | 
 |             ('INF.BADPARENT', 'INFO', '4'), | 
 |         ]) | 
 |  | 
 |  | 
 | class BasicFilterTest(BaseTest): | 
 |  | 
 |     """Test the bundled Filter class.""" | 
 |  | 
 |     def test_filter(self): | 
 |         # Only messages satisfying the specified criteria pass through the | 
 |         #  filter. | 
 |         filter_ = logging.Filter("spam.eggs") | 
 |         handler = self.root_logger.handlers[0] | 
 |         try: | 
 |             handler.addFilter(filter_) | 
 |             spam = logging.getLogger("spam") | 
 |             spam_eggs = logging.getLogger("spam.eggs") | 
 |             spam_eggs_fish = logging.getLogger("spam.eggs.fish") | 
 |             spam_bakedbeans = logging.getLogger("spam.bakedbeans") | 
 |  | 
 |             spam.info(self.next_message()) | 
 |             spam_eggs.info(self.next_message())  # Good. | 
 |             spam_eggs_fish.info(self.next_message())  # Good. | 
 |             spam_bakedbeans.info(self.next_message()) | 
 |  | 
 |             self.assert_log_lines([ | 
 |                 ('spam.eggs', 'INFO', '2'), | 
 |                 ('spam.eggs.fish', 'INFO', '3'), | 
 |             ]) | 
 |         finally: | 
 |             handler.removeFilter(filter_) | 
 |  | 
 |     def test_callable_filter(self): | 
 |         # Only messages satisfying the specified criteria pass through the | 
 |         #  filter. | 
 |  | 
 |         def filterfunc(record): | 
 |             parts = record.name.split('.') | 
 |             prefix = '.'.join(parts[:2]) | 
 |             return prefix == 'spam.eggs' | 
 |  | 
 |         handler = self.root_logger.handlers[0] | 
 |         try: | 
 |             handler.addFilter(filterfunc) | 
 |             spam = logging.getLogger("spam") | 
 |             spam_eggs = logging.getLogger("spam.eggs") | 
 |             spam_eggs_fish = logging.getLogger("spam.eggs.fish") | 
 |             spam_bakedbeans = logging.getLogger("spam.bakedbeans") | 
 |  | 
 |             spam.info(self.next_message()) | 
 |             spam_eggs.info(self.next_message())  # Good. | 
 |             spam_eggs_fish.info(self.next_message())  # Good. | 
 |             spam_bakedbeans.info(self.next_message()) | 
 |  | 
 |             self.assert_log_lines([ | 
 |                 ('spam.eggs', 'INFO', '2'), | 
 |                 ('spam.eggs.fish', 'INFO', '3'), | 
 |             ]) | 
 |         finally: | 
 |             handler.removeFilter(filterfunc) | 
 |  | 
 |  | 
 | # | 
 | #   First, we define our levels. There can be as many as you want - the only | 
 | #     limitations are that they should be integers, the lowest should be > 0 and | 
 | #   larger values mean less information being logged. If you need specific | 
 | #   level values which do not fit into these limitations, you can use a | 
 | #   mapping dictionary to convert between your application levels and the | 
 | #   logging system. | 
 | # | 
 | SILENT      = 120 | 
 | TACITURN    = 119 | 
 | TERSE       = 118 | 
 | EFFUSIVE    = 117 | 
 | SOCIABLE    = 116 | 
 | VERBOSE     = 115 | 
 | TALKATIVE   = 114 | 
 | GARRULOUS   = 113 | 
 | CHATTERBOX  = 112 | 
 | BORING      = 111 | 
 |  | 
 | LEVEL_RANGE = range(BORING, SILENT + 1) | 
 |  | 
 | # | 
 | #   Next, we define names for our levels. You don't need to do this - in which | 
 | #   case the system will use "Level n" to denote the text for the level. | 
 | # | 
 | my_logging_levels = { | 
 |     SILENT      : 'Silent', | 
 |     TACITURN    : 'Taciturn', | 
 |     TERSE       : 'Terse', | 
 |     EFFUSIVE    : 'Effusive', | 
 |     SOCIABLE    : 'Sociable', | 
 |     VERBOSE     : 'Verbose', | 
 |     TALKATIVE   : 'Talkative', | 
 |     GARRULOUS   : 'Garrulous', | 
 |     CHATTERBOX  : 'Chatterbox', | 
 |     BORING      : 'Boring', | 
 | } | 
 |  | 
 | class GarrulousFilter(logging.Filter): | 
 |  | 
 |     """A filter which blocks garrulous messages.""" | 
 |  | 
 |     def filter(self, record): | 
 |         return record.levelno != GARRULOUS | 
 |  | 
 | class VerySpecificFilter(logging.Filter): | 
 |  | 
 |     """A filter which blocks sociable and taciturn messages.""" | 
 |  | 
 |     def filter(self, record): | 
 |         return record.levelno not in [SOCIABLE, TACITURN] | 
 |  | 
 |  | 
 | class CustomLevelsAndFiltersTest(BaseTest): | 
 |  | 
 |     """Test various filtering possibilities with custom logging levels.""" | 
 |  | 
 |     # Skip the logger name group. | 
 |     expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$" | 
 |  | 
 |     def setUp(self): | 
 |         BaseTest.setUp(self) | 
 |         for k, v in my_logging_levels.items(): | 
 |             logging.addLevelName(k, v) | 
 |  | 
 |     def log_at_all_levels(self, logger): | 
 |         for lvl in LEVEL_RANGE: | 
 |             logger.log(lvl, self.next_message()) | 
 |  | 
 |     def test_logger_filter(self): | 
 |         # Filter at logger level. | 
 |         self.root_logger.setLevel(VERBOSE) | 
 |         # Levels >= 'Verbose' are good. | 
 |         self.log_at_all_levels(self.root_logger) | 
 |         self.assert_log_lines([ | 
 |             ('Verbose', '5'), | 
 |             ('Sociable', '6'), | 
 |             ('Effusive', '7'), | 
 |             ('Terse', '8'), | 
 |             ('Taciturn', '9'), | 
 |             ('Silent', '10'), | 
 |         ]) | 
 |  | 
 |     def test_handler_filter(self): | 
 |         # Filter at handler level. | 
 |         self.root_logger.handlers[0].setLevel(SOCIABLE) | 
 |         try: | 
 |             # Levels >= 'Sociable' are good. | 
 |             self.log_at_all_levels(self.root_logger) | 
 |             self.assert_log_lines([ | 
 |                 ('Sociable', '6'), | 
 |                 ('Effusive', '7'), | 
 |                 ('Terse', '8'), | 
 |                 ('Taciturn', '9'), | 
 |                 ('Silent', '10'), | 
 |             ]) | 
 |         finally: | 
 |             self.root_logger.handlers[0].setLevel(logging.NOTSET) | 
 |  | 
 |     def test_specific_filters(self): | 
 |         # Set a specific filter object on the handler, and then add another | 
 |         #  filter object on the logger itself. | 
 |         handler = self.root_logger.handlers[0] | 
 |         specific_filter = None | 
 |         garr = GarrulousFilter() | 
 |         handler.addFilter(garr) | 
 |         try: | 
 |             self.log_at_all_levels(self.root_logger) | 
 |             first_lines = [ | 
 |                 # Notice how 'Garrulous' is missing | 
 |                 ('Boring', '1'), | 
 |                 ('Chatterbox', '2'), | 
 |                 ('Talkative', '4'), | 
 |                 ('Verbose', '5'), | 
 |                 ('Sociable', '6'), | 
 |                 ('Effusive', '7'), | 
 |                 ('Terse', '8'), | 
 |                 ('Taciturn', '9'), | 
 |                 ('Silent', '10'), | 
 |             ] | 
 |             self.assert_log_lines(first_lines) | 
 |  | 
 |             specific_filter = VerySpecificFilter() | 
 |             self.root_logger.addFilter(specific_filter) | 
 |             self.log_at_all_levels(self.root_logger) | 
 |             self.assert_log_lines(first_lines + [ | 
 |                 # Not only 'Garrulous' is still missing, but also 'Sociable' | 
 |                 # and 'Taciturn' | 
 |                 ('Boring', '11'), | 
 |                 ('Chatterbox', '12'), | 
 |                 ('Talkative', '14'), | 
 |                 ('Verbose', '15'), | 
 |                 ('Effusive', '17'), | 
 |                 ('Terse', '18'), | 
 |                 ('Silent', '20'), | 
 |         ]) | 
 |         finally: | 
 |             if specific_filter: | 
 |                 self.root_logger.removeFilter(specific_filter) | 
 |             handler.removeFilter(garr) | 
 |  | 
 |  | 
 | class MemoryHandlerTest(BaseTest): | 
 |  | 
 |     """Tests for the MemoryHandler.""" | 
 |  | 
 |     # Do not bother with a logger name group. | 
 |     expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$" | 
 |  | 
 |     def setUp(self): | 
 |         BaseTest.setUp(self) | 
 |         self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING, | 
 |                                                         self.root_hdlr) | 
 |         self.mem_logger = logging.getLogger('mem') | 
 |         self.mem_logger.propagate = 0 | 
 |         self.mem_logger.addHandler(self.mem_hdlr) | 
 |  | 
 |     def tearDown(self): | 
 |         self.mem_hdlr.close() | 
 |         BaseTest.tearDown(self) | 
 |  | 
 |     def test_flush(self): | 
 |         # The memory handler flushes to its target handler based on specific | 
 |         #  criteria (message count and message level). | 
 |         self.mem_logger.debug(self.next_message()) | 
 |         self.assert_log_lines([]) | 
 |         self.mem_logger.info(self.next_message()) | 
 |         self.assert_log_lines([]) | 
 |         # This will flush because the level is >= logging.WARNING | 
 |         self.mem_logger.warn(self.next_message()) | 
 |         lines = [ | 
 |             ('DEBUG', '1'), | 
 |             ('INFO', '2'), | 
 |             ('WARNING', '3'), | 
 |         ] | 
 |         self.assert_log_lines(lines) | 
 |         for n in (4, 14): | 
 |             for i in range(9): | 
 |                 self.mem_logger.debug(self.next_message()) | 
 |             self.assert_log_lines(lines) | 
 |             # This will flush because it's the 10th message since the last | 
 |             #  flush. | 
 |             self.mem_logger.debug(self.next_message()) | 
 |             lines = lines + [('DEBUG', str(i)) for i in range(n, n + 10)] | 
 |             self.assert_log_lines(lines) | 
 |  | 
 |         self.mem_logger.debug(self.next_message()) | 
 |         self.assert_log_lines(lines) | 
 |  | 
 |  | 
 | class ExceptionFormatter(logging.Formatter): | 
 |     """A special exception formatter.""" | 
 |     def formatException(self, ei): | 
 |         return "Got a [%s]" % ei[0].__name__ | 
 |  | 
 |  | 
 | class ConfigFileTest(BaseTest): | 
 |  | 
 |     """Reading logging config from a .ini-style config file.""" | 
 |  | 
 |     expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$" | 
 |  | 
 |     # config0 is a standard configuration. | 
 |     config0 = """ | 
 |     [loggers] | 
 |     keys=root | 
 |  | 
 |     [handlers] | 
 |     keys=hand1 | 
 |  | 
 |     [formatters] | 
 |     keys=form1 | 
 |  | 
 |     [logger_root] | 
 |     level=WARNING | 
 |     handlers=hand1 | 
 |  | 
 |     [handler_hand1] | 
 |     class=StreamHandler | 
 |     level=NOTSET | 
 |     formatter=form1 | 
 |     args=(sys.stdout,) | 
 |  | 
 |     [formatter_form1] | 
 |     format=%(levelname)s ++ %(message)s | 
 |     datefmt= | 
 |     """ | 
 |  | 
 |     # config1 adds a little to the standard configuration. | 
 |     config1 = """ | 
 |     [loggers] | 
 |     keys=root,parser | 
 |  | 
 |     [handlers] | 
 |     keys=hand1 | 
 |  | 
 |     [formatters] | 
 |     keys=form1 | 
 |  | 
 |     [logger_root] | 
 |     level=WARNING | 
 |     handlers= | 
 |  | 
 |     [logger_parser] | 
 |     level=DEBUG | 
 |     handlers=hand1 | 
 |     propagate=1 | 
 |     qualname=compiler.parser | 
 |  | 
 |     [handler_hand1] | 
 |     class=StreamHandler | 
 |     level=NOTSET | 
 |     formatter=form1 | 
 |     args=(sys.stdout,) | 
 |  | 
 |     [formatter_form1] | 
 |     format=%(levelname)s ++ %(message)s | 
 |     datefmt= | 
 |     """ | 
 |  | 
 |     # config1a moves the handler to the root. | 
 |     config1a = """ | 
 |     [loggers] | 
 |     keys=root,parser | 
 |  | 
 |     [handlers] | 
 |     keys=hand1 | 
 |  | 
 |     [formatters] | 
 |     keys=form1 | 
 |  | 
 |     [logger_root] | 
 |     level=WARNING | 
 |     handlers=hand1 | 
 |  | 
 |     [logger_parser] | 
 |     level=DEBUG | 
 |     handlers= | 
 |     propagate=1 | 
 |     qualname=compiler.parser | 
 |  | 
 |     [handler_hand1] | 
 |     class=StreamHandler | 
 |     level=NOTSET | 
 |     formatter=form1 | 
 |     args=(sys.stdout,) | 
 |  | 
 |     [formatter_form1] | 
 |     format=%(levelname)s ++ %(message)s | 
 |     datefmt= | 
 |     """ | 
 |  | 
 |     # config2 has a subtle configuration error that should be reported | 
 |     config2 = config1.replace("sys.stdout", "sys.stbout") | 
 |  | 
 |     # config3 has a less subtle configuration error | 
 |     config3 = config1.replace("formatter=form1", "formatter=misspelled_name") | 
 |  | 
 |     # config4 specifies a custom formatter class to be loaded | 
 |     config4 = """ | 
 |     [loggers] | 
 |     keys=root | 
 |  | 
 |     [handlers] | 
 |     keys=hand1 | 
 |  | 
 |     [formatters] | 
 |     keys=form1 | 
 |  | 
 |     [logger_root] | 
 |     level=NOTSET | 
 |     handlers=hand1 | 
 |  | 
 |     [handler_hand1] | 
 |     class=StreamHandler | 
 |     level=NOTSET | 
 |     formatter=form1 | 
 |     args=(sys.stdout,) | 
 |  | 
 |     [formatter_form1] | 
 |     class=""" + __name__ + """.ExceptionFormatter | 
 |     format=%(levelname)s:%(name)s:%(message)s | 
 |     datefmt= | 
 |     """ | 
 |  | 
 |     # config5 specifies a custom handler class to be loaded | 
 |     config5 = config1.replace('class=StreamHandler', 'class=logging.StreamHandler') | 
 |  | 
 |     # config6 uses ', ' delimiters in the handlers and formatters sections | 
 |     config6 = """ | 
 |     [loggers] | 
 |     keys=root,parser | 
 |  | 
 |     [handlers] | 
 |     keys=hand1, hand2 | 
 |  | 
 |     [formatters] | 
 |     keys=form1, form2 | 
 |  | 
 |     [logger_root] | 
 |     level=WARNING | 
 |     handlers= | 
 |  | 
 |     [logger_parser] | 
 |     level=DEBUG | 
 |     handlers=hand1 | 
 |     propagate=1 | 
 |     qualname=compiler.parser | 
 |  | 
 |     [handler_hand1] | 
 |     class=StreamHandler | 
 |     level=NOTSET | 
 |     formatter=form1 | 
 |     args=(sys.stdout,) | 
 |  | 
 |     [handler_hand2] | 
 |     class=StreamHandler | 
 |     level=NOTSET | 
 |     formatter=form1 | 
 |     args=(sys.stderr,) | 
 |  | 
 |     [formatter_form1] | 
 |     format=%(levelname)s ++ %(message)s | 
 |     datefmt= | 
 |  | 
 |     [formatter_form2] | 
 |     format=%(message)s | 
 |     datefmt= | 
 |     """ | 
 |  | 
 |     # config7 adds a compiler logger. | 
 |     config7 = """ | 
 |     [loggers] | 
 |     keys=root,parser,compiler | 
 |  | 
 |     [handlers] | 
 |     keys=hand1 | 
 |  | 
 |     [formatters] | 
 |     keys=form1 | 
 |  | 
 |     [logger_root] | 
 |     level=WARNING | 
 |     handlers=hand1 | 
 |  | 
 |     [logger_compiler] | 
 |     level=DEBUG | 
 |     handlers= | 
 |     propagate=1 | 
 |     qualname=compiler | 
 |  | 
 |     [logger_parser] | 
 |     level=DEBUG | 
 |     handlers= | 
 |     propagate=1 | 
 |     qualname=compiler.parser | 
 |  | 
 |     [handler_hand1] | 
 |     class=StreamHandler | 
 |     level=NOTSET | 
 |     formatter=form1 | 
 |     args=(sys.stdout,) | 
 |  | 
 |     [formatter_form1] | 
 |     format=%(levelname)s ++ %(message)s | 
 |     datefmt= | 
 |     """ | 
 |  | 
 |     def apply_config(self, conf): | 
 |         file = io.StringIO(textwrap.dedent(conf)) | 
 |         logging.config.fileConfig(file) | 
 |  | 
 |     def test_config0_ok(self): | 
 |         # A simple config file which overrides the default settings. | 
 |         with captured_stdout() as output: | 
 |             self.apply_config(self.config0) | 
 |             logger = logging.getLogger() | 
 |             # Won't output anything | 
 |             logger.info(self.next_message()) | 
 |             # Outputs a message | 
 |             logger.error(self.next_message()) | 
 |             self.assert_log_lines([ | 
 |                 ('ERROR', '2'), | 
 |             ], stream=output) | 
 |             # Original logger output is empty. | 
 |             self.assert_log_lines([]) | 
 |  | 
 |     def test_config1_ok(self, config=config1): | 
 |         # A config file defining a sub-parser as well. | 
 |         with captured_stdout() as output: | 
 |             self.apply_config(config) | 
 |             logger = logging.getLogger("compiler.parser") | 
 |             # Both will output a message | 
 |             logger.info(self.next_message()) | 
 |             logger.error(self.next_message()) | 
 |             self.assert_log_lines([ | 
 |                 ('INFO', '1'), | 
 |                 ('ERROR', '2'), | 
 |             ], stream=output) | 
 |             # Original logger output is empty. | 
 |             self.assert_log_lines([]) | 
 |  | 
 |     def test_config2_failure(self): | 
 |         # A simple config file which overrides the default settings. | 
 |         self.assertRaises(Exception, self.apply_config, self.config2) | 
 |  | 
 |     def test_config3_failure(self): | 
 |         # A simple config file which overrides the default settings. | 
 |         self.assertRaises(Exception, self.apply_config, self.config3) | 
 |  | 
 |     def test_config4_ok(self): | 
 |         # A config file specifying a custom formatter class. | 
 |         with captured_stdout() as output: | 
 |             self.apply_config(self.config4) | 
 |             logger = logging.getLogger() | 
 |             try: | 
 |                 raise RuntimeError() | 
 |             except RuntimeError: | 
 |                 logging.exception("just testing") | 
 |             sys.stdout.seek(0) | 
 |             self.assertEqual(output.getvalue(), | 
 |                 "ERROR:root:just testing\nGot a [RuntimeError]\n") | 
 |             # Original logger output is empty | 
 |             self.assert_log_lines([]) | 
 |  | 
 |     def test_config5_ok(self): | 
 |         self.test_config1_ok(config=self.config5) | 
 |  | 
 |     def test_config6_ok(self): | 
 |         self.test_config1_ok(config=self.config6) | 
 |  | 
 |     def test_config7_ok(self): | 
 |         with captured_stdout() as output: | 
 |             self.apply_config(self.config1a) | 
 |             logger = logging.getLogger("compiler.parser") | 
 |             # See issue #11424. compiler-hyphenated sorts | 
 |             # between compiler and compiler.xyz and this | 
 |             # was preventing compiler.xyz from being included | 
 |             # in the child loggers of compiler because of an | 
 |             # overzealous loop termination condition. | 
 |             hyphenated = logging.getLogger('compiler-hyphenated') | 
 |             # All will output a message | 
 |             logger.info(self.next_message()) | 
 |             logger.error(self.next_message()) | 
 |             hyphenated.critical(self.next_message()) | 
 |             self.assert_log_lines([ | 
 |                 ('INFO', '1'), | 
 |                 ('ERROR', '2'), | 
 |                 ('CRITICAL', '3'), | 
 |             ], stream=output) | 
 |             # Original logger output is empty. | 
 |             self.assert_log_lines([]) | 
 |         with captured_stdout() as output: | 
 |             self.apply_config(self.config7) | 
 |             logger = logging.getLogger("compiler.parser") | 
 |             self.assertFalse(logger.disabled) | 
 |             # Both will output a message | 
 |             logger.info(self.next_message()) | 
 |             logger.error(self.next_message()) | 
 |             logger = logging.getLogger("compiler.lexer") | 
 |             # Both will output a message | 
 |             logger.info(self.next_message()) | 
 |             logger.error(self.next_message()) | 
 |             # Will not appear | 
 |             hyphenated.critical(self.next_message()) | 
 |             self.assert_log_lines([ | 
 |                 ('INFO', '4'), | 
 |                 ('ERROR', '5'), | 
 |                 ('INFO', '6'), | 
 |                 ('ERROR', '7'), | 
 |             ], stream=output) | 
 |             # Original logger output is empty. | 
 |             self.assert_log_lines([]) | 
 |  | 
 | class LogRecordStreamHandler(StreamRequestHandler): | 
 |  | 
 |     """Handler for a streaming logging request. It saves the log message in the | 
 |     TCP server's 'log_output' attribute.""" | 
 |  | 
 |     TCP_LOG_END = "!!!END!!!" | 
 |  | 
 |     def handle(self): | 
 |         """Handle multiple requests - each expected to be of 4-byte length, | 
 |         followed by the LogRecord in pickle format. Logs the record | 
 |         according to whatever policy is configured locally.""" | 
 |         while True: | 
 |             chunk = self.connection.recv(4) | 
 |             if len(chunk) < 4: | 
 |                 break | 
 |             slen = struct.unpack(">L", chunk)[0] | 
 |             chunk = self.connection.recv(slen) | 
 |             while len(chunk) < slen: | 
 |                 chunk = chunk + self.connection.recv(slen - len(chunk)) | 
 |             obj = self.unpickle(chunk) | 
 |             record = logging.makeLogRecord(obj) | 
 |             self.handle_log_record(record) | 
 |  | 
 |     def unpickle(self, data): | 
 |         return pickle.loads(data) | 
 |  | 
 |     def handle_log_record(self, record): | 
 |         # If the end-of-messages sentinel is seen, tell the server to | 
 |         #  terminate. | 
 |         if self.TCP_LOG_END in record.msg: | 
 |             self.server.abort = 1 | 
 |             return | 
 |         self.server.log_output += record.msg + "\n" | 
 |  | 
 |  | 
 | class LogRecordSocketReceiver(ThreadingTCPServer): | 
 |  | 
 |     """A simple-minded TCP socket-based logging receiver suitable for test | 
 |     purposes.""" | 
 |  | 
 |     allow_reuse_address = 1 | 
 |     log_output = "" | 
 |  | 
 |     def __init__(self, host='localhost', | 
 |                              port=logging.handlers.DEFAULT_TCP_LOGGING_PORT, | 
 |                      handler=LogRecordStreamHandler): | 
 |         ThreadingTCPServer.__init__(self, (host, port), handler) | 
 |         self.abort = False | 
 |         self.timeout = 0.1 | 
 |         self.finished = threading.Event() | 
 |  | 
 |     def serve_until_stopped(self): | 
 |         while not self.abort: | 
 |             rd, wr, ex = select.select([self.socket.fileno()], [], [], | 
 |                                        self.timeout) | 
 |             if rd: | 
 |                 self.handle_request() | 
 |         # Notify the main thread that we're about to exit | 
 |         self.finished.set() | 
 |         # close the listen socket | 
 |         self.server_close() | 
 |  | 
 |  | 
 | @unittest.skipUnless(threading, 'Threading required for this test.') | 
 | class SocketHandlerTest(BaseTest): | 
 |  | 
 |     """Test for SocketHandler objects.""" | 
 |  | 
 |     def setUp(self): | 
 |         """Set up a TCP server to receive log messages, and a SocketHandler | 
 |         pointing to that server's address and port.""" | 
 |         BaseTest.setUp(self) | 
 |         self.tcpserver = LogRecordSocketReceiver(port=0) | 
 |         self.port = self.tcpserver.socket.getsockname()[1] | 
 |         self.threads = [ | 
 |                 threading.Thread(target=self.tcpserver.serve_until_stopped)] | 
 |         for thread in self.threads: | 
 |             thread.start() | 
 |  | 
 |         self.sock_hdlr = logging.handlers.SocketHandler('localhost', self.port) | 
 |         self.sock_hdlr.setFormatter(self.root_formatter) | 
 |         self.root_logger.removeHandler(self.root_logger.handlers[0]) | 
 |         self.root_logger.addHandler(self.sock_hdlr) | 
 |  | 
 |     def tearDown(self): | 
 |         """Shutdown the TCP server.""" | 
 |         try: | 
 |             self.tcpserver.abort = True | 
 |             del self.tcpserver | 
 |             self.root_logger.removeHandler(self.sock_hdlr) | 
 |             self.sock_hdlr.close() | 
 |             for thread in self.threads: | 
 |                 thread.join(2.0) | 
 |         finally: | 
 |             BaseTest.tearDown(self) | 
 |  | 
 |     def get_output(self): | 
 |         """Get the log output as received by the TCP server.""" | 
 |         # Signal the TCP receiver and wait for it to terminate. | 
 |         self.root_logger.critical(LogRecordStreamHandler.TCP_LOG_END) | 
 |         self.tcpserver.finished.wait(2.0) | 
 |         return self.tcpserver.log_output | 
 |  | 
 |     def test_output(self): | 
 |         # The log message sent to the SocketHandler is properly received. | 
 |         logger = logging.getLogger("tcp") | 
 |         logger.error("spam") | 
 |         logger.debug("eggs") | 
 |         self.assertEqual(self.get_output(), "spam\neggs\n") | 
 |  | 
 |  | 
 | class MemoryTest(BaseTest): | 
 |  | 
 |     """Test memory persistence of logger objects.""" | 
 |  | 
 |     def setUp(self): | 
 |         """Create a dict to remember potentially destroyed objects.""" | 
 |         BaseTest.setUp(self) | 
 |         self._survivors = {} | 
 |  | 
 |     def _watch_for_survival(self, *args): | 
 |         """Watch the given objects for survival, by creating weakrefs to | 
 |         them.""" | 
 |         for obj in args: | 
 |             key = id(obj), repr(obj) | 
 |             self._survivors[key] = weakref.ref(obj) | 
 |  | 
 |     def _assertTruesurvival(self): | 
 |         """Assert that all objects watched for survival have survived.""" | 
 |         # Trigger cycle breaking. | 
 |         gc.collect() | 
 |         dead = [] | 
 |         for (id_, repr_), ref in self._survivors.items(): | 
 |             if ref() is None: | 
 |                 dead.append(repr_) | 
 |         if dead: | 
 |             self.fail("%d objects should have survived " | 
 |                 "but have been destroyed: %s" % (len(dead), ", ".join(dead))) | 
 |  | 
 |     def test_persistent_loggers(self): | 
 |         # Logger objects are persistent and retain their configuration, even | 
 |         #  if visible references are destroyed. | 
 |         self.root_logger.setLevel(logging.INFO) | 
 |         foo = logging.getLogger("foo") | 
 |         self._watch_for_survival(foo) | 
 |         foo.setLevel(logging.DEBUG) | 
 |         self.root_logger.debug(self.next_message()) | 
 |         foo.debug(self.next_message()) | 
 |         self.assert_log_lines([ | 
 |             ('foo', 'DEBUG', '2'), | 
 |         ]) | 
 |         del foo | 
 |         # foo has survived. | 
 |         self._assertTruesurvival() | 
 |         # foo has retained its settings. | 
 |         bar = logging.getLogger("foo") | 
 |         bar.debug(self.next_message()) | 
 |         self.assert_log_lines([ | 
 |             ('foo', 'DEBUG', '2'), | 
 |             ('foo', 'DEBUG', '3'), | 
 |         ]) | 
 |  | 
 |  | 
 | class EncodingTest(BaseTest): | 
 |     def test_encoding_plain_file(self): | 
 |         # In Python 2.x, a plain file object is treated as having no encoding. | 
 |         log = logging.getLogger("test") | 
 |         fd, fn = tempfile.mkstemp(".log", "test_logging-1-") | 
 |         os.close(fd) | 
 |         # the non-ascii data we write to the log. | 
 |         data = "foo\x80" | 
 |         try: | 
 |             handler = logging.FileHandler(fn, encoding="utf-8") | 
 |             log.addHandler(handler) | 
 |             try: | 
 |                 # write non-ascii data to the log. | 
 |                 log.warning(data) | 
 |             finally: | 
 |                 log.removeHandler(handler) | 
 |                 handler.close() | 
 |             # check we wrote exactly those bytes, ignoring trailing \n etc | 
 |             f = open(fn, encoding="utf-8") | 
 |             try: | 
 |                 self.assertEqual(f.read().rstrip(), data) | 
 |             finally: | 
 |                 f.close() | 
 |         finally: | 
 |             if os.path.isfile(fn): | 
 |                 os.remove(fn) | 
 |  | 
 |     def test_encoding_cyrillic_unicode(self): | 
 |         log = logging.getLogger("test") | 
 |         #Get a message in Unicode: Do svidanya in Cyrillic (meaning goodbye) | 
 |         message = '\u0434\u043e \u0441\u0432\u0438\u0434\u0430\u043d\u0438\u044f' | 
 |         #Ensure it's written in a Cyrillic encoding | 
 |         writer_class = codecs.getwriter('cp1251') | 
 |         writer_class.encoding = 'cp1251' | 
 |         stream = io.BytesIO() | 
 |         writer = writer_class(stream, 'strict') | 
 |         handler = logging.StreamHandler(writer) | 
 |         log.addHandler(handler) | 
 |         try: | 
 |             log.warning(message) | 
 |         finally: | 
 |             log.removeHandler(handler) | 
 |             handler.close() | 
 |         # check we wrote exactly those bytes, ignoring trailing \n etc | 
 |         s = stream.getvalue() | 
 |         #Compare against what the data should be when encoded in CP-1251 | 
 |         self.assertEqual(s, b'\xe4\xee \xf1\xe2\xe8\xe4\xe0\xed\xe8\xff\n') | 
 |  | 
 |  | 
 | class WarningsTest(BaseTest): | 
 |  | 
 |     def test_warnings(self): | 
 |         with warnings.catch_warnings(): | 
 |             logging.captureWarnings(True) | 
 |             try: | 
 |                 warnings.filterwarnings("always", category=UserWarning) | 
 |                 file = io.StringIO() | 
 |                 h = logging.StreamHandler(file) | 
 |                 logger = logging.getLogger("py.warnings") | 
 |                 logger.addHandler(h) | 
 |                 warnings.warn("I'm warning you...") | 
 |                 logger.removeHandler(h) | 
 |                 s = file.getvalue() | 
 |                 h.close() | 
 |                 self.assertTrue(s.find("UserWarning: I'm warning you...\n") > 0) | 
 |  | 
 |                 #See if an explicit file uses the original implementation | 
 |                 file = io.StringIO() | 
 |                 warnings.showwarning("Explicit", UserWarning, "dummy.py", 42, | 
 |                                         file, "Dummy line") | 
 |                 s = file.getvalue() | 
 |                 file.close() | 
 |                 self.assertEqual(s, | 
 |                         "dummy.py:42: UserWarning: Explicit\n  Dummy line\n") | 
 |             finally: | 
 |                 logging.captureWarnings(False) | 
 |  | 
 |  | 
 | def formatFunc(format, datefmt=None): | 
 |     return logging.Formatter(format, datefmt) | 
 |  | 
 | def handlerFunc(): | 
 |     return logging.StreamHandler() | 
 |  | 
 | class CustomHandler(logging.StreamHandler): | 
 |     pass | 
 |  | 
 | class ConfigDictTest(BaseTest): | 
 |  | 
 |     """Reading logging config from a dictionary.""" | 
 |  | 
 |     expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$" | 
 |  | 
 |     # config0 is a standard configuration. | 
 |     config0 = { | 
 |         'version': 1, | 
 |         'formatters': { | 
 |             'form1' : { | 
 |                 'format' : '%(levelname)s ++ %(message)s', | 
 |             }, | 
 |         }, | 
 |         'handlers' : { | 
 |             'hand1' : { | 
 |                 'class' : 'logging.StreamHandler', | 
 |                 'formatter' : 'form1', | 
 |                 'level' : 'NOTSET', | 
 |                 'stream'  : 'ext://sys.stdout', | 
 |             }, | 
 |         }, | 
 |         'root' : { | 
 |             'level' : 'WARNING', | 
 |             'handlers' : ['hand1'], | 
 |         }, | 
 |     } | 
 |  | 
 |     # config1 adds a little to the standard configuration. | 
 |     config1 = { | 
 |         'version': 1, | 
 |         'formatters': { | 
 |             'form1' : { | 
 |                 'format' : '%(levelname)s ++ %(message)s', | 
 |             }, | 
 |         }, | 
 |         'handlers' : { | 
 |             'hand1' : { | 
 |                 'class' : 'logging.StreamHandler', | 
 |                 'formatter' : 'form1', | 
 |                 'level' : 'NOTSET', | 
 |                 'stream'  : 'ext://sys.stdout', | 
 |             }, | 
 |         }, | 
 |         'loggers' : { | 
 |             'compiler.parser' : { | 
 |                 'level' : 'DEBUG', | 
 |                 'handlers' : ['hand1'], | 
 |             }, | 
 |         }, | 
 |         'root' : { | 
 |             'level' : 'WARNING', | 
 |         }, | 
 |     } | 
 |  | 
 |     # config1a moves the handler to the root. Used with config8a | 
 |     config1a = { | 
 |         'version': 1, | 
 |         'formatters': { | 
 |             'form1' : { | 
 |                 'format' : '%(levelname)s ++ %(message)s', | 
 |             }, | 
 |         }, | 
 |         'handlers' : { | 
 |             'hand1' : { | 
 |                 'class' : 'logging.StreamHandler', | 
 |                 'formatter' : 'form1', | 
 |                 'level' : 'NOTSET', | 
 |                 'stream'  : 'ext://sys.stdout', | 
 |             }, | 
 |         }, | 
 |         'loggers' : { | 
 |             'compiler.parser' : { | 
 |                 'level' : 'DEBUG', | 
 |             }, | 
 |         }, | 
 |         'root' : { | 
 |             'level' : 'WARNING', | 
 |             'handlers' : ['hand1'], | 
 |         }, | 
 |     } | 
 |  | 
 |     # config2 has a subtle configuration error that should be reported | 
 |     config2 = { | 
 |         'version': 1, | 
 |         'formatters': { | 
 |             'form1' : { | 
 |                 'format' : '%(levelname)s ++ %(message)s', | 
 |             }, | 
 |         }, | 
 |         'handlers' : { | 
 |             'hand1' : { | 
 |                 'class' : 'logging.StreamHandler', | 
 |                 'formatter' : 'form1', | 
 |                 'level' : 'NOTSET', | 
 |                 'stream'  : 'ext://sys.stdbout', | 
 |             }, | 
 |         }, | 
 |         'loggers' : { | 
 |             'compiler.parser' : { | 
 |                 'level' : 'DEBUG', | 
 |                 'handlers' : ['hand1'], | 
 |             }, | 
 |         }, | 
 |         'root' : { | 
 |             'level' : 'WARNING', | 
 |         }, | 
 |     } | 
 |  | 
 |     #As config1 but with a misspelt level on a handler | 
 |     config2a = { | 
 |         'version': 1, | 
 |         'formatters': { | 
 |             'form1' : { | 
 |                 'format' : '%(levelname)s ++ %(message)s', | 
 |             }, | 
 |         }, | 
 |         'handlers' : { | 
 |             'hand1' : { | 
 |                 'class' : 'logging.StreamHandler', | 
 |                 'formatter' : 'form1', | 
 |                 'level' : 'NTOSET', | 
 |                 'stream'  : 'ext://sys.stdout', | 
 |             }, | 
 |         }, | 
 |         'loggers' : { | 
 |             'compiler.parser' : { | 
 |                 'level' : 'DEBUG', | 
 |                 'handlers' : ['hand1'], | 
 |             }, | 
 |         }, | 
 |         'root' : { | 
 |             'level' : 'WARNING', | 
 |         }, | 
 |     } | 
 |  | 
 |  | 
 |     #As config1 but with a misspelt level on a logger | 
 |     config2b = { | 
 |         'version': 1, | 
 |         'formatters': { | 
 |             'form1' : { | 
 |                 'format' : '%(levelname)s ++ %(message)s', | 
 |             }, | 
 |         }, | 
 |         'handlers' : { | 
 |             'hand1' : { | 
 |                 'class' : 'logging.StreamHandler', | 
 |                 'formatter' : 'form1', | 
 |                 'level' : 'NOTSET', | 
 |                 'stream'  : 'ext://sys.stdout', | 
 |             }, | 
 |         }, | 
 |         'loggers' : { | 
 |             'compiler.parser' : { | 
 |                 'level' : 'DEBUG', | 
 |                 'handlers' : ['hand1'], | 
 |             }, | 
 |         }, | 
 |         'root' : { | 
 |             'level' : 'WRANING', | 
 |         }, | 
 |     } | 
 |  | 
 |     # config3 has a less subtle configuration error | 
 |     config3 = { | 
 |         'version': 1, | 
 |         'formatters': { | 
 |             'form1' : { | 
 |                 'format' : '%(levelname)s ++ %(message)s', | 
 |             }, | 
 |         }, | 
 |         'handlers' : { | 
 |             'hand1' : { | 
 |                 'class' : 'logging.StreamHandler', | 
 |                 'formatter' : 'misspelled_name', | 
 |                 'level' : 'NOTSET', | 
 |                 'stream'  : 'ext://sys.stdout', | 
 |             }, | 
 |         }, | 
 |         'loggers' : { | 
 |             'compiler.parser' : { | 
 |                 'level' : 'DEBUG', | 
 |                 'handlers' : ['hand1'], | 
 |             }, | 
 |         }, | 
 |         'root' : { | 
 |             'level' : 'WARNING', | 
 |         }, | 
 |     } | 
 |  | 
 |     # config4 specifies a custom formatter class to be loaded | 
 |     config4 = { | 
 |         'version': 1, | 
 |         'formatters': { | 
 |             'form1' : { | 
 |                 '()' : __name__ + '.ExceptionFormatter', | 
 |                 'format' : '%(levelname)s:%(name)s:%(message)s', | 
 |             }, | 
 |         }, | 
 |         'handlers' : { | 
 |             'hand1' : { | 
 |                 'class' : 'logging.StreamHandler', | 
 |                 'formatter' : 'form1', | 
 |                 'level' : 'NOTSET', | 
 |                 'stream'  : 'ext://sys.stdout', | 
 |             }, | 
 |         }, | 
 |         'root' : { | 
 |             'level' : 'NOTSET', | 
 |                 'handlers' : ['hand1'], | 
 |         }, | 
 |     } | 
 |  | 
 |     # As config4 but using an actual callable rather than a string | 
 |     config4a = { | 
 |         'version': 1, | 
 |         'formatters': { | 
 |             'form1' : { | 
 |                 '()' : ExceptionFormatter, | 
 |                 'format' : '%(levelname)s:%(name)s:%(message)s', | 
 |             }, | 
 |             'form2' : { | 
 |                 '()' : __name__ + '.formatFunc', | 
 |                 'format' : '%(levelname)s:%(name)s:%(message)s', | 
 |             }, | 
 |             'form3' : { | 
 |                 '()' : formatFunc, | 
 |                 'format' : '%(levelname)s:%(name)s:%(message)s', | 
 |             }, | 
 |         }, | 
 |         'handlers' : { | 
 |             'hand1' : { | 
 |                 'class' : 'logging.StreamHandler', | 
 |                 'formatter' : 'form1', | 
 |                 'level' : 'NOTSET', | 
 |                 'stream'  : 'ext://sys.stdout', | 
 |             }, | 
 |             'hand2' : { | 
 |                 '()' : handlerFunc, | 
 |             }, | 
 |         }, | 
 |         'root' : { | 
 |             'level' : 'NOTSET', | 
 |                 'handlers' : ['hand1'], | 
 |         }, | 
 |     } | 
 |  | 
 |     # config5 specifies a custom handler class to be loaded | 
 |     config5 = { | 
 |         'version': 1, | 
 |         'formatters': { | 
 |             'form1' : { | 
 |                 'format' : '%(levelname)s ++ %(message)s', | 
 |             }, | 
 |         }, | 
 |         'handlers' : { | 
 |             'hand1' : { | 
 |                 'class' : __name__ + '.CustomHandler', | 
 |                 'formatter' : 'form1', | 
 |                 'level' : 'NOTSET', | 
 |                 'stream'  : 'ext://sys.stdout', | 
 |             }, | 
 |         }, | 
 |         'loggers' : { | 
 |             'compiler.parser' : { | 
 |                 'level' : 'DEBUG', | 
 |                 'handlers' : ['hand1'], | 
 |             }, | 
 |         }, | 
 |         'root' : { | 
 |             'level' : 'WARNING', | 
 |         }, | 
 |     } | 
 |  | 
 |     # config6 specifies a custom handler class to be loaded | 
 |     # but has bad arguments | 
 |     config6 = { | 
 |         'version': 1, | 
 |         'formatters': { | 
 |             'form1' : { | 
 |                 'format' : '%(levelname)s ++ %(message)s', | 
 |             }, | 
 |         }, | 
 |         'handlers' : { | 
 |             'hand1' : { | 
 |                 'class' : __name__ + '.CustomHandler', | 
 |                 'formatter' : 'form1', | 
 |                 'level' : 'NOTSET', | 
 |                 'stream'  : 'ext://sys.stdout', | 
 |                 '9' : 'invalid parameter name', | 
 |             }, | 
 |         }, | 
 |         'loggers' : { | 
 |             'compiler.parser' : { | 
 |                 'level' : 'DEBUG', | 
 |                 'handlers' : ['hand1'], | 
 |             }, | 
 |         }, | 
 |         'root' : { | 
 |             'level' : 'WARNING', | 
 |         }, | 
 |     } | 
 |  | 
 |     #config 7 does not define compiler.parser but defines compiler.lexer | 
 |     #so compiler.parser should be disabled after applying it | 
 |     config7 = { | 
 |         'version': 1, | 
 |         'formatters': { | 
 |             'form1' : { | 
 |                 'format' : '%(levelname)s ++ %(message)s', | 
 |             }, | 
 |         }, | 
 |         'handlers' : { | 
 |             'hand1' : { | 
 |                 'class' : 'logging.StreamHandler', | 
 |                 'formatter' : 'form1', | 
 |                 'level' : 'NOTSET', | 
 |                 'stream'  : 'ext://sys.stdout', | 
 |             }, | 
 |         }, | 
 |         'loggers' : { | 
 |             'compiler.lexer' : { | 
 |                 'level' : 'DEBUG', | 
 |                 'handlers' : ['hand1'], | 
 |             }, | 
 |         }, | 
 |         'root' : { | 
 |             'level' : 'WARNING', | 
 |         }, | 
 |     } | 
 |  | 
 |     # config8 defines both compiler and compiler.lexer | 
 |     # so compiler.parser should not be disabled (since | 
 |     # compiler is defined) | 
 |     config8 = { | 
 |         'version': 1, | 
 |         'disable_existing_loggers' : False, | 
 |         'formatters': { | 
 |             'form1' : { | 
 |                 'format' : '%(levelname)s ++ %(message)s', | 
 |             }, | 
 |         }, | 
 |         'handlers' : { | 
 |             'hand1' : { | 
 |                 'class' : 'logging.StreamHandler', | 
 |                 'formatter' : 'form1', | 
 |                 'level' : 'NOTSET', | 
 |                 'stream'  : 'ext://sys.stdout', | 
 |             }, | 
 |         }, | 
 |         'loggers' : { | 
 |             'compiler' : { | 
 |                 'level' : 'DEBUG', | 
 |                 'handlers' : ['hand1'], | 
 |             }, | 
 |             'compiler.lexer' : { | 
 |             }, | 
 |         }, | 
 |         'root' : { | 
 |             'level' : 'WARNING', | 
 |         }, | 
 |     } | 
 |  | 
 |     # config8a disables existing loggers | 
 |     config8a = { | 
 |         'version': 1, | 
 |         'disable_existing_loggers' : True, | 
 |         'formatters': { | 
 |             'form1' : { | 
 |                 'format' : '%(levelname)s ++ %(message)s', | 
 |             }, | 
 |         }, | 
 |         'handlers' : { | 
 |             'hand1' : { | 
 |                 'class' : 'logging.StreamHandler', | 
 |                 'formatter' : 'form1', | 
 |                 'level' : 'NOTSET', | 
 |                 'stream'  : 'ext://sys.stdout', | 
 |             }, | 
 |         }, | 
 |         'loggers' : { | 
 |             'compiler' : { | 
 |                 'level' : 'DEBUG', | 
 |                 'handlers' : ['hand1'], | 
 |             }, | 
 |             'compiler.lexer' : { | 
 |             }, | 
 |         }, | 
 |         'root' : { | 
 |             'level' : 'WARNING', | 
 |         }, | 
 |     } | 
 |  | 
 |     config9 = { | 
 |         'version': 1, | 
 |         'formatters': { | 
 |             'form1' : { | 
 |                 'format' : '%(levelname)s ++ %(message)s', | 
 |             }, | 
 |         }, | 
 |         'handlers' : { | 
 |             'hand1' : { | 
 |                 'class' : 'logging.StreamHandler', | 
 |                 'formatter' : 'form1', | 
 |                 'level' : 'WARNING', | 
 |                 'stream'  : 'ext://sys.stdout', | 
 |             }, | 
 |         }, | 
 |         'loggers' : { | 
 |             'compiler.parser' : { | 
 |                 'level' : 'WARNING', | 
 |                 'handlers' : ['hand1'], | 
 |             }, | 
 |         }, | 
 |         'root' : { | 
 |             'level' : 'NOTSET', | 
 |         }, | 
 |     } | 
 |  | 
 |     config9a = { | 
 |         'version': 1, | 
 |         'incremental' : True, | 
 |         'handlers' : { | 
 |             'hand1' : { | 
 |                 'level' : 'WARNING', | 
 |             }, | 
 |         }, | 
 |         'loggers' : { | 
 |             'compiler.parser' : { | 
 |                 'level' : 'INFO', | 
 |             }, | 
 |         }, | 
 |     } | 
 |  | 
 |     config9b = { | 
 |         'version': 1, | 
 |         'incremental' : True, | 
 |         'handlers' : { | 
 |             'hand1' : { | 
 |                 'level' : 'INFO', | 
 |             }, | 
 |         }, | 
 |         'loggers' : { | 
 |             'compiler.parser' : { | 
 |                 'level' : 'INFO', | 
 |             }, | 
 |         }, | 
 |     } | 
 |  | 
 |     #As config1 but with a filter added | 
 |     config10 = { | 
 |         'version': 1, | 
 |         'formatters': { | 
 |             'form1' : { | 
 |                 'format' : '%(levelname)s ++ %(message)s', | 
 |             }, | 
 |         }, | 
 |         'filters' : { | 
 |             'filt1' : { | 
 |                 'name' : 'compiler.parser', | 
 |             }, | 
 |         }, | 
 |         'handlers' : { | 
 |             'hand1' : { | 
 |                 'class' : 'logging.StreamHandler', | 
 |                 'formatter' : 'form1', | 
 |                 'level' : 'NOTSET', | 
 |                 'stream'  : 'ext://sys.stdout', | 
 |                 'filters' : ['filt1'], | 
 |             }, | 
 |         }, | 
 |         'loggers' : { | 
 |             'compiler.parser' : { | 
 |                 'level' : 'DEBUG', | 
 |                 'filters' : ['filt1'], | 
 |             }, | 
 |         }, | 
 |         'root' : { | 
 |             'level' : 'WARNING', | 
 |             'handlers' : ['hand1'], | 
 |         }, | 
 |     } | 
 |  | 
 |     #As config1 but using cfg:// references | 
 |     config11 = { | 
 |         'version': 1, | 
 |         'true_formatters': { | 
 |             'form1' : { | 
 |                 'format' : '%(levelname)s ++ %(message)s', | 
 |             }, | 
 |         }, | 
 |         'handler_configs': { | 
 |             'hand1' : { | 
 |                 'class' : 'logging.StreamHandler', | 
 |                 'formatter' : 'form1', | 
 |                 'level' : 'NOTSET', | 
 |                 'stream'  : 'ext://sys.stdout', | 
 |             }, | 
 |         }, | 
 |         'formatters' : 'cfg://true_formatters', | 
 |         'handlers' : { | 
 |             'hand1' : 'cfg://handler_configs[hand1]', | 
 |         }, | 
 |         'loggers' : { | 
 |             'compiler.parser' : { | 
 |                 'level' : 'DEBUG', | 
 |                 'handlers' : ['hand1'], | 
 |             }, | 
 |         }, | 
 |         'root' : { | 
 |             'level' : 'WARNING', | 
 |         }, | 
 |     } | 
 |  | 
 |     #As config11 but missing the version key | 
 |     config12 = { | 
 |         'true_formatters': { | 
 |             'form1' : { | 
 |                 'format' : '%(levelname)s ++ %(message)s', | 
 |             }, | 
 |         }, | 
 |         'handler_configs': { | 
 |             'hand1' : { | 
 |                 'class' : 'logging.StreamHandler', | 
 |                 'formatter' : 'form1', | 
 |                 'level' : 'NOTSET', | 
 |                 'stream'  : 'ext://sys.stdout', | 
 |             }, | 
 |         }, | 
 |         'formatters' : 'cfg://true_formatters', | 
 |         'handlers' : { | 
 |             'hand1' : 'cfg://handler_configs[hand1]', | 
 |         }, | 
 |         'loggers' : { | 
 |             'compiler.parser' : { | 
 |                 'level' : 'DEBUG', | 
 |                 'handlers' : ['hand1'], | 
 |             }, | 
 |         }, | 
 |         'root' : { | 
 |             'level' : 'WARNING', | 
 |         }, | 
 |     } | 
 |  | 
 |     #As config11 but using an unsupported version | 
 |     config13 = { | 
 |         'version': 2, | 
 |         'true_formatters': { | 
 |             'form1' : { | 
 |                 'format' : '%(levelname)s ++ %(message)s', | 
 |             }, | 
 |         }, | 
 |         'handler_configs': { | 
 |             'hand1' : { | 
 |                 'class' : 'logging.StreamHandler', | 
 |                 'formatter' : 'form1', | 
 |                 'level' : 'NOTSET', | 
 |                 'stream'  : 'ext://sys.stdout', | 
 |             }, | 
 |         }, | 
 |         'formatters' : 'cfg://true_formatters', | 
 |         'handlers' : { | 
 |             'hand1' : 'cfg://handler_configs[hand1]', | 
 |         }, | 
 |         'loggers' : { | 
 |             'compiler.parser' : { | 
 |                 'level' : 'DEBUG', | 
 |                 'handlers' : ['hand1'], | 
 |             }, | 
 |         }, | 
 |         'root' : { | 
 |             'level' : 'WARNING', | 
 |         }, | 
 |     } | 
 |  | 
 |     def apply_config(self, conf): | 
 |         logging.config.dictConfig(conf) | 
 |  | 
 |     def test_config0_ok(self): | 
 |         # A simple config which overrides the default settings. | 
 |         with captured_stdout() as output: | 
 |             self.apply_config(self.config0) | 
 |             logger = logging.getLogger() | 
 |             # Won't output anything | 
 |             logger.info(self.next_message()) | 
 |             # Outputs a message | 
 |             logger.error(self.next_message()) | 
 |             self.assert_log_lines([ | 
 |                 ('ERROR', '2'), | 
 |             ], stream=output) | 
 |             # Original logger output is empty. | 
 |             self.assert_log_lines([]) | 
 |  | 
 |     def test_config1_ok(self, config=config1): | 
 |         # A config defining a sub-parser as well. | 
 |         with captured_stdout() as output: | 
 |             self.apply_config(config) | 
 |             logger = logging.getLogger("compiler.parser") | 
 |             # Both will output a message | 
 |             logger.info(self.next_message()) | 
 |             logger.error(self.next_message()) | 
 |             self.assert_log_lines([ | 
 |                 ('INFO', '1'), | 
 |                 ('ERROR', '2'), | 
 |             ], stream=output) | 
 |             # Original logger output is empty. | 
 |             self.assert_log_lines([]) | 
 |  | 
 |     def test_config2_failure(self): | 
 |         # A simple config which overrides the default settings. | 
 |         self.assertRaises(Exception, self.apply_config, self.config2) | 
 |  | 
 |     def test_config2a_failure(self): | 
 |         # A simple config which overrides the default settings. | 
 |         self.assertRaises(Exception, self.apply_config, self.config2a) | 
 |  | 
 |     def test_config2b_failure(self): | 
 |         # A simple config which overrides the default settings. | 
 |         self.assertRaises(Exception, self.apply_config, self.config2b) | 
 |  | 
 |     def test_config3_failure(self): | 
 |         # A simple config which overrides the default settings. | 
 |         self.assertRaises(Exception, self.apply_config, self.config3) | 
 |  | 
 |     def test_config4_ok(self): | 
 |         # A config specifying a custom formatter class. | 
 |         with captured_stdout() as output: | 
 |             self.apply_config(self.config4) | 
 |             #logger = logging.getLogger() | 
 |             try: | 
 |                 raise RuntimeError() | 
 |             except RuntimeError: | 
 |                 logging.exception("just testing") | 
 |             sys.stdout.seek(0) | 
 |             self.assertEqual(output.getvalue(), | 
 |                 "ERROR:root:just testing\nGot a [RuntimeError]\n") | 
 |             # Original logger output is empty | 
 |             self.assert_log_lines([]) | 
 |  | 
 |     def test_config4a_ok(self): | 
 |         # A config specifying a custom formatter class. | 
 |         with captured_stdout() as output: | 
 |             self.apply_config(self.config4a) | 
 |             #logger = logging.getLogger() | 
 |             try: | 
 |                 raise RuntimeError() | 
 |             except RuntimeError: | 
 |                 logging.exception("just testing") | 
 |             sys.stdout.seek(0) | 
 |             self.assertEqual(output.getvalue(), | 
 |                 "ERROR:root:just testing\nGot a [RuntimeError]\n") | 
 |             # Original logger output is empty | 
 |             self.assert_log_lines([]) | 
 |  | 
 |     def test_config5_ok(self): | 
 |         self.test_config1_ok(config=self.config5) | 
 |  | 
 |     def test_config6_failure(self): | 
 |         self.assertRaises(Exception, self.apply_config, self.config6) | 
 |  | 
 |     def test_config7_ok(self): | 
 |         with captured_stdout() as output: | 
 |             self.apply_config(self.config1) | 
 |             logger = logging.getLogger("compiler.parser") | 
 |             # Both will output a message | 
 |             logger.info(self.next_message()) | 
 |             logger.error(self.next_message()) | 
 |             self.assert_log_lines([ | 
 |                 ('INFO', '1'), | 
 |                 ('ERROR', '2'), | 
 |             ], stream=output) | 
 |             # Original logger output is empty. | 
 |             self.assert_log_lines([]) | 
 |         with captured_stdout() as output: | 
 |             self.apply_config(self.config7) | 
 |             logger = logging.getLogger("compiler.parser") | 
 |             self.assertTrue(logger.disabled) | 
 |             logger = logging.getLogger("compiler.lexer") | 
 |             # Both will output a message | 
 |             logger.info(self.next_message()) | 
 |             logger.error(self.next_message()) | 
 |             self.assert_log_lines([ | 
 |                 ('INFO', '3'), | 
 |                 ('ERROR', '4'), | 
 |             ], stream=output) | 
 |             # Original logger output is empty. | 
 |             self.assert_log_lines([]) | 
 |  | 
 |     #Same as test_config_7_ok but don't disable old loggers. | 
 |     def test_config_8_ok(self): | 
 |         with captured_stdout() as output: | 
 |             self.apply_config(self.config1) | 
 |             logger = logging.getLogger("compiler.parser") | 
 |             # All will output a message | 
 |             logger.info(self.next_message()) | 
 |             logger.error(self.next_message()) | 
 |             self.assert_log_lines([ | 
 |                 ('INFO', '1'), | 
 |                 ('ERROR', '2'), | 
 |             ], stream=output) | 
 |             # Original logger output is empty. | 
 |             self.assert_log_lines([]) | 
 |         with captured_stdout() as output: | 
 |             self.apply_config(self.config8) | 
 |             logger = logging.getLogger("compiler.parser") | 
 |             self.assertFalse(logger.disabled) | 
 |             # Both will output a message | 
 |             logger.info(self.next_message()) | 
 |             logger.error(self.next_message()) | 
 |             logger = logging.getLogger("compiler.lexer") | 
 |             # Both will output a message | 
 |             logger.info(self.next_message()) | 
 |             logger.error(self.next_message()) | 
 |             self.assert_log_lines([ | 
 |                 ('INFO', '3'), | 
 |                 ('ERROR', '4'), | 
 |                 ('INFO', '5'), | 
 |                 ('ERROR', '6'), | 
 |             ], stream=output) | 
 |             # Original logger output is empty. | 
 |             self.assert_log_lines([]) | 
 |  | 
 |     def test_config_8a_ok(self): | 
 |         with captured_stdout() as output: | 
 |             self.apply_config(self.config1a) | 
 |             logger = logging.getLogger("compiler.parser") | 
 |             # See issue #11424. compiler-hyphenated sorts | 
 |             # between compiler and compiler.xyz and this | 
 |             # was preventing compiler.xyz from being included | 
 |             # in the child loggers of compiler because of an | 
 |             # overzealous loop termination condition. | 
 |             hyphenated = logging.getLogger('compiler-hyphenated') | 
 |             # All will output a message | 
 |             logger.info(self.next_message()) | 
 |             logger.error(self.next_message()) | 
 |             hyphenated.critical(self.next_message()) | 
 |             self.assert_log_lines([ | 
 |                 ('INFO', '1'), | 
 |                 ('ERROR', '2'), | 
 |                 ('CRITICAL', '3'), | 
 |             ], stream=output) | 
 |             # Original logger output is empty. | 
 |             self.assert_log_lines([]) | 
 |         with captured_stdout() as output: | 
 |             self.apply_config(self.config8a) | 
 |             logger = logging.getLogger("compiler.parser") | 
 |             self.assertFalse(logger.disabled) | 
 |             # Both will output a message | 
 |             logger.info(self.next_message()) | 
 |             logger.error(self.next_message()) | 
 |             logger = logging.getLogger("compiler.lexer") | 
 |             # Both will output a message | 
 |             logger.info(self.next_message()) | 
 |             logger.error(self.next_message()) | 
 |             # Will not appear | 
 |             hyphenated.critical(self.next_message()) | 
 |             self.assert_log_lines([ | 
 |                 ('INFO', '4'), | 
 |                 ('ERROR', '5'), | 
 |                 ('INFO', '6'), | 
 |                 ('ERROR', '7'), | 
 |             ], stream=output) | 
 |             # Original logger output is empty. | 
 |             self.assert_log_lines([]) | 
 |  | 
 |     def test_config_9_ok(self): | 
 |         with captured_stdout() as output: | 
 |             self.apply_config(self.config9) | 
 |             logger = logging.getLogger("compiler.parser") | 
 |             #Nothing will be output since both handler and logger are set to WARNING | 
 |             logger.info(self.next_message()) | 
 |             self.assert_log_lines([], stream=output) | 
 |             self.apply_config(self.config9a) | 
 |             #Nothing will be output since both handler is still set to WARNING | 
 |             logger.info(self.next_message()) | 
 |             self.assert_log_lines([], stream=output) | 
 |             self.apply_config(self.config9b) | 
 |             #Message should now be output | 
 |             logger.info(self.next_message()) | 
 |             self.assert_log_lines([ | 
 |                 ('INFO', '3'), | 
 |             ], stream=output) | 
 |  | 
 |     def test_config_10_ok(self): | 
 |         with captured_stdout() as output: | 
 |             self.apply_config(self.config10) | 
 |             logger = logging.getLogger("compiler.parser") | 
 |             logger.warning(self.next_message()) | 
 |             logger = logging.getLogger('compiler') | 
 |             #Not output, because filtered | 
 |             logger.warning(self.next_message()) | 
 |             logger = logging.getLogger('compiler.lexer') | 
 |             #Not output, because filtered | 
 |             logger.warning(self.next_message()) | 
 |             logger = logging.getLogger("compiler.parser.codegen") | 
 |             #Output, as not filtered | 
 |             logger.error(self.next_message()) | 
 |             self.assert_log_lines([ | 
 |                 ('WARNING', '1'), | 
 |                 ('ERROR', '4'), | 
 |             ], stream=output) | 
 |  | 
 |     def test_config11_ok(self): | 
 |         self.test_config1_ok(self.config11) | 
 |  | 
 |     def test_config12_failure(self): | 
 |         self.assertRaises(Exception, self.apply_config, self.config12) | 
 |  | 
 |     def test_config13_failure(self): | 
 |         self.assertRaises(Exception, self.apply_config, self.config13) | 
 |  | 
 |     @unittest.skipUnless(threading, 'listen() needs threading to work') | 
 |     def setup_via_listener(self, text): | 
 |         text = text.encode("utf-8") | 
 |         # Ask for a randomly assigned port (by using port 0) | 
 |         t = logging.config.listen(0) | 
 |         t.start() | 
 |         t.ready.wait() | 
 |         # Now get the port allocated | 
 |         port = t.port | 
 |         t.ready.clear() | 
 |         try: | 
 |             sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | 
 |             sock.settimeout(2.0) | 
 |             sock.connect(('localhost', port)) | 
 |  | 
 |             slen = struct.pack('>L', len(text)) | 
 |             s = slen + text | 
 |             sentsofar = 0 | 
 |             left = len(s) | 
 |             while left > 0: | 
 |                 sent = sock.send(s[sentsofar:]) | 
 |                 sentsofar += sent | 
 |                 left -= sent | 
 |             sock.close() | 
 |         finally: | 
 |             t.ready.wait(2.0) | 
 |             logging.config.stopListening() | 
 |             t.join(2.0) | 
 |  | 
 |     def test_listen_config_10_ok(self): | 
 |         with captured_stdout() as output: | 
 |             self.setup_via_listener(json.dumps(self.config10)) | 
 |             logger = logging.getLogger("compiler.parser") | 
 |             logger.warning(self.next_message()) | 
 |             logger = logging.getLogger('compiler') | 
 |             #Not output, because filtered | 
 |             logger.warning(self.next_message()) | 
 |             logger = logging.getLogger('compiler.lexer') | 
 |             #Not output, because filtered | 
 |             logger.warning(self.next_message()) | 
 |             logger = logging.getLogger("compiler.parser.codegen") | 
 |             #Output, as not filtered | 
 |             logger.error(self.next_message()) | 
 |             self.assert_log_lines([ | 
 |                 ('WARNING', '1'), | 
 |                 ('ERROR', '4'), | 
 |             ], stream=output) | 
 |  | 
 |     def test_listen_config_1_ok(self): | 
 |         with captured_stdout() as output: | 
 |             self.setup_via_listener(textwrap.dedent(ConfigFileTest.config1)) | 
 |             logger = logging.getLogger("compiler.parser") | 
 |             # Both will output a message | 
 |             logger.info(self.next_message()) | 
 |             logger.error(self.next_message()) | 
 |             self.assert_log_lines([ | 
 |                 ('INFO', '1'), | 
 |                 ('ERROR', '2'), | 
 |             ], stream=output) | 
 |             # Original logger output is empty. | 
 |             self.assert_log_lines([]) | 
 |  | 
 |  | 
 | class ManagerTest(BaseTest): | 
 |     def test_manager_loggerclass(self): | 
 |         logged = [] | 
 |  | 
 |         class MyLogger(logging.Logger): | 
 |             def _log(self, level, msg, args, exc_info=None, extra=None): | 
 |                 logged.append(msg) | 
 |  | 
 |         man = logging.Manager(None) | 
 |         self.assertRaises(TypeError, man.setLoggerClass, int) | 
 |         man.setLoggerClass(MyLogger) | 
 |         logger = man.getLogger('test') | 
 |         logger.warning('should appear in logged') | 
 |         logging.warning('should not appear in logged') | 
 |  | 
 |         self.assertEqual(logged, ['should appear in logged']) | 
 |  | 
 |  | 
 | class ChildLoggerTest(BaseTest): | 
 |     def test_child_loggers(self): | 
 |         r = logging.getLogger() | 
 |         l1 = logging.getLogger('abc') | 
 |         l2 = logging.getLogger('def.ghi') | 
 |         c1 = r.getChild('xyz') | 
 |         c2 = r.getChild('uvw.xyz') | 
 |         self.assertTrue(c1 is logging.getLogger('xyz')) | 
 |         self.assertTrue(c2 is logging.getLogger('uvw.xyz')) | 
 |         c1 = l1.getChild('def') | 
 |         c2 = c1.getChild('ghi') | 
 |         c3 = l1.getChild('def.ghi') | 
 |         self.assertTrue(c1 is logging.getLogger('abc.def')) | 
 |         self.assertTrue(c2 is logging.getLogger('abc.def.ghi')) | 
 |         self.assertTrue(c2 is c3) | 
 |  | 
 |  | 
 | class DerivedLogRecord(logging.LogRecord): | 
 |     pass | 
 |  | 
 | class LogRecordFactoryTest(BaseTest): | 
 |  | 
 |     def setUp(self): | 
 |         class CheckingFilter(logging.Filter): | 
 |             def __init__(self, cls): | 
 |                 self.cls = cls | 
 |  | 
 |             def filter(self, record): | 
 |                 t = type(record) | 
 |                 if t is not self.cls: | 
 |                     msg = 'Unexpected LogRecord type %s, expected %s' % (t, | 
 |                             self.cls) | 
 |                     raise TypeError(msg) | 
 |                 return True | 
 |  | 
 |         BaseTest.setUp(self) | 
 |         self.filter = CheckingFilter(DerivedLogRecord) | 
 |         self.root_logger.addFilter(self.filter) | 
 |         self.orig_factory = logging.getLogRecordFactory() | 
 |  | 
 |     def tearDown(self): | 
 |         self.root_logger.removeFilter(self.filter) | 
 |         BaseTest.tearDown(self) | 
 |         logging.setLogRecordFactory(self.orig_factory) | 
 |  | 
 |     def test_logrecord_class(self): | 
 |         self.assertRaises(TypeError, self.root_logger.warning, | 
 |                           self.next_message()) | 
 |         logging.setLogRecordFactory(DerivedLogRecord) | 
 |         self.root_logger.error(self.next_message()) | 
 |         self.assert_log_lines([ | 
 |            ('root', 'ERROR', '2'), | 
 |         ]) | 
 |  | 
 |  | 
 | class QueueHandlerTest(BaseTest): | 
 |     # Do not bother with a logger name group. | 
 |     expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$" | 
 |  | 
 |     def setUp(self): | 
 |         BaseTest.setUp(self) | 
 |         self.queue = queue.Queue(-1) | 
 |         self.que_hdlr = logging.handlers.QueueHandler(self.queue) | 
 |         self.que_logger = logging.getLogger('que') | 
 |         self.que_logger.propagate = False | 
 |         self.que_logger.setLevel(logging.WARNING) | 
 |         self.que_logger.addHandler(self.que_hdlr) | 
 |  | 
 |     def tearDown(self): | 
 |         self.que_hdlr.close() | 
 |         BaseTest.tearDown(self) | 
 |  | 
 |     def test_queue_handler(self): | 
 |         self.que_logger.debug(self.next_message()) | 
 |         self.assertRaises(queue.Empty, self.queue.get_nowait) | 
 |         self.que_logger.info(self.next_message()) | 
 |         self.assertRaises(queue.Empty, self.queue.get_nowait) | 
 |         msg = self.next_message() | 
 |         self.que_logger.warning(msg) | 
 |         data = self.queue.get_nowait() | 
 |         self.assertTrue(isinstance(data, logging.LogRecord)) | 
 |         self.assertEqual(data.name, self.que_logger.name) | 
 |         self.assertEqual((data.msg, data.args), (msg, None)) | 
 |  | 
 |     @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'), | 
 |                          'logging.handlers.QueueListener required for this test') | 
 |     def test_queue_listener(self): | 
 |         handler = TestHandler(Matcher()) | 
 |         listener = logging.handlers.QueueListener(self.queue, handler) | 
 |         listener.start() | 
 |         try: | 
 |             self.que_logger.warning(self.next_message()) | 
 |             self.que_logger.error(self.next_message()) | 
 |             self.que_logger.critical(self.next_message()) | 
 |         finally: | 
 |             listener.stop() | 
 |         self.assertTrue(handler.matches(levelno=logging.WARNING, message='1')) | 
 |         self.assertTrue(handler.matches(levelno=logging.ERROR, message='2')) | 
 |         self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='3')) | 
 |  | 
 |  | 
 | class FormatterTest(unittest.TestCase): | 
 |     def setUp(self): | 
 |         self.common = { | 
 |             'name': 'formatter.test', | 
 |             'level': logging.DEBUG, | 
 |             'pathname': os.path.join('path', 'to', 'dummy.ext'), | 
 |             'lineno': 42, | 
 |             'exc_info': None, | 
 |             'func': None, | 
 |             'msg': 'Message with %d %s', | 
 |             'args': (2, 'placeholders'), | 
 |         } | 
 |         self.variants = { | 
 |         } | 
 |  | 
 |     def get_record(self, name=None): | 
 |         result = dict(self.common) | 
 |         if name is not None: | 
 |             result.update(self.variants[name]) | 
 |         return logging.makeLogRecord(result) | 
 |  | 
 |     def test_percent(self): | 
 |         # Test %-formatting | 
 |         r = self.get_record() | 
 |         f = logging.Formatter('${%(message)s}') | 
 |         self.assertEqual(f.format(r), '${Message with 2 placeholders}') | 
 |         f = logging.Formatter('%(random)s') | 
 |         self.assertRaises(KeyError, f.format, r) | 
 |         self.assertFalse(f.usesTime()) | 
 |         f = logging.Formatter('%(asctime)s') | 
 |         self.assertTrue(f.usesTime()) | 
 |         f = logging.Formatter('%(asctime)-15s') | 
 |         self.assertTrue(f.usesTime()) | 
 |         f = logging.Formatter('asctime') | 
 |         self.assertFalse(f.usesTime()) | 
 |  | 
 |     def test_braces(self): | 
 |         # Test {}-formatting | 
 |         r = self.get_record() | 
 |         f = logging.Formatter('$%{message}%$', style='{') | 
 |         self.assertEqual(f.format(r), '$%Message with 2 placeholders%$') | 
 |         f = logging.Formatter('{random}', style='{') | 
 |         self.assertRaises(KeyError, f.format, r) | 
 |         self.assertFalse(f.usesTime()) | 
 |         f = logging.Formatter('{asctime}', style='{') | 
 |         self.assertTrue(f.usesTime()) | 
 |         f = logging.Formatter('{asctime!s:15}', style='{') | 
 |         self.assertTrue(f.usesTime()) | 
 |         f = logging.Formatter('{asctime:15}', style='{') | 
 |         self.assertTrue(f.usesTime()) | 
 |         f = logging.Formatter('asctime', style='{') | 
 |         self.assertFalse(f.usesTime()) | 
 |  | 
 |     def test_dollars(self): | 
 |         # Test $-formatting | 
 |         r = self.get_record() | 
 |         f = logging.Formatter('$message', style='$') | 
 |         self.assertEqual(f.format(r), 'Message with 2 placeholders') | 
 |         f = logging.Formatter('$$%${message}%$$', style='$') | 
 |         self.assertEqual(f.format(r), '$%Message with 2 placeholders%$') | 
 |         f = logging.Formatter('${random}', style='$') | 
 |         self.assertRaises(KeyError, f.format, r) | 
 |         self.assertFalse(f.usesTime()) | 
 |         f = logging.Formatter('${asctime}', style='$') | 
 |         self.assertTrue(f.usesTime()) | 
 |         f = logging.Formatter('${asctime', style='$') | 
 |         self.assertFalse(f.usesTime()) | 
 |         f = logging.Formatter('$asctime', style='$') | 
 |         self.assertTrue(f.usesTime()) | 
 |         f = logging.Formatter('asctime', style='$') | 
 |         self.assertFalse(f.usesTime()) | 
 |  | 
 | class LastResortTest(BaseTest): | 
 |     def test_last_resort(self): | 
 |         # Test the last resort handler | 
 |         root = self.root_logger | 
 |         root.removeHandler(self.root_hdlr) | 
 |         old_stderr = sys.stderr | 
 |         old_lastresort = logging.lastResort | 
 |         old_raise_exceptions = logging.raiseExceptions | 
 |         try: | 
 |             sys.stderr = sio = io.StringIO() | 
 |             root.warning('This is your final chance!') | 
 |             self.assertEqual(sio.getvalue(), 'This is your final chance!\n') | 
 |             #No handlers and no last resort, so 'No handlers' message | 
 |             logging.lastResort = None | 
 |             sys.stderr = sio = io.StringIO() | 
 |             root.warning('This is your final chance!') | 
 |             self.assertEqual(sio.getvalue(), 'No handlers could be found for logger "root"\n') | 
 |             # 'No handlers' message only printed once | 
 |             sys.stderr = sio = io.StringIO() | 
 |             root.warning('This is your final chance!') | 
 |             self.assertEqual(sio.getvalue(), '') | 
 |             root.manager.emittedNoHandlerWarning = False | 
 |             #If raiseExceptions is False, no message is printed | 
 |             logging.raiseExceptions = False | 
 |             sys.stderr = sio = io.StringIO() | 
 |             root.warning('This is your final chance!') | 
 |             self.assertEqual(sio.getvalue(), '') | 
 |         finally: | 
 |             sys.stderr = old_stderr | 
 |             root.addHandler(self.root_hdlr) | 
 |             logging.lastResort = old_lastresort | 
 |             logging.raiseExceptions = old_raise_exceptions | 
 |  | 
 |  | 
 | class BaseFileTest(BaseTest): | 
 |     "Base class for handler tests that write log files" | 
 |  | 
 |     def setUp(self): | 
 |         BaseTest.setUp(self) | 
 |         fd, self.fn = tempfile.mkstemp(".log", "test_logging-2-") | 
 |         os.close(fd) | 
 |         self.rmfiles = [] | 
 |  | 
 |     def tearDown(self): | 
 |         for fn in self.rmfiles: | 
 |             os.unlink(fn) | 
 |         if os.path.exists(self.fn): | 
 |             os.unlink(self.fn) | 
 |         BaseTest.tearDown(self) | 
 |  | 
 |     def assertLogFile(self, filename): | 
 |         "Assert a log file is there and register it for deletion" | 
 |         self.assertTrue(os.path.exists(filename), | 
 |                         msg="Log file %r does not exist") | 
 |         self.rmfiles.append(filename) | 
 |  | 
 |  | 
 | class RotatingFileHandlerTest(BaseFileTest): | 
 |     def next_rec(self): | 
 |         return logging.LogRecord('n', logging.DEBUG, 'p', 1, | 
 |                                  self.next_message(), None, None, None) | 
 |  | 
 |     def test_should_not_rollover(self): | 
 |         # If maxbytes is zero rollover never occurs | 
 |         rh = logging.handlers.RotatingFileHandler(self.fn, maxBytes=0) | 
 |         self.assertFalse(rh.shouldRollover(None)) | 
 |         rh.close() | 
 |  | 
 |     def test_should_rollover(self): | 
 |         rh = logging.handlers.RotatingFileHandler(self.fn, maxBytes=1) | 
 |         self.assertTrue(rh.shouldRollover(self.next_rec())) | 
 |         rh.close() | 
 |  | 
 |     def test_file_created(self): | 
 |         # checks that the file is created and assumes it was created | 
 |         # by us | 
 |         rh = logging.handlers.RotatingFileHandler(self.fn) | 
 |         rh.emit(self.next_rec()) | 
 |         self.assertLogFile(self.fn) | 
 |         rh.close() | 
 |  | 
 |     def test_rollover_filenames(self): | 
 |         rh = logging.handlers.RotatingFileHandler( | 
 |             self.fn, backupCount=2, maxBytes=1) | 
 |         rh.emit(self.next_rec()) | 
 |         self.assertLogFile(self.fn) | 
 |         rh.emit(self.next_rec()) | 
 |         self.assertLogFile(self.fn + ".1") | 
 |         rh.emit(self.next_rec()) | 
 |         self.assertLogFile(self.fn + ".2") | 
 |         self.assertFalse(os.path.exists(self.fn + ".3")) | 
 |         rh.close() | 
 |  | 
 | class TimedRotatingFileHandlerTest(BaseFileTest): | 
 |     # test methods added below | 
 |     pass | 
 |  | 
 | def secs(**kw): | 
 |     return datetime.timedelta(**kw) // datetime.timedelta(seconds=1) | 
 |  | 
 | for when, exp in (('S', 1), | 
 |                   ('M', 60), | 
 |                   ('H', 60 * 60), | 
 |                   ('D', 60 * 60 * 24), | 
 |                   ('MIDNIGHT', 60 * 60 * 24), | 
 |                   # current time (epoch start) is a Thursday, W0 means Monday | 
 |                   ('W0', secs(days=4, hours=24)), | 
 |                  ): | 
 |     def test_compute_rollover(self, when=when, exp=exp): | 
 |         rh = logging.handlers.TimedRotatingFileHandler( | 
 |             self.fn, when=when, interval=1, backupCount=0, utc=True) | 
 |         currentTime = 0.0 | 
 |         actual = rh.computeRollover(currentTime) | 
 |         if exp != actual: | 
 |             # Failures occur on some systems for MIDNIGHT and W0. | 
 |             # Print detailed calculation for MIDNIGHT so we can try to see | 
 |             # what's going on | 
 |             import time | 
 |             if when == 'MIDNIGHT': | 
 |                 try: | 
 |                     if rh.utc: | 
 |                         t = time.gmtime(currentTime) | 
 |                     else: | 
 |                         t = time.localtime(currentTime) | 
 |                     currentHour = t[3] | 
 |                     currentMinute = t[4] | 
 |                     currentSecond = t[5] | 
 |                     # r is the number of seconds left between now and midnight | 
 |                     r = logging.handlers._MIDNIGHT - ((currentHour * 60 + | 
 |                                                        currentMinute) * 60 + | 
 |                             currentSecond) | 
 |                     result = currentTime + r | 
 |                     print('t: %s (%s)' % (t, rh.utc), file=sys.stderr) | 
 |                     print('currentHour: %s' % currentHour, file=sys.stderr) | 
 |                     print('currentMinute: %s' % currentMinute, file=sys.stderr) | 
 |                     print('currentSecond: %s' % currentSecond, file=sys.stderr) | 
 |                     print('r: %s' % r, file=sys.stderr) | 
 |                     print('result: %s' % result, file=sys.stderr) | 
 |                 except Exception: | 
 |                     print('exception in diagnostic code: %s' % sys.exc_info()[1], file=sys.stderr) | 
 |         self.assertEqual(exp, actual) | 
 |         rh.close() | 
 |     setattr(TimedRotatingFileHandlerTest, "test_compute_rollover_%s" % when, test_compute_rollover) | 
 |  | 
 | # Set the locale to the platform-dependent default.  I have no idea | 
 | # why the test does this, but in any case we save the current locale | 
 | # first and restore it at the end. | 
 | @run_with_locale('LC_ALL', '') | 
 | def test_main(): | 
 |     run_unittest(BuiltinLevelsTest, BasicFilterTest, | 
 |                  CustomLevelsAndFiltersTest, MemoryHandlerTest, | 
 |                  ConfigFileTest, SocketHandlerTest, MemoryTest, | 
 |                  EncodingTest, WarningsTest, ConfigDictTest, ManagerTest, | 
 |                  FormatterTest, | 
 |                  LogRecordFactoryTest, ChildLoggerTest, QueueHandlerTest, | 
 |                  RotatingFileHandlerTest, | 
 |                  LastResortTest, | 
 |                  TimedRotatingFileHandlerTest | 
 |                 ) | 
 |  | 
 | if __name__ == "__main__": | 
 |     test_main() |