blob: 1ed71a1dd20ae9928d37adce49730730c5bd4c9b [file] [log] [blame]
mblighf3294cc2009-04-08 21:17:38 +00001"""
2Autotest AFE Cleanup used by the scheduler
3"""
4
5
6import datetime, time, logging
7import common
8from autotest_lib.database import database_connection
9from autotest_lib.frontend.afe import models
10from autotest_lib.scheduler import email_manager, scheduler_config
11
12
13class PeriodicCleanup(object):
14
15
16 def __init__(self, db, clean_interval, run_at_initialize=False):
17 self._db = db
18 self.clean_interval = clean_interval
19 self._last_clean_time = time.time()
showard915958d2009-04-22 21:00:58 +000020 self._run_at_initialize = run_at_initialize
21
22
23 def initialize(self):
24 if self._run_at_initialize:
mblighf3294cc2009-04-08 21:17:38 +000025 self._cleanup()
26
27
28 def run_cleanup_maybe(self):
29 should_cleanup = (self._last_clean_time + self.clean_interval * 60
30 < time.time())
31 if should_cleanup:
32 self._cleanup()
33 self._last_clean_time = time.time()
34
35
36 def _cleanup(self):
37 """Abrstract cleanup method."""
38 raise NotImplementedError
39
40
41class UserCleanup(PeriodicCleanup):
42 """User cleanup that is controlled by the global config variable
43 clean_interval in the SCHEDULER section.
44 """
45
46
47 def __init__(self, db, clean_interval_minutes):
48 super(UserCleanup, self).__init__(db, clean_interval_minutes)
49
50
51 def _cleanup(self):
52 logging.info('Running periodic cleanup')
53 self._abort_timed_out_jobs()
54 self._abort_jobs_past_synch_start_timeout()
showard12f3e322009-05-13 21:27:42 +000055 self._abort_jobs_past_max_runtime()
mblighf3294cc2009-04-08 21:17:38 +000056 self._clear_inactive_blocks()
57 self._check_for_db_inconsistencies()
58
59
60 def _abort_timed_out_jobs(self):
61 msg = 'Aborting all jobs that have timed out and are not complete'
62 logging.info(msg)
63 query = models.Job.objects.filter(hostqueueentry__complete=False).extra(
64 where=['created_on + INTERVAL timeout HOUR < NOW()'])
65 for job in query.distinct():
66 logging.warning('Aborting job %d due to job timeout', job.id)
67 job.abort(None)
68
69
70 def _abort_jobs_past_synch_start_timeout(self):
71 """
72 Abort synchronous jobs that are past the start timeout (from global
73 config) and are holding a machine that's in everyone.
74 """
75 msg = 'Aborting synchronous jobs that are past the start timeout'
76 logging.info(msg)
77 timeout_delta = datetime.timedelta(
78 minutes=scheduler_config.config.synch_job_start_timeout_minutes)
79 timeout_start = datetime.datetime.now() - timeout_delta
80 query = models.Job.objects.filter(
81 created_on__lt=timeout_start,
82 hostqueueentry__status='Pending',
83 hostqueueentry__host__aclgroup__name='Everyone')
84 for job in query.distinct():
85 logging.warning('Aborting job %d due to start timeout', job.id)
86 entries_to_abort = job.hostqueueentry_set.exclude(
87 status=models.HostQueueEntry.Status.RUNNING)
88 for queue_entry in entries_to_abort:
89 queue_entry.abort(None)
90
91
showard12f3e322009-05-13 21:27:42 +000092 def _abort_jobs_past_max_runtime(self):
93 """
94 Abort executions that have started and are past the job's max runtime.
95 """
96 logging.info('Aborting all jobs that have passed maximum runtime')
97 rows = self._db.execute("""
98 SELECT hqe.id
99 FROM host_queue_entries AS hqe
100 INNER JOIN jobs ON (hqe.job_id = jobs.id)
101 WHERE NOT hqe.complete AND NOT hqe.aborted AND
102 hqe.started_on + INTERVAL jobs.max_runtime_hrs HOUR < NOW()""")
103 query = models.HostQueueEntry.objects.filter(
104 id__in=[row[0] for row in rows])
105 for queue_entry in query.distinct():
106 logging.warning('Aborting entry %s due to max runtime', queue_entry)
107 queue_entry.abort(None)
108
109
mblighf3294cc2009-04-08 21:17:38 +0000110 def _check_for_db_inconsistencies(self):
showard01a51672009-05-29 18:42:37 +0000111 logging.info('Cleaning db inconsistencies')
112 self._check_all_invalid_related_objects()
mblighf3294cc2009-04-08 21:17:38 +0000113
showard01a51672009-05-29 18:42:37 +0000114
115 def _check_invalid_related_objects_one_way(self, first_model,
116 relation_field, second_model):
117 if 'invalid' not in first_model.get_field_dict():
118 return []
119 invalid_objects = list(first_model.objects.filter(invalid=True))
120 first_model.objects.populate_relationships(invalid_objects,
121 second_model,
122 'related_objects')
123 error_lines = []
124 for invalid_object in invalid_objects:
125 if invalid_object.related_objects:
126 related_list = ', '.join(str(related_object) for related_object
127 in invalid_object.related_objects)
128 error_lines.append('Invalid %s %s is related to %ss: %s'
129 % (first_model.__name__, invalid_object,
130 second_model.__name__, related_list))
131 related_manager = getattr(invalid_object, relation_field)
132 related_manager.clear()
133 return error_lines
134
135
136 def _check_invalid_related_objects(self, first_model, first_field,
137 second_model, second_field):
138 errors = self._check_invalid_related_objects_one_way(
139 first_model, first_field, second_model)
140 errors.extend(self._check_invalid_related_objects_one_way(
141 second_model, second_field, first_model))
142 return errors
143
144
145 def _check_all_invalid_related_objects(self):
146 model_pairs = ((models.Host, 'labels', models.Label, 'host_set'),
147 (models.AclGroup, 'hosts', models.Host, 'aclgroup_set'),
148 (models.AclGroup, 'users', models.User, 'aclgroup_set'),
149 (models.Test, 'dependency_labels', models.Label,
150 'test_set'))
151 errors = []
152 for first_model, first_field, second_model, second_field in model_pairs:
153 errors.extend(self._check_invalid_related_objects(
154 first_model, first_field, second_model, second_field))
155
156 if errors:
157 subject = ('%s relationships to invalid models, cleaned all' %
158 len(errors))
159 message = '\n'.join(errors)
160 logging.warning(subject)
161 logging.warning(message)
mblighf3294cc2009-04-08 21:17:38 +0000162 email_manager.manager.enqueue_notify_email(subject, message)
163
164
165 def _clear_inactive_blocks(self):
166 msg = 'Clear out blocks for all completed jobs.'
167 logging.info(msg)
168 # this would be simpler using NOT IN (subquery), but MySQL
169 # treats all IN subqueries as dependent, so this optimizes much
170 # better
171 self._db.execute("""
172 DELETE ihq FROM ineligible_host_queues ihq
173 LEFT JOIN (SELECT DISTINCT job_id FROM host_queue_entries
174 WHERE NOT complete) hqe
175 USING (job_id) WHERE hqe.job_id IS NULL""")
176
177
178class TwentyFourHourUpkeep(PeriodicCleanup):
179 """Cleanup that runs at the startup of monitor_db and every subsequent
180 twenty four hours.
181 """
182
183
184 def __init__(self, db, run_at_initialize=True):
185 clean_interval = 24 * 60 # 24 hours
186 super(TwentyFourHourUpkeep, self).__init__(
187 db, clean_interval, run_at_initialize=run_at_initialize)
188
189
190 def _cleanup(self):
191 logging.info('Running 24 hour clean up')
192 self._django_session_cleanup()
showard01a51672009-05-29 18:42:37 +0000193 self._check_for_uncleanable_db_inconsistencies()
mblighf3294cc2009-04-08 21:17:38 +0000194
195
196 def _django_session_cleanup(self):
197 """Clean up django_session since django doesn't for us.
198 http://www.djangoproject.com/documentation/0.96/sessions/
199 """
200 logging.info('Deleting old sessions from django_session')
201 sql = 'DELETE FROM django_session WHERE expire_date < NOW()'
202 self._db.execute(sql)
showard01a51672009-05-29 18:42:37 +0000203
204
205 def _check_for_uncleanable_db_inconsistencies(self):
206 logging.info('Checking for uncleanable DB inconsistencies')
207 self._check_for_active_and_complete_queue_entries()
208 self._check_for_multiple_platform_hosts()
209 self._check_for_no_platform_hosts()
210
211
212 def _check_for_active_and_complete_queue_entries(self):
213 query = models.HostQueueEntry.objects.filter(active=True, complete=True)
214 if query.count() != 0:
215 subject = ('%d queue entries found with active=complete=1'
216 % query.count())
217 lines = [str(entry.get_object_dict()) for entry in query]
218 self._send_inconsistency_message(subject, lines)
219
220
221 def _check_for_multiple_platform_hosts(self):
222 rows = self._db.execute("""
223 SELECT hosts.id, hostname, COUNT(1) AS platform_count,
224 GROUP_CONCAT(labels.name)
225 FROM hosts
226 INNER JOIN hosts_labels ON hosts.id = hosts_labels.host_id
227 INNER JOIN labels ON hosts_labels.label_id = labels.id
228 WHERE labels.platform
229 GROUP BY hosts.id
230 HAVING platform_count > 1
231 ORDER BY hostname""")
232 if rows:
233 subject = '%s hosts with multiple platforms' % self._db.rowcount
234 lines = [' '.join(str(item) for item in row)
235 for row in rows]
236 self._send_inconsistency_message(subject, lines)
237
238
239 def _check_for_no_platform_hosts(self):
240 rows = self._db.execute("""
241 SELECT hostname
242 FROM hosts
243 LEFT JOIN hosts_labels
244 ON hosts.id = hosts_labels.host_id
245 AND hosts_labels.label_id IN (SELECT id FROM labels
246 WHERE platform)
247 WHERE NOT hosts.invalid AND hosts_labels.host_id IS NULL""")
248 if rows:
249 subject = '%s hosts with no platform' % self._db.rowcount
250 self._send_inconsistency_message(
251 subject, [', '.join(row[0] for row in rows)])
252
253
254 def _send_inconsistency_message(self, subject, lines):
255 logging.error(subject)
256 message = '\n'.join(lines)
257 if len(message) > 5000:
258 message = message[:5000] + '\n(truncated)\n'
259 email_manager.manager.enqueue_notify_email(subject, message)