| #!/usr/bin/python2 |
| """ |
| Usage: ./cron_scripts/log_distiller.py job_id path_to_logfile |
| If the job_id is a suite it will find all subjobs. |
| You need to change the location of the log it will parse. |
| The job_id needs to be in the afe database. |
| """ |
| import abc |
| import datetime |
| import os |
| import re |
| import pprint |
| import subprocess |
| import sys |
| import time |
| |
| import common |
| from autotest_lib.server import frontend |
| |
| |
| LOGFIE = './logs/scheduler.log.2014-04-17-16.51.47' |
| # logfile name format: scheduler.log.2014-02-14-18.10.56 |
| time_format = '%Y-%m-%d-%H.%M.%S' |
| logfile_regex = r'scheduler.log.([0-9,.,-]+)' |
| logdir = os.path.join('/usr/local/autotest', 'logs') |
| |
| class StateMachineViolation(Exception): |
| pass |
| |
| |
| class LogLineException(Exception): |
| pass |
| |
| |
| def should_process_log(time_str, time_format, cutoff_days=7): |
| """Returns true if the logs was created after cutoff days. |
| |
| @param time_str: A string representing the time. |
| eg: 2014-02-14-18.10.56 |
| @param time_format: A string representing the format of the time string. |
| ref: http://docs.python.org/2/library/datetime.html#strftime-strptime-behavior |
| @param cutoff_days: Int representind the cutoff in days. |
| |
| @return: Returns True if time_str has aged more than cutoff_days. |
| """ |
| log_time = datetime.datetime.strptime(time_str, time_format) |
| now = datetime.datetime.strptime(time.strftime(time_format), time_format) |
| cutoff = now - datetime.timedelta(days=cutoff_days) |
| return log_time < cutoff |
| |
| |
| def apply_regex(regex, line): |
| """Simple regex applicator. |
| |
| @param regex: Regex to apply. |
| @param line: The line to apply regex on. |
| |
| @return: A tuple with the matching groups, if there was a match. |
| """ |
| log_match = re.match(regex, line) |
| if log_match: |
| return log_match.groups() |
| |
| |
| class StateMachineParser(object): |
| """Abstract class that enforces state transition ordering. |
| |
| Classes inheriting from StateMachineParser need to define an |
| expected_transitions dictionary. The SMP will pop 'to' states |
| from the dictionary as they occur, so you cannot same state transitions |
| unless you specify 2 of them. |
| """ |
| __metaclass__ = abc.ABCMeta |
| |
| |
| @abc.abstractmethod |
| def __init__(self): |
| self.visited_states = [] |
| self.expected_transitions = {} |
| |
| |
| def advance_state(self, from_state, to_state): |
| """Checks that a transition is valid. |
| |
| @param from_state: A string representind the state the host is leaving. |
| @param to_state: The state The host is going to, represented as a string. |
| |
| @raises LogLineException: If an invalid state transition was |
| detected. |
| """ |
| # TODO: Updating to the same state is a waste of bw. |
| if from_state and from_state == to_state: |
| return ('Updating to the same state is a waste of BW: %s->%s' % |
| (from_state, to_state)) |
| return |
| |
| if (from_state in self.expected_transitions and |
| to_state in self.expected_transitions[from_state]): |
| self.expected_transitions[from_state].remove(to_state) |
| self.visited_states.append(to_state) |
| else: |
| return (from_state, to_state) |
| |
| |
| class SingleJobHostSMP(StateMachineParser): |
| def __init__(self): |
| self.visited_states = [] |
| self.expected_transitions = { |
| 'Ready': ['Resetting', 'Verifying', 'Pending', 'Provisioning'], |
| 'Resetting': ['Ready', 'Provisioning'], |
| 'Pending': ['Running'], |
| 'Provisioning': ['Repairing'], |
| 'Running': ['Ready'] |
| } |
| |
| |
| def check_transitions(self, hostline): |
| if hostline.line_info['field'] == 'status': |
| self.advance_state(hostline.line_info['state'], |
| hostline.line_info['value']) |
| |
| |
| class SingleJobHqeSMP(StateMachineParser): |
| def __init__(self): |
| self.visited_states = [] |
| self.expected_transitions = { |
| 'Queued': ['Starting', 'Resetting', 'Aborted'], |
| 'Resetting': ['Pending', 'Provisioning'], |
| 'Provisioning': ['Pending', 'Queued', 'Repairing'], |
| 'Pending': ['Starting'], |
| 'Starting': ['Running'], |
| 'Running': ['Gathering', 'Parsing'], |
| 'Gathering': ['Parsing'], |
| 'Parsing': ['Completed', 'Aborted'] |
| } |
| |
| |
| def check_transitions(self, hqeline): |
| invalid_states = self.advance_state( |
| hqeline.line_info['from_state'], hqeline.line_info['to_state']) |
| if not invalid_states: |
| return |
| |
| # Deal with repair. |
| if (invalid_states[0] == 'Queued' and |
| 'Running' in self.visited_states): |
| raise StateMachineViolation('Unrecognized state transition ' |
| '%s->%s, expected transitions are %s' % |
| (invalid_states[0], invalid_states[1], |
| self.expected_transitions)) |
| |
| |
| class LogLine(object): |
| """Line objects. |
| |
| All classes inheriting from LogLine represent a line of some sort. |
| A line is responsible for parsing itself, and invoking an SMP to |
| validate state transitions. A line can be part of several state machines. |
| """ |
| line_format = '%s' |
| |
| |
| def __init__(self, state_machine_parsers): |
| """ |
| @param state_machine_parsers: A list of smp objects to use to validate |
| state changes on these types of lines.. |
| """ |
| self.smps = state_machine_parsers |
| |
| # Because, this is easier to flush. |
| self.line_info = {} |
| |
| |
| def parse_line(self, line): |
| """Apply a line regex and save any information the parsed line contains. |
| |
| @param line: A string representing a line. |
| """ |
| # Regex for all the things. |
| line_rgx = '(.*)' |
| parsed_line = apply_regex(line_rgx, line) |
| if parsed_line: |
| self.line_info['line'] = parsed_line[0] |
| |
| |
| def flush(self): |
| """Call any state machine parsers, persist line info if needed. |
| """ |
| for smp in self.smps: |
| smp.check_transitions(self) |
| # TODO: persist this? |
| self.line_info={} |
| |
| |
| def format_line(self): |
| try: |
| return self.line_format % self.line_info |
| except KeyError: |
| return self.line_info['line'] |
| |
| |
| class TimeLine(LogLine): |
| """Filters timestamps for scheduler logs. |
| """ |
| |
| def parse_line(self, line): |
| super(TimeLine, self).parse_line(line) |
| |
| # Regex for isolating the date and time from scheduler logs, eg: |
| # 02/16 16:04:36.573 INFO |scheduler_:0574|... |
| line_rgx = '([0-9,/,:,., ]+)(.*)' |
| parsed_line = apply_regex(line_rgx, self.line_info['line']) |
| if parsed_line: |
| self.line_info['time'] = parsed_line[0] |
| self.line_info['line'] = parsed_line[1] |
| |
| |
| class HostLine(TimeLine): |
| """Manages hosts line parsing. |
| """ |
| line_format = (' \t\t %(time)s %(host)s, currently in %(state)s, ' |
| 'updated %(field)s->%(value)s') |
| |
| |
| def record_state_transition(self, line): |
| """Apply the state_transition_rgx to a line and record state changes. |
| |
| @param line: The line we're expecting to contain a state transition. |
| """ |
| state_transition_rgx = ".* ([a-zA-Z]+) updating {'([a-zA-Z]+)': ('[a-zA-Z]+'|[0-9])}.*" |
| match = apply_regex(state_transition_rgx, line) |
| if match: |
| self.line_info['state'] = match[0] |
| self.line_info['field'] = match[1] |
| self.line_info['value'] = match[2].replace("'", "") |
| |
| |
| def parse_line(self, line): |
| super(HostLine, self).parse_line(line) |
| |
| # Regex for getting host status. Eg: |
| # 172.22.4 in Running updating {'status': 'Running'} |
| line_rgx = '.*Host (([0-9,.,a-z,-]+).*)' |
| parsed_line = apply_regex(line_rgx, self.line_info['line']) |
| if parsed_line: |
| self.line_info['line'] = parsed_line[0] |
| self.line_info['host'] = parsed_line[1] |
| self.record_state_transition(self.line_info['line']) |
| return self.format_line() |
| |
| |
| class HQELine(TimeLine): |
| """Manages HQE line parsing. |
| """ |
| line_format = ('%(time)s %(hqe)s, currently in %(from_state)s, ' |
| 'updated to %(to_state)s. Flags: %(flags)s') |
| |
| |
| def record_state_transition(self, line): |
| """Apply the state_transition_rgx to a line and record state changes. |
| |
| @param line: The line we're expecting to contain a state transition. |
| """ |
| # Regex for getting hqe status. Eg: |
| # status:Running [active] -> Gathering |
| state_transition_rgx = ".*status:([a-zA-Z]+)( \[[a-z\,]+\])? -> ([a-zA-Z]+)" |
| match = apply_regex(state_transition_rgx, line) |
| if match: |
| self.line_info['from_state'] = match[0] |
| self.line_info['flags'] = match[1] |
| self.line_info['to_state'] = match[2] |
| |
| |
| def parse_line(self, line): |
| super(HQELine, self).parse_line(line) |
| line_rgx = r'.*\| HQE: (([0-9]+).*)' |
| parsed_line = apply_regex(line_rgx, self.line_info['line']) |
| if parsed_line: |
| self.line_info['line'] = parsed_line[0] |
| self.line_info['hqe'] = parsed_line[1] |
| self.record_state_transition(self.line_info['line']) |
| return self.format_line() |
| |
| |
| class LogCrawler(object): |
| """Crawl logs. |
| |
| Log crawlers are meant to apply some basic preprocessing to a log, and crawl |
| the output validating state changes. They manage line and state machine |
| creation. The initial filtering applied to the log needs to be grab all lines |
| that match an action, such as the running of a job. |
| """ |
| |
| def __init__(self, log_name): |
| self.log = log_name |
| self.filter_command = 'cat %s' % log_name |
| |
| |
| def preprocess_log(self): |
| """Apply some basic filtering to the log. |
| """ |
| proc = subprocess.Popen(self.filter_command, |
| shell=True, stdout=subprocess.PIPE) |
| out, err = proc.communicate() |
| return out |
| |
| |
| class SchedulerLogCrawler(LogCrawler): |
| """A log crawler for the scheduler logs. |
| |
| This crawler is only capable of processing information about a single job. |
| """ |
| |
| def __init__(self, log_name, **kwargs): |
| super(SchedulerLogCrawler, self).__init__(log_name) |
| self.job_id = kwargs['job_id'] |
| self.line_processors = [HostLine([SingleJobHostSMP()]), |
| HQELine([SingleJobHqeSMP()])] |
| self.filter_command = ('%s | grep "for job: %s"' % |
| (self.filter_command, self.job_id)) |
| |
| |
| def parse_log(self): |
| """Parse each line of the preprocessed log output. |
| |
| Pass each line through each possible line_processor. The one that matches |
| will populate itself, call flush, this will walk the state machine of that |
| line to the next step. |
| """ |
| out = self.preprocess_log() |
| response = [] |
| for job_line in out.split('\n'): |
| parsed_line = None |
| for processor in self.line_processors: |
| line = processor.parse_line(job_line) |
| if line and parsed_line: |
| raise LogLineException('Multiple Parsers claiming the line %s: ' |
| 'previous parsing: %s, current parsing: %s ' % |
| (job_line, parsed_line, line)) |
| elif line: |
| parsed_line = line |
| try: |
| processor.flush() |
| except StateMachineViolation as e: |
| response.append(str(e)) |
| raise StateMachineViolation(response) |
| response.append(parsed_line if parsed_line else job_line) |
| return response |
| |
| |
| def process_logs(): |
| if len(sys.argv) < 2: |
| print ('Usage: ./cron_scripts/log_distiller.py 0 8415620 ' |
| 'You need to change the location of the log it will parse.' |
| 'The job_id needs to be in the afe database.') |
| sys.exit(1) |
| |
| job_id = int(sys.argv[1]) |
| rpc = frontend.AFE() |
| suite_jobs = rpc.run('get_jobs', id=job_id) |
| if not suite_jobs[0]['parent_job']: |
| suite_jobs = rpc.run('get_jobs', parent_job=job_id) |
| try: |
| logfile = sys.argv[2] |
| except Exception: |
| logfile = LOGFILE |
| |
| for job in suite_jobs: |
| log_crawler = SchedulerLogCrawler(logfile, job_id=job['id']) |
| for line in log_crawler.parse_log(): |
| print line |
| return |
| |
| |
| if __name__ == '__main__': |
| process_logs() |