| #!/usr/bin/env python |
| # |
| # Copyright 2001-2002 by Vinay Sajip. All Rights Reserved. |
| # |
| # Permission to use, copy, modify, and distribute this software and its |
| # documentation for any purpose and without fee is hereby granted, |
| # provided that the above copyright notice appear in all copies and that |
| # both that copyright notice and this permission notice appear in |
| # supporting documentation, and that the name of Vinay Sajip |
| # not be used in advertising or publicity pertaining to distribution |
| # of the software without specific, written prior permission. |
| # VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING |
| # ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL |
| # VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR |
| # ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER |
| # IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT |
| # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. |
| # |
| # This file is part of the Python logging distribution. See |
| # http://www.red-dove.com/python_logging.html |
| # |
| """Test harness for the logging module. Run all tests. |
| |
| Copyright (C) 2001-2002 Vinay Sajip. All Rights Reserved. |
| """ |
| |
| import select |
| import os, sys, string, struct, types, cPickle, cStringIO |
| import socket, threading, time, locale |
| import logging, logging.handlers, logging.config |
| |
| try: |
| locale.setlocale(locale.LC_ALL, '') |
| except (ValueError, locale.Error): |
| # this happens on a Solaris box which only supports "C" locale |
| # or a Mac OS X box which supports very little locale stuff at all |
| pass |
| |
| BANNER = "-- %-10s %-6s ---------------------------------------------------\n" |
| |
| #---------------------------------------------------------------------------- |
| # Log receiver |
| #---------------------------------------------------------------------------- |
| |
| TIMEOUT = 10 |
| |
| from SocketServer import ThreadingTCPServer, StreamRequestHandler |
| |
| class LogRecordStreamHandler(StreamRequestHandler): |
| """ |
| Handler for a streaming logging request. It basically logs the record |
| using whatever logging policy is configured locally. |
| """ |
| |
| def handle(self): |
| """ |
| Handle multiple requests - each expected to be a 4-byte length, |
| followed by the LogRecord in pickle format. Logs the record |
| according to whatever policy is configured locally. |
| """ |
| while 1: |
| try: |
| chunk = self.connection.recv(4) |
| if len(chunk) < 4: |
| break |
| slen = struct.unpack(">L", chunk)[0] |
| chunk = self.connection.recv(slen) |
| while len(chunk) < slen: |
| chunk = chunk + self.connection.recv(slen - len(chunk)) |
| obj = self.unPickle(chunk) |
| record = logging.LogRecord(None, None, "", 0, "", (), None) |
| record.__dict__.update(obj) |
| self.handleLogRecord(record) |
| except: |
| raise |
| |
| def unPickle(self, data): |
| return cPickle.loads(data) |
| |
| def handleLogRecord(self, record): |
| logname = "logrecv.tcp." + record.name |
| record.msg = record.msg + " (via " + logname + ")" |
| logger = logging.getLogger(logname) |
| logger.handle(record) |
| |
| class LogRecordSocketReceiver(ThreadingTCPServer): |
| """ |
| A simple-minded TCP socket-based logging receiver suitable for test |
| purposes. |
| """ |
| |
| allow_reuse_address = 1 |
| |
| def __init__(self, host='localhost', |
| port=logging.handlers.DEFAULT_TCP_LOGGING_PORT, |
| handler=LogRecordStreamHandler): |
| ThreadingTCPServer.__init__(self, (host, port), handler) |
| self.abort = 0 |
| self.timeout = 1 |
| |
| def serve_until_stopped(self): |
| abort = 0 |
| while not abort: |
| rd, wr, ex = select.select([self.socket.fileno()], |
| [], [], |
| self.timeout) |
| if rd: |
| self.handle_request() |
| abort = self.abort |
| |
| def process_request(self, request, client_address): |
| #import threading |
| t = threading.Thread(target = self.finish_request, |
| args = (request, client_address)) |
| t.start() |
| |
| def runTCP(tcpserver): |
| tcpserver.serve_until_stopped() |
| |
| #---------------------------------------------------------------------------- |
| # Test 0 |
| #---------------------------------------------------------------------------- |
| |
| msgcount = 0 |
| |
| def nextmessage(): |
| global msgcount |
| rv = "Message %d" % msgcount |
| msgcount = msgcount + 1 |
| return rv |
| |
| def test0(): |
| ERR = logging.getLogger("ERR") |
| ERR.setLevel(logging.ERROR) |
| INF = logging.getLogger("INF") |
| INF.setLevel(logging.INFO) |
| INF_ERR = logging.getLogger("INF.ERR") |
| INF_ERR.setLevel(logging.ERROR) |
| DEB = logging.getLogger("DEB") |
| DEB.setLevel(logging.DEBUG) |
| |
| INF_UNDEF = logging.getLogger("INF.UNDEF") |
| INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF") |
| UNDEF = logging.getLogger("UNDEF") |
| |
| GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF") |
| CHILD = logging.getLogger("INF.BADPARENT") |
| |
| #These should log |
| ERR.log(logging.FATAL, nextmessage()) |
| ERR.error(nextmessage()) |
| |
| INF.log(logging.FATAL, nextmessage()) |
| INF.error(nextmessage()) |
| INF.warn(nextmessage()) |
| INF.info(nextmessage()) |
| |
| INF_UNDEF.log(logging.FATAL, nextmessage()) |
| INF_UNDEF.error(nextmessage()) |
| INF_UNDEF.warn (nextmessage()) |
| INF_UNDEF.info (nextmessage()) |
| |
| INF_ERR.log(logging.FATAL, nextmessage()) |
| INF_ERR.error(nextmessage()) |
| |
| INF_ERR_UNDEF.log(logging.FATAL, nextmessage()) |
| INF_ERR_UNDEF.error(nextmessage()) |
| |
| DEB.log(logging.FATAL, nextmessage()) |
| DEB.error(nextmessage()) |
| DEB.warn (nextmessage()) |
| DEB.info (nextmessage()) |
| DEB.debug(nextmessage()) |
| |
| UNDEF.log(logging.FATAL, nextmessage()) |
| UNDEF.error(nextmessage()) |
| UNDEF.warn (nextmessage()) |
| UNDEF.info (nextmessage()) |
| |
| GRANDCHILD.log(logging.FATAL, nextmessage()) |
| CHILD.log(logging.FATAL, nextmessage()) |
| |
| #These should not log |
| ERR.warn(nextmessage()) |
| ERR.info(nextmessage()) |
| ERR.debug(nextmessage()) |
| |
| INF.debug(nextmessage()) |
| INF_UNDEF.debug(nextmessage()) |
| |
| INF_ERR.warn(nextmessage()) |
| INF_ERR.info(nextmessage()) |
| INF_ERR.debug(nextmessage()) |
| INF_ERR_UNDEF.warn(nextmessage()) |
| INF_ERR_UNDEF.info(nextmessage()) |
| INF_ERR_UNDEF.debug(nextmessage()) |
| |
| INF.info("Messages should bear numbers 0 through 24.") |
| |
| #---------------------------------------------------------------------------- |
| # Test 1 |
| #---------------------------------------------------------------------------- |
| |
| # |
| # First, we define our levels. There can be as many as you want - the only |
| # limitations are that they should be integers, the lowest should be > 0 and |
| # larger values mean less information being logged. If you need specific |
| # level values which do not fit into these limitations, you can use a |
| # mapping dictionary to convert between your application levels and the |
| # logging system. |
| # |
| SILENT = 10 |
| TACITURN = 9 |
| TERSE = 8 |
| EFFUSIVE = 7 |
| SOCIABLE = 6 |
| VERBOSE = 5 |
| TALKATIVE = 4 |
| GARRULOUS = 3 |
| CHATTERBOX = 2 |
| BORING = 1 |
| |
| LEVEL_RANGE = range(BORING, SILENT + 1) |
| |
| # |
| # Next, we define names for our levels. You don't need to do this - in which |
| # case the system will use "Level n" to denote the text for the level. |
| # |
| my_logging_levels = { |
| SILENT : 'Silent', |
| TACITURN : 'Taciturn', |
| TERSE : 'Terse', |
| EFFUSIVE : 'Effusive', |
| SOCIABLE : 'Sociable', |
| VERBOSE : 'Verbose', |
| TALKATIVE : 'Talkative', |
| GARRULOUS : 'Garrulous', |
| CHATTERBOX : 'Chatterbox', |
| BORING : 'Boring', |
| } |
| |
| # |
| # Now, to demonstrate filtering: suppose for some perverse reason we only |
| # want to print out all except GARRULOUS messages. Let's create a filter for |
| # this purpose... |
| # |
| class SpecificLevelFilter(logging.Filter): |
| def __init__(self, lvl): |
| self.level = lvl |
| |
| def filter(self, record): |
| return self.level != record.levelno |
| |
| class GarrulousFilter(SpecificLevelFilter): |
| def __init__(self): |
| SpecificLevelFilter.__init__(self, GARRULOUS) |
| |
| # |
| # Now, let's demonstrate filtering at the logger. This time, use a filter |
| # which excludes SOCIABLE and TACITURN messages. Note that GARRULOUS events |
| # are still excluded. |
| # |
| class VerySpecificFilter(logging.Filter): |
| def filter(self, record): |
| return record.levelno not in [SOCIABLE, TACITURN] |
| |
| def message(s): |
| sys.stdout.write("%s\n" % s) |
| |
| SHOULD1 = "This should only be seen at the '%s' logging level (or lower)" |
| |
| def test1(): |
| # |
| # Now, tell the logging system to associate names with our levels. |
| # |
| for lvl in my_logging_levels.keys(): |
| logging.addLevelName(lvl, my_logging_levels[lvl]) |
| |
| # |
| # Now, define a test function which logs an event at each of our levels. |
| # |
| |
| def doLog(log): |
| for lvl in LEVEL_RANGE: |
| log.log(lvl, SHOULD1, logging.getLevelName(lvl)) |
| |
| log = logging.getLogger("") |
| hdlr = log.handlers[0] |
| # |
| # Set the logging level to each different value and call the utility |
| # function to log events. |
| # In the output, you should see that each time round the loop, the number of |
| # logging events which are actually output decreases. |
| # |
| for lvl in LEVEL_RANGE: |
| message("-- setting logging level to '%s' -----" % |
| logging.getLevelName(lvl)) |
| log.setLevel(lvl) |
| doLog(log) |
| # |
| # Now, we demonstrate level filtering at the handler level. Tell the |
| # handler defined above to filter at level 'SOCIABLE', and repeat the |
| # above loop. Compare the output from the two runs. |
| # |
| hdlr.setLevel(SOCIABLE) |
| message("-- Filtering at handler level to SOCIABLE --") |
| for lvl in LEVEL_RANGE: |
| message("-- setting logging level to '%s' -----" % |
| logging.getLevelName(lvl)) |
| log.setLevel(lvl) |
| doLog(log) |
| |
| hdlr.setLevel(0) #turn off level filtering at the handler |
| |
| garr = GarrulousFilter() |
| hdlr.addFilter(garr) |
| message("-- Filtering using GARRULOUS filter --") |
| for lvl in LEVEL_RANGE: |
| message("-- setting logging level to '%s' -----" % |
| logging.getLevelName(lvl)) |
| log.setLevel(lvl) |
| doLog(log) |
| spec = VerySpecificFilter() |
| log.addFilter(spec) |
| message("-- Filtering using specific filter for SOCIABLE, TACITURN --") |
| for lvl in LEVEL_RANGE: |
| message("-- setting logging level to '%s' -----" % |
| logging.getLevelName(lvl)) |
| log.setLevel(lvl) |
| doLog(log) |
| |
| log.removeFilter(spec) |
| hdlr.removeFilter(garr) |
| #Undo the one level which clashes...for regression tests |
| logging.addLevelName(logging.DEBUG, "DEBUG") |
| |
| #---------------------------------------------------------------------------- |
| # Test 2 |
| #---------------------------------------------------------------------------- |
| |
| MSG = "-- logging %d at INFO, messages should be seen every 10 events --" |
| def test2(): |
| logger = logging.getLogger("") |
| sh = logger.handlers[0] |
| sh.close() |
| logger.removeHandler(sh) |
| mh = logging.handlers.MemoryHandler(10,logging.WARNING, sh) |
| logger.setLevel(logging.DEBUG) |
| logger.addHandler(mh) |
| message("-- logging at DEBUG, nothing should be seen yet --") |
| logger.debug("Debug message") |
| message("-- logging at INFO, nothing should be seen yet --") |
| logger.info("Info message") |
| message("-- logging at WARNING, 3 messages should be seen --") |
| logger.warn("Warn message") |
| for i in xrange(102): |
| message(MSG % i) |
| logger.info("Info index = %d", i) |
| mh.close() |
| logger.removeHandler(mh) |
| logger.addHandler(sh) |
| |
| #---------------------------------------------------------------------------- |
| # Test 3 |
| #---------------------------------------------------------------------------- |
| |
| FILTER = "a.b" |
| |
| def doLog3(): |
| logging.getLogger("a").info("Info 1") |
| logging.getLogger("a.b").info("Info 2") |
| logging.getLogger("a.c").info("Info 3") |
| logging.getLogger("a.b.c").info("Info 4") |
| logging.getLogger("a.b.c.d").info("Info 5") |
| logging.getLogger("a.bb.c").info("Info 6") |
| logging.getLogger("b").info("Info 7") |
| logging.getLogger("b.a").info("Info 8") |
| logging.getLogger("c.a.b").info("Info 9") |
| logging.getLogger("a.bb").info("Info 10") |
| |
| def test3(): |
| root = logging.getLogger() |
| root.setLevel(logging.DEBUG) |
| hand = root.handlers[0] |
| message("Unfiltered...") |
| doLog3() |
| message("Filtered with '%s'..." % FILTER) |
| filt = logging.Filter(FILTER) |
| hand.addFilter(filt) |
| doLog3() |
| hand.removeFilter(filt) |
| |
| #---------------------------------------------------------------------------- |
| # Test Harness |
| #---------------------------------------------------------------------------- |
| def banner(nm, typ): |
| sep = BANNER % (nm, typ) |
| sys.stdout.write(sep) |
| sys.stdout.flush() |
| |
| def test_main(): |
| rootLogger = logging.getLogger("") |
| rootLogger.setLevel(logging.DEBUG) |
| hdlr = logging.StreamHandler(sys.stdout) |
| fmt = logging.Formatter(logging.BASIC_FORMAT) |
| hdlr.setFormatter(fmt) |
| rootLogger.addHandler(hdlr) |
| |
| #Set up a handler such that all events are sent via a socket to the log |
| #receiver (logrecv). |
| #The handler will only be added to the rootLogger for some of the tests |
| hdlr = logging.handlers.SocketHandler('localhost', |
| logging.handlers.DEFAULT_TCP_LOGGING_PORT) |
| |
| #Configure the logger for logrecv so events do not propagate beyond it. |
| #The sockLogger output is buffered in memory until the end of the test, |
| #and printed at the end. |
| sockOut = cStringIO.StringIO() |
| sockLogger = logging.getLogger("logrecv") |
| sockLogger.setLevel(logging.DEBUG) |
| sockhdlr = logging.StreamHandler(sockOut) |
| sockhdlr.setFormatter(logging.Formatter( |
| "%(name)s -> %(levelname)s: %(message)s")) |
| sockLogger.addHandler(sockhdlr) |
| sockLogger.propagate = 0 |
| |
| #Set up servers |
| threads = [] |
| tcpserver = LogRecordSocketReceiver() |
| #sys.stdout.write("About to start TCP server...\n") |
| threads.append(threading.Thread(target=runTCP, args=(tcpserver,))) |
| |
| for thread in threads: |
| thread.start() |
| try: |
| banner("log_test0", "begin") |
| |
| rootLogger.addHandler(hdlr) |
| test0() |
| hdlr.close() |
| rootLogger.removeHandler(hdlr) |
| |
| banner("log_test0", "end") |
| |
| banner("log_test1", "begin") |
| test1() |
| banner("log_test1", "end") |
| |
| banner("log_test2", "begin") |
| test2() |
| banner("log_test2", "end") |
| |
| banner("log_test3", "begin") |
| test3() |
| banner("log_test3", "end") |
| |
| finally: |
| #shut down server |
| tcpserver.abort = 1 |
| for thread in threads: |
| thread.join() |
| banner("logrecv output", "begin") |
| sys.stdout.write(sockOut.getvalue()) |
| sockOut.close() |
| banner("logrecv output", "end") |
| sys.stdout.flush() |
| |
| if __name__ == "__main__": |
| sys.stdout.write("test_logging\n") |
| test_main() |