| #!/usr/bin/python |
| # |
| # rt-mutex tester |
| # |
| # (C) 2006 Thomas Gleixner <tglx@linutronix.de> |
| # |
| # This program is free software; you can redistribute it and/or modify |
| # it under the terms of the GNU General Public License version 2 as |
| # published by the Free Software Foundation. |
| # |
| import os |
| import sys |
| import getopt |
| import shutil |
| import string |
| |
| # Globals |
| quiet = 0 |
| test = 0 |
| comments = 0 |
| |
| sysfsprefix = "/sys/devices/system/rttest/rttest" |
| statusfile = "/status" |
| commandfile = "/command" |
| |
| # Command opcodes |
| cmd_opcodes = { |
| "schedother" : "1", |
| "schedfifo" : "2", |
| "lock" : "3", |
| "locknowait" : "4", |
| "lockint" : "5", |
| "lockintnowait" : "6", |
| "lockcont" : "7", |
| "unlock" : "8", |
| "signal" : "11", |
| "resetevent" : "98", |
| "reset" : "99", |
| } |
| |
| test_opcodes = { |
| "prioeq" : ["P" , "eq" , None], |
| "priolt" : ["P" , "lt" , None], |
| "priogt" : ["P" , "gt" , None], |
| "nprioeq" : ["N" , "eq" , None], |
| "npriolt" : ["N" , "lt" , None], |
| "npriogt" : ["N" , "gt" , None], |
| "unlocked" : ["M" , "eq" , 0], |
| "trylock" : ["M" , "eq" , 1], |
| "blocked" : ["M" , "eq" , 2], |
| "blockedwake" : ["M" , "eq" , 3], |
| "locked" : ["M" , "eq" , 4], |
| "opcodeeq" : ["O" , "eq" , None], |
| "opcodelt" : ["O" , "lt" , None], |
| "opcodegt" : ["O" , "gt" , None], |
| "eventeq" : ["E" , "eq" , None], |
| "eventlt" : ["E" , "lt" , None], |
| "eventgt" : ["E" , "gt" , None], |
| } |
| |
| # Print usage information |
| def usage(): |
| print "rt-tester.py <-c -h -q -t> <testfile>" |
| print " -c display comments after first command" |
| print " -h help" |
| print " -q quiet mode" |
| print " -t test mode (syntax check)" |
| print " testfile: read test specification from testfile" |
| print " otherwise from stdin" |
| return |
| |
| # Print progress when not in quiet mode |
| def progress(str): |
| if not quiet: |
| print str |
| |
| # Analyse a status value |
| def analyse(val, top, arg): |
| |
| intval = int(val) |
| |
| if top[0] == "M": |
| intval = intval / (10 ** int(arg)) |
| intval = intval % 10 |
| argval = top[2] |
| elif top[0] == "O": |
| argval = int(cmd_opcodes.get(arg, arg)) |
| else: |
| argval = int(arg) |
| |
| # progress("%d %s %d" %(intval, top[1], argval)) |
| |
| if top[1] == "eq" and intval == argval: |
| return 1 |
| if top[1] == "lt" and intval < argval: |
| return 1 |
| if top[1] == "gt" and intval > argval: |
| return 1 |
| return 0 |
| |
| # Parse the commandline |
| try: |
| (options, arguments) = getopt.getopt(sys.argv[1:],'chqt') |
| except getopt.GetoptError, ex: |
| usage() |
| sys.exit(1) |
| |
| # Parse commandline options |
| for option, value in options: |
| if option == "-c": |
| comments = 1 |
| elif option == "-q": |
| quiet = 1 |
| elif option == "-t": |
| test = 1 |
| elif option == '-h': |
| usage() |
| sys.exit(0) |
| |
| # Select the input source |
| if arguments: |
| try: |
| fd = open(arguments[0]) |
| except Exception,ex: |
| sys.stderr.write("File not found %s\n" %(arguments[0])) |
| sys.exit(1) |
| else: |
| fd = sys.stdin |
| |
| linenr = 0 |
| |
| # Read the test patterns |
| while 1: |
| |
| linenr = linenr + 1 |
| line = fd.readline() |
| if not len(line): |
| break |
| |
| line = line.strip() |
| parts = line.split(":") |
| |
| if not parts or len(parts) < 1: |
| continue |
| |
| if len(parts[0]) == 0: |
| continue |
| |
| if parts[0].startswith("#"): |
| if comments > 1: |
| progress(line) |
| continue |
| |
| if comments == 1: |
| comments = 2 |
| |
| progress(line) |
| |
| cmd = parts[0].strip().lower() |
| opc = parts[1].strip().lower() |
| tid = parts[2].strip() |
| dat = parts[3].strip() |
| |
| try: |
| # Test or wait for a status value |
| if cmd == "t" or cmd == "w": |
| testop = test_opcodes[opc] |
| |
| fname = "%s%s%s" %(sysfsprefix, tid, statusfile) |
| if test: |
| print fname |
| continue |
| |
| while 1: |
| query = 1 |
| fsta = open(fname, 'r') |
| status = fsta.readline().strip() |
| fsta.close() |
| stat = status.split(",") |
| for s in stat: |
| s = s.strip() |
| if s.startswith(testop[0]): |
| # Separate status value |
| val = s[2:].strip() |
| query = analyse(val, testop, dat) |
| break |
| if query or cmd == "t": |
| break |
| |
| progress(" " + status) |
| |
| if not query: |
| sys.stderr.write("Test failed in line %d\n" %(linenr)) |
| sys.exit(1) |
| |
| # Issue a command to the tester |
| elif cmd == "c": |
| cmdnr = cmd_opcodes[opc] |
| # Build command string and sys filename |
| cmdstr = "%s:%s" %(cmdnr, dat) |
| fname = "%s%s%s" %(sysfsprefix, tid, commandfile) |
| if test: |
| print fname |
| continue |
| fcmd = open(fname, 'w') |
| fcmd.write(cmdstr) |
| fcmd.close() |
| |
| except Exception,ex: |
| sys.stderr.write(str(ex)) |
| sys.stderr.write("\nSyntax error in line %d\n" %(linenr)) |
| if not test: |
| fd.close() |
| sys.exit(1) |
| |
| # Normal exit pass |
| print "Pass" |
| sys.exit(0) |
| |
| |