Prashanth B | 22243eb | 2014-04-23 08:53:15 -0700 | [diff] [blame] | 1 | #!/usr/bin/python |
| 2 | """ |
| 3 | Usage: ./cron_scripts/log_distiller.py job_id path_to_logfile |
| 4 | If the job_id is a suite it will find all subjobs. |
| 5 | You need to change the location of the log it will parse. |
| 6 | The job_id needs to be in the afe database. |
| 7 | """ |
| 8 | import abc |
| 9 | import datetime |
| 10 | import os |
| 11 | import re |
| 12 | import pprint |
| 13 | import subprocess |
| 14 | import sys |
| 15 | import time |
| 16 | |
| 17 | import common |
| 18 | from autotest_lib.server import frontend |
| 19 | |
| 20 | |
| 21 | LOGFIE = './logs/scheduler.log.2014-04-17-16.51.47' |
| 22 | # logfile name format: scheduler.log.2014-02-14-18.10.56 |
| 23 | time_format = '%Y-%m-%d-%H.%M.%S' |
| 24 | logfile_regex = r'scheduler.log.([0-9,.,-]+)' |
| 25 | logdir = os.path.join('/usr/local/autotest', 'logs') |
| 26 | |
| 27 | class StateMachineViolation(Exception): |
| 28 | pass |
| 29 | |
| 30 | |
| 31 | class LogLineException(Exception): |
| 32 | pass |
| 33 | |
| 34 | |
| 35 | def should_process_log(time_str, time_format, cutoff_days=7): |
| 36 | """Returns true if the logs was created after cutoff days. |
| 37 | |
| 38 | @param time_str: A string representing the time. |
| 39 | eg: 2014-02-14-18.10.56 |
| 40 | @param time_format: A string representing the format of the time string. |
| 41 | ref: http://docs.python.org/2/library/datetime.html#strftime-strptime-behavior |
| 42 | @param cutoff_days: Int representind the cutoff in days. |
| 43 | |
| 44 | @return: Returns True if time_str has aged more than cutoff_days. |
| 45 | """ |
| 46 | log_time = datetime.datetime.strptime(time_str, time_format) |
| 47 | now = datetime.datetime.strptime(time.strftime(time_format), time_format) |
| 48 | cutoff = now - datetime.timedelta(days=cutoff_days) |
| 49 | return log_time < cutoff |
| 50 | |
| 51 | |
| 52 | def apply_regex(regex, line): |
| 53 | """Simple regex applicator. |
| 54 | |
| 55 | @param regex: Regex to apply. |
| 56 | @param line: The line to apply regex on. |
| 57 | |
| 58 | @return: A tuple with the matching groups, if there was a match. |
| 59 | """ |
| 60 | log_match = re.match(regex, line) |
| 61 | if log_match: |
| 62 | return log_match.groups() |
| 63 | |
| 64 | |
| 65 | class StateMachineParser(object): |
| 66 | """Abstract class that enforces state transition ordering. |
| 67 | |
| 68 | Classes inheriting from StateMachineParser need to define an |
| 69 | expected_transitions dictionary. The SMP will pop 'to' states |
| 70 | from the dictionary as they occur, so you cannot same state transitions |
| 71 | unless you specify 2 of them. |
| 72 | """ |
| 73 | __metaclass__ = abc.ABCMeta |
| 74 | |
| 75 | |
| 76 | @abc.abstractmethod |
| 77 | def __init__(self): |
| 78 | self.visited_states = [] |
| 79 | self.expected_transitions = {} |
| 80 | |
| 81 | |
| 82 | def advance_state(self, from_state, to_state): |
| 83 | """Checks that a transition is valid. |
| 84 | |
| 85 | @param from_state: A string representind the state the host is leaving. |
| 86 | @param to_state: The state The host is going to, represented as a string. |
| 87 | |
| 88 | @raises LogLineException: If an invalid state transition was |
| 89 | detected. |
| 90 | """ |
| 91 | # TODO: Updating to the same state is a waste of bw. |
| 92 | if from_state and from_state == to_state: |
| 93 | return ('Updating to the same state is a waste of BW: %s->%s' % |
| 94 | (from_state, to_state)) |
| 95 | return |
| 96 | |
| 97 | if (from_state in self.expected_transitions and |
| 98 | to_state in self.expected_transitions[from_state]): |
| 99 | self.expected_transitions[from_state].remove(to_state) |
| 100 | self.visited_states.append(to_state) |
| 101 | else: |
| 102 | return (from_state, to_state) |
| 103 | |
| 104 | |
| 105 | class SingleJobHostSMP(StateMachineParser): |
| 106 | def __init__(self): |
| 107 | self.visited_states = [] |
| 108 | self.expected_transitions = { |
| 109 | 'Ready': ['Resetting', 'Verifying', 'Pending', 'Provisioning'], |
| 110 | 'Resetting': ['Ready', 'Provisioning'], |
| 111 | 'Pending': ['Running'], |
| 112 | 'Provisioning': ['Repairing'], |
| 113 | 'Running': ['Ready'] |
| 114 | } |
| 115 | |
| 116 | |
| 117 | def check_transitions(self, hostline): |
| 118 | if hostline.line_info['field'] == 'status': |
| 119 | self.advance_state(hostline.line_info['state'], |
| 120 | hostline.line_info['value']) |
| 121 | |
| 122 | |
| 123 | class SingleJobHqeSMP(StateMachineParser): |
| 124 | def __init__(self): |
| 125 | self.visited_states = [] |
| 126 | self.expected_transitions = { |
| 127 | 'Queued': ['Starting', 'Resetting', 'Aborted'], |
| 128 | 'Resetting': ['Pending', 'Provisioning'], |
| 129 | 'Provisioning': ['Pending', 'Queued', 'Repairing'], |
| 130 | 'Pending': ['Starting'], |
| 131 | 'Starting': ['Running'], |
| 132 | 'Running': ['Gathering', 'Parsing'], |
| 133 | 'Gathering': ['Parsing'], |
| 134 | 'Parsing': ['Completed', 'Aborted'] |
| 135 | } |
| 136 | |
| 137 | |
| 138 | def check_transitions(self, hqeline): |
| 139 | invalid_states = self.advance_state( |
| 140 | hqeline.line_info['from_state'], hqeline.line_info['to_state']) |
| 141 | if not invalid_states: |
| 142 | return |
| 143 | |
| 144 | # Deal with repair. |
| 145 | if (invalid_states[0] == 'Queued' and |
| 146 | 'Running' in self.visited_states): |
| 147 | raise StateMachineViolation('Unrecognized state transition ' |
| 148 | '%s->%s, expected transitions are %s' % |
| 149 | (invalid_states[0], invalid_states[1], |
| 150 | self.expected_transitions)) |
| 151 | |
| 152 | |
| 153 | class LogLine(object): |
| 154 | """Line objects. |
| 155 | |
| 156 | All classes inheriting from LogLine represent a line of some sort. |
| 157 | A line is responsible for parsing itself, and invoking an SMP to |
| 158 | validate state transitions. A line can be part of several state machines. |
| 159 | """ |
| 160 | line_format = '%s' |
| 161 | |
| 162 | |
| 163 | def __init__(self, state_machine_parsers): |
| 164 | """ |
| 165 | @param state_machine_parsers: A list of smp objects to use to validate |
| 166 | state changes on these types of lines.. |
| 167 | """ |
| 168 | self.smps = state_machine_parsers |
| 169 | |
| 170 | # Because, this is easier to flush. |
| 171 | self.line_info = {} |
| 172 | |
| 173 | |
| 174 | def parse_line(self, line): |
| 175 | """Apply a line regex and save any information the parsed line contains. |
| 176 | |
| 177 | @param line: A string representing a line. |
| 178 | """ |
| 179 | # Regex for all the things. |
| 180 | line_rgx = '(.*)' |
| 181 | parsed_line = apply_regex(line_rgx, line) |
| 182 | if parsed_line: |
| 183 | self.line_info['line'] = parsed_line[0] |
| 184 | |
| 185 | |
| 186 | def flush(self): |
| 187 | """Call any state machine parsers, persist line info if needed. |
| 188 | """ |
| 189 | for smp in self.smps: |
| 190 | smp.check_transitions(self) |
| 191 | # TODO: persist this? |
| 192 | self.line_info={} |
| 193 | |
| 194 | |
| 195 | def format_line(self): |
| 196 | try: |
| 197 | return self.line_format % self.line_info |
| 198 | except KeyError: |
| 199 | return self.line_info['line'] |
| 200 | |
| 201 | |
| 202 | class TimeLine(LogLine): |
| 203 | """Filters timestamps for scheduler logs. |
| 204 | """ |
| 205 | |
| 206 | def parse_line(self, line): |
| 207 | super(TimeLine, self).parse_line(line) |
| 208 | |
| 209 | # Regex for isolating the date and time from scheduler logs, eg: |
| 210 | # 02/16 16:04:36.573 INFO |scheduler_:0574|... |
| 211 | line_rgx = '([0-9,/,:,., ]+)(.*)' |
| 212 | parsed_line = apply_regex(line_rgx, self.line_info['line']) |
| 213 | if parsed_line: |
| 214 | self.line_info['time'] = parsed_line[0] |
| 215 | self.line_info['line'] = parsed_line[1] |
| 216 | |
| 217 | |
| 218 | class HostLine(TimeLine): |
| 219 | """Manages hosts line parsing. |
| 220 | """ |
| 221 | line_format = (' \t\t %(time)s %(host)s, currently in %(state)s, ' |
| 222 | 'updated %(field)s->%(value)s') |
| 223 | |
| 224 | |
| 225 | def record_state_transition(self, line): |
| 226 | """Apply the state_transition_rgx to a line and record state changes. |
| 227 | |
| 228 | @param line: The line we're expecting to contain a state transition. |
| 229 | """ |
| 230 | state_transition_rgx = ".* ([a-zA-Z]+) updating {'([a-zA-Z]+)': ('[a-zA-Z]+'|[0-9])}.*" |
| 231 | match = apply_regex(state_transition_rgx, line) |
| 232 | if match: |
| 233 | self.line_info['state'] = match[0] |
| 234 | self.line_info['field'] = match[1] |
| 235 | self.line_info['value'] = match[2].replace("'", "") |
| 236 | |
| 237 | |
| 238 | def parse_line(self, line): |
| 239 | super(HostLine, self).parse_line(line) |
| 240 | |
| 241 | # Regex for getting host status. Eg: |
| 242 | # 172.22.4 in Running updating {'status': 'Running'} |
| 243 | line_rgx = '.*Host (([0-9,.,a-z,-]+).*)' |
| 244 | parsed_line = apply_regex(line_rgx, self.line_info['line']) |
| 245 | if parsed_line: |
| 246 | self.line_info['line'] = parsed_line[0] |
| 247 | self.line_info['host'] = parsed_line[1] |
| 248 | self.record_state_transition(self.line_info['line']) |
| 249 | return self.format_line() |
| 250 | |
| 251 | |
| 252 | class HQELine(TimeLine): |
| 253 | """Manages HQE line parsing. |
| 254 | """ |
| 255 | line_format = ('%(time)s %(hqe)s, currently in %(from_state)s, ' |
| 256 | 'updated to %(to_state)s. Flags: %(flags)s') |
| 257 | |
| 258 | |
| 259 | def record_state_transition(self, line): |
| 260 | """Apply the state_transition_rgx to a line and record state changes. |
| 261 | |
| 262 | @param line: The line we're expecting to contain a state transition. |
| 263 | """ |
| 264 | # Regex for getting hqe status. Eg: |
| 265 | # status:Running [active] -> Gathering |
| 266 | state_transition_rgx = ".*status:([a-zA-Z]+)( \[[a-z\,]+\])? -> ([a-zA-Z]+)" |
| 267 | match = apply_regex(state_transition_rgx, line) |
| 268 | if match: |
| 269 | self.line_info['from_state'] = match[0] |
| 270 | self.line_info['flags'] = match[1] |
| 271 | self.line_info['to_state'] = match[2] |
| 272 | |
| 273 | |
| 274 | def parse_line(self, line): |
| 275 | super(HQELine, self).parse_line(line) |
| 276 | line_rgx = r'.*\| HQE: (([0-9]+).*)' |
| 277 | parsed_line = apply_regex(line_rgx, self.line_info['line']) |
| 278 | if parsed_line: |
| 279 | self.line_info['line'] = parsed_line[0] |
| 280 | self.line_info['hqe'] = parsed_line[1] |
| 281 | self.record_state_transition(self.line_info['line']) |
| 282 | return self.format_line() |
| 283 | |
| 284 | |
| 285 | class LogCrawler(object): |
| 286 | """Crawl logs. |
| 287 | |
| 288 | Log crawlers are meant to apply some basic preprocessing to a log, and crawl |
| 289 | the output validating state changes. They manage line and state machine |
| 290 | creation. The initial filtering applied to the log needs to be grab all lines |
| 291 | that match an action, such as the running of a job. |
| 292 | """ |
| 293 | |
| 294 | def __init__(self, log_name): |
| 295 | self.log = log_name |
| 296 | self.filter_command = 'cat %s' % log_name |
| 297 | |
| 298 | |
| 299 | def preprocess_log(self): |
| 300 | """Apply some basic filtering to the log. |
| 301 | """ |
| 302 | proc = subprocess.Popen(self.filter_command, |
| 303 | shell=True, stdout=subprocess.PIPE) |
| 304 | out, err = proc.communicate() |
| 305 | return out |
| 306 | |
| 307 | |
| 308 | class SchedulerLogCrawler(LogCrawler): |
| 309 | """A log crawler for the scheduler logs. |
| 310 | |
| 311 | This crawler is only capable of processing information about a single job. |
| 312 | """ |
| 313 | |
| 314 | def __init__(self, log_name, **kwargs): |
| 315 | super(SchedulerLogCrawler, self).__init__(log_name) |
| 316 | self.job_id = kwargs['job_id'] |
| 317 | self.line_processors = [HostLine([SingleJobHostSMP()]), |
| 318 | HQELine([SingleJobHqeSMP()])] |
| 319 | self.filter_command = ('%s | grep "for job: %s"' % |
| 320 | (self.filter_command, self.job_id)) |
| 321 | |
| 322 | |
| 323 | def parse_log(self): |
| 324 | """Parse each line of the preprocessed log output. |
| 325 | |
| 326 | Pass each line through each possible line_processor. The one that matches |
| 327 | will populate itself, call flush, this will walk the state machine of that |
| 328 | line to the next step. |
| 329 | """ |
| 330 | out = self.preprocess_log() |
| 331 | response = [] |
| 332 | for job_line in out.split('\n'): |
| 333 | parsed_line = None |
| 334 | for processor in self.line_processors: |
| 335 | line = processor.parse_line(job_line) |
| 336 | if line and parsed_line: |
| 337 | raise LogLineException('Multiple Parsers claiming the line %s: ' |
| 338 | 'previous parsing: %s, current parsing: %s ' % |
| 339 | (job_line, parsed_line, line)) |
| 340 | elif line: |
| 341 | parsed_line = line |
| 342 | try: |
| 343 | processor.flush() |
| 344 | except StateMachineViolation as e: |
| 345 | response.append(str(e)) |
| 346 | raise StateMachineViolation(response) |
| 347 | response.append(parsed_line if parsed_line else job_line) |
| 348 | return response |
| 349 | |
| 350 | |
| 351 | def process_logs(): |
| 352 | if len(sys.argv) < 2: |
| 353 | print ('Usage: ./cron_scripts/log_distiller.py 0 8415620 ' |
| 354 | 'You need to change the location of the log it will parse.' |
| 355 | 'The job_id needs to be in the afe database.') |
| 356 | sys.exit(1) |
| 357 | |
| 358 | job_id = int(sys.argv[1]) |
| 359 | rpc = frontend.AFE() |
| 360 | suite_jobs = rpc.run('get_jobs', id=job_id) |
| 361 | if not suite_jobs[0]['parent_job']: |
| 362 | suite_jobs = rpc.run('get_jobs', parent_job=job_id) |
| 363 | try: |
| 364 | logfile = sys.argv[2] |
| 365 | except Exception: |
| 366 | logfile = LOGFILE |
| 367 | |
| 368 | for job in suite_jobs: |
| 369 | log_crawler = SchedulerLogCrawler(logfile, job_id=job['id']) |
| 370 | for line in log_crawler.parse_log(): |
| 371 | print line |
| 372 | return |
| 373 | |
| 374 | |
| 375 | if __name__ == '__main__': |
| 376 | process_logs() |