| # Copyright 2017 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Event handlers.""" |
| |
| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| |
| import datetime |
| import logging |
| import time |
| |
| from lucifer import autotest |
| from lucifer import jobx |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| class EventHandler(object): |
| """Event handling dispatcher. |
| |
| Event handlers are implemented as methods named _handle_<event value>. |
| |
| Each handler method must handle its exceptions accordingly. If an |
| exception escapes, the job dies on the spot. |
| |
| Instances have one public attribute completed. completed is set to |
| True once the final COMPLETED event is received and the handler |
| finishes. |
| """ |
| |
| def __init__(self, metrics, job, autoserv_exit, results_dir): |
| """Initialize instance. |
| |
| @param metrics: Metrics instance |
| @param job: frontend.afe.models.Job instance to own |
| @param hqes: list of HostQueueEntry instances for the job |
| @param autoserv_exit: autoserv exit status |
| @param results_dir: Job results directory |
| """ |
| self.completed = False |
| self._metrics = metrics |
| self._job = job |
| # TODO(crbug.com/748234): autoserv not implemented yet. |
| self._autoserv_exit = autoserv_exit |
| self._results_dir = results_dir |
| |
| def __call__(self, event, msg): |
| logger.debug('Received event %r with message %r', event.name, msg) |
| method_name = '_handle_%s' % event.value |
| try: |
| handler = getattr(self, method_name) |
| except AttributeError: |
| raise NotImplementedError('%s is not implemented for handling %s', |
| method_name, event.name) |
| _retry_db_errors(lambda: handler(msg)) |
| |
| def _handle_starting(self, msg): |
| # TODO(crbug.com/748234): No event update needed yet. |
| pass |
| |
| def _handle_running(self, _msg): |
| models = autotest.load('frontend.afe.models') |
| self._job.hostqueueentry_set.all().update( |
| status=models.HostQueueEntry.Status.RUNNING, |
| started_on=datetime.datetime.now()) |
| |
| def _handle_gathering(self, _msg): |
| models = autotest.load('frontend.afe.models') |
| self._job.hostqueueentry_set.all().update( |
| status=models.HostQueueEntry.Status.GATHERING) |
| |
| def _handle_parsing(self, _msg): |
| models = autotest.load('frontend.afe.models') |
| self._job.hostqueueentry_set.all().update( |
| status=models.HostQueueEntry.Status.PARSING) |
| |
| def _handle_aborted(self, _msg): |
| for hqe in self._job.hostqueueentry_set.all().prefetch_related('host'): |
| _mark_hqe_aborted(hqe) |
| jobx.write_aborted_keyvals_and_status(self._job, self._results_dir) |
| |
| def _handle_completed(self, _msg): |
| self._mark_job_complete() |
| self.completed = True |
| |
| def _handle_test_passed(self, msg): |
| if msg == 'autoserv': |
| self._autoserv_exit = 0 |
| |
| def _handle_test_failed(self, msg): |
| if msg == 'autoserv': |
| self._autoserv_exit = 1 |
| |
| def _handle_host_running(self, msg): |
| models = autotest.load('frontend.afe.models') |
| host = models.Host.objects.get(hostname=msg) |
| host.status = models.Host.Status.RUNNING |
| host.dirty = 1 |
| host.save(update_fields=['status', 'dirty']) |
| self._metrics.send_host_status(host) |
| |
| def _handle_host_ready(self, msg): |
| models = autotest.load('frontend.afe.models') |
| host = models.Host.objects.get(hostname=msg) |
| host.status = models.Host.Status.READY |
| host.save(update_fields=['status']) |
| self._metrics.send_host_status(host) |
| |
| def _handle_host_needs_cleanup(self, msg): |
| models = autotest.load('frontend.afe.models') |
| host = models.Host.objects.get(hostname=msg) |
| models.SpecialTask.objects.create( |
| host_id=host.id, |
| task=models.SpecialTask.Task.CLEANUP, |
| requested_by=models.User.objects.get(login=self._job.owner)) |
| |
| def _handle_host_needs_reset(self, msg): |
| models = autotest.load('frontend.afe.models') |
| host = models.Host.objects.get(hostname=msg) |
| models.SpecialTask.objects.create( |
| host_id=host.id, |
| task=models.SpecialTask.Task.RESET, |
| requested_by=models.User.objects.get(login=self._job.owner)) |
| |
| def _handle_x_tests_done(self, msg): |
| """Taken from GatherLogsTask.epilog.""" |
| autoserv_exit, failures = (int(x) for x in msg.split(',')) |
| logger.debug('Got autoserv_exit=%d, failures=%d', |
| autoserv_exit, failures) |
| success = (autoserv_exit == 0 and failures == 0) |
| reset_after_failure = not self._job.run_reset and not success |
| hqes = self._job.hostqueueentry_set.all().prefetch_related('host') |
| if self._should_reboot_duts(autoserv_exit, failures, |
| reset_after_failure): |
| logger.debug('Creating cleanup jobs for hosts') |
| for entry in hqes: |
| self._handle_host_needs_cleanup(entry.host.hostname) |
| else: |
| logger.debug('Not creating cleanup jobs for hosts') |
| for entry in hqes: |
| self._handle_host_ready(entry.host.hostname) |
| if not reset_after_failure: |
| logger.debug('Skipping reset because reset_after_failure is False') |
| return |
| logger.debug('Creating reset jobs for hosts') |
| self._metrics.send_reset_after_failure(autoserv_exit, failures) |
| for entry in hqes: |
| self._handle_host_needs_reset(entry.host.hostname) |
| |
| def _should_reboot_duts(self, autoserv_exit, failures, reset_after_failure): |
| models = autotest.load('frontend.afe.models') |
| reboot_after = self._job.reboot_after |
| if self._final_status() == models.HostQueueEntry.Status.ABORTED: |
| logger.debug('Should reboot because reboot_after=ABORTED') |
| return True |
| elif reboot_after == models.Job.RebootAfter.ALWAYS: |
| logger.debug('Should reboot because reboot_after=ALWAYS') |
| return True |
| elif (reboot_after == models.Job.RebootAfter.IF_ALL_TESTS_PASSED |
| and autoserv_exit == 0 and failures == 0): |
| logger.debug('Should reboot because' |
| ' reboot_after=IF_ALL_TESTS_PASSED') |
| return True |
| else: |
| return failures > 0 and not reset_after_failure |
| |
| def _mark_job_complete(self): |
| """Perform Autotest operations needed for job completion.""" |
| final_status = self._final_status() |
| self._mark_hqes_complete(final_status) |
| self._stop_job_if_necessary(final_status) |
| self._release_job_if_sharded() |
| |
| def _mark_hqes_complete(self, final_status): |
| """Perform Autotest HQE operations needed for job completion.""" |
| for hqe in self._job.hostqueueentry_set.all(): |
| self._set_completed_status(hqe, final_status) |
| |
| def _stop_job_if_necessary(self, final_status): |
| """Equivalent to scheduler.modes.Job.stop_if_necessary(). |
| |
| The name isn't informative, but this will stop pre-job tasks as |
| necessary. |
| """ |
| models = autotest.load('frontend.afe.models') |
| if final_status is not models.HostQueueEntry.Status.ABORTED: |
| _stop_prejob_hqes(self._job) |
| |
| def _release_job_if_sharded(self): |
| if self._job.shard_id is not None: |
| # If shard_id is None, the job will be synced back to the master |
| self._job.shard_id = None |
| self._job.save(update_fields=['shard_id']) |
| |
| def _final_status(self): |
| models = autotest.load('frontend.afe.models') |
| Status = models.HostQueueEntry.Status |
| if jobx.is_aborted(self._job): |
| return Status.ABORTED |
| if self._autoserv_exit == 0: |
| return Status.COMPLETED |
| return Status.FAILED |
| |
| def _set_completed_status(self, hqe, status): |
| """Set completed status of HQE. |
| |
| This is a cleaned up version of the one in scheduler_models to work |
| with Django models. |
| """ |
| hqe.status = status |
| hqe.active = False |
| hqe.complete = True |
| if hqe.started_on: |
| hqe.finished_on = datetime.datetime.now() |
| hqe.save(update_fields=['status', 'active', 'complete', 'finished_on']) |
| self._metrics.send_hqe_completion(hqe) |
| self._metrics.send_hqe_duration(hqe) |
| |
| |
| class Metrics(object): |
| |
| """Class for sending job metrics.""" |
| |
| def __init__(self): |
| # Metrics |
| metrics = autotest.chromite_load('metrics') |
| self._hqe_completion_metric = metrics.Counter( |
| 'chromeos/autotest/scheduler/hqe_completion_count') |
| self._reset_after_failure_metric = metrics.Counter( |
| 'chromeos/autotest/scheduler/postjob_tasks/' |
| 'reset_after_failure') |
| self._host_status_metric = metrics.Boolean( |
| 'chromeos/autotest/dut_status', reset_after=True) |
| |
| def send_host_status(self, host): |
| """Send ts_mon metrics for host status. |
| |
| @param host: frontend.afe.models.Host instance |
| """ |
| labellib = autotest.load('utils.labellib') |
| labels = labellib.LabelsMapping.from_host(host) |
| fields = { |
| 'dut_host_name': host.hostname, |
| 'board': labels['board'], |
| 'model': labels['model'], |
| } |
| # As each device switches state, indicate that it is not in any |
| # other state. This allows Monarch queries to avoid double counting |
| # when additional points are added by the Window Align operation. |
| for s in host.Status.names: |
| fields['status'] = s |
| self._host_status_metric.set(s == host.status, fields=fields) |
| |
| def send_hqe_completion(self, hqe): |
| """Send ts_mon metrics for HQE completion.""" |
| fields = { |
| 'status': hqe.status.lower(), |
| 'board': 'NO_HOST', |
| 'pool': 'NO_HOST', |
| } |
| if hqe.host: |
| labellib = autotest.load('utils.labellib') |
| labels = labellib.LabelsMapping.from_host(hqe.host) |
| fields['board'] = labels.get('board', '') |
| fields['pool'] = labels.get('pool', '') |
| self._hqe_completion_metric.increment(fields=fields) |
| |
| def send_hqe_duration(self, hqe): |
| """Send CloudTrace metrics for HQE duration.""" |
| if not (hqe.started_on and hqe.finished_on): |
| return |
| scheduler_models = autotest.load('scheduler.scheduler_models') |
| cloud_trace = autotest.chromite_load('cloud_trace') |
| types = autotest.deps_load('google.protobuf.internal.well_known_types') |
| hqe_trace_id = scheduler_models.hqe_trace_id |
| |
| span = cloud_trace.Span( |
| 'HQE', spanId='0', traceId=hqe_trace_id(hqe.id)) |
| span.startTime = types.Timestamp() |
| span.startTime.FromDatetime(hqe.started_on) |
| span.endTime = types.Timestamp() |
| span.endTime.FromDatetime(hqe.finished_on) |
| cloud_trace.LogSpan(span) |
| |
| def send_reset_after_failure(self, autoserv_exit, failures): |
| """Send reset_after_failure metric.""" |
| self._reset_after_failure_metric.increment( |
| fields={'autoserv_process_success': autoserv_exit == 0, |
| # Yes, this is a boolean |
| 'num_tests_failed': failures > 0}) |
| |
| |
| def _mark_hqe_aborted(hqe): |
| """Perform Autotest operations needed for HQE abortion. |
| |
| This also operates on the HQE's host, so prefetch it when possible. |
| |
| This logic is from scheduler_models.HostQueueEntry.abort(). |
| """ |
| models = autotest.load('frontend.afe.models') |
| transaction = autotest.deps_load('django.db.transaction') |
| Status = models.HostQueueEntry.Status |
| with transaction.commit_on_success(): |
| if hqe.status in (Status.GATHERING, Status.PARSING): |
| return |
| if hqe.status in (Status.STARTING, Status.PENDING, Status.RUNNING): |
| if hqe.host is None: |
| return |
| hqe.host.status = models.Host.Status.READY |
| hqe.host.save(update_fields=['status']) |
| hqe.status = Status.ABORTED |
| hqe.save(update_fields=['status']) |
| |
| |
| def _stop_prejob_hqes(job): |
| """Stop pending HQEs for a job (for synch_count).""" |
| models = autotest.load('frontend.afe.models') |
| HQEStatus = models.HostQueueEntry.Status |
| HostStatus = models.Host.Status |
| not_yet_run = _get_prejob_hqes(job) |
| if not_yet_run.count() == job.synch_count: |
| return |
| entries_to_stop = _get_prejob_hqes(job, include_active=False) |
| for hqe in entries_to_stop: |
| if hqe.status == HQEStatus.PENDING: |
| hqe.host.status = HostStatus.READY |
| hqe.host.save(update_fields=['status']) |
| hqe.status = HQEStatus.STOPPED |
| hqe.save(update_fields=['status']) |
| |
| |
| def _get_prejob_hqes(job, include_active=True): |
| """Return a queryset of not run HQEs for the job (for synch_count).""" |
| models = autotest.load('frontend.afe.models') |
| if include_active: |
| statuses = list(models.HostQueueEntry.PRE_JOB_STATUSES) |
| else: |
| statuses = list(models.HostQueueEntry.IDLE_PRE_JOB_STATUSES) |
| return models.HostQueueEntry.objects.filter( |
| job=job, status__in=statuses) |
| |
| |
| def _retry_db_errors(func): |
| """Call func, retrying multiple times if database errors are raised. |
| |
| crbug.com/863504 |
| """ |
| django = autotest.deps_load('django') |
| MySQLdb = autotest.deps_load('MySQLdb') |
| max_retries = 10 |
| # n ... 0 means n + 1 tries, or 1 try plus n retries |
| for i in xrange(max_retries, -1, -1): |
| try: |
| func() |
| except (django.db.utils.DatabaseError, MySQLdb.OperationalError) as e: |
| if i == 0: |
| raise |
| logger.debug('Got database error %s, retrying', e) |
| django.db.close_connection() |
| time.sleep(5) |
| else: |
| break |