blob: e38a06b0379f38c1b94cc36b112bba84f9b8a1c2 [file] [log] [blame]
beeps5e2bb4a2013-10-28 11:26:45 -07001#pylint: disable-msg=C0111
2
3"""
4Postjob task.
5
6Postjob tasks are responsible for setting the final status of the HQE
7and Host, and scheduling additional special agents such as cleanup,
8if necessary.
9"""
10
11import os
12
Michael Liangda8c60a2014-06-03 13:24:51 -070013from autotest_lib.client.common_lib.cros.graphite import stats
beeps5e2bb4a2013-10-28 11:26:45 -070014from autotest_lib.frontend.afe import models, model_attributes
15from autotest_lib.scheduler import agent_task, drones, drone_manager
16from autotest_lib.scheduler import email_manager, pidfile_monitor
17from autotest_lib.scheduler import scheduler_config
18from autotest_lib.server import autoserv_utils
beeps5e2bb4a2013-10-28 11:26:45 -070019
20
beeps5e2bb4a2013-10-28 11:26:45 -070021_parser_path = autoserv_utils._parser_path_func(
22 autoserv_utils.AUTOTEST_INSTALL_DIR)
23
24
25class PostJobTask(agent_task.AgentTask):
26 def __init__(self, queue_entries, log_file_name):
27 super(PostJobTask, self).__init__(log_file_name=log_file_name)
28
29 self.queue_entries = queue_entries
30
31 self._autoserv_monitor = pidfile_monitor.PidfileRunMonitor()
32 self._autoserv_monitor.attach_to_existing_process(
33 self._working_directory())
34
35
36 def _command_line(self):
37 # Do we need testing_mode?
38 return self._generate_command(
Jakob Jülich36accc62014-07-23 10:26:55 -070039 self._drone_manager.absolute_path(self._working_directory()))
beeps5e2bb4a2013-10-28 11:26:45 -070040
41
42 def _generate_command(self, results_dir):
43 raise NotImplementedError('Subclasses must override this')
44
45
46 @property
47 def owner_username(self):
48 return self.queue_entries[0].job.owner
49
50
51 def _working_directory(self):
52 return self._get_consistent_execution_path(self.queue_entries)
53
54
55 def _paired_with_monitor(self):
56 return self._autoserv_monitor
57
58
59 def _job_was_aborted(self):
60 was_aborted = None
61 for queue_entry in self.queue_entries:
62 queue_entry.update_from_database()
63 if was_aborted is None: # first queue entry
64 was_aborted = bool(queue_entry.aborted)
65 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
66 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
67 for entry in self.queue_entries]
68 email_manager.manager.enqueue_notify_email(
69 'Inconsistent abort state',
70 'Queue entries have inconsistent abort state:\n' +
71 '\n'.join(entries))
72 # don't crash here, just assume true
73 return True
74 return was_aborted
75
76
77 def _final_status(self):
78 if self._job_was_aborted():
79 return models.HostQueueEntry.Status.ABORTED
80
81 # we'll use a PidfileRunMonitor to read the autoserv exit status
82 if self._autoserv_monitor.exit_code() == 0:
83 return models.HostQueueEntry.Status.COMPLETED
84 return models.HostQueueEntry.Status.FAILED
85
86
87 def _set_all_statuses(self, status):
88 for queue_entry in self.queue_entries:
89 queue_entry.set_status(status)
90
91
92 def abort(self):
93 # override AgentTask.abort() to avoid killing the process and ending
94 # the task. post-job tasks continue when the job is aborted.
95 pass
96
97
98 def _pidfile_label(self):
99 # '.autoserv_execute' -> 'autoserv'
100 return self._pidfile_name()[1:-len('_execute')]
101
102
103class SelfThrottledPostJobTask(PostJobTask):
104 """
105 PostJobTask that maintains its own process limit.
106
107 We throttle tasks like parsing because we don't want them to
108 hold up tests. At the same time we don't wish to build up load
109 that will take forever to parse.
110 """
111 _num_running_processes = 0
112 # Last known limit of max processes, used to check whether
113 # max processes config has been changed.
114 _last_known_max_processes = 0
115 # Whether an email should be sent to notifiy process limit being hit.
116 _notification_on = True
117 # Once process limit is hit, an email will be sent.
118 # To prevent spams, do not send another email until
119 # it drops to lower than the following level.
120 REVIVE_NOTIFICATION_THRESHOLD = 0.80
121
122
123 @classmethod
124 def _increment_running_processes(cls):
125 cls._num_running_processes += 1
126 stats.Gauge('scheduler').send('%s.num_running_processes' % cls.__name__,
127 cls._num_running_processes)
128
129
130 @classmethod
131 def _decrement_running_processes(cls):
132 cls._num_running_processes -= 1
133 stats.Gauge('scheduler').send('%s.num_running_processes' % cls.__name__,
134 cls._num_running_processes)
135
136
137 @classmethod
138 def _max_processes(cls):
139 raise NotImplementedError
140
141
142 @classmethod
143 def _can_run_new_process(cls):
144 return cls._num_running_processes < cls._max_processes()
145
146
147 def _process_started(self):
148 return bool(self.monitor)
149
150
151 def tick(self):
152 # override tick to keep trying to start until the process count goes
153 # down and we can, at which point we revert to default behavior
154 if self._process_started():
155 super(SelfThrottledPostJobTask, self).tick()
156 else:
157 self._try_starting_process()
158
159
160 def run(self):
161 # override run() to not actually run unless we can
162 self._try_starting_process()
163
164
165 @classmethod
166 def _notify_process_limit_hit(cls):
167 """Send an email to notify that process limit is hit."""
168 if cls._notification_on:
169 subject = '%s: hitting max process limit.' % cls.__name__
170 message = ('Running processes/Max processes: %d/%d'
171 % (cls._num_running_processes, cls._max_processes()))
172 email_manager.manager.enqueue_notify_email(subject, message)
173 cls._notification_on = False
174
175
176 @classmethod
177 def _reset_notification_switch_if_necessary(cls):
178 """Reset _notification_on if necessary.
179
180 Set _notification_on to True on the following cases:
181 1) If the limit of max processes configuration changes;
182 2) If _notification_on is False and the number of running processes
183 drops to lower than a level defined in REVIVE_NOTIFICATION_THRESHOLD.
184
185 """
186 if cls._last_known_max_processes != cls._max_processes():
187 cls._notification_on = True
188 cls._last_known_max_processes = cls._max_processes()
189 return
190 percentage = float(cls._num_running_processes) / cls._max_processes()
191 if (not cls._notification_on and
192 percentage < cls.REVIVE_NOTIFICATION_THRESHOLD):
193 cls._notification_on = True
194
195
196 def _try_starting_process(self):
197 self._reset_notification_switch_if_necessary()
198 if not self._can_run_new_process():
199 self._notify_process_limit_hit()
200 return
201
202 # actually run the command
203 super(SelfThrottledPostJobTask, self).run()
204 if self._process_started():
205 self._increment_running_processes()
206
207
208 def finished(self, success):
209 super(SelfThrottledPostJobTask, self).finished(success)
210 if self._process_started():
211 self._decrement_running_processes()
212
213
214class GatherLogsTask(PostJobTask):
215 """
216 Task responsible for
217 * gathering uncollected logs (if Autoserv crashed hard or was killed)
218 * copying logs to the results repository
219 * spawning CleanupTasks for hosts, if necessary
220 * spawning a FinalReparseTask for the job
221 * setting the final status of the host, directly or through a cleanup
222 """
223 def __init__(self, queue_entries, recover_run_monitor=None):
224 self._job = queue_entries[0].job
225 super(GatherLogsTask, self).__init__(
226 queue_entries, log_file_name='.collect_crashinfo.log')
227 self._set_ids(queue_entries=queue_entries)
228
229
230 # TODO: Refactor into autoserv_utils. crbug.com/243090
231 def _generate_command(self, results_dir):
232 host_list = ','.join(queue_entry.host.hostname
233 for queue_entry in self.queue_entries)
234 return [autoserv_utils.autoserv_path , '-p',
235 '--pidfile-label=%s' % self._pidfile_label(),
236 '--use-existing-results', '--collect-crashinfo',
237 '-m', host_list, '-r', results_dir]
238
239
240 @property
241 def num_processes(self):
242 return len(self.queue_entries)
243
244
245 def _pidfile_name(self):
246 return drone_manager.CRASHINFO_PID_FILE
247
248
249 def prolog(self):
250 self._check_queue_entry_statuses(
251 self.queue_entries,
252 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
253 allowed_host_statuses=(models.Host.Status.RUNNING,))
254
255 super(GatherLogsTask, self).prolog()
256
257
258 def epilog(self):
259 super(GatherLogsTask, self).epilog()
260 self._parse_results(self.queue_entries)
261 self._reboot_hosts()
262
263
264 def _reboot_hosts(self):
265 if self._autoserv_monitor.has_process():
266 final_success = (self._final_status() ==
267 models.HostQueueEntry.Status.COMPLETED)
268 num_tests_failed = self._autoserv_monitor.num_tests_failed()
269 else:
270 final_success = False
271 num_tests_failed = 0
272 reboot_after = self._job.reboot_after
273 do_reboot = (
274 # always reboot after aborted jobs
275 self._final_status() == models.HostQueueEntry.Status.ABORTED
276 or reboot_after == model_attributes.RebootAfter.ALWAYS
277 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
278 and final_success and num_tests_failed == 0)
279 or num_tests_failed > 0)
280
281 for queue_entry in self.queue_entries:
282 if do_reboot:
283 # don't pass the queue entry to the CleanupTask. if the cleanup
284 # fails, the job doesn't care -- it's over.
285 models.SpecialTask.objects.create(
286 host=models.Host.objects.get(id=queue_entry.host.id),
287 task=models.SpecialTask.Task.CLEANUP,
288 requested_by=self._job.owner_model())
289 else:
290 queue_entry.host.set_status(models.Host.Status.READY)
291
292
293 def run(self):
294 autoserv_exit_code = self._autoserv_monitor.exit_code()
295 # only run if Autoserv exited due to some signal. if we have no exit
296 # code, assume something bad (and signal-like) happened.
297 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
298 super(GatherLogsTask, self).run()
299 else:
300 self.finished(True)
301
302
303class FinalReparseTask(SelfThrottledPostJobTask):
304 def __init__(self, queue_entries):
305 super(FinalReparseTask, self).__init__(queue_entries,
306 log_file_name='.parse.log')
307 # don't use _set_ids, since we don't want to set the host_ids
308 self.queue_entry_ids = [entry.id for entry in queue_entries]
309
310
311 def _generate_command(self, results_dir):
312 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
313 results_dir]
314
315
316 @property
317 def num_processes(self):
318 return 0 # don't include parser processes in accounting
319
320
321 def _pidfile_name(self):
322 return drone_manager.PARSER_PID_FILE
323
324
325 @classmethod
326 def _max_processes(cls):
327 return scheduler_config.config.max_parse_processes
328
329
330 def prolog(self):
331 self._check_queue_entry_statuses(
332 self.queue_entries,
333 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
334
335 super(FinalReparseTask, self).prolog()
336
337
338 def epilog(self):
339 super(FinalReparseTask, self).epilog()
340 self._archive_results(self.queue_entries)
341
342
343class ArchiveResultsTask(SelfThrottledPostJobTask):
344 _ARCHIVING_FAILED_FILE = '.archiver_failed'
345
346 def __init__(self, queue_entries):
347 super(ArchiveResultsTask, self).__init__(queue_entries,
348 log_file_name='.archiving.log')
349 # don't use _set_ids, since we don't want to set the host_ids
350 self.queue_entry_ids = [entry.id for entry in queue_entries]
351
352
353 def _pidfile_name(self):
354 return drone_manager.ARCHIVER_PID_FILE
355
356
357 # TODO: Refactor into autoserv_utils. crbug.com/243090
358 def _generate_command(self, results_dir):
359 return [autoserv_utils.autoserv_path , '-p',
360 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
361 '--use-existing-results', '--control-filename=control.archive',
362 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
363 'archive_results.control.srv')]
364
365
366 @classmethod
367 def _max_processes(cls):
368 return scheduler_config.config.max_transfer_processes
369
370
371 def prolog(self):
372 self._check_queue_entry_statuses(
373 self.queue_entries,
374 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
375
376 super(ArchiveResultsTask, self).prolog()
377
378
379 def epilog(self):
380 super(ArchiveResultsTask, self).epilog()
381 if not self.success and self._paired_with_monitor().has_process():
382 failed_file = os.path.join(self._working_directory(),
383 self._ARCHIVING_FAILED_FILE)
384 paired_process = self._paired_with_monitor().get_process()
Jakob Jülich36accc62014-07-23 10:26:55 -0700385 self._drone_manager.write_lines_to_file(
beeps5e2bb4a2013-10-28 11:26:45 -0700386 failed_file, ['Archiving failed with exit code %s'
387 % self.monitor.exit_code()],
388 paired_with_process=paired_process)
389 self._set_all_statuses(self._final_status())