| #!/usr/bin/env python |
| # |
| # Copyright 2001-2010 by Vinay Sajip. All Rights Reserved. |
| # |
| # Permission to use, copy, modify, and distribute this software and its |
| # documentation for any purpose and without fee is hereby granted, |
| # provided that the above copyright notice appear in all copies and that |
| # both that copyright notice and this permission notice appear in |
| # supporting documentation, and that the name of Vinay Sajip |
| # not be used in advertising or publicity pertaining to distribution |
| # of the software without specific, written prior permission. |
| # VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING |
| # ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL |
| # VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR |
| # ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER |
| # IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT |
| # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. |
| |
| """Test harness for the logging module. Run all tests. |
| |
| Copyright (C) 2001-2010 Vinay Sajip. All Rights Reserved. |
| """ |
| |
| import logging |
| import logging.handlers |
| import logging.config |
| |
| import codecs |
| import copy |
| import pickle |
| import io |
| import gc |
| import os |
| import re |
| import select |
| import socket |
| from socketserver import ThreadingTCPServer, StreamRequestHandler |
| import string |
| import struct |
| import sys |
| import tempfile |
| from test.support import captured_stdout, run_with_locale, run_unittest |
| import textwrap |
| import threading |
| import time |
| import types |
| import unittest |
| import warnings |
| import weakref |
| |
| |
| class BaseTest(unittest.TestCase): |
| |
| """Base class for logging tests.""" |
| |
| log_format = "%(name)s -> %(levelname)s: %(message)s" |
| expected_log_pat = r"^([\w.]+) -> ([\w]+): ([\d]+)$" |
| message_num = 0 |
| |
| def setUp(self): |
| """Setup the default logging stream to an internal StringIO instance, |
| so that we can examine log output as we want.""" |
| logger_dict = logging.getLogger().manager.loggerDict |
| logging._acquireLock() |
| try: |
| self.saved_handlers = logging._handlers.copy() |
| self.saved_handler_list = logging._handlerList[:] |
| self.saved_loggers = logger_dict.copy() |
| self.saved_level_names = logging._levelNames.copy() |
| finally: |
| logging._releaseLock() |
| |
| self.root_logger = logging.getLogger("") |
| self.original_logging_level = self.root_logger.getEffectiveLevel() |
| |
| self.stream = io.StringIO() |
| self.root_logger.setLevel(logging.DEBUG) |
| self.root_hdlr = logging.StreamHandler(self.stream) |
| self.root_formatter = logging.Formatter(self.log_format) |
| self.root_hdlr.setFormatter(self.root_formatter) |
| self.root_logger.addHandler(self.root_hdlr) |
| |
| def tearDown(self): |
| """Remove our logging stream, and restore the original logging |
| level.""" |
| self.stream.close() |
| self.root_logger.removeHandler(self.root_hdlr) |
| self.root_logger.setLevel(self.original_logging_level) |
| logging._acquireLock() |
| try: |
| logging._levelNames.clear() |
| logging._levelNames.update(self.saved_level_names) |
| logging._handlers.clear() |
| logging._handlers.update(self.saved_handlers) |
| logging._handlerList[:] = self.saved_handler_list |
| loggerDict = logging.getLogger().manager.loggerDict |
| loggerDict.clear() |
| loggerDict.update(self.saved_loggers) |
| finally: |
| logging._releaseLock() |
| |
| def assert_log_lines(self, expected_values, stream=None): |
| """Match the collected log lines against the regular expression |
| self.expected_log_pat, and compare the extracted group values to |
| the expected_values list of tuples.""" |
| stream = stream or self.stream |
| pat = re.compile(self.expected_log_pat) |
| try: |
| stream.reset() |
| actual_lines = stream.readlines() |
| except AttributeError: |
| # StringIO.StringIO lacks a reset() method. |
| actual_lines = stream.getvalue().splitlines() |
| self.assertEquals(len(actual_lines), len(expected_values)) |
| for actual, expected in zip(actual_lines, expected_values): |
| match = pat.search(actual) |
| if not match: |
| self.fail("Log line does not match expected pattern:\n" + |
| actual) |
| self.assertEquals(tuple(match.groups()), expected) |
| s = stream.read() |
| if s: |
| self.fail("Remaining output at end of log stream:\n" + s) |
| |
| def next_message(self): |
| """Generate a message consisting solely of an auto-incrementing |
| integer.""" |
| self.message_num += 1 |
| return "%d" % self.message_num |
| |
| |
| class BuiltinLevelsTest(BaseTest): |
| """Test builtin levels and their inheritance.""" |
| |
| def test_flat(self): |
| #Logging levels in a flat logger namespace. |
| m = self.next_message |
| |
| ERR = logging.getLogger("ERR") |
| ERR.setLevel(logging.ERROR) |
| INF = logging.getLogger("INF") |
| INF.setLevel(logging.INFO) |
| DEB = logging.getLogger("DEB") |
| DEB.setLevel(logging.DEBUG) |
| |
| # These should log. |
| ERR.log(logging.CRITICAL, m()) |
| ERR.error(m()) |
| |
| INF.log(logging.CRITICAL, m()) |
| INF.error(m()) |
| INF.warn(m()) |
| INF.info(m()) |
| |
| DEB.log(logging.CRITICAL, m()) |
| DEB.error(m()) |
| DEB.warn (m()) |
| DEB.info (m()) |
| DEB.debug(m()) |
| |
| # These should not log. |
| ERR.warn(m()) |
| ERR.info(m()) |
| ERR.debug(m()) |
| |
| INF.debug(m()) |
| |
| self.assert_log_lines([ |
| ('ERR', 'CRITICAL', '1'), |
| ('ERR', 'ERROR', '2'), |
| ('INF', 'CRITICAL', '3'), |
| ('INF', 'ERROR', '4'), |
| ('INF', 'WARNING', '5'), |
| ('INF', 'INFO', '6'), |
| ('DEB', 'CRITICAL', '7'), |
| ('DEB', 'ERROR', '8'), |
| ('DEB', 'WARNING', '9'), |
| ('DEB', 'INFO', '10'), |
| ('DEB', 'DEBUG', '11'), |
| ]) |
| |
| def test_nested_explicit(self): |
| # Logging levels in a nested namespace, all explicitly set. |
| m = self.next_message |
| |
| INF = logging.getLogger("INF") |
| INF.setLevel(logging.INFO) |
| INF_ERR = logging.getLogger("INF.ERR") |
| INF_ERR.setLevel(logging.ERROR) |
| |
| # These should log. |
| INF_ERR.log(logging.CRITICAL, m()) |
| INF_ERR.error(m()) |
| |
| # These should not log. |
| INF_ERR.warn(m()) |
| INF_ERR.info(m()) |
| INF_ERR.debug(m()) |
| |
| self.assert_log_lines([ |
| ('INF.ERR', 'CRITICAL', '1'), |
| ('INF.ERR', 'ERROR', '2'), |
| ]) |
| |
| def test_nested_inherited(self): |
| #Logging levels in a nested namespace, inherited from parent loggers. |
| m = self.next_message |
| |
| INF = logging.getLogger("INF") |
| INF.setLevel(logging.INFO) |
| INF_ERR = logging.getLogger("INF.ERR") |
| INF_ERR.setLevel(logging.ERROR) |
| INF_UNDEF = logging.getLogger("INF.UNDEF") |
| INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF") |
| UNDEF = logging.getLogger("UNDEF") |
| |
| # These should log. |
| INF_UNDEF.log(logging.CRITICAL, m()) |
| INF_UNDEF.error(m()) |
| INF_UNDEF.warn(m()) |
| INF_UNDEF.info(m()) |
| INF_ERR_UNDEF.log(logging.CRITICAL, m()) |
| INF_ERR_UNDEF.error(m()) |
| |
| # These should not log. |
| INF_UNDEF.debug(m()) |
| INF_ERR_UNDEF.warn(m()) |
| INF_ERR_UNDEF.info(m()) |
| INF_ERR_UNDEF.debug(m()) |
| |
| self.assert_log_lines([ |
| ('INF.UNDEF', 'CRITICAL', '1'), |
| ('INF.UNDEF', 'ERROR', '2'), |
| ('INF.UNDEF', 'WARNING', '3'), |
| ('INF.UNDEF', 'INFO', '4'), |
| ('INF.ERR.UNDEF', 'CRITICAL', '5'), |
| ('INF.ERR.UNDEF', 'ERROR', '6'), |
| ]) |
| |
| def test_nested_with_virtual_parent(self): |
| # Logging levels when some parent does not exist yet. |
| m = self.next_message |
| |
| INF = logging.getLogger("INF") |
| GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF") |
| CHILD = logging.getLogger("INF.BADPARENT") |
| INF.setLevel(logging.INFO) |
| |
| # These should log. |
| GRANDCHILD.log(logging.FATAL, m()) |
| GRANDCHILD.info(m()) |
| CHILD.log(logging.FATAL, m()) |
| CHILD.info(m()) |
| |
| # These should not log. |
| GRANDCHILD.debug(m()) |
| CHILD.debug(m()) |
| |
| self.assert_log_lines([ |
| ('INF.BADPARENT.UNDEF', 'CRITICAL', '1'), |
| ('INF.BADPARENT.UNDEF', 'INFO', '2'), |
| ('INF.BADPARENT', 'CRITICAL', '3'), |
| ('INF.BADPARENT', 'INFO', '4'), |
| ]) |
| |
| |
| class BasicFilterTest(BaseTest): |
| |
| """Test the bundled Filter class.""" |
| |
| def test_filter(self): |
| # Only messages satisfying the specified criteria pass through the |
| # filter. |
| filter_ = logging.Filter("spam.eggs") |
| handler = self.root_logger.handlers[0] |
| try: |
| handler.addFilter(filter_) |
| spam = logging.getLogger("spam") |
| spam_eggs = logging.getLogger("spam.eggs") |
| spam_eggs_fish = logging.getLogger("spam.eggs.fish") |
| spam_bakedbeans = logging.getLogger("spam.bakedbeans") |
| |
| spam.info(self.next_message()) |
| spam_eggs.info(self.next_message()) # Good. |
| spam_eggs_fish.info(self.next_message()) # Good. |
| spam_bakedbeans.info(self.next_message()) |
| |
| self.assert_log_lines([ |
| ('spam.eggs', 'INFO', '2'), |
| ('spam.eggs.fish', 'INFO', '3'), |
| ]) |
| finally: |
| handler.removeFilter(filter_) |
| |
| |
| # |
| # First, we define our levels. There can be as many as you want - the only |
| # limitations are that they should be integers, the lowest should be > 0 and |
| # larger values mean less information being logged. If you need specific |
| # level values which do not fit into these limitations, you can use a |
| # mapping dictionary to convert between your application levels and the |
| # logging system. |
| # |
| SILENT = 120 |
| TACITURN = 119 |
| TERSE = 118 |
| EFFUSIVE = 117 |
| SOCIABLE = 116 |
| VERBOSE = 115 |
| TALKATIVE = 114 |
| GARRULOUS = 113 |
| CHATTERBOX = 112 |
| BORING = 111 |
| |
| LEVEL_RANGE = range(BORING, SILENT + 1) |
| |
| # |
| # Next, we define names for our levels. You don't need to do this - in which |
| # case the system will use "Level n" to denote the text for the level. |
| # |
| my_logging_levels = { |
| SILENT : 'Silent', |
| TACITURN : 'Taciturn', |
| TERSE : 'Terse', |
| EFFUSIVE : 'Effusive', |
| SOCIABLE : 'Sociable', |
| VERBOSE : 'Verbose', |
| TALKATIVE : 'Talkative', |
| GARRULOUS : 'Garrulous', |
| CHATTERBOX : 'Chatterbox', |
| BORING : 'Boring', |
| } |
| |
| class GarrulousFilter(logging.Filter): |
| |
| """A filter which blocks garrulous messages.""" |
| |
| def filter(self, record): |
| return record.levelno != GARRULOUS |
| |
| class VerySpecificFilter(logging.Filter): |
| |
| """A filter which blocks sociable and taciturn messages.""" |
| |
| def filter(self, record): |
| return record.levelno not in [SOCIABLE, TACITURN] |
| |
| |
| class CustomLevelsAndFiltersTest(BaseTest): |
| |
| """Test various filtering possibilities with custom logging levels.""" |
| |
| # Skip the logger name group. |
| expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$" |
| |
| def setUp(self): |
| BaseTest.setUp(self) |
| for k, v in list(my_logging_levels.items()): |
| logging.addLevelName(k, v) |
| |
| def log_at_all_levels(self, logger): |
| for lvl in LEVEL_RANGE: |
| logger.log(lvl, self.next_message()) |
| |
| def test_logger_filter(self): |
| # Filter at logger level. |
| self.root_logger.setLevel(VERBOSE) |
| # Levels >= 'Verbose' are good. |
| self.log_at_all_levels(self.root_logger) |
| self.assert_log_lines([ |
| ('Verbose', '5'), |
| ('Sociable', '6'), |
| ('Effusive', '7'), |
| ('Terse', '8'), |
| ('Taciturn', '9'), |
| ('Silent', '10'), |
| ]) |
| |
| def test_handler_filter(self): |
| # Filter at handler level. |
| self.root_logger.handlers[0].setLevel(SOCIABLE) |
| try: |
| # Levels >= 'Sociable' are good. |
| self.log_at_all_levels(self.root_logger) |
| self.assert_log_lines([ |
| ('Sociable', '6'), |
| ('Effusive', '7'), |
| ('Terse', '8'), |
| ('Taciturn', '9'), |
| ('Silent', '10'), |
| ]) |
| finally: |
| self.root_logger.handlers[0].setLevel(logging.NOTSET) |
| |
| def test_specific_filters(self): |
| # Set a specific filter object on the handler, and then add another |
| # filter object on the logger itself. |
| handler = self.root_logger.handlers[0] |
| specific_filter = None |
| garr = GarrulousFilter() |
| handler.addFilter(garr) |
| try: |
| self.log_at_all_levels(self.root_logger) |
| first_lines = [ |
| # Notice how 'Garrulous' is missing |
| ('Boring', '1'), |
| ('Chatterbox', '2'), |
| ('Talkative', '4'), |
| ('Verbose', '5'), |
| ('Sociable', '6'), |
| ('Effusive', '7'), |
| ('Terse', '8'), |
| ('Taciturn', '9'), |
| ('Silent', '10'), |
| ] |
| self.assert_log_lines(first_lines) |
| |
| specific_filter = VerySpecificFilter() |
| self.root_logger.addFilter(specific_filter) |
| self.log_at_all_levels(self.root_logger) |
| self.assert_log_lines(first_lines + [ |
| # Not only 'Garrulous' is still missing, but also 'Sociable' |
| # and 'Taciturn' |
| ('Boring', '11'), |
| ('Chatterbox', '12'), |
| ('Talkative', '14'), |
| ('Verbose', '15'), |
| ('Effusive', '17'), |
| ('Terse', '18'), |
| ('Silent', '20'), |
| ]) |
| finally: |
| if specific_filter: |
| self.root_logger.removeFilter(specific_filter) |
| handler.removeFilter(garr) |
| |
| |
| class MemoryHandlerTest(BaseTest): |
| |
| """Tests for the MemoryHandler.""" |
| |
| # Do not bother with a logger name group. |
| expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$" |
| |
| def setUp(self): |
| BaseTest.setUp(self) |
| self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING, |
| self.root_hdlr) |
| self.mem_logger = logging.getLogger('mem') |
| self.mem_logger.propagate = 0 |
| self.mem_logger.addHandler(self.mem_hdlr) |
| |
| def tearDown(self): |
| self.mem_hdlr.close() |
| BaseTest.tearDown(self) |
| |
| def test_flush(self): |
| # The memory handler flushes to its target handler based on specific |
| # criteria (message count and message level). |
| self.mem_logger.debug(self.next_message()) |
| self.assert_log_lines([]) |
| self.mem_logger.info(self.next_message()) |
| self.assert_log_lines([]) |
| # This will flush because the level is >= logging.WARNING |
| self.mem_logger.warn(self.next_message()) |
| lines = [ |
| ('DEBUG', '1'), |
| ('INFO', '2'), |
| ('WARNING', '3'), |
| ] |
| self.assert_log_lines(lines) |
| for n in (4, 14): |
| for i in range(9): |
| self.mem_logger.debug(self.next_message()) |
| self.assert_log_lines(lines) |
| # This will flush because it's the 10th message since the last |
| # flush. |
| self.mem_logger.debug(self.next_message()) |
| lines = lines + [('DEBUG', str(i)) for i in range(n, n + 10)] |
| self.assert_log_lines(lines) |
| |
| self.mem_logger.debug(self.next_message()) |
| self.assert_log_lines(lines) |
| |
| |
| class ExceptionFormatter(logging.Formatter): |
| """A special exception formatter.""" |
| def formatException(self, ei): |
| return "Got a [%s]" % ei[0].__name__ |
| |
| |
| class ConfigFileTest(BaseTest): |
| |
| """Reading logging config from a .ini-style config file.""" |
| |
| expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$" |
| |
| # config0 is a standard configuration. |
| config0 = """ |
| [loggers] |
| keys=root |
| |
| [handlers] |
| keys=hand1 |
| |
| [formatters] |
| keys=form1 |
| |
| [logger_root] |
| level=WARNING |
| handlers=hand1 |
| |
| [handler_hand1] |
| class=StreamHandler |
| level=NOTSET |
| formatter=form1 |
| args=(sys.stdout,) |
| |
| [formatter_form1] |
| format=%(levelname)s ++ %(message)s |
| datefmt= |
| """ |
| |
| # config1 adds a little to the standard configuration. |
| config1 = """ |
| [loggers] |
| keys=root,parser |
| |
| [handlers] |
| keys=hand1 |
| |
| [formatters] |
| keys=form1 |
| |
| [logger_root] |
| level=WARNING |
| handlers= |
| |
| [logger_parser] |
| level=DEBUG |
| handlers=hand1 |
| propagate=1 |
| qualname=compiler.parser |
| |
| [handler_hand1] |
| class=StreamHandler |
| level=NOTSET |
| formatter=form1 |
| args=(sys.stdout,) |
| |
| [formatter_form1] |
| format=%(levelname)s ++ %(message)s |
| datefmt= |
| """ |
| |
| # config2 has a subtle configuration error that should be reported |
| config2 = config1.replace("sys.stdout", "sys.stbout") |
| |
| # config3 has a less subtle configuration error |
| config3 = config1.replace("formatter=form1", "formatter=misspelled_name") |
| |
| # config4 specifies a custom formatter class to be loaded |
| config4 = """ |
| [loggers] |
| keys=root |
| |
| [handlers] |
| keys=hand1 |
| |
| [formatters] |
| keys=form1 |
| |
| [logger_root] |
| level=NOTSET |
| handlers=hand1 |
| |
| [handler_hand1] |
| class=StreamHandler |
| level=NOTSET |
| formatter=form1 |
| args=(sys.stdout,) |
| |
| [formatter_form1] |
| class=""" + __name__ + """.ExceptionFormatter |
| format=%(levelname)s:%(name)s:%(message)s |
| datefmt= |
| """ |
| |
| # config5 specifies a custom handler class to be loaded |
| config5 = config1.replace('class=StreamHandler', 'class=logging.StreamHandler') |
| |
| # config6 uses ', ' delimiters in the handlers and formatters sections |
| config6 = """ |
| [loggers] |
| keys=root,parser |
| |
| [handlers] |
| keys=hand1, hand2 |
| |
| [formatters] |
| keys=form1, form2 |
| |
| [logger_root] |
| level=WARNING |
| handlers= |
| |
| [logger_parser] |
| level=DEBUG |
| handlers=hand1 |
| propagate=1 |
| qualname=compiler.parser |
| |
| [handler_hand1] |
| class=StreamHandler |
| level=NOTSET |
| formatter=form1 |
| args=(sys.stdout,) |
| |
| [handler_hand2] |
| class=StreamHandler |
| level=NOTSET |
| formatter=form1 |
| args=(sys.stderr,) |
| |
| [formatter_form1] |
| format=%(levelname)s ++ %(message)s |
| datefmt= |
| |
| [formatter_form2] |
| format=%(message)s |
| datefmt= |
| """ |
| |
| def apply_config(self, conf): |
| try: |
| fn = tempfile.mktemp(".ini") |
| f = open(fn, "w") |
| f.write(textwrap.dedent(conf)) |
| f.close() |
| logging.config.fileConfig(fn) |
| finally: |
| os.remove(fn) |
| |
| def test_config0_ok(self): |
| # A simple config file which overrides the default settings. |
| with captured_stdout() as output: |
| self.apply_config(self.config0) |
| logger = logging.getLogger() |
| # Won't output anything |
| logger.info(self.next_message()) |
| # Outputs a message |
| logger.error(self.next_message()) |
| self.assert_log_lines([ |
| ('ERROR', '2'), |
| ], stream=output) |
| # Original logger output is empty. |
| self.assert_log_lines([]) |
| |
| def test_config1_ok(self, config=config1): |
| # A config file defining a sub-parser as well. |
| with captured_stdout() as output: |
| self.apply_config(config) |
| logger = logging.getLogger("compiler.parser") |
| # Both will output a message |
| logger.info(self.next_message()) |
| logger.error(self.next_message()) |
| self.assert_log_lines([ |
| ('INFO', '1'), |
| ('ERROR', '2'), |
| ], stream=output) |
| # Original logger output is empty. |
| self.assert_log_lines([]) |
| |
| def test_config2_failure(self): |
| # A simple config file which overrides the default settings. |
| self.assertRaises(Exception, self.apply_config, self.config2) |
| |
| def test_config3_failure(self): |
| # A simple config file which overrides the default settings. |
| self.assertRaises(Exception, self.apply_config, self.config3) |
| |
| def test_config4_ok(self): |
| # A config file specifying a custom formatter class. |
| with captured_stdout() as output: |
| self.apply_config(self.config4) |
| logger = logging.getLogger() |
| try: |
| raise RuntimeError() |
| except RuntimeError: |
| logging.exception("just testing") |
| sys.stdout.seek(0) |
| self.assertEquals(output.getvalue(), |
| "ERROR:root:just testing\nGot a [RuntimeError]\n") |
| # Original logger output is empty |
| self.assert_log_lines([]) |
| |
| def test_config5_ok(self): |
| self.test_config1_ok(config=self.config5) |
| |
| def test_config6_ok(self): |
| self.test_config1_ok(config=self.config6) |
| |
| class LogRecordStreamHandler(StreamRequestHandler): |
| |
| """Handler for a streaming logging request. It saves the log message in the |
| TCP server's 'log_output' attribute.""" |
| |
| TCP_LOG_END = "!!!END!!!" |
| |
| def handle(self): |
| """Handle multiple requests - each expected to be of 4-byte length, |
| followed by the LogRecord in pickle format. Logs the record |
| according to whatever policy is configured locally.""" |
| while True: |
| chunk = self.connection.recv(4) |
| if len(chunk) < 4: |
| break |
| slen = struct.unpack(">L", chunk)[0] |
| chunk = self.connection.recv(slen) |
| while len(chunk) < slen: |
| chunk = chunk + self.connection.recv(slen - len(chunk)) |
| obj = self.unpickle(chunk) |
| record = logging.makeLogRecord(obj) |
| self.handle_log_record(record) |
| |
| def unpickle(self, data): |
| return pickle.loads(data) |
| |
| def handle_log_record(self, record): |
| # If the end-of-messages sentinel is seen, tell the server to |
| # terminate. |
| if self.TCP_LOG_END in record.msg: |
| self.server.abort = 1 |
| return |
| self.server.log_output += record.msg + "\n" |
| |
| |
| class LogRecordSocketReceiver(ThreadingTCPServer): |
| |
| """A simple-minded TCP socket-based logging receiver suitable for test |
| purposes.""" |
| |
| allow_reuse_address = 1 |
| log_output = "" |
| |
| def __init__(self, host='localhost', |
| port=logging.handlers.DEFAULT_TCP_LOGGING_PORT, |
| handler=LogRecordStreamHandler): |
| ThreadingTCPServer.__init__(self, (host, port), handler) |
| self.abort = False |
| self.timeout = 0.1 |
| self.finished = threading.Event() |
| |
| def serve_until_stopped(self): |
| while not self.abort: |
| rd, wr, ex = select.select([self.socket.fileno()], [], [], |
| self.timeout) |
| if rd: |
| self.handle_request() |
| # Notify the main thread that we're about to exit |
| self.finished.set() |
| # close the listen socket |
| self.server_close() |
| |
| |
| class SocketHandlerTest(BaseTest): |
| |
| """Test for SocketHandler objects.""" |
| |
| def setUp(self): |
| """Set up a TCP server to receive log messages, and a SocketHandler |
| pointing to that server's address and port.""" |
| BaseTest.setUp(self) |
| self.tcpserver = LogRecordSocketReceiver(port=0) |
| self.port = self.tcpserver.socket.getsockname()[1] |
| self.threads = [ |
| threading.Thread(target=self.tcpserver.serve_until_stopped)] |
| for thread in self.threads: |
| thread.start() |
| |
| self.sock_hdlr = logging.handlers.SocketHandler('localhost', self.port) |
| self.sock_hdlr.setFormatter(self.root_formatter) |
| self.root_logger.removeHandler(self.root_logger.handlers[0]) |
| self.root_logger.addHandler(self.sock_hdlr) |
| |
| def tearDown(self): |
| """Shutdown the TCP server.""" |
| try: |
| self.tcpserver.abort = True |
| del self.tcpserver |
| self.root_logger.removeHandler(self.sock_hdlr) |
| self.sock_hdlr.close() |
| for thread in self.threads: |
| thread.join(2.0) |
| finally: |
| BaseTest.tearDown(self) |
| |
| def get_output(self): |
| """Get the log output as received by the TCP server.""" |
| # Signal the TCP receiver and wait for it to terminate. |
| self.root_logger.critical(LogRecordStreamHandler.TCP_LOG_END) |
| self.tcpserver.finished.wait(2.0) |
| return self.tcpserver.log_output |
| |
| def test_output(self): |
| # The log message sent to the SocketHandler is properly received. |
| logger = logging.getLogger("tcp") |
| logger.error("spam") |
| logger.debug("eggs") |
| self.assertEquals(self.get_output(), "spam\neggs\n") |
| |
| |
| class MemoryTest(BaseTest): |
| |
| """Test memory persistence of logger objects.""" |
| |
| def setUp(self): |
| """Create a dict to remember potentially destroyed objects.""" |
| BaseTest.setUp(self) |
| self._survivors = {} |
| |
| def _watch_for_survival(self, *args): |
| """Watch the given objects for survival, by creating weakrefs to |
| them.""" |
| for obj in args: |
| key = id(obj), repr(obj) |
| self._survivors[key] = weakref.ref(obj) |
| |
| def _assertTruesurvival(self): |
| """Assert that all objects watched for survival have survived.""" |
| # Trigger cycle breaking. |
| gc.collect() |
| dead = [] |
| for (id_, repr_), ref in list(self._survivors.items()): |
| if ref() is None: |
| dead.append(repr_) |
| if dead: |
| self.fail("%d objects should have survived " |
| "but have been destroyed: %s" % (len(dead), ", ".join(dead))) |
| |
| def test_persistent_loggers(self): |
| # Logger objects are persistent and retain their configuration, even |
| # if visible references are destroyed. |
| self.root_logger.setLevel(logging.INFO) |
| foo = logging.getLogger("foo") |
| self._watch_for_survival(foo) |
| foo.setLevel(logging.DEBUG) |
| self.root_logger.debug(self.next_message()) |
| foo.debug(self.next_message()) |
| self.assert_log_lines([ |
| ('foo', 'DEBUG', '2'), |
| ]) |
| del foo |
| # foo has survived. |
| self._assertTruesurvival() |
| # foo has retained its settings. |
| bar = logging.getLogger("foo") |
| bar.debug(self.next_message()) |
| self.assert_log_lines([ |
| ('foo', 'DEBUG', '2'), |
| ('foo', 'DEBUG', '3'), |
| ]) |
| |
| |
| class EncodingTest(BaseTest): |
| def test_encoding_plain_file(self): |
| # In Python 2.x, a plain file object is treated as having no encoding. |
| log = logging.getLogger("test") |
| fn = tempfile.mktemp(".log") |
| # the non-ascii data we write to the log. |
| data = "foo\x80" |
| try: |
| handler = logging.FileHandler(fn, encoding="utf8") |
| log.addHandler(handler) |
| try: |
| # write non-ascii data to the log. |
| log.warning(data) |
| finally: |
| log.removeHandler(handler) |
| handler.close() |
| # check we wrote exactly those bytes, ignoring trailing \n etc |
| f = open(fn, encoding="utf8") |
| try: |
| self.assertEqual(f.read().rstrip(), data) |
| finally: |
| f.close() |
| finally: |
| if os.path.isfile(fn): |
| os.remove(fn) |
| |
| def test_encoding_cyrillic_unicode(self): |
| log = logging.getLogger("test") |
| #Get a message in Unicode: Do svidanya in Cyrillic (meaning goodbye) |
| message = '\u0434\u043e \u0441\u0432\u0438\u0434\u0430\u043d\u0438\u044f' |
| #Ensure it's written in a Cyrillic encoding |
| writer_class = codecs.getwriter('cp1251') |
| writer_class.encoding = 'cp1251' |
| stream = io.BytesIO() |
| writer = writer_class(stream, 'strict') |
| handler = logging.StreamHandler(writer) |
| log.addHandler(handler) |
| try: |
| log.warning(message) |
| finally: |
| log.removeHandler(handler) |
| handler.close() |
| # check we wrote exactly those bytes, ignoring trailing \n etc |
| s = stream.getvalue() |
| #Compare against what the data should be when encoded in CP-1251 |
| self.assertEqual(s, b'\xe4\xee \xf1\xe2\xe8\xe4\xe0\xed\xe8\xff\n') |
| |
| |
| class WarningsTest(BaseTest): |
| |
| def test_warnings(self): |
| with warnings.catch_warnings(): |
| logging.captureWarnings(True) |
| try: |
| warnings.filterwarnings("always", category=UserWarning) |
| file = io.StringIO() |
| h = logging.StreamHandler(file) |
| logger = logging.getLogger("py.warnings") |
| logger.addHandler(h) |
| warnings.warn("I'm warning you...") |
| logger.removeHandler(h) |
| s = file.getvalue() |
| h.close() |
| self.assertTrue(s.find("UserWarning: I'm warning you...\n") > 0) |
| |
| #See if an explicit file uses the original implementation |
| file = io.StringIO() |
| warnings.showwarning("Explicit", UserWarning, "dummy.py", 42, |
| file, "Dummy line") |
| s = file.getvalue() |
| file.close() |
| self.assertEqual(s, |
| "dummy.py:42: UserWarning: Explicit\n Dummy line\n") |
| finally: |
| logging.captureWarnings(False) |
| |
| # Set the locale to the platform-dependent default. I have no idea |
| # why the test does this, but in any case we save the current locale |
| # first and restore it at the end. |
| @run_with_locale('LC_ALL', '') |
| def test_main(): |
| run_unittest(BuiltinLevelsTest, BasicFilterTest, |
| CustomLevelsAndFiltersTest, MemoryHandlerTest, |
| ConfigFileTest, SocketHandlerTest, MemoryTest, |
| EncodingTest, WarningsTest) |
| |
| if __name__ == "__main__": |
| test_main() |