| #!/usr/bin/env python |
| # |
| # Copyright 2001-2010 by Vinay Sajip. All Rights Reserved. |
| # |
| # Permission to use, copy, modify, and distribute this software and its |
| # documentation for any purpose and without fee is hereby granted, |
| # provided that the above copyright notice appear in all copies and that |
| # both that copyright notice and this permission notice appear in |
| # supporting documentation, and that the name of Vinay Sajip |
| # not be used in advertising or publicity pertaining to distribution |
| # of the software without specific, written prior permission. |
| # VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING |
| # ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL |
| # VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR |
| # ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER |
| # IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT |
| # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. |
| |
| """Test harness for the logging module. Run all tests. |
| |
| Copyright (C) 2001-2010 Vinay Sajip. All Rights Reserved. |
| """ |
| |
| import logging |
| import logging.handlers |
| import logging.config |
| |
| import codecs |
| import datetime |
| import pickle |
| import io |
| import gc |
| import json |
| import os |
| import queue |
| import re |
| import select |
| import socket |
| from socketserver import ThreadingTCPServer, StreamRequestHandler |
| import struct |
| import sys |
| import tempfile |
| from test.support import captured_stdout, run_with_locale, run_unittest |
| import textwrap |
| import unittest |
| import warnings |
| import weakref |
| try: |
| import threading |
| except ImportError: |
| threading = None |
| |
| |
| class BaseTest(unittest.TestCase): |
| |
| """Base class for logging tests.""" |
| |
| log_format = "%(name)s -> %(levelname)s: %(message)s" |
| expected_log_pat = r"^([\w.]+) -> ([\w]+): ([\d]+)$" |
| message_num = 0 |
| |
| def setUp(self): |
| """Setup the default logging stream to an internal StringIO instance, |
| so that we can examine log output as we want.""" |
| logger_dict = logging.getLogger().manager.loggerDict |
| logging._acquireLock() |
| try: |
| self.saved_handlers = logging._handlers.copy() |
| self.saved_handler_list = logging._handlerList[:] |
| self.saved_loggers = logger_dict.copy() |
| self.saved_level_names = logging._levelNames.copy() |
| finally: |
| logging._releaseLock() |
| |
| # Set two unused loggers: one non-ASCII and one Unicode. |
| # This is to test correct operation when sorting existing |
| # loggers in the configuration code. See issue 8201. |
| self.logger1 = logging.getLogger("\xab\xd7\xbb") |
| self.logger2 = logging.getLogger("\u013f\u00d6\u0047") |
| |
| self.root_logger = logging.getLogger("") |
| self.original_logging_level = self.root_logger.getEffectiveLevel() |
| |
| self.stream = io.StringIO() |
| self.root_logger.setLevel(logging.DEBUG) |
| self.root_hdlr = logging.StreamHandler(self.stream) |
| self.root_formatter = logging.Formatter(self.log_format) |
| self.root_hdlr.setFormatter(self.root_formatter) |
| self.assertFalse(self.logger1.hasHandlers()) |
| self.assertFalse(self.logger2.hasHandlers()) |
| self.root_logger.addHandler(self.root_hdlr) |
| self.assertTrue(self.logger1.hasHandlers()) |
| self.assertTrue(self.logger2.hasHandlers()) |
| |
| def tearDown(self): |
| """Remove our logging stream, and restore the original logging |
| level.""" |
| self.stream.close() |
| self.root_logger.removeHandler(self.root_hdlr) |
| while self.root_logger.handlers: |
| h = self.root_logger.handlers[0] |
| self.root_logger.removeHandler(h) |
| h.close() |
| self.root_logger.setLevel(self.original_logging_level) |
| logging._acquireLock() |
| try: |
| logging._levelNames.clear() |
| logging._levelNames.update(self.saved_level_names) |
| logging._handlers.clear() |
| logging._handlers.update(self.saved_handlers) |
| logging._handlerList[:] = self.saved_handler_list |
| loggerDict = logging.getLogger().manager.loggerDict |
| loggerDict.clear() |
| loggerDict.update(self.saved_loggers) |
| finally: |
| logging._releaseLock() |
| |
| def assert_log_lines(self, expected_values, stream=None): |
| """Match the collected log lines against the regular expression |
| self.expected_log_pat, and compare the extracted group values to |
| the expected_values list of tuples.""" |
| stream = stream or self.stream |
| pat = re.compile(self.expected_log_pat) |
| try: |
| stream.reset() |
| actual_lines = stream.readlines() |
| except AttributeError: |
| # StringIO.StringIO lacks a reset() method. |
| actual_lines = stream.getvalue().splitlines() |
| self.assertEquals(len(actual_lines), len(expected_values)) |
| for actual, expected in zip(actual_lines, expected_values): |
| match = pat.search(actual) |
| if not match: |
| self.fail("Log line does not match expected pattern:\n" + |
| actual) |
| self.assertEquals(tuple(match.groups()), expected) |
| s = stream.read() |
| if s: |
| self.fail("Remaining output at end of log stream:\n" + s) |
| |
| def next_message(self): |
| """Generate a message consisting solely of an auto-incrementing |
| integer.""" |
| self.message_num += 1 |
| return "%d" % self.message_num |
| |
| |
| class BuiltinLevelsTest(BaseTest): |
| """Test builtin levels and their inheritance.""" |
| |
| def test_flat(self): |
| #Logging levels in a flat logger namespace. |
| m = self.next_message |
| |
| ERR = logging.getLogger("ERR") |
| ERR.setLevel(logging.ERROR) |
| INF = logging.LoggerAdapter(logging.getLogger("INF"), {}) |
| INF.setLevel(logging.INFO) |
| DEB = logging.getLogger("DEB") |
| DEB.setLevel(logging.DEBUG) |
| |
| # These should log. |
| ERR.log(logging.CRITICAL, m()) |
| ERR.error(m()) |
| |
| INF.log(logging.CRITICAL, m()) |
| INF.error(m()) |
| INF.warn(m()) |
| INF.info(m()) |
| |
| DEB.log(logging.CRITICAL, m()) |
| DEB.error(m()) |
| DEB.warn (m()) |
| DEB.info (m()) |
| DEB.debug(m()) |
| |
| # These should not log. |
| ERR.warn(m()) |
| ERR.info(m()) |
| ERR.debug(m()) |
| |
| INF.debug(m()) |
| |
| self.assert_log_lines([ |
| ('ERR', 'CRITICAL', '1'), |
| ('ERR', 'ERROR', '2'), |
| ('INF', 'CRITICAL', '3'), |
| ('INF', 'ERROR', '4'), |
| ('INF', 'WARNING', '5'), |
| ('INF', 'INFO', '6'), |
| ('DEB', 'CRITICAL', '7'), |
| ('DEB', 'ERROR', '8'), |
| ('DEB', 'WARNING', '9'), |
| ('DEB', 'INFO', '10'), |
| ('DEB', 'DEBUG', '11'), |
| ]) |
| |
| def test_nested_explicit(self): |
| # Logging levels in a nested namespace, all explicitly set. |
| m = self.next_message |
| |
| INF = logging.getLogger("INF") |
| INF.setLevel(logging.INFO) |
| INF_ERR = logging.getLogger("INF.ERR") |
| INF_ERR.setLevel(logging.ERROR) |
| |
| # These should log. |
| INF_ERR.log(logging.CRITICAL, m()) |
| INF_ERR.error(m()) |
| |
| # These should not log. |
| INF_ERR.warn(m()) |
| INF_ERR.info(m()) |
| INF_ERR.debug(m()) |
| |
| self.assert_log_lines([ |
| ('INF.ERR', 'CRITICAL', '1'), |
| ('INF.ERR', 'ERROR', '2'), |
| ]) |
| |
| def test_nested_inherited(self): |
| #Logging levels in a nested namespace, inherited from parent loggers. |
| m = self.next_message |
| |
| INF = logging.getLogger("INF") |
| INF.setLevel(logging.INFO) |
| INF_ERR = logging.getLogger("INF.ERR") |
| INF_ERR.setLevel(logging.ERROR) |
| INF_UNDEF = logging.getLogger("INF.UNDEF") |
| INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF") |
| UNDEF = logging.getLogger("UNDEF") |
| |
| # These should log. |
| INF_UNDEF.log(logging.CRITICAL, m()) |
| INF_UNDEF.error(m()) |
| INF_UNDEF.warn(m()) |
| INF_UNDEF.info(m()) |
| INF_ERR_UNDEF.log(logging.CRITICAL, m()) |
| INF_ERR_UNDEF.error(m()) |
| |
| # These should not log. |
| INF_UNDEF.debug(m()) |
| INF_ERR_UNDEF.warn(m()) |
| INF_ERR_UNDEF.info(m()) |
| INF_ERR_UNDEF.debug(m()) |
| |
| self.assert_log_lines([ |
| ('INF.UNDEF', 'CRITICAL', '1'), |
| ('INF.UNDEF', 'ERROR', '2'), |
| ('INF.UNDEF', 'WARNING', '3'), |
| ('INF.UNDEF', 'INFO', '4'), |
| ('INF.ERR.UNDEF', 'CRITICAL', '5'), |
| ('INF.ERR.UNDEF', 'ERROR', '6'), |
| ]) |
| |
| def test_nested_with_virtual_parent(self): |
| # Logging levels when some parent does not exist yet. |
| m = self.next_message |
| |
| INF = logging.getLogger("INF") |
| GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF") |
| CHILD = logging.getLogger("INF.BADPARENT") |
| INF.setLevel(logging.INFO) |
| |
| # These should log. |
| GRANDCHILD.log(logging.FATAL, m()) |
| GRANDCHILD.info(m()) |
| CHILD.log(logging.FATAL, m()) |
| CHILD.info(m()) |
| |
| # These should not log. |
| GRANDCHILD.debug(m()) |
| CHILD.debug(m()) |
| |
| self.assert_log_lines([ |
| ('INF.BADPARENT.UNDEF', 'CRITICAL', '1'), |
| ('INF.BADPARENT.UNDEF', 'INFO', '2'), |
| ('INF.BADPARENT', 'CRITICAL', '3'), |
| ('INF.BADPARENT', 'INFO', '4'), |
| ]) |
| |
| |
| class BasicFilterTest(BaseTest): |
| |
| """Test the bundled Filter class.""" |
| |
| def test_filter(self): |
| # Only messages satisfying the specified criteria pass through the |
| # filter. |
| filter_ = logging.Filter("spam.eggs") |
| handler = self.root_logger.handlers[0] |
| try: |
| handler.addFilter(filter_) |
| spam = logging.getLogger("spam") |
| spam_eggs = logging.getLogger("spam.eggs") |
| spam_eggs_fish = logging.getLogger("spam.eggs.fish") |
| spam_bakedbeans = logging.getLogger("spam.bakedbeans") |
| |
| spam.info(self.next_message()) |
| spam_eggs.info(self.next_message()) # Good. |
| spam_eggs_fish.info(self.next_message()) # Good. |
| spam_bakedbeans.info(self.next_message()) |
| |
| self.assert_log_lines([ |
| ('spam.eggs', 'INFO', '2'), |
| ('spam.eggs.fish', 'INFO', '3'), |
| ]) |
| finally: |
| handler.removeFilter(filter_) |
| |
| |
| # |
| # First, we define our levels. There can be as many as you want - the only |
| # limitations are that they should be integers, the lowest should be > 0 and |
| # larger values mean less information being logged. If you need specific |
| # level values which do not fit into these limitations, you can use a |
| # mapping dictionary to convert between your application levels and the |
| # logging system. |
| # |
| SILENT = 120 |
| TACITURN = 119 |
| TERSE = 118 |
| EFFUSIVE = 117 |
| SOCIABLE = 116 |
| VERBOSE = 115 |
| TALKATIVE = 114 |
| GARRULOUS = 113 |
| CHATTERBOX = 112 |
| BORING = 111 |
| |
| LEVEL_RANGE = range(BORING, SILENT + 1) |
| |
| # |
| # Next, we define names for our levels. You don't need to do this - in which |
| # case the system will use "Level n" to denote the text for the level. |
| # |
| my_logging_levels = { |
| SILENT : 'Silent', |
| TACITURN : 'Taciturn', |
| TERSE : 'Terse', |
| EFFUSIVE : 'Effusive', |
| SOCIABLE : 'Sociable', |
| VERBOSE : 'Verbose', |
| TALKATIVE : 'Talkative', |
| GARRULOUS : 'Garrulous', |
| CHATTERBOX : 'Chatterbox', |
| BORING : 'Boring', |
| } |
| |
| class GarrulousFilter(logging.Filter): |
| |
| """A filter which blocks garrulous messages.""" |
| |
| def filter(self, record): |
| return record.levelno != GARRULOUS |
| |
| class VerySpecificFilter(logging.Filter): |
| |
| """A filter which blocks sociable and taciturn messages.""" |
| |
| def filter(self, record): |
| return record.levelno not in [SOCIABLE, TACITURN] |
| |
| |
| class CustomLevelsAndFiltersTest(BaseTest): |
| |
| """Test various filtering possibilities with custom logging levels.""" |
| |
| # Skip the logger name group. |
| expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$" |
| |
| def setUp(self): |
| BaseTest.setUp(self) |
| for k, v in my_logging_levels.items(): |
| logging.addLevelName(k, v) |
| |
| def log_at_all_levels(self, logger): |
| for lvl in LEVEL_RANGE: |
| logger.log(lvl, self.next_message()) |
| |
| def test_logger_filter(self): |
| # Filter at logger level. |
| self.root_logger.setLevel(VERBOSE) |
| # Levels >= 'Verbose' are good. |
| self.log_at_all_levels(self.root_logger) |
| self.assert_log_lines([ |
| ('Verbose', '5'), |
| ('Sociable', '6'), |
| ('Effusive', '7'), |
| ('Terse', '8'), |
| ('Taciturn', '9'), |
| ('Silent', '10'), |
| ]) |
| |
| def test_handler_filter(self): |
| # Filter at handler level. |
| self.root_logger.handlers[0].setLevel(SOCIABLE) |
| try: |
| # Levels >= 'Sociable' are good. |
| self.log_at_all_levels(self.root_logger) |
| self.assert_log_lines([ |
| ('Sociable', '6'), |
| ('Effusive', '7'), |
| ('Terse', '8'), |
| ('Taciturn', '9'), |
| ('Silent', '10'), |
| ]) |
| finally: |
| self.root_logger.handlers[0].setLevel(logging.NOTSET) |
| |
| def test_specific_filters(self): |
| # Set a specific filter object on the handler, and then add another |
| # filter object on the logger itself. |
| handler = self.root_logger.handlers[0] |
| specific_filter = None |
| garr = GarrulousFilter() |
| handler.addFilter(garr) |
| try: |
| self.log_at_all_levels(self.root_logger) |
| first_lines = [ |
| # Notice how 'Garrulous' is missing |
| ('Boring', '1'), |
| ('Chatterbox', '2'), |
| ('Talkative', '4'), |
| ('Verbose', '5'), |
| ('Sociable', '6'), |
| ('Effusive', '7'), |
| ('Terse', '8'), |
| ('Taciturn', '9'), |
| ('Silent', '10'), |
| ] |
| self.assert_log_lines(first_lines) |
| |
| specific_filter = VerySpecificFilter() |
| self.root_logger.addFilter(specific_filter) |
| self.log_at_all_levels(self.root_logger) |
| self.assert_log_lines(first_lines + [ |
| # Not only 'Garrulous' is still missing, but also 'Sociable' |
| # and 'Taciturn' |
| ('Boring', '11'), |
| ('Chatterbox', '12'), |
| ('Talkative', '14'), |
| ('Verbose', '15'), |
| ('Effusive', '17'), |
| ('Terse', '18'), |
| ('Silent', '20'), |
| ]) |
| finally: |
| if specific_filter: |
| self.root_logger.removeFilter(specific_filter) |
| handler.removeFilter(garr) |
| |
| |
| class MemoryHandlerTest(BaseTest): |
| |
| """Tests for the MemoryHandler.""" |
| |
| # Do not bother with a logger name group. |
| expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$" |
| |
| def setUp(self): |
| BaseTest.setUp(self) |
| self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING, |
| self.root_hdlr) |
| self.mem_logger = logging.getLogger('mem') |
| self.mem_logger.propagate = 0 |
| self.mem_logger.addHandler(self.mem_hdlr) |
| |
| def tearDown(self): |
| self.mem_hdlr.close() |
| BaseTest.tearDown(self) |
| |
| def test_flush(self): |
| # The memory handler flushes to its target handler based on specific |
| # criteria (message count and message level). |
| self.mem_logger.debug(self.next_message()) |
| self.assert_log_lines([]) |
| self.mem_logger.info(self.next_message()) |
| self.assert_log_lines([]) |
| # This will flush because the level is >= logging.WARNING |
| self.mem_logger.warn(self.next_message()) |
| lines = [ |
| ('DEBUG', '1'), |
| ('INFO', '2'), |
| ('WARNING', '3'), |
| ] |
| self.assert_log_lines(lines) |
| for n in (4, 14): |
| for i in range(9): |
| self.mem_logger.debug(self.next_message()) |
| self.assert_log_lines(lines) |
| # This will flush because it's the 10th message since the last |
| # flush. |
| self.mem_logger.debug(self.next_message()) |
| lines = lines + [('DEBUG', str(i)) for i in range(n, n + 10)] |
| self.assert_log_lines(lines) |
| |
| self.mem_logger.debug(self.next_message()) |
| self.assert_log_lines(lines) |
| |
| |
| class ExceptionFormatter(logging.Formatter): |
| """A special exception formatter.""" |
| def formatException(self, ei): |
| return "Got a [%s]" % ei[0].__name__ |
| |
| |
| class ConfigFileTest(BaseTest): |
| |
| """Reading logging config from a .ini-style config file.""" |
| |
| expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$" |
| |
| # config0 is a standard configuration. |
| config0 = """ |
| [loggers] |
| keys=root |
| |
| [handlers] |
| keys=hand1 |
| |
| [formatters] |
| keys=form1 |
| |
| [logger_root] |
| level=WARNING |
| handlers=hand1 |
| |
| [handler_hand1] |
| class=StreamHandler |
| level=NOTSET |
| formatter=form1 |
| args=(sys.stdout,) |
| |
| [formatter_form1] |
| format=%(levelname)s ++ %(message)s |
| datefmt= |
| """ |
| |
| # config1 adds a little to the standard configuration. |
| config1 = """ |
| [loggers] |
| keys=root,parser |
| |
| [handlers] |
| keys=hand1 |
| |
| [formatters] |
| keys=form1 |
| |
| [logger_root] |
| level=WARNING |
| handlers= |
| |
| [logger_parser] |
| level=DEBUG |
| handlers=hand1 |
| propagate=1 |
| qualname=compiler.parser |
| |
| [handler_hand1] |
| class=StreamHandler |
| level=NOTSET |
| formatter=form1 |
| args=(sys.stdout,) |
| |
| [formatter_form1] |
| format=%(levelname)s ++ %(message)s |
| datefmt= |
| """ |
| |
| # config2 has a subtle configuration error that should be reported |
| config2 = config1.replace("sys.stdout", "sys.stbout") |
| |
| # config3 has a less subtle configuration error |
| config3 = config1.replace("formatter=form1", "formatter=misspelled_name") |
| |
| # config4 specifies a custom formatter class to be loaded |
| config4 = """ |
| [loggers] |
| keys=root |
| |
| [handlers] |
| keys=hand1 |
| |
| [formatters] |
| keys=form1 |
| |
| [logger_root] |
| level=NOTSET |
| handlers=hand1 |
| |
| [handler_hand1] |
| class=StreamHandler |
| level=NOTSET |
| formatter=form1 |
| args=(sys.stdout,) |
| |
| [formatter_form1] |
| class=""" + __name__ + """.ExceptionFormatter |
| format=%(levelname)s:%(name)s:%(message)s |
| datefmt= |
| """ |
| |
| # config5 specifies a custom handler class to be loaded |
| config5 = config1.replace('class=StreamHandler', 'class=logging.StreamHandler') |
| |
| # config6 uses ', ' delimiters in the handlers and formatters sections |
| config6 = """ |
| [loggers] |
| keys=root,parser |
| |
| [handlers] |
| keys=hand1, hand2 |
| |
| [formatters] |
| keys=form1, form2 |
| |
| [logger_root] |
| level=WARNING |
| handlers= |
| |
| [logger_parser] |
| level=DEBUG |
| handlers=hand1 |
| propagate=1 |
| qualname=compiler.parser |
| |
| [handler_hand1] |
| class=StreamHandler |
| level=NOTSET |
| formatter=form1 |
| args=(sys.stdout,) |
| |
| [handler_hand2] |
| class=StreamHandler |
| level=NOTSET |
| formatter=form1 |
| args=(sys.stderr,) |
| |
| [formatter_form1] |
| format=%(levelname)s ++ %(message)s |
| datefmt= |
| |
| [formatter_form2] |
| format=%(message)s |
| datefmt= |
| """ |
| |
| def apply_config(self, conf): |
| file = io.StringIO(textwrap.dedent(conf)) |
| logging.config.fileConfig(file) |
| |
| def test_config0_ok(self): |
| # A simple config file which overrides the default settings. |
| with captured_stdout() as output: |
| self.apply_config(self.config0) |
| logger = logging.getLogger() |
| # Won't output anything |
| logger.info(self.next_message()) |
| # Outputs a message |
| logger.error(self.next_message()) |
| self.assert_log_lines([ |
| ('ERROR', '2'), |
| ], stream=output) |
| # Original logger output is empty. |
| self.assert_log_lines([]) |
| |
| def test_config1_ok(self, config=config1): |
| # A config file defining a sub-parser as well. |
| with captured_stdout() as output: |
| self.apply_config(config) |
| logger = logging.getLogger("compiler.parser") |
| # Both will output a message |
| logger.info(self.next_message()) |
| logger.error(self.next_message()) |
| self.assert_log_lines([ |
| ('INFO', '1'), |
| ('ERROR', '2'), |
| ], stream=output) |
| # Original logger output is empty. |
| self.assert_log_lines([]) |
| |
| def test_config2_failure(self): |
| # A simple config file which overrides the default settings. |
| self.assertRaises(Exception, self.apply_config, self.config2) |
| |
| def test_config3_failure(self): |
| # A simple config file which overrides the default settings. |
| self.assertRaises(Exception, self.apply_config, self.config3) |
| |
| def test_config4_ok(self): |
| # A config file specifying a custom formatter class. |
| with captured_stdout() as output: |
| self.apply_config(self.config4) |
| logger = logging.getLogger() |
| try: |
| raise RuntimeError() |
| except RuntimeError: |
| logging.exception("just testing") |
| sys.stdout.seek(0) |
| self.assertEquals(output.getvalue(), |
| "ERROR:root:just testing\nGot a [RuntimeError]\n") |
| # Original logger output is empty |
| self.assert_log_lines([]) |
| |
| def test_config5_ok(self): |
| self.test_config1_ok(config=self.config5) |
| |
| def test_config6_ok(self): |
| self.test_config1_ok(config=self.config6) |
| |
| class LogRecordStreamHandler(StreamRequestHandler): |
| |
| """Handler for a streaming logging request. It saves the log message in the |
| TCP server's 'log_output' attribute.""" |
| |
| TCP_LOG_END = "!!!END!!!" |
| |
| def handle(self): |
| """Handle multiple requests - each expected to be of 4-byte length, |
| followed by the LogRecord in pickle format. Logs the record |
| according to whatever policy is configured locally.""" |
| while True: |
| chunk = self.connection.recv(4) |
| if len(chunk) < 4: |
| break |
| slen = struct.unpack(">L", chunk)[0] |
| chunk = self.connection.recv(slen) |
| while len(chunk) < slen: |
| chunk = chunk + self.connection.recv(slen - len(chunk)) |
| obj = self.unpickle(chunk) |
| record = logging.makeLogRecord(obj) |
| self.handle_log_record(record) |
| |
| def unpickle(self, data): |
| return pickle.loads(data) |
| |
| def handle_log_record(self, record): |
| # If the end-of-messages sentinel is seen, tell the server to |
| # terminate. |
| if self.TCP_LOG_END in record.msg: |
| self.server.abort = 1 |
| return |
| self.server.log_output += record.msg + "\n" |
| |
| |
| class LogRecordSocketReceiver(ThreadingTCPServer): |
| |
| """A simple-minded TCP socket-based logging receiver suitable for test |
| purposes.""" |
| |
| allow_reuse_address = 1 |
| log_output = "" |
| |
| def __init__(self, host='localhost', |
| port=logging.handlers.DEFAULT_TCP_LOGGING_PORT, |
| handler=LogRecordStreamHandler): |
| ThreadingTCPServer.__init__(self, (host, port), handler) |
| self.abort = False |
| self.timeout = 0.1 |
| self.finished = threading.Event() |
| |
| def serve_until_stopped(self): |
| while not self.abort: |
| rd, wr, ex = select.select([self.socket.fileno()], [], [], |
| self.timeout) |
| if rd: |
| self.handle_request() |
| # Notify the main thread that we're about to exit |
| self.finished.set() |
| # close the listen socket |
| self.server_close() |
| |
| |
| @unittest.skipUnless(threading, 'Threading required for this test.') |
| class SocketHandlerTest(BaseTest): |
| |
| """Test for SocketHandler objects.""" |
| |
| def setUp(self): |
| """Set up a TCP server to receive log messages, and a SocketHandler |
| pointing to that server's address and port.""" |
| BaseTest.setUp(self) |
| self.tcpserver = LogRecordSocketReceiver(port=0) |
| self.port = self.tcpserver.socket.getsockname()[1] |
| self.threads = [ |
| threading.Thread(target=self.tcpserver.serve_until_stopped)] |
| for thread in self.threads: |
| thread.start() |
| |
| self.sock_hdlr = logging.handlers.SocketHandler('localhost', self.port) |
| self.sock_hdlr.setFormatter(self.root_formatter) |
| self.root_logger.removeHandler(self.root_logger.handlers[0]) |
| self.root_logger.addHandler(self.sock_hdlr) |
| |
| def tearDown(self): |
| """Shutdown the TCP server.""" |
| try: |
| self.tcpserver.abort = True |
| del self.tcpserver |
| self.root_logger.removeHandler(self.sock_hdlr) |
| self.sock_hdlr.close() |
| for thread in self.threads: |
| thread.join(2.0) |
| finally: |
| BaseTest.tearDown(self) |
| |
| def get_output(self): |
| """Get the log output as received by the TCP server.""" |
| # Signal the TCP receiver and wait for it to terminate. |
| self.root_logger.critical(LogRecordStreamHandler.TCP_LOG_END) |
| self.tcpserver.finished.wait(2.0) |
| return self.tcpserver.log_output |
| |
| def test_output(self): |
| # The log message sent to the SocketHandler is properly received. |
| logger = logging.getLogger("tcp") |
| logger.error("spam") |
| logger.debug("eggs") |
| self.assertEquals(self.get_output(), "spam\neggs\n") |
| |
| |
| class MemoryTest(BaseTest): |
| |
| """Test memory persistence of logger objects.""" |
| |
| def setUp(self): |
| """Create a dict to remember potentially destroyed objects.""" |
| BaseTest.setUp(self) |
| self._survivors = {} |
| |
| def _watch_for_survival(self, *args): |
| """Watch the given objects for survival, by creating weakrefs to |
| them.""" |
| for obj in args: |
| key = id(obj), repr(obj) |
| self._survivors[key] = weakref.ref(obj) |
| |
| def _assertTruesurvival(self): |
| """Assert that all objects watched for survival have survived.""" |
| # Trigger cycle breaking. |
| gc.collect() |
| dead = [] |
| for (id_, repr_), ref in self._survivors.items(): |
| if ref() is None: |
| dead.append(repr_) |
| if dead: |
| self.fail("%d objects should have survived " |
| "but have been destroyed: %s" % (len(dead), ", ".join(dead))) |
| |
| def test_persistent_loggers(self): |
| # Logger objects are persistent and retain their configuration, even |
| # if visible references are destroyed. |
| self.root_logger.setLevel(logging.INFO) |
| foo = logging.getLogger("foo") |
| self._watch_for_survival(foo) |
| foo.setLevel(logging.DEBUG) |
| self.root_logger.debug(self.next_message()) |
| foo.debug(self.next_message()) |
| self.assert_log_lines([ |
| ('foo', 'DEBUG', '2'), |
| ]) |
| del foo |
| # foo has survived. |
| self._assertTruesurvival() |
| # foo has retained its settings. |
| bar = logging.getLogger("foo") |
| bar.debug(self.next_message()) |
| self.assert_log_lines([ |
| ('foo', 'DEBUG', '2'), |
| ('foo', 'DEBUG', '3'), |
| ]) |
| |
| |
| class EncodingTest(BaseTest): |
| def test_encoding_plain_file(self): |
| # In Python 2.x, a plain file object is treated as having no encoding. |
| log = logging.getLogger("test") |
| fn = tempfile.mktemp(".log") |
| # the non-ascii data we write to the log. |
| data = "foo\x80" |
| try: |
| handler = logging.FileHandler(fn, encoding="utf-8") |
| log.addHandler(handler) |
| try: |
| # write non-ascii data to the log. |
| log.warning(data) |
| finally: |
| log.removeHandler(handler) |
| handler.close() |
| # check we wrote exactly those bytes, ignoring trailing \n etc |
| f = open(fn, encoding="utf-8") |
| try: |
| self.assertEqual(f.read().rstrip(), data) |
| finally: |
| f.close() |
| finally: |
| if os.path.isfile(fn): |
| os.remove(fn) |
| |
| def test_encoding_cyrillic_unicode(self): |
| log = logging.getLogger("test") |
| #Get a message in Unicode: Do svidanya in Cyrillic (meaning goodbye) |
| message = '\u0434\u043e \u0441\u0432\u0438\u0434\u0430\u043d\u0438\u044f' |
| #Ensure it's written in a Cyrillic encoding |
| writer_class = codecs.getwriter('cp1251') |
| writer_class.encoding = 'cp1251' |
| stream = io.BytesIO() |
| writer = writer_class(stream, 'strict') |
| handler = logging.StreamHandler(writer) |
| log.addHandler(handler) |
| try: |
| log.warning(message) |
| finally: |
| log.removeHandler(handler) |
| handler.close() |
| # check we wrote exactly those bytes, ignoring trailing \n etc |
| s = stream.getvalue() |
| #Compare against what the data should be when encoded in CP-1251 |
| self.assertEqual(s, b'\xe4\xee \xf1\xe2\xe8\xe4\xe0\xed\xe8\xff\n') |
| |
| |
| class WarningsTest(BaseTest): |
| |
| def test_warnings(self): |
| with warnings.catch_warnings(): |
| logging.captureWarnings(True) |
| try: |
| warnings.filterwarnings("always", category=UserWarning) |
| file = io.StringIO() |
| h = logging.StreamHandler(file) |
| logger = logging.getLogger("py.warnings") |
| logger.addHandler(h) |
| warnings.warn("I'm warning you...") |
| logger.removeHandler(h) |
| s = file.getvalue() |
| h.close() |
| self.assertTrue(s.find("UserWarning: I'm warning you...\n") > 0) |
| |
| #See if an explicit file uses the original implementation |
| file = io.StringIO() |
| warnings.showwarning("Explicit", UserWarning, "dummy.py", 42, |
| file, "Dummy line") |
| s = file.getvalue() |
| file.close() |
| self.assertEqual(s, |
| "dummy.py:42: UserWarning: Explicit\n Dummy line\n") |
| finally: |
| logging.captureWarnings(False) |
| |
| |
| def formatFunc(format, datefmt=None): |
| return logging.Formatter(format, datefmt) |
| |
| def handlerFunc(): |
| return logging.StreamHandler() |
| |
| class CustomHandler(logging.StreamHandler): |
| pass |
| |
| class ConfigDictTest(BaseTest): |
| |
| """Reading logging config from a dictionary.""" |
| |
| expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$" |
| |
| # config0 is a standard configuration. |
| config0 = { |
| 'version': 1, |
| 'formatters': { |
| 'form1' : { |
| 'format' : '%(levelname)s ++ %(message)s', |
| }, |
| }, |
| 'handlers' : { |
| 'hand1' : { |
| 'class' : 'logging.StreamHandler', |
| 'formatter' : 'form1', |
| 'level' : 'NOTSET', |
| 'stream' : 'ext://sys.stdout', |
| }, |
| }, |
| 'root' : { |
| 'level' : 'WARNING', |
| 'handlers' : ['hand1'], |
| }, |
| } |
| |
| # config1 adds a little to the standard configuration. |
| config1 = { |
| 'version': 1, |
| 'formatters': { |
| 'form1' : { |
| 'format' : '%(levelname)s ++ %(message)s', |
| }, |
| }, |
| 'handlers' : { |
| 'hand1' : { |
| 'class' : 'logging.StreamHandler', |
| 'formatter' : 'form1', |
| 'level' : 'NOTSET', |
| 'stream' : 'ext://sys.stdout', |
| }, |
| }, |
| 'loggers' : { |
| 'compiler.parser' : { |
| 'level' : 'DEBUG', |
| 'handlers' : ['hand1'], |
| }, |
| }, |
| 'root' : { |
| 'level' : 'WARNING', |
| }, |
| } |
| |
| # config2 has a subtle configuration error that should be reported |
| config2 = { |
| 'version': 1, |
| 'formatters': { |
| 'form1' : { |
| 'format' : '%(levelname)s ++ %(message)s', |
| }, |
| }, |
| 'handlers' : { |
| 'hand1' : { |
| 'class' : 'logging.StreamHandler', |
| 'formatter' : 'form1', |
| 'level' : 'NOTSET', |
| 'stream' : 'ext://sys.stdbout', |
| }, |
| }, |
| 'loggers' : { |
| 'compiler.parser' : { |
| 'level' : 'DEBUG', |
| 'handlers' : ['hand1'], |
| }, |
| }, |
| 'root' : { |
| 'level' : 'WARNING', |
| }, |
| } |
| |
| #As config1 but with a misspelt level on a handler |
| config2a = { |
| 'version': 1, |
| 'formatters': { |
| 'form1' : { |
| 'format' : '%(levelname)s ++ %(message)s', |
| }, |
| }, |
| 'handlers' : { |
| 'hand1' : { |
| 'class' : 'logging.StreamHandler', |
| 'formatter' : 'form1', |
| 'level' : 'NTOSET', |
| 'stream' : 'ext://sys.stdout', |
| }, |
| }, |
| 'loggers' : { |
| 'compiler.parser' : { |
| 'level' : 'DEBUG', |
| 'handlers' : ['hand1'], |
| }, |
| }, |
| 'root' : { |
| 'level' : 'WARNING', |
| }, |
| } |
| |
| |
| #As config1 but with a misspelt level on a logger |
| config2b = { |
| 'version': 1, |
| 'formatters': { |
| 'form1' : { |
| 'format' : '%(levelname)s ++ %(message)s', |
| }, |
| }, |
| 'handlers' : { |
| 'hand1' : { |
| 'class' : 'logging.StreamHandler', |
| 'formatter' : 'form1', |
| 'level' : 'NOTSET', |
| 'stream' : 'ext://sys.stdout', |
| }, |
| }, |
| 'loggers' : { |
| 'compiler.parser' : { |
| 'level' : 'DEBUG', |
| 'handlers' : ['hand1'], |
| }, |
| }, |
| 'root' : { |
| 'level' : 'WRANING', |
| }, |
| } |
| |
| # config3 has a less subtle configuration error |
| config3 = { |
| 'version': 1, |
| 'formatters': { |
| 'form1' : { |
| 'format' : '%(levelname)s ++ %(message)s', |
| }, |
| }, |
| 'handlers' : { |
| 'hand1' : { |
| 'class' : 'logging.StreamHandler', |
| 'formatter' : 'misspelled_name', |
| 'level' : 'NOTSET', |
| 'stream' : 'ext://sys.stdout', |
| }, |
| }, |
| 'loggers' : { |
| 'compiler.parser' : { |
| 'level' : 'DEBUG', |
| 'handlers' : ['hand1'], |
| }, |
| }, |
| 'root' : { |
| 'level' : 'WARNING', |
| }, |
| } |
| |
| # config4 specifies a custom formatter class to be loaded |
| config4 = { |
| 'version': 1, |
| 'formatters': { |
| 'form1' : { |
| '()' : __name__ + '.ExceptionFormatter', |
| 'format' : '%(levelname)s:%(name)s:%(message)s', |
| }, |
| }, |
| 'handlers' : { |
| 'hand1' : { |
| 'class' : 'logging.StreamHandler', |
| 'formatter' : 'form1', |
| 'level' : 'NOTSET', |
| 'stream' : 'ext://sys.stdout', |
| }, |
| }, |
| 'root' : { |
| 'level' : 'NOTSET', |
| 'handlers' : ['hand1'], |
| }, |
| } |
| |
| # As config4 but using an actual callable rather than a string |
| config4a = { |
| 'version': 1, |
| 'formatters': { |
| 'form1' : { |
| '()' : ExceptionFormatter, |
| 'format' : '%(levelname)s:%(name)s:%(message)s', |
| }, |
| 'form2' : { |
| '()' : __name__ + '.formatFunc', |
| 'format' : '%(levelname)s:%(name)s:%(message)s', |
| }, |
| 'form3' : { |
| '()' : formatFunc, |
| 'format' : '%(levelname)s:%(name)s:%(message)s', |
| }, |
| }, |
| 'handlers' : { |
| 'hand1' : { |
| 'class' : 'logging.StreamHandler', |
| 'formatter' : 'form1', |
| 'level' : 'NOTSET', |
| 'stream' : 'ext://sys.stdout', |
| }, |
| 'hand2' : { |
| '()' : handlerFunc, |
| }, |
| }, |
| 'root' : { |
| 'level' : 'NOTSET', |
| 'handlers' : ['hand1'], |
| }, |
| } |
| |
| # config5 specifies a custom handler class to be loaded |
| config5 = { |
| 'version': 1, |
| 'formatters': { |
| 'form1' : { |
| 'format' : '%(levelname)s ++ %(message)s', |
| }, |
| }, |
| 'handlers' : { |
| 'hand1' : { |
| 'class' : __name__ + '.CustomHandler', |
| 'formatter' : 'form1', |
| 'level' : 'NOTSET', |
| 'stream' : 'ext://sys.stdout', |
| }, |
| }, |
| 'loggers' : { |
| 'compiler.parser' : { |
| 'level' : 'DEBUG', |
| 'handlers' : ['hand1'], |
| }, |
| }, |
| 'root' : { |
| 'level' : 'WARNING', |
| }, |
| } |
| |
| # config6 specifies a custom handler class to be loaded |
| # but has bad arguments |
| config6 = { |
| 'version': 1, |
| 'formatters': { |
| 'form1' : { |
| 'format' : '%(levelname)s ++ %(message)s', |
| }, |
| }, |
| 'handlers' : { |
| 'hand1' : { |
| 'class' : __name__ + '.CustomHandler', |
| 'formatter' : 'form1', |
| 'level' : 'NOTSET', |
| 'stream' : 'ext://sys.stdout', |
| '9' : 'invalid parameter name', |
| }, |
| }, |
| 'loggers' : { |
| 'compiler.parser' : { |
| 'level' : 'DEBUG', |
| 'handlers' : ['hand1'], |
| }, |
| }, |
| 'root' : { |
| 'level' : 'WARNING', |
| }, |
| } |
| |
| #config 7 does not define compiler.parser but defines compiler.lexer |
| #so compiler.parser should be disabled after applying it |
| config7 = { |
| 'version': 1, |
| 'formatters': { |
| 'form1' : { |
| 'format' : '%(levelname)s ++ %(message)s', |
| }, |
| }, |
| 'handlers' : { |
| 'hand1' : { |
| 'class' : 'logging.StreamHandler', |
| 'formatter' : 'form1', |
| 'level' : 'NOTSET', |
| 'stream' : 'ext://sys.stdout', |
| }, |
| }, |
| 'loggers' : { |
| 'compiler.lexer' : { |
| 'level' : 'DEBUG', |
| 'handlers' : ['hand1'], |
| }, |
| }, |
| 'root' : { |
| 'level' : 'WARNING', |
| }, |
| } |
| |
| config8 = { |
| 'version': 1, |
| 'disable_existing_loggers' : False, |
| 'formatters': { |
| 'form1' : { |
| 'format' : '%(levelname)s ++ %(message)s', |
| }, |
| }, |
| 'handlers' : { |
| 'hand1' : { |
| 'class' : 'logging.StreamHandler', |
| 'formatter' : 'form1', |
| 'level' : 'NOTSET', |
| 'stream' : 'ext://sys.stdout', |
| }, |
| }, |
| 'loggers' : { |
| 'compiler' : { |
| 'level' : 'DEBUG', |
| 'handlers' : ['hand1'], |
| }, |
| 'compiler.lexer' : { |
| }, |
| }, |
| 'root' : { |
| 'level' : 'WARNING', |
| }, |
| } |
| |
| config9 = { |
| 'version': 1, |
| 'formatters': { |
| 'form1' : { |
| 'format' : '%(levelname)s ++ %(message)s', |
| }, |
| }, |
| 'handlers' : { |
| 'hand1' : { |
| 'class' : 'logging.StreamHandler', |
| 'formatter' : 'form1', |
| 'level' : 'WARNING', |
| 'stream' : 'ext://sys.stdout', |
| }, |
| }, |
| 'loggers' : { |
| 'compiler.parser' : { |
| 'level' : 'WARNING', |
| 'handlers' : ['hand1'], |
| }, |
| }, |
| 'root' : { |
| 'level' : 'NOTSET', |
| }, |
| } |
| |
| config9a = { |
| 'version': 1, |
| 'incremental' : True, |
| 'handlers' : { |
| 'hand1' : { |
| 'level' : 'WARNING', |
| }, |
| }, |
| 'loggers' : { |
| 'compiler.parser' : { |
| 'level' : 'INFO', |
| }, |
| }, |
| } |
| |
| config9b = { |
| 'version': 1, |
| 'incremental' : True, |
| 'handlers' : { |
| 'hand1' : { |
| 'level' : 'INFO', |
| }, |
| }, |
| 'loggers' : { |
| 'compiler.parser' : { |
| 'level' : 'INFO', |
| }, |
| }, |
| } |
| |
| #As config1 but with a filter added |
| config10 = { |
| 'version': 1, |
| 'formatters': { |
| 'form1' : { |
| 'format' : '%(levelname)s ++ %(message)s', |
| }, |
| }, |
| 'filters' : { |
| 'filt1' : { |
| 'name' : 'compiler.parser', |
| }, |
| }, |
| 'handlers' : { |
| 'hand1' : { |
| 'class' : 'logging.StreamHandler', |
| 'formatter' : 'form1', |
| 'level' : 'NOTSET', |
| 'stream' : 'ext://sys.stdout', |
| 'filters' : ['filt1'], |
| }, |
| }, |
| 'loggers' : { |
| 'compiler.parser' : { |
| 'level' : 'DEBUG', |
| 'filters' : ['filt1'], |
| }, |
| }, |
| 'root' : { |
| 'level' : 'WARNING', |
| 'handlers' : ['hand1'], |
| }, |
| } |
| |
| #As config1 but using cfg:// references |
| config11 = { |
| 'version': 1, |
| 'true_formatters': { |
| 'form1' : { |
| 'format' : '%(levelname)s ++ %(message)s', |
| }, |
| }, |
| 'handler_configs': { |
| 'hand1' : { |
| 'class' : 'logging.StreamHandler', |
| 'formatter' : 'form1', |
| 'level' : 'NOTSET', |
| 'stream' : 'ext://sys.stdout', |
| }, |
| }, |
| 'formatters' : 'cfg://true_formatters', |
| 'handlers' : { |
| 'hand1' : 'cfg://handler_configs[hand1]', |
| }, |
| 'loggers' : { |
| 'compiler.parser' : { |
| 'level' : 'DEBUG', |
| 'handlers' : ['hand1'], |
| }, |
| }, |
| 'root' : { |
| 'level' : 'WARNING', |
| }, |
| } |
| |
| #As config11 but missing the version key |
| config12 = { |
| 'true_formatters': { |
| 'form1' : { |
| 'format' : '%(levelname)s ++ %(message)s', |
| }, |
| }, |
| 'handler_configs': { |
| 'hand1' : { |
| 'class' : 'logging.StreamHandler', |
| 'formatter' : 'form1', |
| 'level' : 'NOTSET', |
| 'stream' : 'ext://sys.stdout', |
| }, |
| }, |
| 'formatters' : 'cfg://true_formatters', |
| 'handlers' : { |
| 'hand1' : 'cfg://handler_configs[hand1]', |
| }, |
| 'loggers' : { |
| 'compiler.parser' : { |
| 'level' : 'DEBUG', |
| 'handlers' : ['hand1'], |
| }, |
| }, |
| 'root' : { |
| 'level' : 'WARNING', |
| }, |
| } |
| |
| #As config11 but using an unsupported version |
| config13 = { |
| 'version': 2, |
| 'true_formatters': { |
| 'form1' : { |
| 'format' : '%(levelname)s ++ %(message)s', |
| }, |
| }, |
| 'handler_configs': { |
| 'hand1' : { |
| 'class' : 'logging.StreamHandler', |
| 'formatter' : 'form1', |
| 'level' : 'NOTSET', |
| 'stream' : 'ext://sys.stdout', |
| }, |
| }, |
| 'formatters' : 'cfg://true_formatters', |
| 'handlers' : { |
| 'hand1' : 'cfg://handler_configs[hand1]', |
| }, |
| 'loggers' : { |
| 'compiler.parser' : { |
| 'level' : 'DEBUG', |
| 'handlers' : ['hand1'], |
| }, |
| }, |
| 'root' : { |
| 'level' : 'WARNING', |
| }, |
| } |
| |
| def apply_config(self, conf): |
| logging.config.dictConfig(conf) |
| |
| def test_config0_ok(self): |
| # A simple config which overrides the default settings. |
| with captured_stdout() as output: |
| self.apply_config(self.config0) |
| logger = logging.getLogger() |
| # Won't output anything |
| logger.info(self.next_message()) |
| # Outputs a message |
| logger.error(self.next_message()) |
| self.assert_log_lines([ |
| ('ERROR', '2'), |
| ], stream=output) |
| # Original logger output is empty. |
| self.assert_log_lines([]) |
| |
| def test_config1_ok(self, config=config1): |
| # A config defining a sub-parser as well. |
| with captured_stdout() as output: |
| self.apply_config(config) |
| logger = logging.getLogger("compiler.parser") |
| # Both will output a message |
| logger.info(self.next_message()) |
| logger.error(self.next_message()) |
| self.assert_log_lines([ |
| ('INFO', '1'), |
| ('ERROR', '2'), |
| ], stream=output) |
| # Original logger output is empty. |
| self.assert_log_lines([]) |
| |
| def test_config2_failure(self): |
| # A simple config which overrides the default settings. |
| self.assertRaises(Exception, self.apply_config, self.config2) |
| |
| def test_config2a_failure(self): |
| # A simple config which overrides the default settings. |
| self.assertRaises(Exception, self.apply_config, self.config2a) |
| |
| def test_config2b_failure(self): |
| # A simple config which overrides the default settings. |
| self.assertRaises(Exception, self.apply_config, self.config2b) |
| |
| def test_config3_failure(self): |
| # A simple config which overrides the default settings. |
| self.assertRaises(Exception, self.apply_config, self.config3) |
| |
| def test_config4_ok(self): |
| # A config specifying a custom formatter class. |
| with captured_stdout() as output: |
| self.apply_config(self.config4) |
| #logger = logging.getLogger() |
| try: |
| raise RuntimeError() |
| except RuntimeError: |
| logging.exception("just testing") |
| sys.stdout.seek(0) |
| self.assertEquals(output.getvalue(), |
| "ERROR:root:just testing\nGot a [RuntimeError]\n") |
| # Original logger output is empty |
| self.assert_log_lines([]) |
| |
| def test_config4a_ok(self): |
| # A config specifying a custom formatter class. |
| with captured_stdout() as output: |
| self.apply_config(self.config4a) |
| #logger = logging.getLogger() |
| try: |
| raise RuntimeError() |
| except RuntimeError: |
| logging.exception("just testing") |
| sys.stdout.seek(0) |
| self.assertEquals(output.getvalue(), |
| "ERROR:root:just testing\nGot a [RuntimeError]\n") |
| # Original logger output is empty |
| self.assert_log_lines([]) |
| |
| def test_config5_ok(self): |
| self.test_config1_ok(config=self.config5) |
| |
| def test_config6_failure(self): |
| self.assertRaises(Exception, self.apply_config, self.config6) |
| |
| def test_config7_ok(self): |
| with captured_stdout() as output: |
| self.apply_config(self.config1) |
| logger = logging.getLogger("compiler.parser") |
| # Both will output a message |
| logger.info(self.next_message()) |
| logger.error(self.next_message()) |
| self.assert_log_lines([ |
| ('INFO', '1'), |
| ('ERROR', '2'), |
| ], stream=output) |
| # Original logger output is empty. |
| self.assert_log_lines([]) |
| with captured_stdout() as output: |
| self.apply_config(self.config7) |
| logger = logging.getLogger("compiler.parser") |
| self.assertTrue(logger.disabled) |
| logger = logging.getLogger("compiler.lexer") |
| # Both will output a message |
| logger.info(self.next_message()) |
| logger.error(self.next_message()) |
| self.assert_log_lines([ |
| ('INFO', '3'), |
| ('ERROR', '4'), |
| ], stream=output) |
| # Original logger output is empty. |
| self.assert_log_lines([]) |
| |
| #Same as test_config_7_ok but don't disable old loggers. |
| def test_config_8_ok(self): |
| with captured_stdout() as output: |
| self.apply_config(self.config1) |
| logger = logging.getLogger("compiler.parser") |
| # Both will output a message |
| logger.info(self.next_message()) |
| logger.error(self.next_message()) |
| self.assert_log_lines([ |
| ('INFO', '1'), |
| ('ERROR', '2'), |
| ], stream=output) |
| # Original logger output is empty. |
| self.assert_log_lines([]) |
| with captured_stdout() as output: |
| self.apply_config(self.config8) |
| logger = logging.getLogger("compiler.parser") |
| self.assertFalse(logger.disabled) |
| # Both will output a message |
| logger.info(self.next_message()) |
| logger.error(self.next_message()) |
| logger = logging.getLogger("compiler.lexer") |
| # Both will output a message |
| logger.info(self.next_message()) |
| logger.error(self.next_message()) |
| self.assert_log_lines([ |
| ('INFO', '3'), |
| ('ERROR', '4'), |
| ('INFO', '5'), |
| ('ERROR', '6'), |
| ], stream=output) |
| # Original logger output is empty. |
| self.assert_log_lines([]) |
| |
| def test_config_9_ok(self): |
| with captured_stdout() as output: |
| self.apply_config(self.config9) |
| logger = logging.getLogger("compiler.parser") |
| #Nothing will be output since both handler and logger are set to WARNING |
| logger.info(self.next_message()) |
| self.assert_log_lines([], stream=output) |
| self.apply_config(self.config9a) |
| #Nothing will be output since both handler is still set to WARNING |
| logger.info(self.next_message()) |
| self.assert_log_lines([], stream=output) |
| self.apply_config(self.config9b) |
| #Message should now be output |
| logger.info(self.next_message()) |
| self.assert_log_lines([ |
| ('INFO', '3'), |
| ], stream=output) |
| |
| def test_config_10_ok(self): |
| with captured_stdout() as output: |
| self.apply_config(self.config10) |
| logger = logging.getLogger("compiler.parser") |
| logger.warning(self.next_message()) |
| logger = logging.getLogger('compiler') |
| #Not output, because filtered |
| logger.warning(self.next_message()) |
| logger = logging.getLogger('compiler.lexer') |
| #Not output, because filtered |
| logger.warning(self.next_message()) |
| logger = logging.getLogger("compiler.parser.codegen") |
| #Output, as not filtered |
| logger.error(self.next_message()) |
| self.assert_log_lines([ |
| ('WARNING', '1'), |
| ('ERROR', '4'), |
| ], stream=output) |
| |
| def test_config11_ok(self): |
| self.test_config1_ok(self.config11) |
| |
| def test_config12_failure(self): |
| self.assertRaises(Exception, self.apply_config, self.config12) |
| |
| def test_config13_failure(self): |
| self.assertRaises(Exception, self.apply_config, self.config13) |
| |
| @unittest.skipUnless(threading, 'listen() needs threading to work') |
| def setup_via_listener(self, text): |
| text = text.encode("utf-8") |
| # Ask for a randomly assigned port (by using port 0) |
| t = logging.config.listen(0) |
| t.start() |
| t.ready.wait() |
| # Now get the port allocated |
| port = t.port |
| t.ready.clear() |
| try: |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| sock.settimeout(2.0) |
| sock.connect(('localhost', port)) |
| |
| slen = struct.pack('>L', len(text)) |
| s = slen + text |
| sentsofar = 0 |
| left = len(s) |
| while left > 0: |
| sent = sock.send(s[sentsofar:]) |
| sentsofar += sent |
| left -= sent |
| sock.close() |
| finally: |
| t.ready.wait(2.0) |
| logging.config.stopListening() |
| t.join(2.0) |
| |
| def test_listen_config_10_ok(self): |
| with captured_stdout() as output: |
| self.setup_via_listener(json.dumps(self.config10)) |
| logger = logging.getLogger("compiler.parser") |
| logger.warning(self.next_message()) |
| logger = logging.getLogger('compiler') |
| #Not output, because filtered |
| logger.warning(self.next_message()) |
| logger = logging.getLogger('compiler.lexer') |
| #Not output, because filtered |
| logger.warning(self.next_message()) |
| logger = logging.getLogger("compiler.parser.codegen") |
| #Output, as not filtered |
| logger.error(self.next_message()) |
| self.assert_log_lines([ |
| ('WARNING', '1'), |
| ('ERROR', '4'), |
| ], stream=output) |
| |
| def test_listen_config_1_ok(self): |
| with captured_stdout() as output: |
| self.setup_via_listener(textwrap.dedent(ConfigFileTest.config1)) |
| logger = logging.getLogger("compiler.parser") |
| # Both will output a message |
| logger.info(self.next_message()) |
| logger.error(self.next_message()) |
| self.assert_log_lines([ |
| ('INFO', '1'), |
| ('ERROR', '2'), |
| ], stream=output) |
| # Original logger output is empty. |
| self.assert_log_lines([]) |
| |
| |
| class ManagerTest(BaseTest): |
| def test_manager_loggerclass(self): |
| logged = [] |
| |
| class MyLogger(logging.Logger): |
| def _log(self, level, msg, args, exc_info=None, extra=None): |
| logged.append(msg) |
| |
| man = logging.Manager(None) |
| self.assertRaises(TypeError, man.setLoggerClass, int) |
| man.setLoggerClass(MyLogger) |
| logger = man.getLogger('test') |
| logger.warning('should appear in logged') |
| logging.warning('should not appear in logged') |
| |
| self.assertEqual(logged, ['should appear in logged']) |
| |
| |
| class ChildLoggerTest(BaseTest): |
| def test_child_loggers(self): |
| r = logging.getLogger() |
| l1 = logging.getLogger('abc') |
| l2 = logging.getLogger('def.ghi') |
| c1 = r.getChild('xyz') |
| c2 = r.getChild('uvw.xyz') |
| self.assertTrue(c1 is logging.getLogger('xyz')) |
| self.assertTrue(c2 is logging.getLogger('uvw.xyz')) |
| c1 = l1.getChild('def') |
| c2 = c1.getChild('ghi') |
| c3 = l1.getChild('def.ghi') |
| self.assertTrue(c1 is logging.getLogger('abc.def')) |
| self.assertTrue(c2 is logging.getLogger('abc.def.ghi')) |
| self.assertTrue(c2 is c3) |
| |
| |
| class QueueHandlerTest(BaseTest): |
| # Do not bother with a logger name group. |
| expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$" |
| |
| def setUp(self): |
| BaseTest.setUp(self) |
| self.queue = queue.Queue(-1) |
| self.que_hdlr = logging.handlers.QueueHandler(self.queue) |
| self.que_logger = logging.getLogger('que') |
| self.que_logger.propagate = False |
| self.que_logger.setLevel(logging.WARNING) |
| self.que_logger.addHandler(self.que_hdlr) |
| |
| def tearDown(self): |
| self.que_hdlr.close() |
| BaseTest.tearDown(self) |
| |
| def test_queue_handler(self): |
| self.que_logger.debug(self.next_message()) |
| self.assertRaises(queue.Empty, self.queue.get_nowait) |
| self.que_logger.info(self.next_message()) |
| self.assertRaises(queue.Empty, self.queue.get_nowait) |
| msg = self.next_message() |
| self.que_logger.warning(msg) |
| data = self.queue.get_nowait() |
| self.assertTrue(isinstance(data, logging.LogRecord)) |
| self.assertEqual(data.name, self.que_logger.name) |
| self.assertEqual((data.msg, data.args), (msg, None)) |
| |
| class BaseFileTest(BaseTest): |
| "Base class for handler tests that write log files" |
| |
| def setUp(self): |
| BaseTest.setUp(self) |
| self.fn = tempfile.mktemp(".log") |
| self.rmfiles = [] |
| |
| def tearDown(self): |
| for fn in self.rmfiles: |
| os.unlink(fn) |
| BaseTest.tearDown(self) |
| |
| def assertLogFile(self, filename): |
| "Assert a log file is there and register it for deletion" |
| self.assertTrue(os.path.exists(filename), |
| msg="Log file %r does not exist") |
| self.rmfiles.append(filename) |
| |
| |
| class RotatingFileHandlerTest(BaseFileTest): |
| def next_rec(self): |
| return logging.LogRecord('n', logging.DEBUG, 'p', 1, |
| self.next_message(), None, None, None) |
| |
| def test_should_not_rollover(self): |
| # If maxbytes is zero rollover never occurs |
| rh = logging.handlers.RotatingFileHandler(self.fn, maxBytes=0) |
| self.assertFalse(rh.shouldRollover(None)) |
| |
| def test_should_rollover(self): |
| rh = logging.handlers.RotatingFileHandler(self.fn, maxBytes=1) |
| self.assertTrue(rh.shouldRollover(self.next_rec())) |
| |
| def test_file_created(self): |
| # checks that the file is created and assumes it was created |
| # by us |
| self.assertFalse(os.path.exists(self.fn)) |
| rh = logging.handlers.RotatingFileHandler(self.fn) |
| rh.emit(self.next_rec()) |
| self.assertLogFile(self.fn) |
| |
| def test_rollover_filenames(self): |
| rh = logging.handlers.RotatingFileHandler( |
| self.fn, backupCount=2, maxBytes=1) |
| rh.emit(self.next_rec()) |
| self.assertLogFile(self.fn) |
| rh.emit(self.next_rec()) |
| self.assertLogFile(self.fn + ".1") |
| rh.emit(self.next_rec()) |
| self.assertLogFile(self.fn + ".2") |
| self.assertFalse(os.path.exists(self.fn + ".3")) |
| |
| class TimedRotatingFileHandlerTest(BaseFileTest): |
| # test methods added below |
| pass |
| |
| def secs(**kw): |
| return datetime.timedelta(**kw) // datetime.timedelta(seconds=1) |
| |
| for when, exp in (('S', 1), |
| ('M', 60), |
| ('H', 60 * 60), |
| ('D', 60 * 60 * 24), |
| ('MIDNIGHT', 60 * 60 * 23), |
| # current time (epoch start) is a Thursday, W0 means Monday |
| ('W0', secs(days=4, hours=23)),): |
| def test_compute_rollover(self, when=when, exp=exp): |
| rh = logging.handlers.TimedRotatingFileHandler( |
| self.fn, when=when, interval=1, backupCount=0) |
| self.assertEquals(exp, rh.computeRollover(0.0)) |
| setattr(TimedRotatingFileHandlerTest, "test_compute_rollover_%s" % when, test_compute_rollover) |
| |
| # Set the locale to the platform-dependent default. I have no idea |
| # why the test does this, but in any case we save the current locale |
| # first and restore it at the end. |
| @run_with_locale('LC_ALL', '') |
| def test_main(): |
| run_unittest(BuiltinLevelsTest, BasicFilterTest, |
| CustomLevelsAndFiltersTest, MemoryHandlerTest, |
| ConfigFileTest, SocketHandlerTest, MemoryTest, |
| EncodingTest, WarningsTest, ConfigDictTest, ManagerTest, |
| ChildLoggerTest, QueueHandlerTest, |
| RotatingFileHandlerTest, |
| #TimedRotatingFileHandlerTest |
| ) |
| |
| if __name__ == "__main__": |
| test_main() |