blob: 59313c0ad5d7365a30d2a943960527fde2cef979 [file] [log] [blame]
mblighf3294cc2009-04-08 21:17:38 +00001"""
2Autotest AFE Cleanup used by the scheduler
3"""
4
5
6import datetime, time, logging
7import common
8from autotest_lib.database import database_connection
9from autotest_lib.frontend.afe import models
10from autotest_lib.scheduler import email_manager, scheduler_config
11
12
13class PeriodicCleanup(object):
14
15
16 def __init__(self, db, clean_interval, run_at_initialize=False):
17 self._db = db
18 self.clean_interval = clean_interval
19 self._last_clean_time = time.time()
showard915958d2009-04-22 21:00:58 +000020 self._run_at_initialize = run_at_initialize
21
22
23 def initialize(self):
24 if self._run_at_initialize:
mblighf3294cc2009-04-08 21:17:38 +000025 self._cleanup()
26
27
28 def run_cleanup_maybe(self):
29 should_cleanup = (self._last_clean_time + self.clean_interval * 60
30 < time.time())
31 if should_cleanup:
32 self._cleanup()
33 self._last_clean_time = time.time()
34
35
36 def _cleanup(self):
37 """Abrstract cleanup method."""
38 raise NotImplementedError
39
40
41class UserCleanup(PeriodicCleanup):
42 """User cleanup that is controlled by the global config variable
43 clean_interval in the SCHEDULER section.
44 """
45
46
47 def __init__(self, db, clean_interval_minutes):
48 super(UserCleanup, self).__init__(db, clean_interval_minutes)
49
50
51 def _cleanup(self):
52 logging.info('Running periodic cleanup')
53 self._abort_timed_out_jobs()
54 self._abort_jobs_past_synch_start_timeout()
showard12f3e322009-05-13 21:27:42 +000055 self._abort_jobs_past_max_runtime()
mblighf3294cc2009-04-08 21:17:38 +000056 self._clear_inactive_blocks()
57 self._check_for_db_inconsistencies()
58
59
60 def _abort_timed_out_jobs(self):
61 msg = 'Aborting all jobs that have timed out and are not complete'
62 logging.info(msg)
63 query = models.Job.objects.filter(hostqueueentry__complete=False).extra(
64 where=['created_on + INTERVAL timeout HOUR < NOW()'])
65 for job in query.distinct():
66 logging.warning('Aborting job %d due to job timeout', job.id)
67 job.abort(None)
68
69
70 def _abort_jobs_past_synch_start_timeout(self):
71 """
72 Abort synchronous jobs that are past the start timeout (from global
73 config) and are holding a machine that's in everyone.
74 """
75 msg = 'Aborting synchronous jobs that are past the start timeout'
76 logging.info(msg)
77 timeout_delta = datetime.timedelta(
78 minutes=scheduler_config.config.synch_job_start_timeout_minutes)
79 timeout_start = datetime.datetime.now() - timeout_delta
80 query = models.Job.objects.filter(
81 created_on__lt=timeout_start,
82 hostqueueentry__status='Pending',
83 hostqueueentry__host__aclgroup__name='Everyone')
84 for job in query.distinct():
85 logging.warning('Aborting job %d due to start timeout', job.id)
86 entries_to_abort = job.hostqueueentry_set.exclude(
87 status=models.HostQueueEntry.Status.RUNNING)
88 for queue_entry in entries_to_abort:
89 queue_entry.abort(None)
90
91
showard12f3e322009-05-13 21:27:42 +000092 def _abort_jobs_past_max_runtime(self):
93 """
94 Abort executions that have started and are past the job's max runtime.
95 """
96 logging.info('Aborting all jobs that have passed maximum runtime')
97 rows = self._db.execute("""
98 SELECT hqe.id
99 FROM host_queue_entries AS hqe
100 INNER JOIN jobs ON (hqe.job_id = jobs.id)
101 WHERE NOT hqe.complete AND NOT hqe.aborted AND
102 hqe.started_on + INTERVAL jobs.max_runtime_hrs HOUR < NOW()""")
103 query = models.HostQueueEntry.objects.filter(
104 id__in=[row[0] for row in rows])
105 for queue_entry in query.distinct():
106 logging.warning('Aborting entry %s due to max runtime', queue_entry)
107 queue_entry.abort(None)
108
109
mblighf3294cc2009-04-08 21:17:38 +0000110 def _check_for_db_inconsistencies(self):
111 logging.info('Checking for db inconsistencies')
112 query = models.HostQueueEntry.objects.filter(active=True, complete=True)
113 if query.count() != 0:
114 subject = ('%d queue entries found with active=complete=1'
115 % query.count())
116 message = '\n'.join(str(entry.get_object_dict())
117 for entry in query[:50])
118 if len(query) > 50:
119 message += '\n(truncated)\n'
120
121 logging.error(subject)
122 email_manager.manager.enqueue_notify_email(subject, message)
123
124
125 def _clear_inactive_blocks(self):
126 msg = 'Clear out blocks for all completed jobs.'
127 logging.info(msg)
128 # this would be simpler using NOT IN (subquery), but MySQL
129 # treats all IN subqueries as dependent, so this optimizes much
130 # better
131 self._db.execute("""
132 DELETE ihq FROM ineligible_host_queues ihq
133 LEFT JOIN (SELECT DISTINCT job_id FROM host_queue_entries
134 WHERE NOT complete) hqe
135 USING (job_id) WHERE hqe.job_id IS NULL""")
136
137
138class TwentyFourHourUpkeep(PeriodicCleanup):
139 """Cleanup that runs at the startup of monitor_db and every subsequent
140 twenty four hours.
141 """
142
143
144 def __init__(self, db, run_at_initialize=True):
145 clean_interval = 24 * 60 # 24 hours
146 super(TwentyFourHourUpkeep, self).__init__(
147 db, clean_interval, run_at_initialize=run_at_initialize)
148
149
150 def _cleanup(self):
151 logging.info('Running 24 hour clean up')
152 self._django_session_cleanup()
153
154
155 def _django_session_cleanup(self):
156 """Clean up django_session since django doesn't for us.
157 http://www.djangoproject.com/documentation/0.96/sessions/
158 """
159 logging.info('Deleting old sessions from django_session')
160 sql = 'DELETE FROM django_session WHERE expire_date < NOW()'
161 self._db.execute(sql)