| """Autotest AFE Cleanup used by the scheduler""" |
| |
| import contextlib |
| import logging |
| import random |
| import time |
| |
| from autotest_lib.client.common_lib import utils |
| from autotest_lib.frontend.afe import models |
| from autotest_lib.scheduler import scheduler_config |
| from autotest_lib.client.common_lib import global_config |
| from autotest_lib.client.common_lib import host_protections |
| |
| try: |
| from chromite.lib import metrics |
| except ImportError: |
| metrics = utils.metrics_mock |
| |
| |
| _METRICS_PREFIX = 'chromeos/autotest/scheduler/cleanup' |
| |
| |
| class PeriodicCleanup(object): |
| """Base class to schedule periodical cleanup work. |
| """ |
| |
| def __init__(self, db, clean_interval_minutes, run_at_initialize=False): |
| self._db = db |
| self.clean_interval_minutes = clean_interval_minutes |
| self._last_clean_time = time.time() |
| self._run_at_initialize = run_at_initialize |
| |
| |
| def initialize(self): |
| """Method called by scheduler at the startup. |
| """ |
| if self._run_at_initialize: |
| self._cleanup() |
| |
| |
| def run_cleanup_maybe(self): |
| """Test if cleanup method should be called. |
| """ |
| should_cleanup = (self._last_clean_time + |
| self.clean_interval_minutes * 60 |
| < time.time()) |
| if should_cleanup: |
| self._cleanup() |
| self._last_clean_time = time.time() |
| |
| |
| def _cleanup(self): |
| """Abrstract cleanup method.""" |
| raise NotImplementedError |
| |
| |
| class UserCleanup(PeriodicCleanup): |
| """User cleanup that is controlled by the global config variable |
| clean_interval_minutes in the SCHEDULER section. |
| """ |
| |
| def __init__(self, db, clean_interval_minutes): |
| super(UserCleanup, self).__init__(db, clean_interval_minutes) |
| self._last_reverify_time = time.time() |
| |
| |
| @metrics.SecondsTimerDecorator(_METRICS_PREFIX + '/user/durations') |
| def _cleanup(self): |
| logging.info('Running periodic cleanup') |
| self._abort_timed_out_jobs() |
| self._abort_jobs_past_max_runtime() |
| self._clear_inactive_blocks() |
| self._check_for_db_inconsistencies() |
| self._reverify_dead_hosts() |
| self._django_session_cleanup() |
| |
| |
| def _abort_timed_out_jobs(self): |
| logging.info( |
| 'Aborting all jobs that have timed out and are not complete') |
| query = models.Job.objects.filter(hostqueueentry__complete=False).extra( |
| where=['created_on + INTERVAL timeout_mins MINUTE < NOW()']) |
| jobs = query.distinct() |
| if not jobs: |
| return |
| |
| with _cleanup_warning_banner('timed out jobs', len(jobs)): |
| for job in jobs: |
| logging.warning('Aborting job %d due to job timeout', job.id) |
| job.abort() |
| _report_detected_errors('jobs_timed_out', len(jobs)) |
| |
| |
| def _abort_jobs_past_max_runtime(self): |
| """ |
| Abort executions that have started and are past the job's max runtime. |
| """ |
| logging.info('Aborting all jobs that have passed maximum runtime') |
| rows = self._db.execute(""" |
| SELECT hqe.id FROM afe_host_queue_entries AS hqe |
| WHERE NOT hqe.complete AND NOT hqe.aborted AND EXISTS |
| (select * from afe_jobs where hqe.job_id=afe_jobs.id and |
| hqe.started_on + INTERVAL afe_jobs.max_runtime_mins MINUTE < NOW()) |
| """) |
| query = models.HostQueueEntry.objects.filter( |
| id__in=[row[0] for row in rows]) |
| hqes = query.distinct() |
| if not hqes: |
| return |
| |
| with _cleanup_warning_banner('hqes past max runtime', len(hqes)): |
| for queue_entry in hqes: |
| logging.warning('Aborting entry %s due to max runtime', |
| queue_entry) |
| queue_entry.abort() |
| _report_detected_errors('hqes_past_max_runtime', len(hqes)) |
| |
| |
| def _check_for_db_inconsistencies(self): |
| logging.info('Cleaning db inconsistencies') |
| self._check_all_invalid_related_objects() |
| |
| |
| def _check_invalid_related_objects_one_way(self, invalid_model, |
| relation_field, valid_model): |
| if 'invalid' not in invalid_model.get_field_dict(): |
| return |
| |
| invalid_objects = list(invalid_model.objects.filter(invalid=True)) |
| invalid_model.objects.populate_relationships( |
| invalid_objects, valid_model, 'related_objects') |
| if not invalid_objects: |
| return |
| |
| num_objects_with_invalid_relations = 0 |
| errors = [] |
| for invalid_object in invalid_objects: |
| if invalid_object.related_objects: |
| related_objects = invalid_object.related_objects |
| related_list = ', '.join(str(x) for x in related_objects) |
| num_objects_with_invalid_relations += 1 |
| errors.append('Invalid %s is related to: %s' % |
| (invalid_object, related_list)) |
| related_manager = getattr(invalid_object, relation_field) |
| related_manager.clear() |
| |
| # Only log warnings after we're sure we've seen at least one invalid |
| # model with some valid relations to avoid empty banners from getting |
| # printed. |
| if errors: |
| invalid_model_name = invalid_model.__name__ |
| valid_model_name = valid_model.__name__ |
| banner = 'invalid %s related to valid %s' % (invalid_model_name, |
| valid_model_name) |
| with _cleanup_warning_banner(banner, len(errors)): |
| for error in errors: |
| logging.warning(error) |
| _report_detected_errors( |
| 'invalid_related_objects', |
| num_objects_with_invalid_relations, |
| fields={'invalid_model': invalid_model_name, |
| 'valid_model': valid_model_name}) |
| _report_detected_errors( |
| 'invalid_related_objects_relations', |
| len(errors), |
| fields={'invalid_model': invalid_model_name, |
| 'valid_model': valid_model_name}) |
| |
| |
| def _check_invalid_related_objects(self, first_model, first_field, |
| second_model, second_field): |
| self._check_invalid_related_objects_one_way( |
| first_model, |
| first_field, |
| second_model, |
| ) |
| self._check_invalid_related_objects_one_way( |
| second_model, |
| second_field, |
| first_model, |
| ) |
| |
| |
| def _check_all_invalid_related_objects(self): |
| model_pairs = ((models.Host, 'labels', models.Label, 'host_set'), |
| (models.AclGroup, 'hosts', models.Host, 'aclgroup_set'), |
| (models.AclGroup, 'users', models.User, 'aclgroup_set'), |
| (models.Test, 'dependency_labels', models.Label, |
| 'test_set')) |
| for first_model, first_field, second_model, second_field in model_pairs: |
| self._check_invalid_related_objects( |
| first_model, |
| first_field, |
| second_model, |
| second_field, |
| ) |
| |
| |
| def _clear_inactive_blocks(self): |
| logging.info('Clear out blocks for all completed jobs.') |
| # this would be simpler using NOT IN (subquery), but MySQL |
| # treats all IN subqueries as dependent, so this optimizes much |
| # better |
| self._db.execute(""" |
| DELETE ihq FROM afe_ineligible_host_queues ihq |
| WHERE NOT EXISTS |
| (SELECT job_id FROM afe_host_queue_entries hqe |
| WHERE NOT hqe.complete AND hqe.job_id = ihq.job_id)""") |
| |
| |
| def _should_reverify_hosts_now(self): |
| reverify_period_sec = (scheduler_config.config.reverify_period_minutes |
| * 60) |
| if reverify_period_sec == 0: |
| return False |
| return (self._last_reverify_time + reverify_period_sec) <= time.time() |
| |
| |
| def _choose_subset_of_hosts_to_reverify(self, hosts): |
| """Given hosts needing verification, return a subset to reverify.""" |
| max_at_once = scheduler_config.config.reverify_max_hosts_at_once |
| if (max_at_once > 0 and len(hosts) > max_at_once): |
| return random.sample(hosts, max_at_once) |
| return sorted(hosts) |
| |
| |
| def _reverify_dead_hosts(self): |
| if not self._should_reverify_hosts_now(): |
| return |
| |
| self._last_reverify_time = time.time() |
| logging.info('Checking for dead hosts to reverify') |
| hosts = models.Host.objects.filter( |
| status=models.Host.Status.REPAIR_FAILED, |
| locked=False, |
| invalid=False) |
| hosts = hosts.exclude( |
| protection=host_protections.Protection.DO_NOT_VERIFY) |
| if not hosts: |
| return |
| |
| hosts = list(hosts) |
| total_hosts = len(hosts) |
| hosts = self._choose_subset_of_hosts_to_reverify(hosts) |
| logging.info('Reverifying dead hosts (%d of %d)', len(hosts), |
| total_hosts) |
| with _cleanup_warning_banner('reverify dead hosts', len(hosts)): |
| for host in hosts: |
| logging.warning(host.hostname) |
| _report_detected_errors('dead_hosts_triggered_reverify', len(hosts)) |
| _report_detected_errors('dead_hosts_require_reverify', total_hosts) |
| for host in hosts: |
| models.SpecialTask.schedule_special_task( |
| host=host, task=models.SpecialTask.Task.VERIFY) |
| |
| |
| def _django_session_cleanup(self): |
| """Clean up django_session since django doesn't for us. |
| http://www.djangoproject.com/documentation/0.96/sessions/ |
| """ |
| logging.info('Deleting old sessions from django_session') |
| sql = 'TRUNCATE TABLE django_session' |
| self._db.execute(sql) |
| |
| |
| class TwentyFourHourUpkeep(PeriodicCleanup): |
| """Cleanup that runs at the startup of monitor_db and every subsequent |
| twenty four hours. |
| """ |
| |
| |
| def __init__(self, db, drone_manager, run_at_initialize=True): |
| """Initialize TwentyFourHourUpkeep. |
| |
| @param db: Database connection object. |
| @param drone_manager: DroneManager to access drones. |
| @param run_at_initialize: True to run cleanup when scheduler starts. |
| Default is set to True. |
| |
| """ |
| self.drone_manager = drone_manager |
| clean_interval_minutes = 24 * 60 # 24 hours |
| super(TwentyFourHourUpkeep, self).__init__( |
| db, clean_interval_minutes, run_at_initialize=run_at_initialize) |
| |
| |
| @metrics.SecondsTimerDecorator(_METRICS_PREFIX + '/daily/durations') |
| def _cleanup(self): |
| logging.info('Running 24 hour clean up') |
| self._check_for_uncleanable_db_inconsistencies() |
| self._cleanup_orphaned_containers() |
| |
| |
| def _check_for_uncleanable_db_inconsistencies(self): |
| logging.info('Checking for uncleanable DB inconsistencies') |
| self._check_for_active_and_complete_queue_entries() |
| self._check_for_multiple_platform_hosts() |
| self._check_for_no_platform_hosts() |
| |
| |
| def _check_for_active_and_complete_queue_entries(self): |
| query = models.HostQueueEntry.objects.filter(active=True, complete=True) |
| num_bad_hqes = query.count() |
| if num_bad_hqes == 0: |
| return |
| |
| num_aborted = 0 |
| logging.warning('%d queue entries found with active=complete=1', |
| num_bad_hqes) |
| with _cleanup_warning_banner('active and complete hqes', num_bad_hqes): |
| for entry in query: |
| if entry.status == 'Aborted': |
| entry.active = False |
| entry.save() |
| recovery_path = 'was also aborted, set active to False' |
| num_aborted += 1 |
| else: |
| recovery_path = 'can not recover' |
| logging.warning('%s (recovery: %s)', entry.get_object_dict(), |
| recovery_path) |
| _report_detected_errors('hqes_active_and_complete', num_bad_hqes) |
| _report_detected_errors('hqes_aborted_set_to_inactive', num_aborted) |
| |
| |
| def _check_for_multiple_platform_hosts(self): |
| rows = self._db.execute(""" |
| SELECT afe_hosts.id, hostname, COUNT(1) AS platform_count, |
| GROUP_CONCAT(afe_labels.name) |
| FROM afe_hosts |
| INNER JOIN afe_hosts_labels ON |
| afe_hosts.id = afe_hosts_labels.host_id |
| INNER JOIN afe_labels ON afe_hosts_labels.label_id = afe_labels.id |
| WHERE afe_labels.platform |
| GROUP BY afe_hosts.id |
| HAVING platform_count > 1 |
| ORDER BY hostname""") |
| |
| if rows: |
| logging.warning('Cleanup found hosts with multiple platforms') |
| with _cleanup_warning_banner('hosts with multiple platforms', |
| len(rows)): |
| for row in rows: |
| logging.warning(' '.join(str(item) for item in row)) |
| _report_detected_errors('hosts_with_multiple_platforms', len(rows)) |
| |
| |
| def _check_for_no_platform_hosts(self): |
| rows = self._db.execute(""" |
| SELECT hostname |
| FROM afe_hosts |
| LEFT JOIN afe_hosts_labels |
| ON afe_hosts.id = afe_hosts_labels.host_id |
| AND afe_hosts_labels.label_id IN (SELECT id FROM afe_labels |
| WHERE platform) |
| WHERE NOT afe_hosts.invalid AND afe_hosts_labels.host_id IS NULL""") |
| if rows: |
| with _cleanup_warning_banner('hosts with no platform', len(rows)): |
| for row in rows: |
| logging.warning(row[0]) |
| _report_detected_errors('hosts_with_no_platform', len(rows)) |
| |
| |
| def _cleanup_orphaned_containers(self): |
| """Cleanup orphaned containers in each drone. |
| |
| The function queues a lxc_cleanup call in each drone without waiting for |
| the script to finish, as the cleanup procedure could take minutes and the |
| script output is logged. |
| |
| """ |
| ssp_enabled = global_config.global_config.get_config_value( |
| 'AUTOSERV', 'enable_ssp_container') |
| if not ssp_enabled: |
| logging.info( |
| 'Server-side packaging is not enabled, no need to clean ' |
| 'up orphaned containers.') |
| return |
| self.drone_manager.cleanup_orphaned_containers() |
| |
| |
| def _report_detected_errors(metric_name, count, fields={}): |
| """Reports a counter metric for recovered errors |
| |
| @param metric_name: Name of the metric to report about. |
| @param count: How many "errors" were fixed this cycle. |
| @param fields: Optional fields to include with the metric. |
| """ |
| m = '%s/errors_recovered/%s' % (_METRICS_PREFIX, metric_name) |
| metrics.Counter(m).increment_by(count, fields=fields) |
| |
| |
| def _report_detected_errors(metric_name, gauge, fields={}): |
| """Reports a gauge metric for errors detected |
| |
| @param metric_name: Name of the metric to report about. |
| @param gauge: Outstanding number of unrecoverable errors of this type. |
| @param fields: Optional fields to include with the metric. |
| """ |
| m = '%s/errors_detected/%s' % (_METRICS_PREFIX, metric_name) |
| metrics.Gauge(m).set(gauge, fields=fields) |
| |
| |
| @contextlib.contextmanager |
| def _cleanup_warning_banner(banner, error_count=None): |
| """Put a clear context in the logs around list of errors |
| |
| @param: banner: The identifying header to print for context. |
| @param: error_count: If not None, the number of errors detected. |
| """ |
| if error_count is not None: |
| banner += ' (total: %d)' % error_count |
| logging.warning('#### START: %s ####', banner) |
| try: |
| yield |
| finally: |
| logging.warning('#### END: %s ####', banner) |