[autotest] Scheduler Log distiller.

Parses scheduler logs and figures out state transitions of a job
or of all jobs in a suite.

Usage: ./log_distiller.py <job or suite job id> <path to logs>

TEST=Ran against scheduler logs I copied locally.
BUG=None

Change-Id: Idad72214ceb3028bf54206c3ae72887b57ffb989
Reviewed-on: https://chromium-review.googlesource.com/196542
Tested-by: Prashanth B <beeps@chromium.org>
Reviewed-by: Dan Shi <dshi@chromium.org>
Reviewed-by: Alex Miller <milleral@chromium.org>
Commit-Queue: Prashanth B <beeps@chromium.org>
diff --git a/contrib/log_distiller.py b/contrib/log_distiller.py
new file mode 100755
index 0000000..f44b132
--- /dev/null
+++ b/contrib/log_distiller.py
@@ -0,0 +1,376 @@
+#!/usr/bin/python
+"""
+Usage: ./cron_scripts/log_distiller.py job_id path_to_logfile
+    If the job_id is a suite it will find all subjobs.
+You need to change the location of the log it will parse.
+The job_id needs to be in the afe database.
+"""
+import abc
+import datetime
+import os
+import re
+import pprint
+import subprocess
+import sys
+import time
+
+import common
+from autotest_lib.server import frontend
+
+
+LOGFIE = './logs/scheduler.log.2014-04-17-16.51.47'
+# logfile name format: scheduler.log.2014-02-14-18.10.56
+time_format = '%Y-%m-%d-%H.%M.%S'
+logfile_regex = r'scheduler.log.([0-9,.,-]+)'
+logdir = os.path.join('/usr/local/autotest', 'logs')
+
+class StateMachineViolation(Exception):
+    pass
+
+
+class LogLineException(Exception):
+    pass
+
+
+def should_process_log(time_str, time_format, cutoff_days=7):
+    """Returns true if the logs was created after cutoff days.
+
+    @param time_str: A string representing the time.
+        eg: 2014-02-14-18.10.56
+    @param time_format: A string representing the format of the time string.
+        ref: http://docs.python.org/2/library/datetime.html#strftime-strptime-behavior
+    @param cutoff_days: Int representind the cutoff in days.
+
+    @return: Returns True if time_str has aged more than cutoff_days.
+    """
+    log_time = datetime.datetime.strptime(time_str, time_format)
+    now = datetime.datetime.strptime(time.strftime(time_format), time_format)
+    cutoff = now - datetime.timedelta(days=cutoff_days)
+    return log_time < cutoff
+
+
+def apply_regex(regex, line):
+    """Simple regex applicator.
+
+    @param regex: Regex to apply.
+    @param line: The line to apply regex on.
+
+    @return: A tuple with the matching groups, if there was a match.
+    """
+    log_match  = re.match(regex, line)
+    if log_match:
+        return log_match.groups()
+
+
+class StateMachineParser(object):
+    """Abstract class that enforces state transition ordering.
+
+    Classes inheriting from StateMachineParser need to define an
+    expected_transitions dictionary. The SMP will pop 'to' states
+    from the dictionary as they occur, so you cannot same state transitions
+    unless you specify 2 of them.
+    """
+    __metaclass__ = abc.ABCMeta
+
+
+    @abc.abstractmethod
+    def __init__(self):
+        self.visited_states = []
+        self.expected_transitions = {}
+
+
+    def advance_state(self, from_state, to_state):
+        """Checks that a transition is valid.
+
+        @param from_state: A string representind the state the host is leaving.
+        @param to_state: The state The host is going to, represented as a string.
+
+        @raises LogLineException: If an invalid state transition was
+            detected.
+        """
+        # TODO: Updating to the same state is a waste of bw.
+        if from_state and from_state == to_state:
+            return ('Updating to the same state is a waste of BW: %s->%s' %
+                    (from_state, to_state))
+            return
+
+        if (from_state in self.expected_transitions and
+            to_state in self.expected_transitions[from_state]):
+            self.expected_transitions[from_state].remove(to_state)
+            self.visited_states.append(to_state)
+        else:
+            return (from_state, to_state)
+
+
+class SingleJobHostSMP(StateMachineParser):
+    def __init__(self):
+        self.visited_states = []
+        self.expected_transitions = {
+                'Ready': ['Resetting', 'Verifying', 'Pending', 'Provisioning'],
+                'Resetting': ['Ready', 'Provisioning'],
+                'Pending': ['Running'],
+                'Provisioning': ['Repairing'],
+                'Running': ['Ready']
+        }
+
+
+    def check_transitions(self, hostline):
+        if hostline.line_info['field'] == 'status':
+            self.advance_state(hostline.line_info['state'],
+                    hostline.line_info['value'])
+
+
+class SingleJobHqeSMP(StateMachineParser):
+    def __init__(self):
+        self.visited_states = []
+        self.expected_transitions = {
+                'Queued': ['Starting', 'Resetting', 'Aborted'],
+                'Resetting': ['Pending', 'Provisioning'],
+                'Provisioning': ['Pending', 'Queued', 'Repairing'],
+                'Pending': ['Starting'],
+                'Starting': ['Running'],
+                'Running': ['Gathering', 'Parsing'],
+                'Gathering': ['Parsing'],
+                'Parsing': ['Completed', 'Aborted']
+        }
+
+
+    def check_transitions(self, hqeline):
+        invalid_states = self.advance_state(
+                hqeline.line_info['from_state'], hqeline.line_info['to_state'])
+        if not invalid_states:
+            return
+
+        # Deal with repair.
+        if (invalid_states[0] == 'Queued' and
+            'Running' in self.visited_states):
+            raise StateMachineViolation('Unrecognized state transition '
+                    '%s->%s, expected transitions are %s' %
+                    (invalid_states[0], invalid_states[1],
+                     self.expected_transitions))
+
+
+class LogLine(object):
+    """Line objects.
+
+    All classes inheriting from LogLine represent a line of some sort.
+    A line is responsible for parsing itself, and invoking an SMP to
+    validate state transitions. A line can be part of several state machines.
+    """
+    line_format = '%s'
+
+
+    def __init__(self, state_machine_parsers):
+        """
+        @param state_machine_parsers: A list of smp objects to use to validate
+            state changes on these types of lines..
+        """
+        self.smps = state_machine_parsers
+
+        # Because, this is easier to flush.
+        self.line_info = {}
+
+
+    def parse_line(self, line):
+        """Apply a line regex and save any information the parsed line contains.
+
+        @param line: A string representing a line.
+        """
+        # Regex for all the things.
+        line_rgx = '(.*)'
+        parsed_line = apply_regex(line_rgx, line)
+        if parsed_line:
+            self.line_info['line'] = parsed_line[0]
+
+
+    def flush(self):
+        """Call any state machine parsers, persist line info if needed.
+        """
+        for smp in self.smps:
+            smp.check_transitions(self)
+        # TODO: persist this?
+        self.line_info={}
+
+
+    def format_line(self):
+        try:
+            return self.line_format % self.line_info
+        except KeyError:
+            return self.line_info['line']
+
+
+class TimeLine(LogLine):
+    """Filters timestamps for scheduler logs.
+    """
+
+    def parse_line(self, line):
+        super(TimeLine, self).parse_line(line)
+
+        # Regex for isolating the date and time from scheduler logs, eg:
+        # 02/16 16:04:36.573 INFO |scheduler_:0574|...
+        line_rgx = '([0-9,/,:,., ]+)(.*)'
+        parsed_line = apply_regex(line_rgx, self.line_info['line'])
+        if parsed_line:
+            self.line_info['time'] = parsed_line[0]
+            self.line_info['line'] = parsed_line[1]
+
+
+class HostLine(TimeLine):
+    """Manages hosts line parsing.
+    """
+    line_format = (' \t\t %(time)s %(host)s, currently in %(state)s, '
+                'updated %(field)s->%(value)s')
+
+
+    def record_state_transition(self, line):
+        """Apply the state_transition_rgx to a line and record state changes.
+
+        @param line: The line we're expecting to contain a state transition.
+        """
+        state_transition_rgx = ".* ([a-zA-Z]+) updating {'([a-zA-Z]+)': ('[a-zA-Z]+'|[0-9])}.*"
+        match = apply_regex(state_transition_rgx, line)
+        if match:
+            self.line_info['state'] = match[0]
+            self.line_info['field'] = match[1]
+            self.line_info['value'] = match[2].replace("'", "")
+
+
+    def parse_line(self, line):
+        super(HostLine, self).parse_line(line)
+
+        # Regex for getting host status. Eg:
+        # 172.22.4 in Running updating {'status': 'Running'}
+        line_rgx = '.*Host (([0-9,.,a-z,-]+).*)'
+        parsed_line = apply_regex(line_rgx, self.line_info['line'])
+        if parsed_line:
+            self.line_info['line'] = parsed_line[0]
+            self.line_info['host'] = parsed_line[1]
+            self.record_state_transition(self.line_info['line'])
+            return self.format_line()
+
+
+class HQELine(TimeLine):
+    """Manages HQE line parsing.
+    """
+    line_format = ('%(time)s %(hqe)s, currently in %(from_state)s, '
+            'updated to %(to_state)s. Flags: %(flags)s')
+
+
+    def record_state_transition(self, line):
+        """Apply the state_transition_rgx to a line and record state changes.
+
+        @param line: The line we're expecting to contain a state transition.
+        """
+        # Regex for getting hqe status. Eg:
+        # status:Running [active] -> Gathering
+        state_transition_rgx = ".*status:([a-zA-Z]+)( \[[a-z\,]+\])? -> ([a-zA-Z]+)"
+        match = apply_regex(state_transition_rgx, line)
+        if match:
+            self.line_info['from_state'] = match[0]
+            self.line_info['flags'] = match[1]
+            self.line_info['to_state'] = match[2]
+
+
+    def parse_line(self, line):
+        super(HQELine, self).parse_line(line)
+        line_rgx = r'.*\| HQE: (([0-9]+).*)'
+        parsed_line = apply_regex(line_rgx, self.line_info['line'])
+        if parsed_line:
+            self.line_info['line'] = parsed_line[0]
+            self.line_info['hqe'] = parsed_line[1]
+            self.record_state_transition(self.line_info['line'])
+            return self.format_line()
+
+
+class LogCrawler(object):
+    """Crawl logs.
+
+    Log crawlers are meant to apply some basic preprocessing to a log, and crawl
+    the output validating state changes. They manage line and state machine
+    creation. The initial filtering applied to the log needs to be grab all lines
+    that match an action, such as the running of a job.
+    """
+
+    def __init__(self, log_name):
+        self.log = log_name
+        self.filter_command = 'cat %s' % log_name
+
+
+    def preprocess_log(self):
+        """Apply some basic filtering to the log.
+        """
+        proc = subprocess.Popen(self.filter_command,
+                shell=True, stdout=subprocess.PIPE)
+        out, err = proc.communicate()
+        return out
+
+
+class SchedulerLogCrawler(LogCrawler):
+    """A log crawler for the scheduler logs.
+
+    This crawler is only capable of processing information about a single job.
+    """
+
+    def __init__(self, log_name, **kwargs):
+        super(SchedulerLogCrawler, self).__init__(log_name)
+        self.job_id = kwargs['job_id']
+        self.line_processors = [HostLine([SingleJobHostSMP()]),
+                HQELine([SingleJobHqeSMP()])]
+        self.filter_command = ('%s | grep "for job: %s"' %
+                (self.filter_command, self.job_id))
+
+
+    def parse_log(self):
+        """Parse each line of the preprocessed log output.
+
+        Pass each line through each possible line_processor. The one that matches
+        will populate itself, call flush, this will walk the state machine of that
+        line to the next step.
+        """
+        out = self.preprocess_log()
+        response = []
+        for job_line in out.split('\n'):
+            parsed_line = None
+            for processor in self.line_processors:
+                line = processor.parse_line(job_line)
+                if line and parsed_line:
+                    raise LogLineException('Multiple Parsers claiming the line %s: '
+                            'previous parsing: %s, current parsing: %s ' %
+                            (job_line, parsed_line, line))
+                elif line:
+                    parsed_line = line
+                    try:
+                        processor.flush()
+                    except StateMachineViolation as e:
+                        response.append(str(e))
+                        raise StateMachineViolation(response)
+            response.append(parsed_line if parsed_line else job_line)
+        return response
+
+
+def process_logs():
+    if len(sys.argv) < 2:
+        print ('Usage: ./cron_scripts/log_distiller.py 0 8415620 '
+               'You need to change the location of the log it will parse.'
+                'The job_id needs to be in the afe database.')
+        sys.exit(1)
+
+    job_id = int(sys.argv[1])
+    rpc = frontend.AFE()
+    suite_jobs = rpc.run('get_jobs', id=job_id)
+    if not suite_jobs[0]['parent_job']:
+        suite_jobs = rpc.run('get_jobs', parent_job=job_id)
+    try:
+        logfile = sys.argv[2]
+    except Exception:
+        logfile = LOGFILE
+
+    for job in suite_jobs:
+        log_crawler = SchedulerLogCrawler(logfile, job_id=job['id'])
+        for line in log_crawler.parse_log():
+            print line
+    return
+
+
+if __name__ == '__main__':
+    process_logs()