blob: 67733f5b487035956bfd07603fde89b8aac34f47 [file] [log] [blame]
beeps5e2bb4a2013-10-28 11:26:45 -07001#pylint: disable-msg=C0111
2
3"""
4Postjob task.
5
6Postjob tasks are responsible for setting the final status of the HQE
7and Host, and scheduling additional special agents such as cleanup,
8if necessary.
9"""
10
11import os
12
Dan Shi5e2efb72017-02-07 11:40:23 -080013from autotest_lib.client.common_lib import utils
beeps5e2bb4a2013-10-28 11:26:45 -070014from autotest_lib.frontend.afe import models, model_attributes
15from autotest_lib.scheduler import agent_task, drones, drone_manager
16from autotest_lib.scheduler import email_manager, pidfile_monitor
17from autotest_lib.scheduler import scheduler_config
18from autotest_lib.server import autoserv_utils
beeps5e2bb4a2013-10-28 11:26:45 -070019
Dan Shi5e2efb72017-02-07 11:40:23 -080020try:
21 from chromite.lib import metrics
22except ImportError:
23 metrics = utils.metrics_mock
24
beeps5e2bb4a2013-10-28 11:26:45 -070025
Fang Dengc330bee2014-10-21 18:10:55 -070026_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
beeps5e2bb4a2013-10-28 11:26:45 -070027
28
29class PostJobTask(agent_task.AgentTask):
30 def __init__(self, queue_entries, log_file_name):
31 super(PostJobTask, self).__init__(log_file_name=log_file_name)
32
33 self.queue_entries = queue_entries
34
35 self._autoserv_monitor = pidfile_monitor.PidfileRunMonitor()
36 self._autoserv_monitor.attach_to_existing_process(
37 self._working_directory())
38
39
40 def _command_line(self):
41 # Do we need testing_mode?
42 return self._generate_command(
Jakob Jülich36accc62014-07-23 10:26:55 -070043 self._drone_manager.absolute_path(self._working_directory()))
beeps5e2bb4a2013-10-28 11:26:45 -070044
45
46 def _generate_command(self, results_dir):
47 raise NotImplementedError('Subclasses must override this')
48
49
50 @property
51 def owner_username(self):
52 return self.queue_entries[0].job.owner
53
54
55 def _working_directory(self):
56 return self._get_consistent_execution_path(self.queue_entries)
57
58
59 def _paired_with_monitor(self):
60 return self._autoserv_monitor
61
62
63 def _job_was_aborted(self):
64 was_aborted = None
65 for queue_entry in self.queue_entries:
66 queue_entry.update_from_database()
67 if was_aborted is None: # first queue entry
68 was_aborted = bool(queue_entry.aborted)
69 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
70 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
71 for entry in self.queue_entries]
72 email_manager.manager.enqueue_notify_email(
73 'Inconsistent abort state',
74 'Queue entries have inconsistent abort state:\n' +
75 '\n'.join(entries))
76 # don't crash here, just assume true
77 return True
78 return was_aborted
79
80
81 def _final_status(self):
82 if self._job_was_aborted():
83 return models.HostQueueEntry.Status.ABORTED
84
85 # we'll use a PidfileRunMonitor to read the autoserv exit status
86 if self._autoserv_monitor.exit_code() == 0:
87 return models.HostQueueEntry.Status.COMPLETED
88 return models.HostQueueEntry.Status.FAILED
89
90
91 def _set_all_statuses(self, status):
92 for queue_entry in self.queue_entries:
93 queue_entry.set_status(status)
94
95
96 def abort(self):
97 # override AgentTask.abort() to avoid killing the process and ending
98 # the task. post-job tasks continue when the job is aborted.
99 pass
100
101
102 def _pidfile_label(self):
103 # '.autoserv_execute' -> 'autoserv'
104 return self._pidfile_name()[1:-len('_execute')]
105
106
107class SelfThrottledPostJobTask(PostJobTask):
108 """
109 PostJobTask that maintains its own process limit.
110
111 We throttle tasks like parsing because we don't want them to
112 hold up tests. At the same time we don't wish to build up load
113 that will take forever to parse.
114 """
115 _num_running_processes = 0
116 # Last known limit of max processes, used to check whether
117 # max processes config has been changed.
118 _last_known_max_processes = 0
119 # Whether an email should be sent to notifiy process limit being hit.
120 _notification_on = True
121 # Once process limit is hit, an email will be sent.
122 # To prevent spams, do not send another email until
123 # it drops to lower than the following level.
124 REVIVE_NOTIFICATION_THRESHOLD = 0.80
125
xixuan7224dcb2016-11-22 17:11:41 -0800126 @classmethod
127 def _gauge_metrics(cls):
128 """Report to monarch the number of running processes."""
129 m = metrics.Gauge('chromeos/autotest/scheduler/postjob_tasks')
130 m.set(cls._num_running_processes, fields={'task_name': cls.__name__})
131
beeps5e2bb4a2013-10-28 11:26:45 -0700132
133 @classmethod
134 def _increment_running_processes(cls):
135 cls._num_running_processes += 1
xixuan7224dcb2016-11-22 17:11:41 -0800136 cls._gauge_metrics()
beeps5e2bb4a2013-10-28 11:26:45 -0700137
138
139 @classmethod
140 def _decrement_running_processes(cls):
141 cls._num_running_processes -= 1
xixuan7224dcb2016-11-22 17:11:41 -0800142 cls._gauge_metrics()
beeps5e2bb4a2013-10-28 11:26:45 -0700143
144
145 @classmethod
146 def _max_processes(cls):
147 raise NotImplementedError
148
149
150 @classmethod
151 def _can_run_new_process(cls):
152 return cls._num_running_processes < cls._max_processes()
153
154
155 def _process_started(self):
156 return bool(self.monitor)
157
158
159 def tick(self):
160 # override tick to keep trying to start until the process count goes
161 # down and we can, at which point we revert to default behavior
162 if self._process_started():
163 super(SelfThrottledPostJobTask, self).tick()
164 else:
165 self._try_starting_process()
166
167
168 def run(self):
169 # override run() to not actually run unless we can
170 self._try_starting_process()
171
172
173 @classmethod
174 def _notify_process_limit_hit(cls):
175 """Send an email to notify that process limit is hit."""
176 if cls._notification_on:
177 subject = '%s: hitting max process limit.' % cls.__name__
178 message = ('Running processes/Max processes: %d/%d'
179 % (cls._num_running_processes, cls._max_processes()))
180 email_manager.manager.enqueue_notify_email(subject, message)
181 cls._notification_on = False
182
183
184 @classmethod
185 def _reset_notification_switch_if_necessary(cls):
186 """Reset _notification_on if necessary.
187
188 Set _notification_on to True on the following cases:
189 1) If the limit of max processes configuration changes;
190 2) If _notification_on is False and the number of running processes
191 drops to lower than a level defined in REVIVE_NOTIFICATION_THRESHOLD.
192
193 """
194 if cls._last_known_max_processes != cls._max_processes():
195 cls._notification_on = True
196 cls._last_known_max_processes = cls._max_processes()
197 return
198 percentage = float(cls._num_running_processes) / cls._max_processes()
199 if (not cls._notification_on and
200 percentage < cls.REVIVE_NOTIFICATION_THRESHOLD):
201 cls._notification_on = True
202
203
204 def _try_starting_process(self):
205 self._reset_notification_switch_if_necessary()
206 if not self._can_run_new_process():
207 self._notify_process_limit_hit()
208 return
209
210 # actually run the command
211 super(SelfThrottledPostJobTask, self).run()
212 if self._process_started():
213 self._increment_running_processes()
214
215
216 def finished(self, success):
217 super(SelfThrottledPostJobTask, self).finished(success)
218 if self._process_started():
219 self._decrement_running_processes()
220
221
222class GatherLogsTask(PostJobTask):
223 """
224 Task responsible for
225 * gathering uncollected logs (if Autoserv crashed hard or was killed)
226 * copying logs to the results repository
227 * spawning CleanupTasks for hosts, if necessary
228 * spawning a FinalReparseTask for the job
229 * setting the final status of the host, directly or through a cleanup
230 """
231 def __init__(self, queue_entries, recover_run_monitor=None):
232 self._job = queue_entries[0].job
233 super(GatherLogsTask, self).__init__(
234 queue_entries, log_file_name='.collect_crashinfo.log')
235 self._set_ids(queue_entries=queue_entries)
236
237
238 # TODO: Refactor into autoserv_utils. crbug.com/243090
239 def _generate_command(self, results_dir):
240 host_list = ','.join(queue_entry.host.hostname
241 for queue_entry in self.queue_entries)
242 return [autoserv_utils.autoserv_path , '-p',
243 '--pidfile-label=%s' % self._pidfile_label(),
244 '--use-existing-results', '--collect-crashinfo',
245 '-m', host_list, '-r', results_dir]
246
247
248 @property
249 def num_processes(self):
250 return len(self.queue_entries)
251
252
253 def _pidfile_name(self):
254 return drone_manager.CRASHINFO_PID_FILE
255
256
257 def prolog(self):
258 self._check_queue_entry_statuses(
259 self.queue_entries,
260 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
261 allowed_host_statuses=(models.Host.Status.RUNNING,))
262
263 super(GatherLogsTask, self).prolog()
264
265
266 def epilog(self):
267 super(GatherLogsTask, self).epilog()
268 self._parse_results(self.queue_entries)
269 self._reboot_hosts()
270
271
272 def _reboot_hosts(self):
273 if self._autoserv_monitor.has_process():
274 final_success = (self._final_status() ==
275 models.HostQueueEntry.Status.COMPLETED)
276 num_tests_failed = self._autoserv_monitor.num_tests_failed()
277 else:
278 final_success = False
279 num_tests_failed = 0
280 reboot_after = self._job.reboot_after
281 do_reboot = (
282 # always reboot after aborted jobs
283 self._final_status() == models.HostQueueEntry.Status.ABORTED
284 or reboot_after == model_attributes.RebootAfter.ALWAYS
285 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
286 and final_success and num_tests_failed == 0)
287 or num_tests_failed > 0)
288
289 for queue_entry in self.queue_entries:
290 if do_reboot:
291 # don't pass the queue entry to the CleanupTask. if the cleanup
292 # fails, the job doesn't care -- it's over.
293 models.SpecialTask.objects.create(
294 host=models.Host.objects.get(id=queue_entry.host.id),
295 task=models.SpecialTask.Task.CLEANUP,
296 requested_by=self._job.owner_model())
297 else:
298 queue_entry.host.set_status(models.Host.Status.READY)
299
300
301 def run(self):
302 autoserv_exit_code = self._autoserv_monitor.exit_code()
303 # only run if Autoserv exited due to some signal. if we have no exit
304 # code, assume something bad (and signal-like) happened.
305 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
306 super(GatherLogsTask, self).run()
307 else:
308 self.finished(True)
309
310
311class FinalReparseTask(SelfThrottledPostJobTask):
312 def __init__(self, queue_entries):
313 super(FinalReparseTask, self).__init__(queue_entries,
314 log_file_name='.parse.log')
315 # don't use _set_ids, since we don't want to set the host_ids
316 self.queue_entry_ids = [entry.id for entry in queue_entries]
317
318
319 def _generate_command(self, results_dir):
Fang Deng49822682014-10-21 16:29:22 -0700320 return [_parser_path, '--write-pidfile', '--record-duration',
David Riley5d14a5d2017-03-02 19:19:44 +0000321 '--suite-report', '-l', '2', '-r', '-o', results_dir]
beeps5e2bb4a2013-10-28 11:26:45 -0700322
323
324 @property
325 def num_processes(self):
326 return 0 # don't include parser processes in accounting
327
328
329 def _pidfile_name(self):
330 return drone_manager.PARSER_PID_FILE
331
332
333 @classmethod
334 def _max_processes(cls):
335 return scheduler_config.config.max_parse_processes
336
337
338 def prolog(self):
339 self._check_queue_entry_statuses(
340 self.queue_entries,
341 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
342
343 super(FinalReparseTask, self).prolog()
344
345
346 def epilog(self):
347 super(FinalReparseTask, self).epilog()
348 self._archive_results(self.queue_entries)
349
350
351class ArchiveResultsTask(SelfThrottledPostJobTask):
352 _ARCHIVING_FAILED_FILE = '.archiver_failed'
353
354 def __init__(self, queue_entries):
355 super(ArchiveResultsTask, self).__init__(queue_entries,
356 log_file_name='.archiving.log')
357 # don't use _set_ids, since we don't want to set the host_ids
358 self.queue_entry_ids = [entry.id for entry in queue_entries]
359
360
361 def _pidfile_name(self):
362 return drone_manager.ARCHIVER_PID_FILE
363
364
365 # TODO: Refactor into autoserv_utils. crbug.com/243090
366 def _generate_command(self, results_dir):
367 return [autoserv_utils.autoserv_path , '-p',
368 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
369 '--use-existing-results', '--control-filename=control.archive',
370 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
371 'archive_results.control.srv')]
372
373
374 @classmethod
375 def _max_processes(cls):
376 return scheduler_config.config.max_transfer_processes
377
378
379 def prolog(self):
380 self._check_queue_entry_statuses(
381 self.queue_entries,
382 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
383
384 super(ArchiveResultsTask, self).prolog()
385
386
387 def epilog(self):
388 super(ArchiveResultsTask, self).epilog()
389 if not self.success and self._paired_with_monitor().has_process():
390 failed_file = os.path.join(self._working_directory(),
391 self._ARCHIVING_FAILED_FILE)
392 paired_process = self._paired_with_monitor().get_process()
Jakob Jülich36accc62014-07-23 10:26:55 -0700393 self._drone_manager.write_lines_to_file(
beeps5e2bb4a2013-10-28 11:26:45 -0700394 failed_file, ['Archiving failed with exit code %s'
395 % self.monitor.exit_code()],
396 paired_with_process=paired_process)
397 self._set_all_statuses(self._final_status())