blob: 1ca602b1a28e795fa44c1753154110e9a3264824 [file] [log] [blame]
beeps5e2bb4a2013-10-28 11:26:45 -07001#pylint: disable-msg=C0111
2
3"""
4Postjob task.
5
6Postjob tasks are responsible for setting the final status of the HQE
7and Host, and scheduling additional special agents such as cleanup,
8if necessary.
9"""
10
11import os
12
Michael Liangda8c60a2014-06-03 13:24:51 -070013from autotest_lib.client.common_lib.cros.graphite import stats
beeps5e2bb4a2013-10-28 11:26:45 -070014from autotest_lib.frontend.afe import models, model_attributes
15from autotest_lib.scheduler import agent_task, drones, drone_manager
16from autotest_lib.scheduler import email_manager, pidfile_monitor
17from autotest_lib.scheduler import scheduler_config
18from autotest_lib.server import autoserv_utils
beeps5e2bb4a2013-10-28 11:26:45 -070019
20
Fang Dengc330bee2014-10-21 18:10:55 -070021_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
beeps5e2bb4a2013-10-28 11:26:45 -070022
23
24class PostJobTask(agent_task.AgentTask):
25 def __init__(self, queue_entries, log_file_name):
26 super(PostJobTask, self).__init__(log_file_name=log_file_name)
27
28 self.queue_entries = queue_entries
29
30 self._autoserv_monitor = pidfile_monitor.PidfileRunMonitor()
31 self._autoserv_monitor.attach_to_existing_process(
32 self._working_directory())
33
34
35 def _command_line(self):
36 # Do we need testing_mode?
37 return self._generate_command(
Jakob Jülich36accc62014-07-23 10:26:55 -070038 self._drone_manager.absolute_path(self._working_directory()))
beeps5e2bb4a2013-10-28 11:26:45 -070039
40
41 def _generate_command(self, results_dir):
42 raise NotImplementedError('Subclasses must override this')
43
44
45 @property
46 def owner_username(self):
47 return self.queue_entries[0].job.owner
48
49
50 def _working_directory(self):
51 return self._get_consistent_execution_path(self.queue_entries)
52
53
54 def _paired_with_monitor(self):
55 return self._autoserv_monitor
56
57
58 def _job_was_aborted(self):
59 was_aborted = None
60 for queue_entry in self.queue_entries:
61 queue_entry.update_from_database()
62 if was_aborted is None: # first queue entry
63 was_aborted = bool(queue_entry.aborted)
64 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
65 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
66 for entry in self.queue_entries]
67 email_manager.manager.enqueue_notify_email(
68 'Inconsistent abort state',
69 'Queue entries have inconsistent abort state:\n' +
70 '\n'.join(entries))
71 # don't crash here, just assume true
72 return True
73 return was_aborted
74
75
76 def _final_status(self):
77 if self._job_was_aborted():
78 return models.HostQueueEntry.Status.ABORTED
79
80 # we'll use a PidfileRunMonitor to read the autoserv exit status
81 if self._autoserv_monitor.exit_code() == 0:
82 return models.HostQueueEntry.Status.COMPLETED
83 return models.HostQueueEntry.Status.FAILED
84
85
86 def _set_all_statuses(self, status):
87 for queue_entry in self.queue_entries:
88 queue_entry.set_status(status)
89
90
91 def abort(self):
92 # override AgentTask.abort() to avoid killing the process and ending
93 # the task. post-job tasks continue when the job is aborted.
94 pass
95
96
97 def _pidfile_label(self):
98 # '.autoserv_execute' -> 'autoserv'
99 return self._pidfile_name()[1:-len('_execute')]
100
101
102class SelfThrottledPostJobTask(PostJobTask):
103 """
104 PostJobTask that maintains its own process limit.
105
106 We throttle tasks like parsing because we don't want them to
107 hold up tests. At the same time we don't wish to build up load
108 that will take forever to parse.
109 """
110 _num_running_processes = 0
111 # Last known limit of max processes, used to check whether
112 # max processes config has been changed.
113 _last_known_max_processes = 0
114 # Whether an email should be sent to notifiy process limit being hit.
115 _notification_on = True
116 # Once process limit is hit, an email will be sent.
117 # To prevent spams, do not send another email until
118 # it drops to lower than the following level.
119 REVIVE_NOTIFICATION_THRESHOLD = 0.80
120
121
122 @classmethod
123 def _increment_running_processes(cls):
124 cls._num_running_processes += 1
125 stats.Gauge('scheduler').send('%s.num_running_processes' % cls.__name__,
126 cls._num_running_processes)
127
128
129 @classmethod
130 def _decrement_running_processes(cls):
131 cls._num_running_processes -= 1
132 stats.Gauge('scheduler').send('%s.num_running_processes' % cls.__name__,
133 cls._num_running_processes)
134
135
136 @classmethod
137 def _max_processes(cls):
138 raise NotImplementedError
139
140
141 @classmethod
142 def _can_run_new_process(cls):
143 return cls._num_running_processes < cls._max_processes()
144
145
146 def _process_started(self):
147 return bool(self.monitor)
148
149
150 def tick(self):
151 # override tick to keep trying to start until the process count goes
152 # down and we can, at which point we revert to default behavior
153 if self._process_started():
154 super(SelfThrottledPostJobTask, self).tick()
155 else:
156 self._try_starting_process()
157
158
159 def run(self):
160 # override run() to not actually run unless we can
161 self._try_starting_process()
162
163
164 @classmethod
165 def _notify_process_limit_hit(cls):
166 """Send an email to notify that process limit is hit."""
167 if cls._notification_on:
168 subject = '%s: hitting max process limit.' % cls.__name__
169 message = ('Running processes/Max processes: %d/%d'
170 % (cls._num_running_processes, cls._max_processes()))
171 email_manager.manager.enqueue_notify_email(subject, message)
172 cls._notification_on = False
173
174
175 @classmethod
176 def _reset_notification_switch_if_necessary(cls):
177 """Reset _notification_on if necessary.
178
179 Set _notification_on to True on the following cases:
180 1) If the limit of max processes configuration changes;
181 2) If _notification_on is False and the number of running processes
182 drops to lower than a level defined in REVIVE_NOTIFICATION_THRESHOLD.
183
184 """
185 if cls._last_known_max_processes != cls._max_processes():
186 cls._notification_on = True
187 cls._last_known_max_processes = cls._max_processes()
188 return
189 percentage = float(cls._num_running_processes) / cls._max_processes()
190 if (not cls._notification_on and
191 percentage < cls.REVIVE_NOTIFICATION_THRESHOLD):
192 cls._notification_on = True
193
194
195 def _try_starting_process(self):
196 self._reset_notification_switch_if_necessary()
197 if not self._can_run_new_process():
198 self._notify_process_limit_hit()
199 return
200
201 # actually run the command
202 super(SelfThrottledPostJobTask, self).run()
203 if self._process_started():
204 self._increment_running_processes()
205
206
207 def finished(self, success):
208 super(SelfThrottledPostJobTask, self).finished(success)
209 if self._process_started():
210 self._decrement_running_processes()
211
212
213class GatherLogsTask(PostJobTask):
214 """
215 Task responsible for
216 * gathering uncollected logs (if Autoserv crashed hard or was killed)
217 * copying logs to the results repository
218 * spawning CleanupTasks for hosts, if necessary
219 * spawning a FinalReparseTask for the job
220 * setting the final status of the host, directly or through a cleanup
221 """
222 def __init__(self, queue_entries, recover_run_monitor=None):
223 self._job = queue_entries[0].job
224 super(GatherLogsTask, self).__init__(
225 queue_entries, log_file_name='.collect_crashinfo.log')
226 self._set_ids(queue_entries=queue_entries)
227
228
229 # TODO: Refactor into autoserv_utils. crbug.com/243090
230 def _generate_command(self, results_dir):
231 host_list = ','.join(queue_entry.host.hostname
232 for queue_entry in self.queue_entries)
233 return [autoserv_utils.autoserv_path , '-p',
234 '--pidfile-label=%s' % self._pidfile_label(),
235 '--use-existing-results', '--collect-crashinfo',
236 '-m', host_list, '-r', results_dir]
237
238
239 @property
240 def num_processes(self):
241 return len(self.queue_entries)
242
243
244 def _pidfile_name(self):
245 return drone_manager.CRASHINFO_PID_FILE
246
247
248 def prolog(self):
249 self._check_queue_entry_statuses(
250 self.queue_entries,
251 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
252 allowed_host_statuses=(models.Host.Status.RUNNING,))
253
254 super(GatherLogsTask, self).prolog()
255
256
257 def epilog(self):
258 super(GatherLogsTask, self).epilog()
259 self._parse_results(self.queue_entries)
260 self._reboot_hosts()
261
262
263 def _reboot_hosts(self):
264 if self._autoserv_monitor.has_process():
265 final_success = (self._final_status() ==
266 models.HostQueueEntry.Status.COMPLETED)
267 num_tests_failed = self._autoserv_monitor.num_tests_failed()
268 else:
269 final_success = False
270 num_tests_failed = 0
271 reboot_after = self._job.reboot_after
272 do_reboot = (
273 # always reboot after aborted jobs
274 self._final_status() == models.HostQueueEntry.Status.ABORTED
275 or reboot_after == model_attributes.RebootAfter.ALWAYS
276 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
277 and final_success and num_tests_failed == 0)
278 or num_tests_failed > 0)
279
280 for queue_entry in self.queue_entries:
281 if do_reboot:
282 # don't pass the queue entry to the CleanupTask. if the cleanup
283 # fails, the job doesn't care -- it's over.
284 models.SpecialTask.objects.create(
285 host=models.Host.objects.get(id=queue_entry.host.id),
286 task=models.SpecialTask.Task.CLEANUP,
287 requested_by=self._job.owner_model())
288 else:
289 queue_entry.host.set_status(models.Host.Status.READY)
290
291
292 def run(self):
293 autoserv_exit_code = self._autoserv_monitor.exit_code()
294 # only run if Autoserv exited due to some signal. if we have no exit
295 # code, assume something bad (and signal-like) happened.
296 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
297 super(GatherLogsTask, self).run()
298 else:
299 self.finished(True)
300
301
302class FinalReparseTask(SelfThrottledPostJobTask):
303 def __init__(self, queue_entries):
304 super(FinalReparseTask, self).__init__(queue_entries,
305 log_file_name='.parse.log')
306 # don't use _set_ids, since we don't want to set the host_ids
307 self.queue_entry_ids = [entry.id for entry in queue_entries]
308
309
310 def _generate_command(self, results_dir):
311 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
312 results_dir]
313
314
315 @property
316 def num_processes(self):
317 return 0 # don't include parser processes in accounting
318
319
320 def _pidfile_name(self):
321 return drone_manager.PARSER_PID_FILE
322
323
324 @classmethod
325 def _max_processes(cls):
326 return scheduler_config.config.max_parse_processes
327
328
329 def prolog(self):
330 self._check_queue_entry_statuses(
331 self.queue_entries,
332 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
333
334 super(FinalReparseTask, self).prolog()
335
336
337 def epilog(self):
338 super(FinalReparseTask, self).epilog()
339 self._archive_results(self.queue_entries)
340
341
342class ArchiveResultsTask(SelfThrottledPostJobTask):
343 _ARCHIVING_FAILED_FILE = '.archiver_failed'
344
345 def __init__(self, queue_entries):
346 super(ArchiveResultsTask, self).__init__(queue_entries,
347 log_file_name='.archiving.log')
348 # don't use _set_ids, since we don't want to set the host_ids
349 self.queue_entry_ids = [entry.id for entry in queue_entries]
350
351
352 def _pidfile_name(self):
353 return drone_manager.ARCHIVER_PID_FILE
354
355
356 # TODO: Refactor into autoserv_utils. crbug.com/243090
357 def _generate_command(self, results_dir):
358 return [autoserv_utils.autoserv_path , '-p',
359 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
360 '--use-existing-results', '--control-filename=control.archive',
361 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
362 'archive_results.control.srv')]
363
364
365 @classmethod
366 def _max_processes(cls):
367 return scheduler_config.config.max_transfer_processes
368
369
370 def prolog(self):
371 self._check_queue_entry_statuses(
372 self.queue_entries,
373 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
374
375 super(ArchiveResultsTask, self).prolog()
376
377
378 def epilog(self):
379 super(ArchiveResultsTask, self).epilog()
380 if not self.success and self._paired_with_monitor().has_process():
381 failed_file = os.path.join(self._working_directory(),
382 self._ARCHIVING_FAILED_FILE)
383 paired_process = self._paired_with_monitor().get_process()
Jakob Jülich36accc62014-07-23 10:26:55 -0700384 self._drone_manager.write_lines_to_file(
beeps5e2bb4a2013-10-28 11:26:45 -0700385 failed_file, ['Archiving failed with exit code %s'
386 % self.monitor.exit_code()],
387 paired_with_process=paired_process)
388 self._set_all_statuses(self._final_status())