blob: 43820936d01a10075e441aaaefe5fe6bd8ea0ef6 [file] [log] [blame]
beeps5e2bb4a2013-10-28 11:26:45 -07001#pylint: disable-msg=C0111
2
3"""
4Postjob task.
5
6Postjob tasks are responsible for setting the final status of the HQE
7and Host, and scheduling additional special agents such as cleanup,
8if necessary.
9"""
10
11import os
12
13from autotest_lib.frontend.afe import models, model_attributes
14from autotest_lib.scheduler import agent_task, drones, drone_manager
15from autotest_lib.scheduler import email_manager, pidfile_monitor
16from autotest_lib.scheduler import scheduler_config
17from autotest_lib.server import autoserv_utils
18from autotest_lib.site_utils.graphite import stats
19
20
21_drone_manager = drone_manager.instance()
22_parser_path = autoserv_utils._parser_path_func(
23 autoserv_utils.AUTOTEST_INSTALL_DIR)
24
25
26class PostJobTask(agent_task.AgentTask):
27 def __init__(self, queue_entries, log_file_name):
28 super(PostJobTask, self).__init__(log_file_name=log_file_name)
29
30 self.queue_entries = queue_entries
31
32 self._autoserv_monitor = pidfile_monitor.PidfileRunMonitor()
33 self._autoserv_monitor.attach_to_existing_process(
34 self._working_directory())
35
36
37 def _command_line(self):
38 # Do we need testing_mode?
39 return self._generate_command(
40 _drone_manager.absolute_path(self._working_directory()))
41
42
43 def _generate_command(self, results_dir):
44 raise NotImplementedError('Subclasses must override this')
45
46
47 @property
48 def owner_username(self):
49 return self.queue_entries[0].job.owner
50
51
52 def _working_directory(self):
53 return self._get_consistent_execution_path(self.queue_entries)
54
55
56 def _paired_with_monitor(self):
57 return self._autoserv_monitor
58
59
60 def _job_was_aborted(self):
61 was_aborted = None
62 for queue_entry in self.queue_entries:
63 queue_entry.update_from_database()
64 if was_aborted is None: # first queue entry
65 was_aborted = bool(queue_entry.aborted)
66 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
67 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
68 for entry in self.queue_entries]
69 email_manager.manager.enqueue_notify_email(
70 'Inconsistent abort state',
71 'Queue entries have inconsistent abort state:\n' +
72 '\n'.join(entries))
73 # don't crash here, just assume true
74 return True
75 return was_aborted
76
77
78 def _final_status(self):
79 if self._job_was_aborted():
80 return models.HostQueueEntry.Status.ABORTED
81
82 # we'll use a PidfileRunMonitor to read the autoserv exit status
83 if self._autoserv_monitor.exit_code() == 0:
84 return models.HostQueueEntry.Status.COMPLETED
85 return models.HostQueueEntry.Status.FAILED
86
87
88 def _set_all_statuses(self, status):
89 for queue_entry in self.queue_entries:
90 queue_entry.set_status(status)
91
92
93 def abort(self):
94 # override AgentTask.abort() to avoid killing the process and ending
95 # the task. post-job tasks continue when the job is aborted.
96 pass
97
98
99 def _pidfile_label(self):
100 # '.autoserv_execute' -> 'autoserv'
101 return self._pidfile_name()[1:-len('_execute')]
102
103
104class SelfThrottledPostJobTask(PostJobTask):
105 """
106 PostJobTask that maintains its own process limit.
107
108 We throttle tasks like parsing because we don't want them to
109 hold up tests. At the same time we don't wish to build up load
110 that will take forever to parse.
111 """
112 _num_running_processes = 0
113 # Last known limit of max processes, used to check whether
114 # max processes config has been changed.
115 _last_known_max_processes = 0
116 # Whether an email should be sent to notifiy process limit being hit.
117 _notification_on = True
118 # Once process limit is hit, an email will be sent.
119 # To prevent spams, do not send another email until
120 # it drops to lower than the following level.
121 REVIVE_NOTIFICATION_THRESHOLD = 0.80
122
123
124 @classmethod
125 def _increment_running_processes(cls):
126 cls._num_running_processes += 1
127 stats.Gauge('scheduler').send('%s.num_running_processes' % cls.__name__,
128 cls._num_running_processes)
129
130
131 @classmethod
132 def _decrement_running_processes(cls):
133 cls._num_running_processes -= 1
134 stats.Gauge('scheduler').send('%s.num_running_processes' % cls.__name__,
135 cls._num_running_processes)
136
137
138 @classmethod
139 def _max_processes(cls):
140 raise NotImplementedError
141
142
143 @classmethod
144 def _can_run_new_process(cls):
145 return cls._num_running_processes < cls._max_processes()
146
147
148 def _process_started(self):
149 return bool(self.monitor)
150
151
152 def tick(self):
153 # override tick to keep trying to start until the process count goes
154 # down and we can, at which point we revert to default behavior
155 if self._process_started():
156 super(SelfThrottledPostJobTask, self).tick()
157 else:
158 self._try_starting_process()
159
160
161 def run(self):
162 # override run() to not actually run unless we can
163 self._try_starting_process()
164
165
166 @classmethod
167 def _notify_process_limit_hit(cls):
168 """Send an email to notify that process limit is hit."""
169 if cls._notification_on:
170 subject = '%s: hitting max process limit.' % cls.__name__
171 message = ('Running processes/Max processes: %d/%d'
172 % (cls._num_running_processes, cls._max_processes()))
173 email_manager.manager.enqueue_notify_email(subject, message)
174 cls._notification_on = False
175
176
177 @classmethod
178 def _reset_notification_switch_if_necessary(cls):
179 """Reset _notification_on if necessary.
180
181 Set _notification_on to True on the following cases:
182 1) If the limit of max processes configuration changes;
183 2) If _notification_on is False and the number of running processes
184 drops to lower than a level defined in REVIVE_NOTIFICATION_THRESHOLD.
185
186 """
187 if cls._last_known_max_processes != cls._max_processes():
188 cls._notification_on = True
189 cls._last_known_max_processes = cls._max_processes()
190 return
191 percentage = float(cls._num_running_processes) / cls._max_processes()
192 if (not cls._notification_on and
193 percentage < cls.REVIVE_NOTIFICATION_THRESHOLD):
194 cls._notification_on = True
195
196
197 def _try_starting_process(self):
198 self._reset_notification_switch_if_necessary()
199 if not self._can_run_new_process():
200 self._notify_process_limit_hit()
201 return
202
203 # actually run the command
204 super(SelfThrottledPostJobTask, self).run()
205 if self._process_started():
206 self._increment_running_processes()
207
208
209 def finished(self, success):
210 super(SelfThrottledPostJobTask, self).finished(success)
211 if self._process_started():
212 self._decrement_running_processes()
213
214
215class GatherLogsTask(PostJobTask):
216 """
217 Task responsible for
218 * gathering uncollected logs (if Autoserv crashed hard or was killed)
219 * copying logs to the results repository
220 * spawning CleanupTasks for hosts, if necessary
221 * spawning a FinalReparseTask for the job
222 * setting the final status of the host, directly or through a cleanup
223 """
224 def __init__(self, queue_entries, recover_run_monitor=None):
225 self._job = queue_entries[0].job
226 super(GatherLogsTask, self).__init__(
227 queue_entries, log_file_name='.collect_crashinfo.log')
228 self._set_ids(queue_entries=queue_entries)
229
230
231 # TODO: Refactor into autoserv_utils. crbug.com/243090
232 def _generate_command(self, results_dir):
233 host_list = ','.join(queue_entry.host.hostname
234 for queue_entry in self.queue_entries)
235 return [autoserv_utils.autoserv_path , '-p',
236 '--pidfile-label=%s' % self._pidfile_label(),
237 '--use-existing-results', '--collect-crashinfo',
238 '-m', host_list, '-r', results_dir]
239
240
241 @property
242 def num_processes(self):
243 return len(self.queue_entries)
244
245
246 def _pidfile_name(self):
247 return drone_manager.CRASHINFO_PID_FILE
248
249
250 def prolog(self):
251 self._check_queue_entry_statuses(
252 self.queue_entries,
253 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
254 allowed_host_statuses=(models.Host.Status.RUNNING,))
255
256 super(GatherLogsTask, self).prolog()
257
258
259 def epilog(self):
260 super(GatherLogsTask, self).epilog()
261 self._parse_results(self.queue_entries)
262 self._reboot_hosts()
263
264
265 def _reboot_hosts(self):
266 if self._autoserv_monitor.has_process():
267 final_success = (self._final_status() ==
268 models.HostQueueEntry.Status.COMPLETED)
269 num_tests_failed = self._autoserv_monitor.num_tests_failed()
270 else:
271 final_success = False
272 num_tests_failed = 0
273 reboot_after = self._job.reboot_after
274 do_reboot = (
275 # always reboot after aborted jobs
276 self._final_status() == models.HostQueueEntry.Status.ABORTED
277 or reboot_after == model_attributes.RebootAfter.ALWAYS
278 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
279 and final_success and num_tests_failed == 0)
280 or num_tests_failed > 0)
281
282 for queue_entry in self.queue_entries:
283 if do_reboot:
284 # don't pass the queue entry to the CleanupTask. if the cleanup
285 # fails, the job doesn't care -- it's over.
286 models.SpecialTask.objects.create(
287 host=models.Host.objects.get(id=queue_entry.host.id),
288 task=models.SpecialTask.Task.CLEANUP,
289 requested_by=self._job.owner_model())
290 else:
291 queue_entry.host.set_status(models.Host.Status.READY)
292
293
294 def run(self):
295 autoserv_exit_code = self._autoserv_monitor.exit_code()
296 # only run if Autoserv exited due to some signal. if we have no exit
297 # code, assume something bad (and signal-like) happened.
298 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
299 super(GatherLogsTask, self).run()
300 else:
301 self.finished(True)
302
303
304class FinalReparseTask(SelfThrottledPostJobTask):
305 def __init__(self, queue_entries):
306 super(FinalReparseTask, self).__init__(queue_entries,
307 log_file_name='.parse.log')
308 # don't use _set_ids, since we don't want to set the host_ids
309 self.queue_entry_ids = [entry.id for entry in queue_entries]
310
311
312 def _generate_command(self, results_dir):
313 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
314 results_dir]
315
316
317 @property
318 def num_processes(self):
319 return 0 # don't include parser processes in accounting
320
321
322 def _pidfile_name(self):
323 return drone_manager.PARSER_PID_FILE
324
325
326 @classmethod
327 def _max_processes(cls):
328 return scheduler_config.config.max_parse_processes
329
330
331 def prolog(self):
332 self._check_queue_entry_statuses(
333 self.queue_entries,
334 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
335
336 super(FinalReparseTask, self).prolog()
337
338
339 def epilog(self):
340 super(FinalReparseTask, self).epilog()
341 self._archive_results(self.queue_entries)
342
343
344class ArchiveResultsTask(SelfThrottledPostJobTask):
345 _ARCHIVING_FAILED_FILE = '.archiver_failed'
346
347 def __init__(self, queue_entries):
348 super(ArchiveResultsTask, self).__init__(queue_entries,
349 log_file_name='.archiving.log')
350 # don't use _set_ids, since we don't want to set the host_ids
351 self.queue_entry_ids = [entry.id for entry in queue_entries]
352
353
354 def _pidfile_name(self):
355 return drone_manager.ARCHIVER_PID_FILE
356
357
358 # TODO: Refactor into autoserv_utils. crbug.com/243090
359 def _generate_command(self, results_dir):
360 return [autoserv_utils.autoserv_path , '-p',
361 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
362 '--use-existing-results', '--control-filename=control.archive',
363 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
364 'archive_results.control.srv')]
365
366
367 @classmethod
368 def _max_processes(cls):
369 return scheduler_config.config.max_transfer_processes
370
371
372 def prolog(self):
373 self._check_queue_entry_statuses(
374 self.queue_entries,
375 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
376
377 super(ArchiveResultsTask, self).prolog()
378
379
380 def epilog(self):
381 super(ArchiveResultsTask, self).epilog()
382 if not self.success and self._paired_with_monitor().has_process():
383 failed_file = os.path.join(self._working_directory(),
384 self._ARCHIVING_FAILED_FILE)
385 paired_process = self._paired_with_monitor().get_process()
386 _drone_manager.write_lines_to_file(
387 failed_file, ['Archiving failed with exit code %s'
388 % self.monitor.exit_code()],
389 paired_with_process=paired_process)
390 self._set_all_statuses(self._final_status())