beeps | 5e2bb4a | 2013-10-28 11:26:45 -0700 | [diff] [blame] | 1 | #pylint: disable-msg=C0111 |
| 2 | |
| 3 | """ |
| 4 | Postjob task. |
| 5 | |
| 6 | Postjob tasks are responsible for setting the final status of the HQE |
| 7 | and Host, and scheduling additional special agents such as cleanup, |
| 8 | if necessary. |
| 9 | """ |
| 10 | |
| 11 | import os |
| 12 | |
| 13 | from autotest_lib.frontend.afe import models, model_attributes |
| 14 | from autotest_lib.scheduler import agent_task, drones, drone_manager |
| 15 | from autotest_lib.scheduler import email_manager, pidfile_monitor |
| 16 | from autotest_lib.scheduler import scheduler_config |
| 17 | from autotest_lib.server import autoserv_utils |
| 18 | from autotest_lib.site_utils.graphite import stats |
| 19 | |
| 20 | |
| 21 | _drone_manager = drone_manager.instance() |
| 22 | _parser_path = autoserv_utils._parser_path_func( |
| 23 | autoserv_utils.AUTOTEST_INSTALL_DIR) |
| 24 | |
| 25 | |
| 26 | class PostJobTask(agent_task.AgentTask): |
| 27 | def __init__(self, queue_entries, log_file_name): |
| 28 | super(PostJobTask, self).__init__(log_file_name=log_file_name) |
| 29 | |
| 30 | self.queue_entries = queue_entries |
| 31 | |
| 32 | self._autoserv_monitor = pidfile_monitor.PidfileRunMonitor() |
| 33 | self._autoserv_monitor.attach_to_existing_process( |
| 34 | self._working_directory()) |
| 35 | |
| 36 | |
| 37 | def _command_line(self): |
| 38 | # Do we need testing_mode? |
| 39 | return self._generate_command( |
| 40 | _drone_manager.absolute_path(self._working_directory())) |
| 41 | |
| 42 | |
| 43 | def _generate_command(self, results_dir): |
| 44 | raise NotImplementedError('Subclasses must override this') |
| 45 | |
| 46 | |
| 47 | @property |
| 48 | def owner_username(self): |
| 49 | return self.queue_entries[0].job.owner |
| 50 | |
| 51 | |
| 52 | def _working_directory(self): |
| 53 | return self._get_consistent_execution_path(self.queue_entries) |
| 54 | |
| 55 | |
| 56 | def _paired_with_monitor(self): |
| 57 | return self._autoserv_monitor |
| 58 | |
| 59 | |
| 60 | def _job_was_aborted(self): |
| 61 | was_aborted = None |
| 62 | for queue_entry in self.queue_entries: |
| 63 | queue_entry.update_from_database() |
| 64 | if was_aborted is None: # first queue entry |
| 65 | was_aborted = bool(queue_entry.aborted) |
| 66 | elif was_aborted != bool(queue_entry.aborted): # subsequent entries |
| 67 | entries = ['%s (aborted: %s)' % (entry, entry.aborted) |
| 68 | for entry in self.queue_entries] |
| 69 | email_manager.manager.enqueue_notify_email( |
| 70 | 'Inconsistent abort state', |
| 71 | 'Queue entries have inconsistent abort state:\n' + |
| 72 | '\n'.join(entries)) |
| 73 | # don't crash here, just assume true |
| 74 | return True |
| 75 | return was_aborted |
| 76 | |
| 77 | |
| 78 | def _final_status(self): |
| 79 | if self._job_was_aborted(): |
| 80 | return models.HostQueueEntry.Status.ABORTED |
| 81 | |
| 82 | # we'll use a PidfileRunMonitor to read the autoserv exit status |
| 83 | if self._autoserv_monitor.exit_code() == 0: |
| 84 | return models.HostQueueEntry.Status.COMPLETED |
| 85 | return models.HostQueueEntry.Status.FAILED |
| 86 | |
| 87 | |
| 88 | def _set_all_statuses(self, status): |
| 89 | for queue_entry in self.queue_entries: |
| 90 | queue_entry.set_status(status) |
| 91 | |
| 92 | |
| 93 | def abort(self): |
| 94 | # override AgentTask.abort() to avoid killing the process and ending |
| 95 | # the task. post-job tasks continue when the job is aborted. |
| 96 | pass |
| 97 | |
| 98 | |
| 99 | def _pidfile_label(self): |
| 100 | # '.autoserv_execute' -> 'autoserv' |
| 101 | return self._pidfile_name()[1:-len('_execute')] |
| 102 | |
| 103 | |
| 104 | class SelfThrottledPostJobTask(PostJobTask): |
| 105 | """ |
| 106 | PostJobTask that maintains its own process limit. |
| 107 | |
| 108 | We throttle tasks like parsing because we don't want them to |
| 109 | hold up tests. At the same time we don't wish to build up load |
| 110 | that will take forever to parse. |
| 111 | """ |
| 112 | _num_running_processes = 0 |
| 113 | # Last known limit of max processes, used to check whether |
| 114 | # max processes config has been changed. |
| 115 | _last_known_max_processes = 0 |
| 116 | # Whether an email should be sent to notifiy process limit being hit. |
| 117 | _notification_on = True |
| 118 | # Once process limit is hit, an email will be sent. |
| 119 | # To prevent spams, do not send another email until |
| 120 | # it drops to lower than the following level. |
| 121 | REVIVE_NOTIFICATION_THRESHOLD = 0.80 |
| 122 | |
| 123 | |
| 124 | @classmethod |
| 125 | def _increment_running_processes(cls): |
| 126 | cls._num_running_processes += 1 |
| 127 | stats.Gauge('scheduler').send('%s.num_running_processes' % cls.__name__, |
| 128 | cls._num_running_processes) |
| 129 | |
| 130 | |
| 131 | @classmethod |
| 132 | def _decrement_running_processes(cls): |
| 133 | cls._num_running_processes -= 1 |
| 134 | stats.Gauge('scheduler').send('%s.num_running_processes' % cls.__name__, |
| 135 | cls._num_running_processes) |
| 136 | |
| 137 | |
| 138 | @classmethod |
| 139 | def _max_processes(cls): |
| 140 | raise NotImplementedError |
| 141 | |
| 142 | |
| 143 | @classmethod |
| 144 | def _can_run_new_process(cls): |
| 145 | return cls._num_running_processes < cls._max_processes() |
| 146 | |
| 147 | |
| 148 | def _process_started(self): |
| 149 | return bool(self.monitor) |
| 150 | |
| 151 | |
| 152 | def tick(self): |
| 153 | # override tick to keep trying to start until the process count goes |
| 154 | # down and we can, at which point we revert to default behavior |
| 155 | if self._process_started(): |
| 156 | super(SelfThrottledPostJobTask, self).tick() |
| 157 | else: |
| 158 | self._try_starting_process() |
| 159 | |
| 160 | |
| 161 | def run(self): |
| 162 | # override run() to not actually run unless we can |
| 163 | self._try_starting_process() |
| 164 | |
| 165 | |
| 166 | @classmethod |
| 167 | def _notify_process_limit_hit(cls): |
| 168 | """Send an email to notify that process limit is hit.""" |
| 169 | if cls._notification_on: |
| 170 | subject = '%s: hitting max process limit.' % cls.__name__ |
| 171 | message = ('Running processes/Max processes: %d/%d' |
| 172 | % (cls._num_running_processes, cls._max_processes())) |
| 173 | email_manager.manager.enqueue_notify_email(subject, message) |
| 174 | cls._notification_on = False |
| 175 | |
| 176 | |
| 177 | @classmethod |
| 178 | def _reset_notification_switch_if_necessary(cls): |
| 179 | """Reset _notification_on if necessary. |
| 180 | |
| 181 | Set _notification_on to True on the following cases: |
| 182 | 1) If the limit of max processes configuration changes; |
| 183 | 2) If _notification_on is False and the number of running processes |
| 184 | drops to lower than a level defined in REVIVE_NOTIFICATION_THRESHOLD. |
| 185 | |
| 186 | """ |
| 187 | if cls._last_known_max_processes != cls._max_processes(): |
| 188 | cls._notification_on = True |
| 189 | cls._last_known_max_processes = cls._max_processes() |
| 190 | return |
| 191 | percentage = float(cls._num_running_processes) / cls._max_processes() |
| 192 | if (not cls._notification_on and |
| 193 | percentage < cls.REVIVE_NOTIFICATION_THRESHOLD): |
| 194 | cls._notification_on = True |
| 195 | |
| 196 | |
| 197 | def _try_starting_process(self): |
| 198 | self._reset_notification_switch_if_necessary() |
| 199 | if not self._can_run_new_process(): |
| 200 | self._notify_process_limit_hit() |
| 201 | return |
| 202 | |
| 203 | # actually run the command |
| 204 | super(SelfThrottledPostJobTask, self).run() |
| 205 | if self._process_started(): |
| 206 | self._increment_running_processes() |
| 207 | |
| 208 | |
| 209 | def finished(self, success): |
| 210 | super(SelfThrottledPostJobTask, self).finished(success) |
| 211 | if self._process_started(): |
| 212 | self._decrement_running_processes() |
| 213 | |
| 214 | |
| 215 | class GatherLogsTask(PostJobTask): |
| 216 | """ |
| 217 | Task responsible for |
| 218 | * gathering uncollected logs (if Autoserv crashed hard or was killed) |
| 219 | * copying logs to the results repository |
| 220 | * spawning CleanupTasks for hosts, if necessary |
| 221 | * spawning a FinalReparseTask for the job |
| 222 | * setting the final status of the host, directly or through a cleanup |
| 223 | """ |
| 224 | def __init__(self, queue_entries, recover_run_monitor=None): |
| 225 | self._job = queue_entries[0].job |
| 226 | super(GatherLogsTask, self).__init__( |
| 227 | queue_entries, log_file_name='.collect_crashinfo.log') |
| 228 | self._set_ids(queue_entries=queue_entries) |
| 229 | |
| 230 | |
| 231 | # TODO: Refactor into autoserv_utils. crbug.com/243090 |
| 232 | def _generate_command(self, results_dir): |
| 233 | host_list = ','.join(queue_entry.host.hostname |
| 234 | for queue_entry in self.queue_entries) |
| 235 | return [autoserv_utils.autoserv_path , '-p', |
| 236 | '--pidfile-label=%s' % self._pidfile_label(), |
| 237 | '--use-existing-results', '--collect-crashinfo', |
| 238 | '-m', host_list, '-r', results_dir] |
| 239 | |
| 240 | |
| 241 | @property |
| 242 | def num_processes(self): |
| 243 | return len(self.queue_entries) |
| 244 | |
| 245 | |
| 246 | def _pidfile_name(self): |
| 247 | return drone_manager.CRASHINFO_PID_FILE |
| 248 | |
| 249 | |
| 250 | def prolog(self): |
| 251 | self._check_queue_entry_statuses( |
| 252 | self.queue_entries, |
| 253 | allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,), |
| 254 | allowed_host_statuses=(models.Host.Status.RUNNING,)) |
| 255 | |
| 256 | super(GatherLogsTask, self).prolog() |
| 257 | |
| 258 | |
| 259 | def epilog(self): |
| 260 | super(GatherLogsTask, self).epilog() |
| 261 | self._parse_results(self.queue_entries) |
| 262 | self._reboot_hosts() |
| 263 | |
| 264 | |
| 265 | def _reboot_hosts(self): |
| 266 | if self._autoserv_monitor.has_process(): |
| 267 | final_success = (self._final_status() == |
| 268 | models.HostQueueEntry.Status.COMPLETED) |
| 269 | num_tests_failed = self._autoserv_monitor.num_tests_failed() |
| 270 | else: |
| 271 | final_success = False |
| 272 | num_tests_failed = 0 |
| 273 | reboot_after = self._job.reboot_after |
| 274 | do_reboot = ( |
| 275 | # always reboot after aborted jobs |
| 276 | self._final_status() == models.HostQueueEntry.Status.ABORTED |
| 277 | or reboot_after == model_attributes.RebootAfter.ALWAYS |
| 278 | or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED |
| 279 | and final_success and num_tests_failed == 0) |
| 280 | or num_tests_failed > 0) |
| 281 | |
| 282 | for queue_entry in self.queue_entries: |
| 283 | if do_reboot: |
| 284 | # don't pass the queue entry to the CleanupTask. if the cleanup |
| 285 | # fails, the job doesn't care -- it's over. |
| 286 | models.SpecialTask.objects.create( |
| 287 | host=models.Host.objects.get(id=queue_entry.host.id), |
| 288 | task=models.SpecialTask.Task.CLEANUP, |
| 289 | requested_by=self._job.owner_model()) |
| 290 | else: |
| 291 | queue_entry.host.set_status(models.Host.Status.READY) |
| 292 | |
| 293 | |
| 294 | def run(self): |
| 295 | autoserv_exit_code = self._autoserv_monitor.exit_code() |
| 296 | # only run if Autoserv exited due to some signal. if we have no exit |
| 297 | # code, assume something bad (and signal-like) happened. |
| 298 | if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code): |
| 299 | super(GatherLogsTask, self).run() |
| 300 | else: |
| 301 | self.finished(True) |
| 302 | |
| 303 | |
| 304 | class FinalReparseTask(SelfThrottledPostJobTask): |
| 305 | def __init__(self, queue_entries): |
| 306 | super(FinalReparseTask, self).__init__(queue_entries, |
| 307 | log_file_name='.parse.log') |
| 308 | # don't use _set_ids, since we don't want to set the host_ids |
| 309 | self.queue_entry_ids = [entry.id for entry in queue_entries] |
| 310 | |
| 311 | |
| 312 | def _generate_command(self, results_dir): |
| 313 | return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', |
| 314 | results_dir] |
| 315 | |
| 316 | |
| 317 | @property |
| 318 | def num_processes(self): |
| 319 | return 0 # don't include parser processes in accounting |
| 320 | |
| 321 | |
| 322 | def _pidfile_name(self): |
| 323 | return drone_manager.PARSER_PID_FILE |
| 324 | |
| 325 | |
| 326 | @classmethod |
| 327 | def _max_processes(cls): |
| 328 | return scheduler_config.config.max_parse_processes |
| 329 | |
| 330 | |
| 331 | def prolog(self): |
| 332 | self._check_queue_entry_statuses( |
| 333 | self.queue_entries, |
| 334 | allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,)) |
| 335 | |
| 336 | super(FinalReparseTask, self).prolog() |
| 337 | |
| 338 | |
| 339 | def epilog(self): |
| 340 | super(FinalReparseTask, self).epilog() |
| 341 | self._archive_results(self.queue_entries) |
| 342 | |
| 343 | |
| 344 | class ArchiveResultsTask(SelfThrottledPostJobTask): |
| 345 | _ARCHIVING_FAILED_FILE = '.archiver_failed' |
| 346 | |
| 347 | def __init__(self, queue_entries): |
| 348 | super(ArchiveResultsTask, self).__init__(queue_entries, |
| 349 | log_file_name='.archiving.log') |
| 350 | # don't use _set_ids, since we don't want to set the host_ids |
| 351 | self.queue_entry_ids = [entry.id for entry in queue_entries] |
| 352 | |
| 353 | |
| 354 | def _pidfile_name(self): |
| 355 | return drone_manager.ARCHIVER_PID_FILE |
| 356 | |
| 357 | |
| 358 | # TODO: Refactor into autoserv_utils. crbug.com/243090 |
| 359 | def _generate_command(self, results_dir): |
| 360 | return [autoserv_utils.autoserv_path , '-p', |
| 361 | '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir, |
| 362 | '--use-existing-results', '--control-filename=control.archive', |
| 363 | os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler', |
| 364 | 'archive_results.control.srv')] |
| 365 | |
| 366 | |
| 367 | @classmethod |
| 368 | def _max_processes(cls): |
| 369 | return scheduler_config.config.max_transfer_processes |
| 370 | |
| 371 | |
| 372 | def prolog(self): |
| 373 | self._check_queue_entry_statuses( |
| 374 | self.queue_entries, |
| 375 | allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,)) |
| 376 | |
| 377 | super(ArchiveResultsTask, self).prolog() |
| 378 | |
| 379 | |
| 380 | def epilog(self): |
| 381 | super(ArchiveResultsTask, self).epilog() |
| 382 | if not self.success and self._paired_with_monitor().has_process(): |
| 383 | failed_file = os.path.join(self._working_directory(), |
| 384 | self._ARCHIVING_FAILED_FILE) |
| 385 | paired_process = self._paired_with_monitor().get_process() |
| 386 | _drone_manager.write_lines_to_file( |
| 387 | failed_file, ['Archiving failed with exit code %s' |
| 388 | % self.monitor.exit_code()], |
| 389 | paired_with_process=paired_process) |
| 390 | self._set_all_statuses(self._final_status()) |