blob: 6122a45cdcb0706547bd0b3fe815b5dfd912b2f8 [file] [log] [blame]
Mike Frysingerd03e6b52019-08-03 12:49:01 -04001#!/usr/bin/python2
Prashanth B22243eb2014-04-23 08:53:15 -07002"""
3Usage: ./cron_scripts/log_distiller.py job_id path_to_logfile
4 If the job_id is a suite it will find all subjobs.
5You need to change the location of the log it will parse.
6The job_id needs to be in the afe database.
7"""
8import abc
9import datetime
10import os
11import re
12import pprint
13import subprocess
14import sys
15import time
16
17import common
18from autotest_lib.server import frontend
19
20
21LOGFIE = './logs/scheduler.log.2014-04-17-16.51.47'
22# logfile name format: scheduler.log.2014-02-14-18.10.56
23time_format = '%Y-%m-%d-%H.%M.%S'
24logfile_regex = r'scheduler.log.([0-9,.,-]+)'
25logdir = os.path.join('/usr/local/autotest', 'logs')
26
27class StateMachineViolation(Exception):
28 pass
29
30
31class LogLineException(Exception):
32 pass
33
34
35def should_process_log(time_str, time_format, cutoff_days=7):
36 """Returns true if the logs was created after cutoff days.
37
38 @param time_str: A string representing the time.
39 eg: 2014-02-14-18.10.56
40 @param time_format: A string representing the format of the time string.
41 ref: http://docs.python.org/2/library/datetime.html#strftime-strptime-behavior
42 @param cutoff_days: Int representind the cutoff in days.
43
44 @return: Returns True if time_str has aged more than cutoff_days.
45 """
46 log_time = datetime.datetime.strptime(time_str, time_format)
47 now = datetime.datetime.strptime(time.strftime(time_format), time_format)
48 cutoff = now - datetime.timedelta(days=cutoff_days)
49 return log_time < cutoff
50
51
52def apply_regex(regex, line):
53 """Simple regex applicator.
54
55 @param regex: Regex to apply.
56 @param line: The line to apply regex on.
57
58 @return: A tuple with the matching groups, if there was a match.
59 """
60 log_match = re.match(regex, line)
61 if log_match:
62 return log_match.groups()
63
64
65class StateMachineParser(object):
66 """Abstract class that enforces state transition ordering.
67
68 Classes inheriting from StateMachineParser need to define an
69 expected_transitions dictionary. The SMP will pop 'to' states
70 from the dictionary as they occur, so you cannot same state transitions
71 unless you specify 2 of them.
72 """
73 __metaclass__ = abc.ABCMeta
74
75
76 @abc.abstractmethod
77 def __init__(self):
78 self.visited_states = []
79 self.expected_transitions = {}
80
81
82 def advance_state(self, from_state, to_state):
83 """Checks that a transition is valid.
84
85 @param from_state: A string representind the state the host is leaving.
86 @param to_state: The state The host is going to, represented as a string.
87
88 @raises LogLineException: If an invalid state transition was
89 detected.
90 """
91 # TODO: Updating to the same state is a waste of bw.
92 if from_state and from_state == to_state:
93 return ('Updating to the same state is a waste of BW: %s->%s' %
94 (from_state, to_state))
95 return
96
97 if (from_state in self.expected_transitions and
98 to_state in self.expected_transitions[from_state]):
99 self.expected_transitions[from_state].remove(to_state)
100 self.visited_states.append(to_state)
101 else:
102 return (from_state, to_state)
103
104
105class SingleJobHostSMP(StateMachineParser):
106 def __init__(self):
107 self.visited_states = []
108 self.expected_transitions = {
109 'Ready': ['Resetting', 'Verifying', 'Pending', 'Provisioning'],
110 'Resetting': ['Ready', 'Provisioning'],
111 'Pending': ['Running'],
112 'Provisioning': ['Repairing'],
113 'Running': ['Ready']
114 }
115
116
117 def check_transitions(self, hostline):
118 if hostline.line_info['field'] == 'status':
119 self.advance_state(hostline.line_info['state'],
120 hostline.line_info['value'])
121
122
123class SingleJobHqeSMP(StateMachineParser):
124 def __init__(self):
125 self.visited_states = []
126 self.expected_transitions = {
127 'Queued': ['Starting', 'Resetting', 'Aborted'],
128 'Resetting': ['Pending', 'Provisioning'],
129 'Provisioning': ['Pending', 'Queued', 'Repairing'],
130 'Pending': ['Starting'],
131 'Starting': ['Running'],
132 'Running': ['Gathering', 'Parsing'],
133 'Gathering': ['Parsing'],
134 'Parsing': ['Completed', 'Aborted']
135 }
136
137
138 def check_transitions(self, hqeline):
139 invalid_states = self.advance_state(
140 hqeline.line_info['from_state'], hqeline.line_info['to_state'])
141 if not invalid_states:
142 return
143
144 # Deal with repair.
145 if (invalid_states[0] == 'Queued' and
146 'Running' in self.visited_states):
147 raise StateMachineViolation('Unrecognized state transition '
148 '%s->%s, expected transitions are %s' %
149 (invalid_states[0], invalid_states[1],
150 self.expected_transitions))
151
152
153class LogLine(object):
154 """Line objects.
155
156 All classes inheriting from LogLine represent a line of some sort.
157 A line is responsible for parsing itself, and invoking an SMP to
158 validate state transitions. A line can be part of several state machines.
159 """
160 line_format = '%s'
161
162
163 def __init__(self, state_machine_parsers):
164 """
165 @param state_machine_parsers: A list of smp objects to use to validate
166 state changes on these types of lines..
167 """
168 self.smps = state_machine_parsers
169
170 # Because, this is easier to flush.
171 self.line_info = {}
172
173
174 def parse_line(self, line):
175 """Apply a line regex and save any information the parsed line contains.
176
177 @param line: A string representing a line.
178 """
179 # Regex for all the things.
180 line_rgx = '(.*)'
181 parsed_line = apply_regex(line_rgx, line)
182 if parsed_line:
183 self.line_info['line'] = parsed_line[0]
184
185
186 def flush(self):
187 """Call any state machine parsers, persist line info if needed.
188 """
189 for smp in self.smps:
190 smp.check_transitions(self)
191 # TODO: persist this?
192 self.line_info={}
193
194
195 def format_line(self):
196 try:
197 return self.line_format % self.line_info
198 except KeyError:
199 return self.line_info['line']
200
201
202class TimeLine(LogLine):
203 """Filters timestamps for scheduler logs.
204 """
205
206 def parse_line(self, line):
207 super(TimeLine, self).parse_line(line)
208
209 # Regex for isolating the date and time from scheduler logs, eg:
210 # 02/16 16:04:36.573 INFO |scheduler_:0574|...
211 line_rgx = '([0-9,/,:,., ]+)(.*)'
212 parsed_line = apply_regex(line_rgx, self.line_info['line'])
213 if parsed_line:
214 self.line_info['time'] = parsed_line[0]
215 self.line_info['line'] = parsed_line[1]
216
217
218class HostLine(TimeLine):
219 """Manages hosts line parsing.
220 """
221 line_format = (' \t\t %(time)s %(host)s, currently in %(state)s, '
222 'updated %(field)s->%(value)s')
223
224
225 def record_state_transition(self, line):
226 """Apply the state_transition_rgx to a line and record state changes.
227
228 @param line: The line we're expecting to contain a state transition.
229 """
230 state_transition_rgx = ".* ([a-zA-Z]+) updating {'([a-zA-Z]+)': ('[a-zA-Z]+'|[0-9])}.*"
231 match = apply_regex(state_transition_rgx, line)
232 if match:
233 self.line_info['state'] = match[0]
234 self.line_info['field'] = match[1]
235 self.line_info['value'] = match[2].replace("'", "")
236
237
238 def parse_line(self, line):
239 super(HostLine, self).parse_line(line)
240
241 # Regex for getting host status. Eg:
242 # 172.22.4 in Running updating {'status': 'Running'}
243 line_rgx = '.*Host (([0-9,.,a-z,-]+).*)'
244 parsed_line = apply_regex(line_rgx, self.line_info['line'])
245 if parsed_line:
246 self.line_info['line'] = parsed_line[0]
247 self.line_info['host'] = parsed_line[1]
248 self.record_state_transition(self.line_info['line'])
249 return self.format_line()
250
251
252class HQELine(TimeLine):
253 """Manages HQE line parsing.
254 """
255 line_format = ('%(time)s %(hqe)s, currently in %(from_state)s, '
256 'updated to %(to_state)s. Flags: %(flags)s')
257
258
259 def record_state_transition(self, line):
260 """Apply the state_transition_rgx to a line and record state changes.
261
262 @param line: The line we're expecting to contain a state transition.
263 """
264 # Regex for getting hqe status. Eg:
265 # status:Running [active] -> Gathering
266 state_transition_rgx = ".*status:([a-zA-Z]+)( \[[a-z\,]+\])? -> ([a-zA-Z]+)"
267 match = apply_regex(state_transition_rgx, line)
268 if match:
269 self.line_info['from_state'] = match[0]
270 self.line_info['flags'] = match[1]
271 self.line_info['to_state'] = match[2]
272
273
274 def parse_line(self, line):
275 super(HQELine, self).parse_line(line)
276 line_rgx = r'.*\| HQE: (([0-9]+).*)'
277 parsed_line = apply_regex(line_rgx, self.line_info['line'])
278 if parsed_line:
279 self.line_info['line'] = parsed_line[0]
280 self.line_info['hqe'] = parsed_line[1]
281 self.record_state_transition(self.line_info['line'])
282 return self.format_line()
283
284
285class LogCrawler(object):
286 """Crawl logs.
287
288 Log crawlers are meant to apply some basic preprocessing to a log, and crawl
289 the output validating state changes. They manage line and state machine
290 creation. The initial filtering applied to the log needs to be grab all lines
291 that match an action, such as the running of a job.
292 """
293
294 def __init__(self, log_name):
295 self.log = log_name
296 self.filter_command = 'cat %s' % log_name
297
298
299 def preprocess_log(self):
300 """Apply some basic filtering to the log.
301 """
302 proc = subprocess.Popen(self.filter_command,
303 shell=True, stdout=subprocess.PIPE)
304 out, err = proc.communicate()
305 return out
306
307
308class SchedulerLogCrawler(LogCrawler):
309 """A log crawler for the scheduler logs.
310
311 This crawler is only capable of processing information about a single job.
312 """
313
314 def __init__(self, log_name, **kwargs):
315 super(SchedulerLogCrawler, self).__init__(log_name)
316 self.job_id = kwargs['job_id']
317 self.line_processors = [HostLine([SingleJobHostSMP()]),
318 HQELine([SingleJobHqeSMP()])]
319 self.filter_command = ('%s | grep "for job: %s"' %
320 (self.filter_command, self.job_id))
321
322
323 def parse_log(self):
324 """Parse each line of the preprocessed log output.
325
326 Pass each line through each possible line_processor. The one that matches
327 will populate itself, call flush, this will walk the state machine of that
328 line to the next step.
329 """
330 out = self.preprocess_log()
331 response = []
332 for job_line in out.split('\n'):
333 parsed_line = None
334 for processor in self.line_processors:
335 line = processor.parse_line(job_line)
336 if line and parsed_line:
337 raise LogLineException('Multiple Parsers claiming the line %s: '
338 'previous parsing: %s, current parsing: %s ' %
339 (job_line, parsed_line, line))
340 elif line:
341 parsed_line = line
342 try:
343 processor.flush()
344 except StateMachineViolation as e:
345 response.append(str(e))
346 raise StateMachineViolation(response)
347 response.append(parsed_line if parsed_line else job_line)
348 return response
349
350
351def process_logs():
352 if len(sys.argv) < 2:
353 print ('Usage: ./cron_scripts/log_distiller.py 0 8415620 '
354 'You need to change the location of the log it will parse.'
355 'The job_id needs to be in the afe database.')
356 sys.exit(1)
357
358 job_id = int(sys.argv[1])
359 rpc = frontend.AFE()
360 suite_jobs = rpc.run('get_jobs', id=job_id)
361 if not suite_jobs[0]['parent_job']:
362 suite_jobs = rpc.run('get_jobs', parent_job=job_id)
363 try:
364 logfile = sys.argv[2]
365 except Exception:
366 logfile = LOGFILE
367
368 for job in suite_jobs:
369 log_crawler = SchedulerLogCrawler(logfile, job_id=job['id'])
370 for line in log_crawler.parse_log():
371 print line
372 return
373
374
375if __name__ == '__main__':
376 process_logs()