blob: a81753486315cd46bcfa7d18d655f5af1cfaa09b [file] [log] [blame]
beeps5e2bb4a2013-10-28 11:26:45 -07001#pylint: disable-msg=C0111
2
3"""
4Postjob task.
5
6Postjob tasks are responsible for setting the final status of the HQE
7and Host, and scheduling additional special agents such as cleanup,
8if necessary.
9"""
10
11import os
12
13from autotest_lib.frontend.afe import models, model_attributes
14from autotest_lib.scheduler import agent_task, drones, drone_manager
15from autotest_lib.scheduler import email_manager, pidfile_monitor
16from autotest_lib.scheduler import scheduler_config
17from autotest_lib.server import autoserv_utils
beeps5e2bb4a2013-10-28 11:26:45 -070018
xixuan7224dcb2016-11-22 17:11:41 -080019from chromite.lib import metrics
beeps5e2bb4a2013-10-28 11:26:45 -070020
Fang Dengc330bee2014-10-21 18:10:55 -070021_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
beeps5e2bb4a2013-10-28 11:26:45 -070022
23
24class PostJobTask(agent_task.AgentTask):
25 def __init__(self, queue_entries, log_file_name):
26 super(PostJobTask, self).__init__(log_file_name=log_file_name)
27
28 self.queue_entries = queue_entries
29
30 self._autoserv_monitor = pidfile_monitor.PidfileRunMonitor()
31 self._autoserv_monitor.attach_to_existing_process(
32 self._working_directory())
33
34
35 def _command_line(self):
36 # Do we need testing_mode?
37 return self._generate_command(
Jakob Jülich36accc62014-07-23 10:26:55 -070038 self._drone_manager.absolute_path(self._working_directory()))
beeps5e2bb4a2013-10-28 11:26:45 -070039
40
41 def _generate_command(self, results_dir):
42 raise NotImplementedError('Subclasses must override this')
43
44
45 @property
46 def owner_username(self):
47 return self.queue_entries[0].job.owner
48
49
50 def _working_directory(self):
51 return self._get_consistent_execution_path(self.queue_entries)
52
53
54 def _paired_with_monitor(self):
55 return self._autoserv_monitor
56
57
58 def _job_was_aborted(self):
59 was_aborted = None
60 for queue_entry in self.queue_entries:
61 queue_entry.update_from_database()
62 if was_aborted is None: # first queue entry
63 was_aborted = bool(queue_entry.aborted)
64 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
65 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
66 for entry in self.queue_entries]
67 email_manager.manager.enqueue_notify_email(
68 'Inconsistent abort state',
69 'Queue entries have inconsistent abort state:\n' +
70 '\n'.join(entries))
71 # don't crash here, just assume true
72 return True
73 return was_aborted
74
75
76 def _final_status(self):
77 if self._job_was_aborted():
78 return models.HostQueueEntry.Status.ABORTED
79
80 # we'll use a PidfileRunMonitor to read the autoserv exit status
81 if self._autoserv_monitor.exit_code() == 0:
82 return models.HostQueueEntry.Status.COMPLETED
83 return models.HostQueueEntry.Status.FAILED
84
85
86 def _set_all_statuses(self, status):
87 for queue_entry in self.queue_entries:
88 queue_entry.set_status(status)
89
90
91 def abort(self):
92 # override AgentTask.abort() to avoid killing the process and ending
93 # the task. post-job tasks continue when the job is aborted.
94 pass
95
96
97 def _pidfile_label(self):
98 # '.autoserv_execute' -> 'autoserv'
99 return self._pidfile_name()[1:-len('_execute')]
100
101
102class SelfThrottledPostJobTask(PostJobTask):
103 """
104 PostJobTask that maintains its own process limit.
105
106 We throttle tasks like parsing because we don't want them to
107 hold up tests. At the same time we don't wish to build up load
108 that will take forever to parse.
109 """
110 _num_running_processes = 0
111 # Last known limit of max processes, used to check whether
112 # max processes config has been changed.
113 _last_known_max_processes = 0
114 # Whether an email should be sent to notifiy process limit being hit.
115 _notification_on = True
116 # Once process limit is hit, an email will be sent.
117 # To prevent spams, do not send another email until
118 # it drops to lower than the following level.
119 REVIVE_NOTIFICATION_THRESHOLD = 0.80
120
xixuan7224dcb2016-11-22 17:11:41 -0800121 @classmethod
122 def _gauge_metrics(cls):
123 """Report to monarch the number of running processes."""
124 m = metrics.Gauge('chromeos/autotest/scheduler/postjob_tasks')
125 m.set(cls._num_running_processes, fields={'task_name': cls.__name__})
126
beeps5e2bb4a2013-10-28 11:26:45 -0700127
128 @classmethod
129 def _increment_running_processes(cls):
130 cls._num_running_processes += 1
xixuan7224dcb2016-11-22 17:11:41 -0800131 cls._gauge_metrics()
beeps5e2bb4a2013-10-28 11:26:45 -0700132
133
134 @classmethod
135 def _decrement_running_processes(cls):
136 cls._num_running_processes -= 1
xixuan7224dcb2016-11-22 17:11:41 -0800137 cls._gauge_metrics()
beeps5e2bb4a2013-10-28 11:26:45 -0700138
139
140 @classmethod
141 def _max_processes(cls):
142 raise NotImplementedError
143
144
145 @classmethod
146 def _can_run_new_process(cls):
147 return cls._num_running_processes < cls._max_processes()
148
149
150 def _process_started(self):
151 return bool(self.monitor)
152
153
154 def tick(self):
155 # override tick to keep trying to start until the process count goes
156 # down and we can, at which point we revert to default behavior
157 if self._process_started():
158 super(SelfThrottledPostJobTask, self).tick()
159 else:
160 self._try_starting_process()
161
162
163 def run(self):
164 # override run() to not actually run unless we can
165 self._try_starting_process()
166
167
168 @classmethod
169 def _notify_process_limit_hit(cls):
170 """Send an email to notify that process limit is hit."""
171 if cls._notification_on:
172 subject = '%s: hitting max process limit.' % cls.__name__
173 message = ('Running processes/Max processes: %d/%d'
174 % (cls._num_running_processes, cls._max_processes()))
175 email_manager.manager.enqueue_notify_email(subject, message)
176 cls._notification_on = False
177
178
179 @classmethod
180 def _reset_notification_switch_if_necessary(cls):
181 """Reset _notification_on if necessary.
182
183 Set _notification_on to True on the following cases:
184 1) If the limit of max processes configuration changes;
185 2) If _notification_on is False and the number of running processes
186 drops to lower than a level defined in REVIVE_NOTIFICATION_THRESHOLD.
187
188 """
189 if cls._last_known_max_processes != cls._max_processes():
190 cls._notification_on = True
191 cls._last_known_max_processes = cls._max_processes()
192 return
193 percentage = float(cls._num_running_processes) / cls._max_processes()
194 if (not cls._notification_on and
195 percentage < cls.REVIVE_NOTIFICATION_THRESHOLD):
196 cls._notification_on = True
197
198
199 def _try_starting_process(self):
200 self._reset_notification_switch_if_necessary()
201 if not self._can_run_new_process():
202 self._notify_process_limit_hit()
203 return
204
205 # actually run the command
206 super(SelfThrottledPostJobTask, self).run()
207 if self._process_started():
208 self._increment_running_processes()
209
210
211 def finished(self, success):
212 super(SelfThrottledPostJobTask, self).finished(success)
213 if self._process_started():
214 self._decrement_running_processes()
215
216
217class GatherLogsTask(PostJobTask):
218 """
219 Task responsible for
220 * gathering uncollected logs (if Autoserv crashed hard or was killed)
221 * copying logs to the results repository
222 * spawning CleanupTasks for hosts, if necessary
223 * spawning a FinalReparseTask for the job
224 * setting the final status of the host, directly or through a cleanup
225 """
226 def __init__(self, queue_entries, recover_run_monitor=None):
227 self._job = queue_entries[0].job
228 super(GatherLogsTask, self).__init__(
229 queue_entries, log_file_name='.collect_crashinfo.log')
230 self._set_ids(queue_entries=queue_entries)
231
232
233 # TODO: Refactor into autoserv_utils. crbug.com/243090
234 def _generate_command(self, results_dir):
235 host_list = ','.join(queue_entry.host.hostname
236 for queue_entry in self.queue_entries)
237 return [autoserv_utils.autoserv_path , '-p',
238 '--pidfile-label=%s' % self._pidfile_label(),
239 '--use-existing-results', '--collect-crashinfo',
240 '-m', host_list, '-r', results_dir]
241
242
243 @property
244 def num_processes(self):
245 return len(self.queue_entries)
246
247
248 def _pidfile_name(self):
249 return drone_manager.CRASHINFO_PID_FILE
250
251
252 def prolog(self):
253 self._check_queue_entry_statuses(
254 self.queue_entries,
255 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
256 allowed_host_statuses=(models.Host.Status.RUNNING,))
257
258 super(GatherLogsTask, self).prolog()
259
260
261 def epilog(self):
262 super(GatherLogsTask, self).epilog()
263 self._parse_results(self.queue_entries)
264 self._reboot_hosts()
265
266
267 def _reboot_hosts(self):
268 if self._autoserv_monitor.has_process():
269 final_success = (self._final_status() ==
270 models.HostQueueEntry.Status.COMPLETED)
271 num_tests_failed = self._autoserv_monitor.num_tests_failed()
272 else:
273 final_success = False
274 num_tests_failed = 0
275 reboot_after = self._job.reboot_after
276 do_reboot = (
277 # always reboot after aborted jobs
278 self._final_status() == models.HostQueueEntry.Status.ABORTED
279 or reboot_after == model_attributes.RebootAfter.ALWAYS
280 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
281 and final_success and num_tests_failed == 0)
282 or num_tests_failed > 0)
283
284 for queue_entry in self.queue_entries:
285 if do_reboot:
286 # don't pass the queue entry to the CleanupTask. if the cleanup
287 # fails, the job doesn't care -- it's over.
288 models.SpecialTask.objects.create(
289 host=models.Host.objects.get(id=queue_entry.host.id),
290 task=models.SpecialTask.Task.CLEANUP,
291 requested_by=self._job.owner_model())
292 else:
293 queue_entry.host.set_status(models.Host.Status.READY)
294
295
296 def run(self):
297 autoserv_exit_code = self._autoserv_monitor.exit_code()
298 # only run if Autoserv exited due to some signal. if we have no exit
299 # code, assume something bad (and signal-like) happened.
300 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
301 super(GatherLogsTask, self).run()
302 else:
303 self.finished(True)
304
305
306class FinalReparseTask(SelfThrottledPostJobTask):
307 def __init__(self, queue_entries):
308 super(FinalReparseTask, self).__init__(queue_entries,
309 log_file_name='.parse.log')
310 # don't use _set_ids, since we don't want to set the host_ids
311 self.queue_entry_ids = [entry.id for entry in queue_entries]
312
313
314 def _generate_command(self, results_dir):
Fang Deng49822682014-10-21 16:29:22 -0700315 return [_parser_path, '--write-pidfile', '--record-duration',
316 '-l', '2', '-r', '-o', results_dir]
beeps5e2bb4a2013-10-28 11:26:45 -0700317
318
319 @property
320 def num_processes(self):
321 return 0 # don't include parser processes in accounting
322
323
324 def _pidfile_name(self):
325 return drone_manager.PARSER_PID_FILE
326
327
328 @classmethod
329 def _max_processes(cls):
330 return scheduler_config.config.max_parse_processes
331
332
333 def prolog(self):
334 self._check_queue_entry_statuses(
335 self.queue_entries,
336 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
337
338 super(FinalReparseTask, self).prolog()
339
340
341 def epilog(self):
342 super(FinalReparseTask, self).epilog()
343 self._archive_results(self.queue_entries)
344
345
346class ArchiveResultsTask(SelfThrottledPostJobTask):
347 _ARCHIVING_FAILED_FILE = '.archiver_failed'
348
349 def __init__(self, queue_entries):
350 super(ArchiveResultsTask, self).__init__(queue_entries,
351 log_file_name='.archiving.log')
352 # don't use _set_ids, since we don't want to set the host_ids
353 self.queue_entry_ids = [entry.id for entry in queue_entries]
354
355
356 def _pidfile_name(self):
357 return drone_manager.ARCHIVER_PID_FILE
358
359
360 # TODO: Refactor into autoserv_utils. crbug.com/243090
361 def _generate_command(self, results_dir):
362 return [autoserv_utils.autoserv_path , '-p',
363 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
364 '--use-existing-results', '--control-filename=control.archive',
365 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
366 'archive_results.control.srv')]
367
368
369 @classmethod
370 def _max_processes(cls):
371 return scheduler_config.config.max_transfer_processes
372
373
374 def prolog(self):
375 self._check_queue_entry_statuses(
376 self.queue_entries,
377 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
378
379 super(ArchiveResultsTask, self).prolog()
380
381
382 def epilog(self):
383 super(ArchiveResultsTask, self).epilog()
384 if not self.success and self._paired_with_monitor().has_process():
385 failed_file = os.path.join(self._working_directory(),
386 self._ARCHIVING_FAILED_FILE)
387 paired_process = self._paired_with_monitor().get_process()
Jakob Jülich36accc62014-07-23 10:26:55 -0700388 self._drone_manager.write_lines_to_file(
beeps5e2bb4a2013-10-28 11:26:45 -0700389 failed_file, ['Archiving failed with exit code %s'
390 % self.monitor.exit_code()],
391 paired_with_process=paired_process)
392 self._set_all_statuses(self._final_status())