blob: 59313c0ad5d7365a30d2a943960527fde2cef979 [file] [log] [blame]
"""
Autotest AFE Cleanup used by the scheduler
"""
import datetime, time, logging
import common
from autotest_lib.database import database_connection
from autotest_lib.frontend.afe import models
from autotest_lib.scheduler import email_manager, scheduler_config
class PeriodicCleanup(object):
def __init__(self, db, clean_interval, run_at_initialize=False):
self._db = db
self.clean_interval = clean_interval
self._last_clean_time = time.time()
self._run_at_initialize = run_at_initialize
def initialize(self):
if self._run_at_initialize:
self._cleanup()
def run_cleanup_maybe(self):
should_cleanup = (self._last_clean_time + self.clean_interval * 60
< time.time())
if should_cleanup:
self._cleanup()
self._last_clean_time = time.time()
def _cleanup(self):
"""Abrstract cleanup method."""
raise NotImplementedError
class UserCleanup(PeriodicCleanup):
"""User cleanup that is controlled by the global config variable
clean_interval in the SCHEDULER section.
"""
def __init__(self, db, clean_interval_minutes):
super(UserCleanup, self).__init__(db, clean_interval_minutes)
def _cleanup(self):
logging.info('Running periodic cleanup')
self._abort_timed_out_jobs()
self._abort_jobs_past_synch_start_timeout()
self._abort_jobs_past_max_runtime()
self._clear_inactive_blocks()
self._check_for_db_inconsistencies()
def _abort_timed_out_jobs(self):
msg = 'Aborting all jobs that have timed out and are not complete'
logging.info(msg)
query = models.Job.objects.filter(hostqueueentry__complete=False).extra(
where=['created_on + INTERVAL timeout HOUR < NOW()'])
for job in query.distinct():
logging.warning('Aborting job %d due to job timeout', job.id)
job.abort(None)
def _abort_jobs_past_synch_start_timeout(self):
"""
Abort synchronous jobs that are past the start timeout (from global
config) and are holding a machine that's in everyone.
"""
msg = 'Aborting synchronous jobs that are past the start timeout'
logging.info(msg)
timeout_delta = datetime.timedelta(
minutes=scheduler_config.config.synch_job_start_timeout_minutes)
timeout_start = datetime.datetime.now() - timeout_delta
query = models.Job.objects.filter(
created_on__lt=timeout_start,
hostqueueentry__status='Pending',
hostqueueentry__host__aclgroup__name='Everyone')
for job in query.distinct():
logging.warning('Aborting job %d due to start timeout', job.id)
entries_to_abort = job.hostqueueentry_set.exclude(
status=models.HostQueueEntry.Status.RUNNING)
for queue_entry in entries_to_abort:
queue_entry.abort(None)
def _abort_jobs_past_max_runtime(self):
"""
Abort executions that have started and are past the job's max runtime.
"""
logging.info('Aborting all jobs that have passed maximum runtime')
rows = self._db.execute("""
SELECT hqe.id
FROM host_queue_entries AS hqe
INNER JOIN jobs ON (hqe.job_id = jobs.id)
WHERE NOT hqe.complete AND NOT hqe.aborted AND
hqe.started_on + INTERVAL jobs.max_runtime_hrs HOUR < NOW()""")
query = models.HostQueueEntry.objects.filter(
id__in=[row[0] for row in rows])
for queue_entry in query.distinct():
logging.warning('Aborting entry %s due to max runtime', queue_entry)
queue_entry.abort(None)
def _check_for_db_inconsistencies(self):
logging.info('Checking for db inconsistencies')
query = models.HostQueueEntry.objects.filter(active=True, complete=True)
if query.count() != 0:
subject = ('%d queue entries found with active=complete=1'
% query.count())
message = '\n'.join(str(entry.get_object_dict())
for entry in query[:50])
if len(query) > 50:
message += '\n(truncated)\n'
logging.error(subject)
email_manager.manager.enqueue_notify_email(subject, message)
def _clear_inactive_blocks(self):
msg = 'Clear out blocks for all completed jobs.'
logging.info(msg)
# this would be simpler using NOT IN (subquery), but MySQL
# treats all IN subqueries as dependent, so this optimizes much
# better
self._db.execute("""
DELETE ihq FROM ineligible_host_queues ihq
LEFT JOIN (SELECT DISTINCT job_id FROM host_queue_entries
WHERE NOT complete) hqe
USING (job_id) WHERE hqe.job_id IS NULL""")
class TwentyFourHourUpkeep(PeriodicCleanup):
"""Cleanup that runs at the startup of monitor_db and every subsequent
twenty four hours.
"""
def __init__(self, db, run_at_initialize=True):
clean_interval = 24 * 60 # 24 hours
super(TwentyFourHourUpkeep, self).__init__(
db, clean_interval, run_at_initialize=run_at_initialize)
def _cleanup(self):
logging.info('Running 24 hour clean up')
self._django_session_cleanup()
def _django_session_cleanup(self):
"""Clean up django_session since django doesn't for us.
http://www.djangoproject.com/documentation/0.96/sessions/
"""
logging.info('Deleting old sessions from django_session')
sql = 'DELETE FROM django_session WHERE expire_date < NOW()'
self._db.execute(sql)