blob: e5e316e0ad538458956dca53ad870e9e247df55d [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard542e8402008-09-19 20:16:18 +00008import datetime, errno, MySQLdb, optparse, os, pwd, Queue, re, shutil, signal
9import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
mbligh70feeee2008-06-11 16:20:49 +000010import common
showard21baa452008-10-21 00:08:39 +000011from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000012from autotest_lib.client.common_lib import global_config
showard2bab8f42008-11-12 18:15:22 +000013from autotest_lib.client.common_lib import host_protections, utils, debug
showardb1e51872008-10-07 11:08:18 +000014from autotest_lib.database import database_connection
showard21baa452008-10-21 00:08:39 +000015from autotest_lib.frontend.afe import models
mbligh70feeee2008-06-11 16:20:49 +000016
mblighb090f142008-02-27 21:33:46 +000017
mbligh36768f02008-02-22 18:28:33 +000018RESULTS_DIR = '.'
19AUTOSERV_NICE_LEVEL = 10
showardb1e51872008-10-07 11:08:18 +000020CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000021
22AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
23
24if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000025 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000026AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
27AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
28
29if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000030 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000031
mblighbb421852008-03-11 22:36:16 +000032AUTOSERV_PID_FILE = '.autoserv_execute'
mbligh90a549d2008-03-25 23:52:34 +000033# how long to wait for autoserv to write a pidfile
34PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000035
mbligh6f8bab42008-02-29 22:45:14 +000036_db = None
mbligh36768f02008-02-22 18:28:33 +000037_shutdown = False
38_notify_email = None
mbligh4314a712008-02-29 22:44:30 +000039_autoserv_path = 'autoserv'
40_testing_mode = False
showardec113162008-05-08 00:52:49 +000041_global_config_section = 'SCHEDULER'
showard542e8402008-09-19 20:16:18 +000042_base_url = None
43# see os.getlogin() online docs
44_email_from = pwd.getpwuid(os.getuid())[0]
mbligh36768f02008-02-22 18:28:33 +000045
46
47def main():
jadmanski0afbb632008-06-06 21:10:57 +000048 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000049
jadmanski0afbb632008-06-06 21:10:57 +000050 parser = optparse.OptionParser(usage)
51 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
52 action='store_true')
53 parser.add_option('--logfile', help='Set a log file that all stdout ' +
54 'should be redirected to. Stderr will go to this ' +
55 'file + ".err"')
56 parser.add_option('--test', help='Indicate that scheduler is under ' +
57 'test and should use dummy autoserv and no parsing',
58 action='store_true')
59 (options, args) = parser.parse_args()
60 if len(args) != 1:
61 parser.print_usage()
62 return
mbligh36768f02008-02-22 18:28:33 +000063
jadmanski0afbb632008-06-06 21:10:57 +000064 global RESULTS_DIR
65 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +000066
jadmanski0afbb632008-06-06 21:10:57 +000067 # read in notify_email from global_config
68 c = global_config.global_config
69 global _notify_email
70 val = c.get_config_value(_global_config_section, "notify_email")
71 if val != "":
72 _notify_email = val
mbligh36768f02008-02-22 18:28:33 +000073
showard3bb499f2008-07-03 19:42:20 +000074 tick_pause = c.get_config_value(
75 _global_config_section, 'tick_pause_sec', type=int)
76
jadmanski0afbb632008-06-06 21:10:57 +000077 if options.test:
78 global _autoserv_path
79 _autoserv_path = 'autoserv_dummy'
80 global _testing_mode
81 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +000082
showard542e8402008-09-19 20:16:18 +000083 # read in base url
84 global _base_url
showardb1e51872008-10-07 11:08:18 +000085 val = c.get_config_value(CONFIG_SECTION, "base_url")
showard542e8402008-09-19 20:16:18 +000086 if val:
87 _base_url = val
88 else:
89 _base_url = "http://your_autotest_server/afe/"
90
jadmanski0afbb632008-06-06 21:10:57 +000091 init(options.logfile)
92 dispatcher = Dispatcher()
93 dispatcher.do_initial_recovery(recover_hosts=options.recover_hosts)
94
95 try:
96 while not _shutdown:
97 dispatcher.tick()
showard3bb499f2008-07-03 19:42:20 +000098 time.sleep(tick_pause)
jadmanski0afbb632008-06-06 21:10:57 +000099 except:
100 log_stacktrace("Uncaught exception; terminating monitor_db")
101
102 email_manager.send_queued_emails()
103 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000104
105
106def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000107 global _shutdown
108 _shutdown = True
109 print "Shutdown request received."
mbligh36768f02008-02-22 18:28:33 +0000110
111
112def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000113 if logfile:
114 enable_logging(logfile)
115 print "%s> dispatcher starting" % time.strftime("%X %x")
116 print "My PID is %d" % os.getpid()
mbligh36768f02008-02-22 18:28:33 +0000117
showardb1e51872008-10-07 11:08:18 +0000118 if _testing_mode:
119 global_config.global_config.override_config_value(
120 CONFIG_SECTION, 'database', 'stresstest_autotest_web')
121
jadmanski0afbb632008-06-06 21:10:57 +0000122 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
123 global _db
showardb1e51872008-10-07 11:08:18 +0000124 _db = database_connection.DatabaseConnection(CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000125 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000126
showardfa8629c2008-11-04 16:51:23 +0000127 # ensure Django connection is in autocommit
128 setup_django_environment.enable_autocommit()
129
showard2bab8f42008-11-12 18:15:22 +0000130 debug.configure('scheduler', format_string='%(message)s')
131
jadmanski0afbb632008-06-06 21:10:57 +0000132 print "Setting signal handler"
133 signal.signal(signal.SIGINT, handle_sigint)
134
135 print "Connected! Running..."
mbligh36768f02008-02-22 18:28:33 +0000136
137
138def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000139 out_file = logfile
140 err_file = "%s.err" % logfile
141 print "Enabling logging to %s (%s)" % (out_file, err_file)
142 out_fd = open(out_file, "a", buffering=0)
143 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000144
jadmanski0afbb632008-06-06 21:10:57 +0000145 os.dup2(out_fd.fileno(), sys.stdout.fileno())
146 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000147
jadmanski0afbb632008-06-06 21:10:57 +0000148 sys.stdout = out_fd
149 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000150
151
mblighd5c95802008-03-05 00:33:46 +0000152def queue_entries_to_abort():
jadmanski0afbb632008-06-06 21:10:57 +0000153 rows = _db.execute("""
154 SELECT * FROM host_queue_entries WHERE status='Abort';
155 """)
showard2bab8f42008-11-12 18:15:22 +0000156
jadmanski0afbb632008-06-06 21:10:57 +0000157 qe = [HostQueueEntry(row=i) for i in rows]
158 return qe
mbligh36768f02008-02-22 18:28:33 +0000159
mblighe2586682008-02-29 22:45:46 +0000160def remove_file_or_dir(path):
jadmanski0afbb632008-06-06 21:10:57 +0000161 if stat.S_ISDIR(os.stat(path).st_mode):
162 # directory
163 shutil.rmtree(path)
164 else:
165 # file
166 os.remove(path)
mblighe2586682008-02-29 22:45:46 +0000167
168
mbligh36768f02008-02-22 18:28:33 +0000169def log_stacktrace(reason):
jadmanski0afbb632008-06-06 21:10:57 +0000170 (type, value, tb) = sys.exc_info()
171 str = "EXCEPTION: %s\n" % reason
172 str += ''.join(traceback.format_exception(type, value, tb))
mbligh36768f02008-02-22 18:28:33 +0000173
jadmanski0afbb632008-06-06 21:10:57 +0000174 sys.stderr.write("\n%s\n" % str)
175 email_manager.enqueue_notify_email("monitor_db exception", str)
mbligh36768f02008-02-22 18:28:33 +0000176
mblighbb421852008-03-11 22:36:16 +0000177
178def get_proc_poll_fn(pid):
jadmanski0afbb632008-06-06 21:10:57 +0000179 proc_path = os.path.join('/proc', str(pid))
180 def poll_fn():
181 if os.path.exists(proc_path):
182 return None
183 return 0 # we can't get a real exit code
184 return poll_fn
mblighbb421852008-03-11 22:36:16 +0000185
186
showard542e8402008-09-19 20:16:18 +0000187def send_email(from_addr, to_string, subject, body):
188 """Mails out emails to the addresses listed in to_string.
189
190 to_string is split into a list which can be delimited by any of:
191 ';', ',', ':' or any whitespace
192 """
193
194 # Create list from string removing empty strings from the list.
195 to_list = [x for x in re.split('\s|,|;|:', to_string) if x]
showard7d182aa2008-09-22 16:17:24 +0000196 if not to_list:
197 return
198
showard542e8402008-09-19 20:16:18 +0000199 msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (
200 from_addr, ', '.join(to_list), subject, body)
showard7d182aa2008-09-22 16:17:24 +0000201 try:
202 mailer = smtplib.SMTP('localhost')
203 try:
204 mailer.sendmail(from_addr, to_list, msg)
205 finally:
206 mailer.quit()
207 except Exception, e:
208 print "Sending email failed. Reason: %s" % repr(e)
showard542e8402008-09-19 20:16:18 +0000209
210
mblighbb421852008-03-11 22:36:16 +0000211def kill_autoserv(pid, poll_fn=None):
jadmanski0afbb632008-06-06 21:10:57 +0000212 print 'killing', pid
213 if poll_fn is None:
214 poll_fn = get_proc_poll_fn(pid)
215 if poll_fn() == None:
216 os.kill(pid, signal.SIGCONT)
217 os.kill(pid, signal.SIGTERM)
mbligh36768f02008-02-22 18:28:33 +0000218
219
showard2bab8f42008-11-12 18:15:22 +0000220def ensure_directory_exists(directory_path):
221 if not os.path.exists(directory_path):
222 os.makedirs(directory_path)
223
224
showard7cf9a9b2008-05-15 21:15:52 +0000225class EmailNotificationManager(object):
jadmanski0afbb632008-06-06 21:10:57 +0000226 def __init__(self):
227 self._emails = []
showard7cf9a9b2008-05-15 21:15:52 +0000228
jadmanski0afbb632008-06-06 21:10:57 +0000229 def enqueue_notify_email(self, subject, message):
230 if not _notify_email:
231 return
showard7cf9a9b2008-05-15 21:15:52 +0000232
jadmanski0afbb632008-06-06 21:10:57 +0000233 body = 'Subject: ' + subject + '\n'
234 body += "%s / %s / %s\n%s" % (socket.gethostname(),
235 os.getpid(),
236 time.strftime("%X %x"), message)
237 self._emails.append(body)
showard7cf9a9b2008-05-15 21:15:52 +0000238
239
jadmanski0afbb632008-06-06 21:10:57 +0000240 def send_queued_emails(self):
241 if not self._emails:
242 return
243 subject = 'Scheduler notifications from ' + socket.gethostname()
244 separator = '\n' + '-' * 40 + '\n'
245 body = separator.join(self._emails)
showard7cf9a9b2008-05-15 21:15:52 +0000246
showard542e8402008-09-19 20:16:18 +0000247 send_email(_email_from, _notify_email, subject, body)
jadmanski0afbb632008-06-06 21:10:57 +0000248 self._emails = []
showard7cf9a9b2008-05-15 21:15:52 +0000249
250email_manager = EmailNotificationManager()
251
252
showard63a34772008-08-18 19:32:50 +0000253class HostScheduler(object):
254 def _get_ready_hosts(self):
255 # avoid any host with a currently active queue entry against it
256 hosts = Host.fetch(
257 joins='LEFT JOIN host_queue_entries AS active_hqe '
258 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000259 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000260 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000261 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000262 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
263 return dict((host.id, host) for host in hosts)
264
265
266 @staticmethod
267 def _get_sql_id_list(id_list):
268 return ','.join(str(item_id) for item_id in id_list)
269
270
271 @classmethod
showard989f25d2008-10-01 11:38:11 +0000272 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000273 if not id_list:
274 return {}
showard63a34772008-08-18 19:32:50 +0000275 query %= cls._get_sql_id_list(id_list)
276 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000277 return cls._process_many2many_dict(rows, flip)
278
279
280 @staticmethod
281 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000282 result = {}
283 for row in rows:
284 left_id, right_id = long(row[0]), long(row[1])
showard989f25d2008-10-01 11:38:11 +0000285 if flip:
286 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000287 result.setdefault(left_id, set()).add(right_id)
288 return result
289
290
291 @classmethod
292 def _get_job_acl_groups(cls, job_ids):
293 query = """
294 SELECT jobs.id, acl_groups_users.acl_group_id
295 FROM jobs
296 INNER JOIN users ON users.login = jobs.owner
297 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
298 WHERE jobs.id IN (%s)
299 """
300 return cls._get_many2many_dict(query, job_ids)
301
302
303 @classmethod
304 def _get_job_ineligible_hosts(cls, job_ids):
305 query = """
306 SELECT job_id, host_id
307 FROM ineligible_host_queues
308 WHERE job_id IN (%s)
309 """
310 return cls._get_many2many_dict(query, job_ids)
311
312
313 @classmethod
showard989f25d2008-10-01 11:38:11 +0000314 def _get_job_dependencies(cls, job_ids):
315 query = """
316 SELECT job_id, label_id
317 FROM jobs_dependency_labels
318 WHERE job_id IN (%s)
319 """
320 return cls._get_many2many_dict(query, job_ids)
321
322
323 @classmethod
showard63a34772008-08-18 19:32:50 +0000324 def _get_host_acls(cls, host_ids):
325 query = """
326 SELECT host_id, acl_group_id
327 FROM acl_groups_hosts
328 WHERE host_id IN (%s)
329 """
330 return cls._get_many2many_dict(query, host_ids)
331
332
333 @classmethod
334 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000335 if not host_ids:
336 return {}, {}
showard63a34772008-08-18 19:32:50 +0000337 query = """
338 SELECT label_id, host_id
339 FROM hosts_labels
340 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000341 """ % cls._get_sql_id_list(host_ids)
342 rows = _db.execute(query)
343 labels_to_hosts = cls._process_many2many_dict(rows)
344 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
345 return labels_to_hosts, hosts_to_labels
346
347
348 @classmethod
349 def _get_labels(cls):
350 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000351
352
353 def refresh(self, pending_queue_entries):
354 self._hosts_available = self._get_ready_hosts()
355
356 relevant_jobs = [queue_entry.job_id
357 for queue_entry in pending_queue_entries]
358 self._job_acls = self._get_job_acl_groups(relevant_jobs)
359 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000360 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000361
362 host_ids = self._hosts_available.keys()
363 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000364 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
365
366 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000367
368
369 def _is_acl_accessible(self, host_id, queue_entry):
370 job_acls = self._job_acls.get(queue_entry.job_id, set())
371 host_acls = self._host_acls.get(host_id, set())
372 return len(host_acls.intersection(job_acls)) > 0
373
374
showard989f25d2008-10-01 11:38:11 +0000375 def _check_job_dependencies(self, job_dependencies, host_labels):
376 missing = job_dependencies - host_labels
377 return len(job_dependencies - host_labels) == 0
378
379
380 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
381 queue_entry):
382 for label_id in host_labels:
383 label = self._labels[label_id]
384 if not label.only_if_needed:
385 # we don't care about non-only_if_needed labels
386 continue
387 if queue_entry.meta_host == label_id:
388 # if the label was requested in a metahost it's OK
389 continue
390 if label_id not in job_dependencies:
391 return False
392 return True
393
394
395 def _is_host_eligible_for_job(self, host_id, queue_entry):
396 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
397 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000398
399 acl = self._is_acl_accessible(host_id, queue_entry)
400 deps = self._check_job_dependencies(job_dependencies, host_labels)
401 only_if = self._check_only_if_needed_labels(job_dependencies,
402 host_labels, queue_entry)
403 return acl and deps and only_if
showard989f25d2008-10-01 11:38:11 +0000404
405
showard63a34772008-08-18 19:32:50 +0000406 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000407 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000408 return None
409 return self._hosts_available.pop(queue_entry.host_id, None)
410
411
412 def _is_host_usable(self, host_id):
413 if host_id not in self._hosts_available:
414 # host was already used during this scheduling cycle
415 return False
416 if self._hosts_available[host_id].invalid:
417 # Invalid hosts cannot be used for metahosts. They're included in
418 # the original query because they can be used by non-metahosts.
419 return False
420 return True
421
422
423 def _schedule_metahost(self, queue_entry):
424 label_id = queue_entry.meta_host
425 hosts_in_label = self._label_hosts.get(label_id, set())
426 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
427 set())
428
429 # must iterate over a copy so we can mutate the original while iterating
430 for host_id in list(hosts_in_label):
431 if not self._is_host_usable(host_id):
432 hosts_in_label.remove(host_id)
433 continue
434 if host_id in ineligible_host_ids:
435 continue
showard989f25d2008-10-01 11:38:11 +0000436 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000437 continue
438
439 hosts_in_label.remove(host_id)
440 return self._hosts_available.pop(host_id)
441 return None
442
443
444 def find_eligible_host(self, queue_entry):
445 if not queue_entry.meta_host:
446 return self._schedule_non_metahost(queue_entry)
447 return self._schedule_metahost(queue_entry)
448
449
mbligh36768f02008-02-22 18:28:33 +0000450class Dispatcher:
jadmanski0afbb632008-06-06 21:10:57 +0000451 autoserv_procs_cache = None
showard4c5374f2008-09-04 17:02:56 +0000452 max_running_processes = global_config.global_config.get_config_value(
jadmanski0afbb632008-06-06 21:10:57 +0000453 _global_config_section, 'max_running_jobs', type=int)
showard4c5374f2008-09-04 17:02:56 +0000454 max_processes_started_per_cycle = (
jadmanski0afbb632008-06-06 21:10:57 +0000455 global_config.global_config.get_config_value(
456 _global_config_section, 'max_jobs_started_per_cycle', type=int))
showard3bb499f2008-07-03 19:42:20 +0000457 clean_interval = (
458 global_config.global_config.get_config_value(
459 _global_config_section, 'clean_interval_minutes', type=int))
showard98863972008-10-29 21:14:56 +0000460 synch_job_start_timeout_minutes = (
461 global_config.global_config.get_config_value(
462 _global_config_section, 'synch_job_start_timeout_minutes',
463 type=int))
mbligh90a549d2008-03-25 23:52:34 +0000464
jadmanski0afbb632008-06-06 21:10:57 +0000465 def __init__(self):
466 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000467 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000468 self._host_scheduler = HostScheduler()
mbligh36768f02008-02-22 18:28:33 +0000469
mbligh36768f02008-02-22 18:28:33 +0000470
jadmanski0afbb632008-06-06 21:10:57 +0000471 def do_initial_recovery(self, recover_hosts=True):
472 # always recover processes
473 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000474
jadmanski0afbb632008-06-06 21:10:57 +0000475 if recover_hosts:
476 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000477
478
jadmanski0afbb632008-06-06 21:10:57 +0000479 def tick(self):
480 Dispatcher.autoserv_procs_cache = None
showarda3ab0d52008-11-03 19:03:47 +0000481 self._run_cleanup_maybe()
jadmanski0afbb632008-06-06 21:10:57 +0000482 self._find_aborting()
483 self._schedule_new_jobs()
484 self._handle_agents()
jadmanski0afbb632008-06-06 21:10:57 +0000485 email_manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000486
showard97aed502008-11-04 02:01:24 +0000487
showarda3ab0d52008-11-03 19:03:47 +0000488 def _run_cleanup_maybe(self):
489 if self._last_clean_time + self.clean_interval * 60 < time.time():
490 print 'Running cleanup'
491 self._abort_timed_out_jobs()
492 self._abort_jobs_past_synch_start_timeout()
493 self._clear_inactive_blocks()
showardfa8629c2008-11-04 16:51:23 +0000494 self._check_for_db_inconsistencies()
showarda3ab0d52008-11-03 19:03:47 +0000495 self._last_clean_time = time.time()
496
mbligh36768f02008-02-22 18:28:33 +0000497
jadmanski0afbb632008-06-06 21:10:57 +0000498 def add_agent(self, agent):
499 self._agents.append(agent)
500 agent.dispatcher = self
mblighd5c95802008-03-05 00:33:46 +0000501
jadmanski0afbb632008-06-06 21:10:57 +0000502 # Find agent corresponding to the specified queue_entry
503 def get_agents(self, queue_entry):
504 res_agents = []
505 for agent in self._agents:
506 if queue_entry.id in agent.queue_entry_ids:
507 res_agents.append(agent)
508 return res_agents
mbligh36768f02008-02-22 18:28:33 +0000509
510
jadmanski0afbb632008-06-06 21:10:57 +0000511 def remove_agent(self, agent):
512 self._agents.remove(agent)
showardec113162008-05-08 00:52:49 +0000513
514
showard4c5374f2008-09-04 17:02:56 +0000515 def num_running_processes(self):
516 return sum(agent.num_processes for agent in self._agents
517 if agent.is_running())
mblighbb421852008-03-11 22:36:16 +0000518
519
jadmanski0afbb632008-06-06 21:10:57 +0000520 @classmethod
521 def find_autoservs(cls, orphans_only=False):
522 """\
523 Returns a dict mapping pids to command lines for root autoserv
524 processes. If orphans_only=True, return only processes that
525 have been orphaned (i.e. parent pid = 1).
526 """
527 if cls.autoserv_procs_cache is not None:
528 return cls.autoserv_procs_cache
529
530 proc = subprocess.Popen(
531 ['/bin/ps', 'x', '-o', 'pid,pgid,ppid,comm,args'],
532 stdout=subprocess.PIPE)
533 # split each line into the four columns output by ps
534 procs = [line.split(None, 4) for line in
535 proc.communicate()[0].splitlines()]
536 autoserv_procs = {}
537 for proc in procs:
538 # check ppid == 1 for orphans
539 if orphans_only and proc[2] != 1:
540 continue
541 # only root autoserv processes have pgid == pid
542 if (proc[3] == 'autoserv' and # comm
543 proc[1] == proc[0]): # pgid == pid
544 # map pid to args
545 autoserv_procs[int(proc[0])] = proc[4]
546 cls.autoserv_procs_cache = autoserv_procs
547 return autoserv_procs
mblighbb421852008-03-11 22:36:16 +0000548
549
showard2bab8f42008-11-12 18:15:22 +0000550 def _recover_queue_entries(self, queue_entries, run_monitor):
551 assert len(queue_entries) > 0
552 queue_entry_ids = [entry.id for entry in queue_entries]
553 queue_task = RecoveryQueueTask(job=queue_entries[0].job,
554 queue_entries=queue_entries,
555 run_monitor=run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +0000556 self.add_agent(Agent(tasks=[queue_task],
showard2bab8f42008-11-12 18:15:22 +0000557 queue_entry_ids=queue_entry_ids))
mblighbb421852008-03-11 22:36:16 +0000558
559
jadmanski0afbb632008-06-06 21:10:57 +0000560 def _recover_processes(self):
561 orphans = self.find_autoservs(orphans_only=True)
mblighbb421852008-03-11 22:36:16 +0000562
jadmanski0afbb632008-06-06 21:10:57 +0000563 # first, recover running queue entries
564 rows = _db.execute("""SELECT * FROM host_queue_entries
565 WHERE status = 'Running'""")
566 queue_entries = [HostQueueEntry(row=i) for i in rows]
567 requeue_entries = []
568 recovered_entry_ids = set()
569 for queue_entry in queue_entries:
showard2bab8f42008-11-12 18:15:22 +0000570 run_monitor = PidfileRunMonitor(queue_entry.results_dir())
showard21baa452008-10-21 00:08:39 +0000571 if not run_monitor.has_pid():
jadmanski0afbb632008-06-06 21:10:57 +0000572 # autoserv apparently never got run, so requeue
573 requeue_entries.append(queue_entry)
574 continue
575 if queue_entry.id in recovered_entry_ids:
576 # synchronous job we've already recovered
577 continue
showard2bab8f42008-11-12 18:15:22 +0000578 job_tag = queue_entry.job.get_job_tag([queue_entry])
showard21baa452008-10-21 00:08:39 +0000579 pid = run_monitor.get_pid()
showard2bab8f42008-11-12 18:15:22 +0000580 print 'Recovering %s (pid %d)' % (queue_entry.id, pid)
showarde788ea62008-11-17 21:02:47 +0000581 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showard2bab8f42008-11-12 18:15:22 +0000582 recovered_entry_ids.union(entry.id for entry in queue_entries)
583 self._recover_queue_entries(queue_entries, run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +0000584 orphans.pop(pid, None)
mblighd5c95802008-03-05 00:33:46 +0000585
jadmanski0afbb632008-06-06 21:10:57 +0000586 # and requeue other active queue entries
587 rows = _db.execute("""SELECT * FROM host_queue_entries
588 WHERE active AND NOT complete
589 AND status != 'Running'
590 AND status != 'Pending'
591 AND status != 'Abort'
592 AND status != 'Aborting'""")
593 queue_entries = [HostQueueEntry(row=i) for i in rows]
594 for queue_entry in queue_entries + requeue_entries:
595 print 'Requeuing running QE %d' % queue_entry.id
596 queue_entry.clear_results_dir(dont_delete_files=True)
597 queue_entry.requeue()
mbligh90a549d2008-03-25 23:52:34 +0000598
599
jadmanski0afbb632008-06-06 21:10:57 +0000600 # now kill any remaining autoserv processes
601 for pid in orphans.keys():
602 print 'Killing orphan %d (%s)' % (pid, orphans[pid])
603 kill_autoserv(pid)
604
605 # recover aborting tasks
606 rebooting_host_ids = set()
607 rows = _db.execute("""SELECT * FROM host_queue_entries
608 WHERE status='Abort' or status='Aborting'""")
609 queue_entries = [HostQueueEntry(row=i) for i in rows]
610 for queue_entry in queue_entries:
611 print 'Recovering aborting QE %d' % queue_entry.id
showard1be97432008-10-17 15:30:45 +0000612 agent = queue_entry.abort()
613 self.add_agent(agent)
614 if queue_entry.get_host():
615 rebooting_host_ids.add(queue_entry.get_host().id)
jadmanski0afbb632008-06-06 21:10:57 +0000616
showard97aed502008-11-04 02:01:24 +0000617 self._recover_parsing_entries()
618
showard45ae8192008-11-05 19:32:53 +0000619 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000620 self._reverify_hosts_where("""(status = 'Repairing' OR
621 status = 'Verifying' OR
showard45ae8192008-11-05 19:32:53 +0000622 status = 'Cleaning')""",
jadmanski0afbb632008-06-06 21:10:57 +0000623 exclude_ids=rebooting_host_ids)
624
625 # finally, recover "Running" hosts with no active queue entries,
626 # although this should never happen
627 message = ('Recovering running host %s - this probably '
628 'indicates a scheduler bug')
629 self._reverify_hosts_where("""status = 'Running' AND
630 id NOT IN (SELECT host_id
631 FROM host_queue_entries
632 WHERE active)""",
633 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000634
635
jadmanski0afbb632008-06-06 21:10:57 +0000636 def _reverify_hosts_where(self, where,
637 print_message='Reverifying host %s',
638 exclude_ids=set()):
639 rows = _db.execute('SELECT * FROM hosts WHERE locked = 0 AND '
640 'invalid = 0 AND ' + where)
641 hosts = [Host(row=i) for i in rows]
642 for host in hosts:
643 if host.id in exclude_ids:
644 continue
645 if print_message is not None:
646 print print_message % host.hostname
647 verify_task = VerifyTask(host = host)
648 self.add_agent(Agent(tasks = [verify_task]))
mbligh36768f02008-02-22 18:28:33 +0000649
650
showard97aed502008-11-04 02:01:24 +0000651 def _recover_parsing_entries(self):
652 # make sure there are no old parsers running
653 os.system('killall parse')
654
showard2bab8f42008-11-12 18:15:22 +0000655 recovered_entry_ids = set()
showard97aed502008-11-04 02:01:24 +0000656 for entry in HostQueueEntry.fetch(where='status = "Parsing"'):
showard2bab8f42008-11-12 18:15:22 +0000657 if entry.id in recovered_entry_ids:
658 continue
659 queue_entries = entry.job.get_group_entries(entry)
660 recovered_entry_ids.union(entry.id for entry in queue_entries)
showard97aed502008-11-04 02:01:24 +0000661
662 reparse_task = FinalReparseTask(queue_entries)
663 self.add_agent(Agent([reparse_task]))
664
665
jadmanski0afbb632008-06-06 21:10:57 +0000666 def _recover_hosts(self):
667 # recover "Repair Failed" hosts
668 message = 'Reverifying dead host %s'
669 self._reverify_hosts_where("status = 'Repair Failed'",
670 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000671
672
showard3bb499f2008-07-03 19:42:20 +0000673 def _abort_timed_out_jobs(self):
674 """
675 Aborts all jobs that have timed out and not completed
676 """
showarda3ab0d52008-11-03 19:03:47 +0000677 query = models.Job.objects.filter(hostqueueentry__complete=False).extra(
678 where=['created_on + INTERVAL timeout HOUR < NOW()'])
679 for job in query.distinct():
680 print 'Aborting job %d due to job timeout' % job.id
681 job.abort(None)
showard3bb499f2008-07-03 19:42:20 +0000682
683
showard98863972008-10-29 21:14:56 +0000684 def _abort_jobs_past_synch_start_timeout(self):
685 """
686 Abort synchronous jobs that are past the start timeout (from global
687 config) and are holding a machine that's in everyone.
688 """
689 timeout_delta = datetime.timedelta(
690 minutes=self.synch_job_start_timeout_minutes)
691 timeout_start = datetime.datetime.now() - timeout_delta
692 query = models.Job.objects.filter(
showard98863972008-10-29 21:14:56 +0000693 created_on__lt=timeout_start,
694 hostqueueentry__status='Pending',
695 hostqueueentry__host__acl_group__name='Everyone')
696 for job in query.distinct():
697 print 'Aborting job %d due to start timeout' % job.id
698 job.abort(None)
699
700
jadmanski0afbb632008-06-06 21:10:57 +0000701 def _clear_inactive_blocks(self):
702 """
703 Clear out blocks for all completed jobs.
704 """
705 # this would be simpler using NOT IN (subquery), but MySQL
706 # treats all IN subqueries as dependent, so this optimizes much
707 # better
708 _db.execute("""
709 DELETE ihq FROM ineligible_host_queues ihq
showard4eaaf522008-06-06 22:28:07 +0000710 LEFT JOIN (SELECT DISTINCT job_id FROM host_queue_entries
jadmanski0afbb632008-06-06 21:10:57 +0000711 WHERE NOT complete) hqe
712 USING (job_id) WHERE hqe.job_id IS NULL""")
showard04c82c52008-05-29 19:38:12 +0000713
714
showardb95b1bd2008-08-15 18:11:04 +0000715 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000716 # prioritize by job priority, then non-metahost over metahost, then FIFO
717 return list(HostQueueEntry.fetch(
718 where='NOT complete AND NOT active',
showard3dd6b882008-10-27 19:21:39 +0000719 order_by='priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000720
721
jadmanski0afbb632008-06-06 21:10:57 +0000722 def _schedule_new_jobs(self):
723 print "finding work"
724
showard63a34772008-08-18 19:32:50 +0000725 queue_entries = self._get_pending_queue_entries()
726 if not queue_entries:
showardb95b1bd2008-08-15 18:11:04 +0000727 return
showardb95b1bd2008-08-15 18:11:04 +0000728
showard63a34772008-08-18 19:32:50 +0000729 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000730
showard63a34772008-08-18 19:32:50 +0000731 for queue_entry in queue_entries:
732 assigned_host = self._host_scheduler.find_eligible_host(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000733 if not assigned_host:
jadmanski0afbb632008-06-06 21:10:57 +0000734 continue
showardb95b1bd2008-08-15 18:11:04 +0000735 self._run_queue_entry(queue_entry, assigned_host)
736
737
738 def _run_queue_entry(self, queue_entry, host):
739 agent = queue_entry.run(assigned_host=host)
showard9976ce92008-10-15 20:28:13 +0000740 # in some cases (synchronous jobs with run_verify=False), agent may be None
741 if agent:
742 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000743
744
jadmanski0afbb632008-06-06 21:10:57 +0000745 def _find_aborting(self):
746 num_aborted = 0
747 # Find jobs that are aborting
748 for entry in queue_entries_to_abort():
749 agents_to_abort = self.get_agents(entry)
showard1be97432008-10-17 15:30:45 +0000750 for agent in agents_to_abort:
751 self.remove_agent(agent)
752
753 agent = entry.abort(agents_to_abort)
754 self.add_agent(agent)
jadmanski0afbb632008-06-06 21:10:57 +0000755 num_aborted += 1
756 if num_aborted >= 50:
757 break
758
759
showard4c5374f2008-09-04 17:02:56 +0000760 def _can_start_agent(self, agent, num_running_processes,
761 num_started_this_cycle, have_reached_limit):
762 # always allow zero-process agents to run
763 if agent.num_processes == 0:
764 return True
765 # don't allow any nonzero-process agents to run after we've reached a
766 # limit (this avoids starvation of many-process agents)
767 if have_reached_limit:
768 return False
769 # total process throttling
770 if (num_running_processes + agent.num_processes >
771 self.max_running_processes):
772 return False
773 # if a single agent exceeds the per-cycle throttling, still allow it to
774 # run when it's the first agent in the cycle
775 if num_started_this_cycle == 0:
776 return True
777 # per-cycle throttling
778 if (num_started_this_cycle + agent.num_processes >
779 self.max_processes_started_per_cycle):
780 return False
781 return True
782
783
jadmanski0afbb632008-06-06 21:10:57 +0000784 def _handle_agents(self):
showard4c5374f2008-09-04 17:02:56 +0000785 num_running_processes = self.num_running_processes()
jadmanski0afbb632008-06-06 21:10:57 +0000786 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000787 have_reached_limit = False
788 # iterate over copy, so we can remove agents during iteration
789 for agent in list(self._agents):
790 if agent.is_done():
jadmanski0afbb632008-06-06 21:10:57 +0000791 print "agent finished"
showard4c5374f2008-09-04 17:02:56 +0000792 self._agents.remove(agent)
showard4c5374f2008-09-04 17:02:56 +0000793 continue
794 if not agent.is_running():
795 if not self._can_start_agent(agent, num_running_processes,
796 num_started_this_cycle,
797 have_reached_limit):
798 have_reached_limit = True
799 continue
800 num_running_processes += agent.num_processes
801 num_started_this_cycle += agent.num_processes
802 agent.tick()
803 print num_running_processes, 'running processes'
mbligh36768f02008-02-22 18:28:33 +0000804
805
showardfa8629c2008-11-04 16:51:23 +0000806 def _check_for_db_inconsistencies(self):
807 query = models.HostQueueEntry.objects.filter(active=True, complete=True)
808 if query.count() != 0:
809 subject = ('%d queue entries found with active=complete=1'
810 % query.count())
811 message = '\n'.join(str(entry.get_object_dict())
812 for entry in query[:50])
813 if len(query) > 50:
814 message += '\n(truncated)\n'
815
816 print subject
817 email_manager.enqueue_notify_email(subject, message)
818
819
mbligh36768f02008-02-22 18:28:33 +0000820class RunMonitor(object):
jadmanski0afbb632008-06-06 21:10:57 +0000821 def __init__(self, cmd, nice_level = None, log_file = None):
822 self.nice_level = nice_level
823 self.log_file = log_file
824 self.cmd = cmd
showard2bab8f42008-11-12 18:15:22 +0000825 self.proc = None
mbligh36768f02008-02-22 18:28:33 +0000826
jadmanski0afbb632008-06-06 21:10:57 +0000827 def run(self):
828 if self.nice_level:
829 nice_cmd = ['nice','-n', str(self.nice_level)]
830 nice_cmd.extend(self.cmd)
831 self.cmd = nice_cmd
mbligh36768f02008-02-22 18:28:33 +0000832
jadmanski0afbb632008-06-06 21:10:57 +0000833 out_file = None
834 if self.log_file:
835 try:
836 os.makedirs(os.path.dirname(self.log_file))
837 except OSError, exc:
838 if exc.errno != errno.EEXIST:
839 log_stacktrace(
840 'Unexpected error creating logfile '
841 'directory for %s' % self.log_file)
842 try:
843 out_file = open(self.log_file, 'a')
844 out_file.write("\n%s\n" % ('*'*80))
845 out_file.write("%s> %s\n" %
846 (time.strftime("%X %x"),
847 self.cmd))
848 out_file.write("%s\n" % ('*'*80))
849 except (OSError, IOError):
850 log_stacktrace('Error opening log file %s' %
851 self.log_file)
mblighcadb3532008-04-15 17:46:26 +0000852
jadmanski0afbb632008-06-06 21:10:57 +0000853 if not out_file:
854 out_file = open('/dev/null', 'w')
mblighcadb3532008-04-15 17:46:26 +0000855
jadmanski0afbb632008-06-06 21:10:57 +0000856 in_devnull = open('/dev/null', 'r')
857 print "cmd = %s" % self.cmd
858 print "path = %s" % os.getcwd()
mbligh36768f02008-02-22 18:28:33 +0000859
jadmanski0afbb632008-06-06 21:10:57 +0000860 self.proc = subprocess.Popen(self.cmd, stdout=out_file,
861 stderr=subprocess.STDOUT,
862 stdin=in_devnull)
863 out_file.close()
864 in_devnull.close()
mbligh36768f02008-02-22 18:28:33 +0000865
866
showard2bab8f42008-11-12 18:15:22 +0000867 def has_pid(self):
868 return self.proc is not None
869
870
jadmanski0afbb632008-06-06 21:10:57 +0000871 def get_pid(self):
872 return self.proc.pid
mblighbb421852008-03-11 22:36:16 +0000873
874
jadmanski0afbb632008-06-06 21:10:57 +0000875 def kill(self):
showard2bab8f42008-11-12 18:15:22 +0000876 if self.has_pid():
877 kill_autoserv(self.get_pid(), self.exit_code)
mblighbb421852008-03-11 22:36:16 +0000878
mbligh36768f02008-02-22 18:28:33 +0000879
jadmanski0afbb632008-06-06 21:10:57 +0000880 def exit_code(self):
881 return self.proc.poll()
mbligh36768f02008-02-22 18:28:33 +0000882
883
mblighbb421852008-03-11 22:36:16 +0000884class PidfileException(Exception):
jadmanski0afbb632008-06-06 21:10:57 +0000885 """\
886 Raised when there's some unexpected behavior with the pid file.
887 """
mblighbb421852008-03-11 22:36:16 +0000888
889
890class PidfileRunMonitor(RunMonitor):
showard21baa452008-10-21 00:08:39 +0000891 class PidfileState(object):
892 pid = None
893 exit_status = None
894 num_tests_failed = None
895
896 def reset(self):
897 self.pid = self.exit_status = self.all_tests_passed = None
898
899
jadmanski0afbb632008-06-06 21:10:57 +0000900 def __init__(self, results_dir, cmd=None, nice_level=None,
901 log_file=None):
902 self.results_dir = os.path.abspath(results_dir)
903 self.pid_file = os.path.join(results_dir, AUTOSERV_PID_FILE)
904 self.lost_process = False
905 self.start_time = time.time()
showard21baa452008-10-21 00:08:39 +0000906 self._state = self.PidfileState()
showardb376bc52008-06-13 20:48:45 +0000907 super(PidfileRunMonitor, self).__init__(cmd, nice_level, log_file)
mblighbb421852008-03-11 22:36:16 +0000908
909
showard21baa452008-10-21 00:08:39 +0000910 def has_pid(self):
911 self._get_pidfile_info()
912 return self._state.pid is not None
913
914
jadmanski0afbb632008-06-06 21:10:57 +0000915 def get_pid(self):
showard21baa452008-10-21 00:08:39 +0000916 self._get_pidfile_info()
917 assert self._state.pid is not None
918 return self._state.pid
mblighbb421852008-03-11 22:36:16 +0000919
920
jadmanski0afbb632008-06-06 21:10:57 +0000921 def _check_command_line(self, command_line, spacer=' ',
922 print_error=False):
923 results_dir_arg = spacer.join(('', '-r', self.results_dir, ''))
924 match = results_dir_arg in command_line
925 if print_error and not match:
926 print '%s not found in %s' % (repr(results_dir_arg),
927 repr(command_line))
928 return match
mbligh90a549d2008-03-25 23:52:34 +0000929
930
showard21baa452008-10-21 00:08:39 +0000931 def _check_proc_fs(self):
932 cmdline_path = os.path.join('/proc', str(self._state.pid), 'cmdline')
jadmanski0afbb632008-06-06 21:10:57 +0000933 try:
934 cmdline_file = open(cmdline_path, 'r')
935 cmdline = cmdline_file.read().strip()
936 cmdline_file.close()
937 except IOError:
938 return False
939 # /proc/.../cmdline has \x00 separating args
940 return self._check_command_line(cmdline, spacer='\x00',
941 print_error=True)
mblighbb421852008-03-11 22:36:16 +0000942
943
showard21baa452008-10-21 00:08:39 +0000944 def _read_pidfile(self):
945 self._state.reset()
jadmanski0afbb632008-06-06 21:10:57 +0000946 if not os.path.exists(self.pid_file):
showard21baa452008-10-21 00:08:39 +0000947 return
jadmanski0afbb632008-06-06 21:10:57 +0000948 file_obj = open(self.pid_file, 'r')
949 lines = file_obj.readlines()
950 file_obj.close()
showard3dd6b882008-10-27 19:21:39 +0000951 if not lines:
952 return
953 if len(lines) > 3:
showard21baa452008-10-21 00:08:39 +0000954 raise PidfileException('Corrupt pid file (%d lines) at %s:\n%s' %
955 (len(lines), self.pid_file, lines))
jadmanski0afbb632008-06-06 21:10:57 +0000956 try:
showard21baa452008-10-21 00:08:39 +0000957 self._state.pid = int(lines[0])
958 if len(lines) > 1:
959 self._state.exit_status = int(lines[1])
960 if len(lines) == 3:
961 self._state.num_tests_failed = int(lines[2])
962 else:
963 # maintain backwards-compatibility with two-line pidfiles
964 self._state.num_tests_failed = 0
jadmanski0afbb632008-06-06 21:10:57 +0000965 except ValueError, exc:
showard3dd6b882008-10-27 19:21:39 +0000966 raise PidfileException('Corrupt pid file: ' + str(exc.args))
mblighbb421852008-03-11 22:36:16 +0000967
mblighbb421852008-03-11 22:36:16 +0000968
jadmanski0afbb632008-06-06 21:10:57 +0000969 def _find_autoserv_proc(self):
970 autoserv_procs = Dispatcher.find_autoservs()
971 for pid, args in autoserv_procs.iteritems():
972 if self._check_command_line(args):
973 return pid, args
974 return None, None
mbligh90a549d2008-03-25 23:52:34 +0000975
976
showard21baa452008-10-21 00:08:39 +0000977 def _handle_pidfile_error(self, error, message=''):
978 message = error + '\nPid: %s\nPidfile: %s\n%s' % (self._state.pid,
979 self.pid_file,
980 message)
981 print message
982 email_manager.enqueue_notify_email(error, message)
983 if self._state.pid is not None:
984 pid = self._state.pid
985 else:
986 pid = 0
987 self.on_lost_process(pid)
988
989
990 def _get_pidfile_info_helper(self):
jadmanski0afbb632008-06-06 21:10:57 +0000991 if self.lost_process:
showard21baa452008-10-21 00:08:39 +0000992 return
mblighbb421852008-03-11 22:36:16 +0000993
showard21baa452008-10-21 00:08:39 +0000994 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +0000995
showard21baa452008-10-21 00:08:39 +0000996 if self._state.pid is None:
997 self._handle_no_pid()
998 return
mbligh90a549d2008-03-25 23:52:34 +0000999
showard21baa452008-10-21 00:08:39 +00001000 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001001 # double check whether or not autoserv is running
showard21baa452008-10-21 00:08:39 +00001002 proc_running = self._check_proc_fs()
jadmanski0afbb632008-06-06 21:10:57 +00001003 if proc_running:
showard21baa452008-10-21 00:08:39 +00001004 return
mbligh90a549d2008-03-25 23:52:34 +00001005
jadmanski0afbb632008-06-06 21:10:57 +00001006 # pid but no process - maybe process *just* exited
showard21baa452008-10-21 00:08:39 +00001007 self._read_pidfile()
1008 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001009 # autoserv exited without writing an exit code
1010 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001011 self._handle_pidfile_error(
1012 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001013
showard21baa452008-10-21 00:08:39 +00001014
1015 def _get_pidfile_info(self):
1016 """\
1017 After completion, self._state will contain:
1018 pid=None, exit_status=None if autoserv has not yet run
1019 pid!=None, exit_status=None if autoserv is running
1020 pid!=None, exit_status!=None if autoserv has completed
1021 """
1022 try:
1023 self._get_pidfile_info_helper()
1024 except PidfileException, exc:
1025 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001026
1027
jadmanski0afbb632008-06-06 21:10:57 +00001028 def _handle_no_pid(self):
1029 """\
1030 Called when no pidfile is found or no pid is in the pidfile.
1031 """
1032 # is autoserv running?
1033 pid, args = self._find_autoserv_proc()
1034 if pid is None:
1035 # no autoserv process running
1036 message = 'No pid found at ' + self.pid_file
1037 else:
1038 message = ("Process %d (%s) hasn't written pidfile %s" %
1039 (pid, args, self.pid_file))
mbligh90a549d2008-03-25 23:52:34 +00001040
jadmanski0afbb632008-06-06 21:10:57 +00001041 print message
1042 if time.time() - self.start_time > PIDFILE_TIMEOUT:
1043 email_manager.enqueue_notify_email(
1044 'Process has failed to write pidfile', message)
1045 if pid is not None:
1046 kill_autoserv(pid)
1047 else:
1048 pid = 0
1049 self.on_lost_process(pid)
showard21baa452008-10-21 00:08:39 +00001050 return
mbligh90a549d2008-03-25 23:52:34 +00001051
1052
jadmanski0afbb632008-06-06 21:10:57 +00001053 def on_lost_process(self, pid):
1054 """\
1055 Called when autoserv has exited without writing an exit status,
1056 or we've timed out waiting for autoserv to write a pid to the
1057 pidfile. In either case, we just return failure and the caller
1058 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001059
jadmanski0afbb632008-06-06 21:10:57 +00001060 pid is unimportant here, as it shouldn't be used by anyone.
1061 """
1062 self.lost_process = True
showard21baa452008-10-21 00:08:39 +00001063 self._state.pid = pid
1064 self._state.exit_status = 1
1065 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001066
1067
jadmanski0afbb632008-06-06 21:10:57 +00001068 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001069 self._get_pidfile_info()
1070 return self._state.exit_status
1071
1072
1073 def num_tests_failed(self):
1074 self._get_pidfile_info()
1075 assert self._state.num_tests_failed is not None
1076 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001077
1078
mbligh36768f02008-02-22 18:28:33 +00001079class Agent(object):
showard4c5374f2008-09-04 17:02:56 +00001080 def __init__(self, tasks, queue_entry_ids=[], num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001081 self.active_task = None
1082 self.queue = Queue.Queue(0)
1083 self.dispatcher = None
1084 self.queue_entry_ids = queue_entry_ids
showard4c5374f2008-09-04 17:02:56 +00001085 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001086
1087 for task in tasks:
1088 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001089
1090
jadmanski0afbb632008-06-06 21:10:57 +00001091 def add_task(self, task):
1092 self.queue.put_nowait(task)
1093 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001094
1095
jadmanski0afbb632008-06-06 21:10:57 +00001096 def tick(self):
showard21baa452008-10-21 00:08:39 +00001097 while not self.is_done():
1098 if self.active_task and not self.active_task.is_done():
1099 self.active_task.poll()
1100 if not self.active_task.is_done():
1101 return
1102 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001103
1104
jadmanski0afbb632008-06-06 21:10:57 +00001105 def _next_task(self):
1106 print "agent picking task"
1107 if self.active_task:
1108 assert self.active_task.is_done()
mbligh36768f02008-02-22 18:28:33 +00001109
jadmanski0afbb632008-06-06 21:10:57 +00001110 if not self.active_task.success:
1111 self.on_task_failure()
mblighe2586682008-02-29 22:45:46 +00001112
jadmanski0afbb632008-06-06 21:10:57 +00001113 self.active_task = None
1114 if not self.is_done():
1115 self.active_task = self.queue.get_nowait()
1116 if self.active_task:
1117 self.active_task.start()
mbligh36768f02008-02-22 18:28:33 +00001118
1119
jadmanski0afbb632008-06-06 21:10:57 +00001120 def on_task_failure(self):
1121 self.queue = Queue.Queue(0)
1122 for task in self.active_task.failure_tasks:
1123 self.add_task(task)
mbligh16c722d2008-03-05 00:58:44 +00001124
mblighe2586682008-02-29 22:45:46 +00001125
showard4c5374f2008-09-04 17:02:56 +00001126 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001127 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001128
1129
jadmanski0afbb632008-06-06 21:10:57 +00001130 def is_done(self):
1131 return self.active_task == None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001132
1133
jadmanski0afbb632008-06-06 21:10:57 +00001134 def start(self):
1135 assert self.dispatcher
mbligh36768f02008-02-22 18:28:33 +00001136
jadmanski0afbb632008-06-06 21:10:57 +00001137 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001138
jadmanski0afbb632008-06-06 21:10:57 +00001139
mbligh36768f02008-02-22 18:28:33 +00001140class AgentTask(object):
jadmanski0afbb632008-06-06 21:10:57 +00001141 def __init__(self, cmd, failure_tasks = []):
1142 self.done = False
1143 self.failure_tasks = failure_tasks
1144 self.started = False
1145 self.cmd = cmd
1146 self.task = None
1147 self.agent = None
1148 self.monitor = None
1149 self.success = None
mbligh36768f02008-02-22 18:28:33 +00001150
1151
jadmanski0afbb632008-06-06 21:10:57 +00001152 def poll(self):
1153 print "poll"
1154 if self.monitor:
1155 self.tick(self.monitor.exit_code())
1156 else:
1157 self.finished(False)
mbligh36768f02008-02-22 18:28:33 +00001158
1159
jadmanski0afbb632008-06-06 21:10:57 +00001160 def tick(self, exit_code):
1161 if exit_code==None:
1162 return
1163# print "exit_code was %d" % exit_code
1164 if exit_code == 0:
1165 success = True
1166 else:
1167 success = False
mbligh36768f02008-02-22 18:28:33 +00001168
jadmanski0afbb632008-06-06 21:10:57 +00001169 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001170
1171
jadmanski0afbb632008-06-06 21:10:57 +00001172 def is_done(self):
1173 return self.done
mbligh36768f02008-02-22 18:28:33 +00001174
1175
jadmanski0afbb632008-06-06 21:10:57 +00001176 def finished(self, success):
1177 self.done = True
1178 self.success = success
1179 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001180
1181
jadmanski0afbb632008-06-06 21:10:57 +00001182 def prolog(self):
1183 pass
mblighd64e5702008-04-04 21:39:28 +00001184
1185
jadmanski0afbb632008-06-06 21:10:57 +00001186 def create_temp_resultsdir(self, suffix=''):
1187 self.temp_results_dir = tempfile.mkdtemp(suffix=suffix)
mblighd64e5702008-04-04 21:39:28 +00001188
mbligh36768f02008-02-22 18:28:33 +00001189
jadmanski0afbb632008-06-06 21:10:57 +00001190 def cleanup(self):
1191 if (hasattr(self, 'temp_results_dir') and
1192 os.path.exists(self.temp_results_dir)):
1193 shutil.rmtree(self.temp_results_dir)
mbligh36768f02008-02-22 18:28:33 +00001194
1195
jadmanski0afbb632008-06-06 21:10:57 +00001196 def epilog(self):
1197 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001198
1199
jadmanski0afbb632008-06-06 21:10:57 +00001200 def start(self):
1201 assert self.agent
1202
1203 if not self.started:
1204 self.prolog()
1205 self.run()
1206
1207 self.started = True
1208
1209
1210 def abort(self):
1211 if self.monitor:
1212 self.monitor.kill()
1213 self.done = True
1214 self.cleanup()
1215
1216
1217 def run(self):
1218 if self.cmd:
1219 print "agent starting monitor"
1220 log_file = None
showard97aed502008-11-04 02:01:24 +00001221 if hasattr(self, 'log_file'):
1222 log_file = self.log_file
1223 elif hasattr(self, 'host'):
jadmanski0afbb632008-06-06 21:10:57 +00001224 log_file = os.path.join(RESULTS_DIR, 'hosts',
1225 self.host.hostname)
1226 self.monitor = RunMonitor(
showard97aed502008-11-04 02:01:24 +00001227 self.cmd, nice_level=AUTOSERV_NICE_LEVEL, log_file=log_file)
jadmanski0afbb632008-06-06 21:10:57 +00001228 self.monitor.run()
mbligh36768f02008-02-22 18:28:33 +00001229
1230
1231class RepairTask(AgentTask):
showarde788ea62008-11-17 21:02:47 +00001232 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001233 """\
1234 fail_queue_entry: queue entry to mark failed if this repair
1235 fails.
1236 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001237 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001238 # normalize the protection name
1239 protection = host_protections.Protection.get_attr_name(protection)
jadmanski0afbb632008-06-06 21:10:57 +00001240 self.create_temp_resultsdir('.repair')
1241 cmd = [_autoserv_path , '-R', '-m', host.hostname,
jadmanskifb7cfb12008-07-09 14:13:21 +00001242 '-r', self.temp_results_dir, '--host-protection', protection]
jadmanski0afbb632008-06-06 21:10:57 +00001243 self.host = host
showarde788ea62008-11-17 21:02:47 +00001244 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001245 super(RepairTask, self).__init__(cmd)
mblighe2586682008-02-29 22:45:46 +00001246
mbligh36768f02008-02-22 18:28:33 +00001247
jadmanski0afbb632008-06-06 21:10:57 +00001248 def prolog(self):
1249 print "repair_task starting"
1250 self.host.set_status('Repairing')
showarde788ea62008-11-17 21:02:47 +00001251 if self.queue_entry:
1252 self.queue_entry.requeue()
mbligh36768f02008-02-22 18:28:33 +00001253
1254
jadmanski0afbb632008-06-06 21:10:57 +00001255 def epilog(self):
1256 super(RepairTask, self).epilog()
1257 if self.success:
1258 self.host.set_status('Ready')
1259 else:
1260 self.host.set_status('Repair Failed')
showarde788ea62008-11-17 21:02:47 +00001261 if self.queue_entry and not self.queue_entry.meta_host:
1262 self.queue_entry.handle_host_failure()
mbligh36768f02008-02-22 18:28:33 +00001263
1264
showard8fe93b52008-11-18 17:53:22 +00001265class PreJobTask(AgentTask):
1266 def prolog(self):
1267 super(PreJobTask, self).prolog()
1268 if self.queue_entry:
1269 # clear any possibly existing results, could be a previously failed
1270 # verify or a previous execution that crashed
1271 self.queue_entry.clear_results_dir()
1272
1273
1274 def cleanup(self):
1275 if not os.path.exists(self.temp_results_dir):
1276 return
1277 should_copy_results = (self.queue_entry and not self.success
1278 and not self.queue_entry.meta_host)
1279 if should_copy_results:
1280 self.queue_entry.set_execution_subdir()
1281 self._move_results()
1282 super(PreJobTask, self).cleanup()
1283
1284
1285 def _move_results(self):
1286 assert self.queue_entry is not None
1287 target_dir = self.queue_entry.results_dir()
1288 ensure_directory_exists(target_dir)
1289 files = os.listdir(self.temp_results_dir)
1290 for filename in files:
1291 if filename == AUTOSERV_PID_FILE:
1292 continue
1293 self._force_move(os.path.join(self.temp_results_dir, filename),
1294 os.path.join(target_dir, filename))
1295
1296
1297 @staticmethod
1298 def _force_move(source, dest):
1299 """\
1300 Replacement for shutil.move() that will delete the destination
1301 if it exists, even if it's a directory.
1302 """
1303 if os.path.exists(dest):
1304 warning = 'Warning: removing existing destination file ' + dest
1305 print warning
1306 email_manager.enqueue_notify_email(warning, warning)
1307 remove_file_or_dir(dest)
1308 shutil.move(source, dest)
1309
1310
1311class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001312 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001313 assert bool(queue_entry) != bool(host)
mbligh36768f02008-02-22 18:28:33 +00001314
jadmanski0afbb632008-06-06 21:10:57 +00001315 self.host = host or queue_entry.host
1316 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001317
jadmanski0afbb632008-06-06 21:10:57 +00001318 self.create_temp_resultsdir('.verify')
showard3d9899a2008-07-31 02:11:58 +00001319
showard2bab8f42008-11-12 18:15:22 +00001320 cmd = [_autoserv_path, '-v', '-m', self.host.hostname, '-r',
1321 self.temp_results_dir]
mbligh36768f02008-02-22 18:28:33 +00001322
showarde788ea62008-11-17 21:02:47 +00001323 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
mblighe2586682008-02-29 22:45:46 +00001324
showard2bab8f42008-11-12 18:15:22 +00001325 super(VerifyTask, self).__init__(cmd, failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001326
1327
jadmanski0afbb632008-06-06 21:10:57 +00001328 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001329 super(VerifyTask, self).prolog()
jadmanski0afbb632008-06-06 21:10:57 +00001330 print "starting verify on %s" % (self.host.hostname)
1331 if self.queue_entry:
1332 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001333 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001334
1335
jadmanski0afbb632008-06-06 21:10:57 +00001336 def epilog(self):
1337 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001338
jadmanski0afbb632008-06-06 21:10:57 +00001339 if self.success:
1340 self.host.set_status('Ready')
showard2bab8f42008-11-12 18:15:22 +00001341 if self.queue_entry:
1342 agent = self.queue_entry.on_pending()
1343 if agent:
1344 self.agent.dispatcher.add_agent(agent)
mbligh36768f02008-02-22 18:28:33 +00001345
1346
mbligh36768f02008-02-22 18:28:33 +00001347class QueueTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001348 def __init__(self, job, queue_entries, cmd):
1349 super(QueueTask, self).__init__(cmd)
1350 self.job = job
1351 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001352
1353
jadmanski0afbb632008-06-06 21:10:57 +00001354 @staticmethod
showardd8e548a2008-09-09 03:04:57 +00001355 def _write_keyval(keyval_dir, field, value, keyval_filename='keyval'):
1356 key_path = os.path.join(keyval_dir, keyval_filename)
jadmanski0afbb632008-06-06 21:10:57 +00001357 keyval_file = open(key_path, 'a')
showardd8e548a2008-09-09 03:04:57 +00001358 print >> keyval_file, '%s=%s' % (field, str(value))
jadmanski0afbb632008-06-06 21:10:57 +00001359 keyval_file.close()
mbligh36768f02008-02-22 18:28:33 +00001360
1361
showardd8e548a2008-09-09 03:04:57 +00001362 def _host_keyval_dir(self):
1363 return os.path.join(self.results_dir(), 'host_keyvals')
1364
1365
1366 def _write_host_keyval(self, host):
1367 labels = ','.join(host.labels())
1368 self._write_keyval(self._host_keyval_dir(), 'labels', labels,
1369 keyval_filename=host.hostname)
1370
1371 def _create_host_keyval_dir(self):
1372 directory = self._host_keyval_dir()
showard2bab8f42008-11-12 18:15:22 +00001373 ensure_directory_exists(directory)
showardd8e548a2008-09-09 03:04:57 +00001374
1375
jadmanski0afbb632008-06-06 21:10:57 +00001376 def results_dir(self):
1377 return self.queue_entries[0].results_dir()
mblighbb421852008-03-11 22:36:16 +00001378
1379
jadmanski0afbb632008-06-06 21:10:57 +00001380 def run(self):
1381 """\
1382 Override AgentTask.run() so we can use a PidfileRunMonitor.
1383 """
1384 self.monitor = PidfileRunMonitor(self.results_dir(),
1385 cmd=self.cmd,
1386 nice_level=AUTOSERV_NICE_LEVEL)
1387 self.monitor.run()
mblighbb421852008-03-11 22:36:16 +00001388
1389
jadmanski0afbb632008-06-06 21:10:57 +00001390 def prolog(self):
1391 # write some job timestamps into the job keyval file
1392 queued = time.mktime(self.job.created_on.timetuple())
1393 started = time.time()
showardd8e548a2008-09-09 03:04:57 +00001394 self._write_keyval(self.results_dir(), "job_queued", int(queued))
1395 self._write_keyval(self.results_dir(), "job_started", int(started))
1396 self._create_host_keyval_dir()
jadmanski0afbb632008-06-06 21:10:57 +00001397 for queue_entry in self.queue_entries:
showardd8e548a2008-09-09 03:04:57 +00001398 self._write_host_keyval(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001399 queue_entry.set_status('Running')
1400 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001401 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001402 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001403 assert len(self.queue_entries) == 1
1404 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001405
1406
showard97aed502008-11-04 02:01:24 +00001407 def _finish_task(self, success):
jadmanski0afbb632008-06-06 21:10:57 +00001408 # write out the finished time into the results keyval
1409 finished = time.time()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001410 self._write_keyval(self.results_dir(), "job_finished", int(finished))
jadmanskic2ac77f2008-05-16 21:44:04 +00001411
jadmanski0afbb632008-06-06 21:10:57 +00001412 # parse the results of the job
showard97aed502008-11-04 02:01:24 +00001413 reparse_task = FinalReparseTask(self.queue_entries)
1414 self.agent.dispatcher.add_agent(Agent([reparse_task]))
jadmanskif7fa2cc2008-10-01 14:13:23 +00001415
1416
showardcbd74612008-11-19 21:42:02 +00001417 def _write_status_comment(self, comment):
1418 status_log = open(os.path.join(self.results_dir(), 'status.log'), 'a')
1419 status_log.write('INFO\t----\t----\t' + comment)
1420 status_log.close()
1421
1422
jadmanskif7fa2cc2008-10-01 14:13:23 +00001423 def _log_abort(self):
1424 # build up sets of all the aborted_by and aborted_on values
1425 aborted_by, aborted_on = set(), set()
1426 for queue_entry in self.queue_entries:
1427 if queue_entry.aborted_by:
1428 aborted_by.add(queue_entry.aborted_by)
1429 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1430 aborted_on.add(t)
1431
1432 # extract some actual, unique aborted by value and write it out
1433 assert len(aborted_by) <= 1
1434 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001435 aborted_by_value = aborted_by.pop()
1436 aborted_on_value = max(aborted_on)
1437 else:
1438 aborted_by_value = 'autotest_system'
1439 aborted_on_value = int(time.time())
1440 results_dir = self.results_dir()
1441 self._write_keyval(results_dir, "aborted_by", aborted_by_value)
1442 self._write_keyval(results_dir, "aborted_on", aborted_on_value)
1443 aborted_on_string = str(datetime.datetime.fromtimestamp(
1444 aborted_on_value))
1445 self._write_status_comment('Job aborted by %s on %s' %
1446 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001447
1448
jadmanski0afbb632008-06-06 21:10:57 +00001449 def abort(self):
1450 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001451 self._log_abort()
showard97aed502008-11-04 02:01:24 +00001452 self._finish_task(False)
jadmanskic2ac77f2008-05-16 21:44:04 +00001453
1454
showard21baa452008-10-21 00:08:39 +00001455 def _reboot_hosts(self):
1456 reboot_after = self.job.reboot_after
1457 do_reboot = False
showard0fc38302008-10-23 00:44:07 +00001458 if reboot_after == models.RebootAfter.ALWAYS:
showard21baa452008-10-21 00:08:39 +00001459 do_reboot = True
showard0fc38302008-10-23 00:44:07 +00001460 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
showard21baa452008-10-21 00:08:39 +00001461 num_tests_failed = self.monitor.num_tests_failed()
1462 do_reboot = (self.success and num_tests_failed == 0)
1463
showard8ebca792008-11-04 21:54:22 +00001464 for queue_entry in self.queue_entries:
1465 if do_reboot:
showard45ae8192008-11-05 19:32:53 +00001466 # don't pass the queue entry to the CleanupTask. if the cleanup
showardfa8629c2008-11-04 16:51:23 +00001467 # fails, the job doesn't care -- it's over.
showard45ae8192008-11-05 19:32:53 +00001468 cleanup_task = CleanupTask(host=queue_entry.get_host())
1469 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
showard8ebca792008-11-04 21:54:22 +00001470 else:
1471 queue_entry.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001472
1473
jadmanski0afbb632008-06-06 21:10:57 +00001474 def epilog(self):
1475 super(QueueTask, self).epilog()
jadmanski0afbb632008-06-06 21:10:57 +00001476 for queue_entry in self.queue_entries:
showard97aed502008-11-04 02:01:24 +00001477 # set status to PARSING here so queue entry is marked complete
1478 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
mbligh36768f02008-02-22 18:28:33 +00001479
showard97aed502008-11-04 02:01:24 +00001480 self._finish_task(self.success)
showard21baa452008-10-21 00:08:39 +00001481 self._reboot_hosts()
mblighbb421852008-03-11 22:36:16 +00001482
showard97aed502008-11-04 02:01:24 +00001483 print "queue_task finished with succes=%s" % self.success
mbligh36768f02008-02-22 18:28:33 +00001484
1485
mblighbb421852008-03-11 22:36:16 +00001486class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001487 def __init__(self, job, queue_entries, run_monitor):
1488 super(RecoveryQueueTask, self).__init__(job,
1489 queue_entries, cmd=None)
1490 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001491
1492
jadmanski0afbb632008-06-06 21:10:57 +00001493 def run(self):
1494 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001495
1496
jadmanski0afbb632008-06-06 21:10:57 +00001497 def prolog(self):
1498 # recovering an existing process - don't do prolog
1499 pass
mblighbb421852008-03-11 22:36:16 +00001500
1501
showard8fe93b52008-11-18 17:53:22 +00001502class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001503 def __init__(self, host=None, queue_entry=None):
1504 assert bool(host) ^ bool(queue_entry)
1505 if queue_entry:
1506 host = queue_entry.get_host()
jadmanski0afbb632008-06-06 21:10:57 +00001507
showard45ae8192008-11-05 19:32:53 +00001508 self.create_temp_resultsdir('.cleanup')
1509 self.cmd = [_autoserv_path, '--cleanup', '-m', host.hostname,
1510 '-r', self.temp_results_dir]
showardfa8629c2008-11-04 16:51:23 +00001511 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001512 self.host = host
showarde788ea62008-11-17 21:02:47 +00001513 repair_task = RepairTask(host, queue_entry=queue_entry)
showard45ae8192008-11-05 19:32:53 +00001514 super(CleanupTask, self).__init__(self.cmd, failure_tasks=[repair_task])
mbligh16c722d2008-03-05 00:58:44 +00001515
mblighd5c95802008-03-05 00:33:46 +00001516
jadmanski0afbb632008-06-06 21:10:57 +00001517 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001518 super(CleanupTask, self).prolog()
showard45ae8192008-11-05 19:32:53 +00001519 print "starting cleanup task for host: %s" % self.host.hostname
1520 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001521
mblighd5c95802008-03-05 00:33:46 +00001522
showard21baa452008-10-21 00:08:39 +00001523 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001524 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001525 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001526 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001527 self.host.update_field('dirty', 0)
1528
1529
mblighd5c95802008-03-05 00:33:46 +00001530class AbortTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001531 def __init__(self, queue_entry, agents_to_abort):
1532 self.queue_entry = queue_entry
1533 self.agents_to_abort = agents_to_abort
jadmanski0afbb632008-06-06 21:10:57 +00001534 super(AbortTask, self).__init__('')
mbligh36768f02008-02-22 18:28:33 +00001535
1536
jadmanski0afbb632008-06-06 21:10:57 +00001537 def prolog(self):
1538 print "starting abort on host %s, job %s" % (
1539 self.queue_entry.host_id, self.queue_entry.job_id)
mbligh36768f02008-02-22 18:28:33 +00001540
mblighd64e5702008-04-04 21:39:28 +00001541
jadmanski0afbb632008-06-06 21:10:57 +00001542 def epilog(self):
1543 super(AbortTask, self).epilog()
1544 self.queue_entry.set_status('Aborted')
1545 self.success = True
1546
1547
1548 def run(self):
1549 for agent in self.agents_to_abort:
1550 if (agent.active_task):
1551 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +00001552
1553
showard97aed502008-11-04 02:01:24 +00001554class FinalReparseTask(AgentTask):
1555 MAX_PARSE_PROCESSES = (
1556 global_config.global_config.get_config_value(
1557 _global_config_section, 'max_parse_processes', type=int))
1558 _num_running_parses = 0
1559
1560 def __init__(self, queue_entries):
1561 self._queue_entries = queue_entries
1562 self._parse_started = False
1563
1564 assert len(queue_entries) > 0
1565 queue_entry = queue_entries[0]
showard97aed502008-11-04 02:01:24 +00001566
1567 if _testing_mode:
1568 self.cmd = 'true'
1569 return
1570
1571 self._results_dir = queue_entry.results_dir()
1572 self.log_file = os.path.abspath(os.path.join(self._results_dir,
1573 '.parse.log'))
1574 super(FinalReparseTask, self).__init__(
showard2bab8f42008-11-12 18:15:22 +00001575 cmd=self._generate_parse_command())
showard97aed502008-11-04 02:01:24 +00001576
1577
1578 @classmethod
1579 def _increment_running_parses(cls):
1580 cls._num_running_parses += 1
1581
1582
1583 @classmethod
1584 def _decrement_running_parses(cls):
1585 cls._num_running_parses -= 1
1586
1587
1588 @classmethod
1589 def _can_run_new_parse(cls):
1590 return cls._num_running_parses < cls.MAX_PARSE_PROCESSES
1591
1592
1593 def prolog(self):
1594 super(FinalReparseTask, self).prolog()
1595 for queue_entry in self._queue_entries:
1596 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
1597
1598
1599 def epilog(self):
1600 super(FinalReparseTask, self).epilog()
1601 final_status = self._determine_final_status()
1602 for queue_entry in self._queue_entries:
1603 queue_entry.set_status(final_status)
1604
1605
1606 def _determine_final_status(self):
1607 # use a PidfileRunMonitor to read the autoserv exit status
1608 monitor = PidfileRunMonitor(self._results_dir)
1609 if monitor.exit_code() == 0:
1610 return models.HostQueueEntry.Status.COMPLETED
1611 return models.HostQueueEntry.Status.FAILED
1612
1613
showard2bab8f42008-11-12 18:15:22 +00001614 def _generate_parse_command(self):
showard97aed502008-11-04 02:01:24 +00001615 parse = os.path.abspath(os.path.join(AUTOTEST_TKO_DIR, 'parse'))
showard2bab8f42008-11-12 18:15:22 +00001616 return [parse, '-l', '2', '-r', '-o', self._results_dir]
showard97aed502008-11-04 02:01:24 +00001617
1618
1619 def poll(self):
1620 # override poll to keep trying to start until the parse count goes down
1621 # and we can, at which point we revert to default behavior
1622 if self._parse_started:
1623 super(FinalReparseTask, self).poll()
1624 else:
1625 self._try_starting_parse()
1626
1627
1628 def run(self):
1629 # override run() to not actually run unless we can
1630 self._try_starting_parse()
1631
1632
1633 def _try_starting_parse(self):
1634 if not self._can_run_new_parse():
1635 return
1636 # actually run the parse command
1637 super(FinalReparseTask, self).run()
1638 self._increment_running_parses()
1639 self._parse_started = True
1640
1641
1642 def finished(self, success):
1643 super(FinalReparseTask, self).finished(success)
1644 self._decrement_running_parses()
1645
1646
mbligh36768f02008-02-22 18:28:33 +00001647class DBObject(object):
jadmanski0afbb632008-06-06 21:10:57 +00001648 def __init__(self, id=None, row=None, new_record=False):
1649 assert (bool(id) != bool(row))
mbligh36768f02008-02-22 18:28:33 +00001650
jadmanski0afbb632008-06-06 21:10:57 +00001651 self.__table = self._get_table()
mbligh36768f02008-02-22 18:28:33 +00001652
jadmanski0afbb632008-06-06 21:10:57 +00001653 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00001654
jadmanski0afbb632008-06-06 21:10:57 +00001655 if row is None:
1656 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
1657 rows = _db.execute(sql, (id,))
1658 if len(rows) == 0:
1659 raise "row not found (table=%s, id=%s)" % \
1660 (self.__table, id)
1661 row = rows[0]
mbligh36768f02008-02-22 18:28:33 +00001662
showard2bab8f42008-11-12 18:15:22 +00001663 self._update_fields_from_row(row)
1664
1665
1666 def _update_fields_from_row(self, row):
jadmanski0afbb632008-06-06 21:10:57 +00001667 assert len(row) == self.num_cols(), (
1668 "table = %s, row = %s/%d, fields = %s/%d" % (
showard2bab8f42008-11-12 18:15:22 +00001669 self.__table, row, len(row), self._fields(), self.num_cols()))
mbligh36768f02008-02-22 18:28:33 +00001670
showard2bab8f42008-11-12 18:15:22 +00001671 self._valid_fields = set()
1672 for field, value in zip(self._fields(), row):
1673 setattr(self, field, value)
1674 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00001675
showard2bab8f42008-11-12 18:15:22 +00001676 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00001677
mblighe2586682008-02-29 22:45:46 +00001678
jadmanski0afbb632008-06-06 21:10:57 +00001679 @classmethod
1680 def _get_table(cls):
1681 raise NotImplementedError('Subclasses must override this')
mblighe2586682008-02-29 22:45:46 +00001682
1683
jadmanski0afbb632008-06-06 21:10:57 +00001684 @classmethod
1685 def _fields(cls):
1686 raise NotImplementedError('Subclasses must override this')
showard04c82c52008-05-29 19:38:12 +00001687
1688
jadmanski0afbb632008-06-06 21:10:57 +00001689 @classmethod
1690 def num_cols(cls):
1691 return len(cls._fields())
showard04c82c52008-05-29 19:38:12 +00001692
1693
jadmanski0afbb632008-06-06 21:10:57 +00001694 def count(self, where, table = None):
1695 if not table:
1696 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00001697
jadmanski0afbb632008-06-06 21:10:57 +00001698 rows = _db.execute("""
1699 SELECT count(*) FROM %s
1700 WHERE %s
1701 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00001702
jadmanski0afbb632008-06-06 21:10:57 +00001703 assert len(rows) == 1
1704
1705 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001706
1707
mblighf8c624d2008-07-03 16:58:45 +00001708 def update_field(self, field, value, condition=''):
showard2bab8f42008-11-12 18:15:22 +00001709 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00001710
showard2bab8f42008-11-12 18:15:22 +00001711 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00001712 return
mbligh36768f02008-02-22 18:28:33 +00001713
mblighf8c624d2008-07-03 16:58:45 +00001714 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
1715 if condition:
1716 query += ' AND (%s)' % condition
jadmanski0afbb632008-06-06 21:10:57 +00001717 _db.execute(query, (value, self.id))
1718
showard2bab8f42008-11-12 18:15:22 +00001719 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00001720
1721
jadmanski0afbb632008-06-06 21:10:57 +00001722 def save(self):
1723 if self.__new_record:
1724 keys = self._fields()[1:] # avoid id
1725 columns = ','.join([str(key) for key in keys])
1726 values = ['"%s"' % self.__dict__[key] for key in keys]
1727 values = ','.join(values)
1728 query = """INSERT INTO %s (%s) VALUES (%s)""" % \
1729 (self.__table, columns, values)
1730 _db.execute(query)
mbligh36768f02008-02-22 18:28:33 +00001731
1732
jadmanski0afbb632008-06-06 21:10:57 +00001733 def delete(self):
1734 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
1735 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00001736
1737
showard63a34772008-08-18 19:32:50 +00001738 @staticmethod
1739 def _prefix_with(string, prefix):
1740 if string:
1741 string = prefix + string
1742 return string
1743
1744
jadmanski0afbb632008-06-06 21:10:57 +00001745 @classmethod
showard989f25d2008-10-01 11:38:11 +00001746 def fetch(cls, where='', params=(), joins='', order_by=''):
showard63a34772008-08-18 19:32:50 +00001747 order_by = cls._prefix_with(order_by, 'ORDER BY ')
1748 where = cls._prefix_with(where, 'WHERE ')
1749 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
1750 '%(where)s %(order_by)s' % {'table' : cls._get_table(),
1751 'joins' : joins,
1752 'where' : where,
1753 'order_by' : order_by})
1754 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00001755 for row in rows:
1756 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00001757
mbligh36768f02008-02-22 18:28:33 +00001758
1759class IneligibleHostQueue(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001760 def __init__(self, id=None, row=None, new_record=None):
1761 super(IneligibleHostQueue, self).__init__(id=id, row=row,
1762 new_record=new_record)
mblighe2586682008-02-29 22:45:46 +00001763
1764
jadmanski0afbb632008-06-06 21:10:57 +00001765 @classmethod
1766 def _get_table(cls):
1767 return 'ineligible_host_queues'
mbligh36768f02008-02-22 18:28:33 +00001768
1769
jadmanski0afbb632008-06-06 21:10:57 +00001770 @classmethod
1771 def _fields(cls):
1772 return ['id', 'job_id', 'host_id']
showard04c82c52008-05-29 19:38:12 +00001773
1774
showard989f25d2008-10-01 11:38:11 +00001775class Label(DBObject):
1776 @classmethod
1777 def _get_table(cls):
1778 return 'labels'
1779
1780
1781 @classmethod
1782 def _fields(cls):
1783 return ['id', 'name', 'kernel_config', 'platform', 'invalid',
1784 'only_if_needed']
1785
1786
mbligh36768f02008-02-22 18:28:33 +00001787class Host(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001788 def __init__(self, id=None, row=None):
1789 super(Host, self).__init__(id=id, row=row)
mblighe2586682008-02-29 22:45:46 +00001790
1791
jadmanski0afbb632008-06-06 21:10:57 +00001792 @classmethod
1793 def _get_table(cls):
1794 return 'hosts'
mbligh36768f02008-02-22 18:28:33 +00001795
1796
jadmanski0afbb632008-06-06 21:10:57 +00001797 @classmethod
1798 def _fields(cls):
1799 return ['id', 'hostname', 'locked', 'synch_id','status',
showard21baa452008-10-21 00:08:39 +00001800 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty']
showard04c82c52008-05-29 19:38:12 +00001801
1802
jadmanski0afbb632008-06-06 21:10:57 +00001803 def current_task(self):
1804 rows = _db.execute("""
1805 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
1806 """, (self.id,))
1807
1808 if len(rows) == 0:
1809 return None
1810 else:
1811 assert len(rows) == 1
1812 results = rows[0];
mblighf8c624d2008-07-03 16:58:45 +00001813# print "current = %s" % results
jadmanski0afbb632008-06-06 21:10:57 +00001814 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00001815
1816
jadmanski0afbb632008-06-06 21:10:57 +00001817 def yield_work(self):
1818 print "%s yielding work" % self.hostname
1819 if self.current_task():
1820 self.current_task().requeue()
1821
1822 def set_status(self,status):
1823 print '%s -> %s' % (self.hostname, status)
1824 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00001825
1826
showardd8e548a2008-09-09 03:04:57 +00001827 def labels(self):
1828 """
1829 Fetch a list of names of all non-platform labels associated with this
1830 host.
1831 """
1832 rows = _db.execute("""
1833 SELECT labels.name
1834 FROM labels
1835 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
1836 WHERE NOT labels.platform AND hosts_labels.host_id = %s
1837 ORDER BY labels.name
1838 """, (self.id,))
1839 return [row[0] for row in rows]
1840
1841
mbligh36768f02008-02-22 18:28:33 +00001842class HostQueueEntry(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001843 def __init__(self, id=None, row=None):
1844 assert id or row
1845 super(HostQueueEntry, self).__init__(id=id, row=row)
1846 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00001847
jadmanski0afbb632008-06-06 21:10:57 +00001848 if self.host_id:
1849 self.host = Host(self.host_id)
1850 else:
1851 self.host = None
mbligh36768f02008-02-22 18:28:33 +00001852
jadmanski0afbb632008-06-06 21:10:57 +00001853 self.queue_log_path = os.path.join(self.job.results_dir(),
1854 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00001855
1856
jadmanski0afbb632008-06-06 21:10:57 +00001857 @classmethod
1858 def _get_table(cls):
1859 return 'host_queue_entries'
mblighe2586682008-02-29 22:45:46 +00001860
1861
jadmanski0afbb632008-06-06 21:10:57 +00001862 @classmethod
1863 def _fields(cls):
showard2bab8f42008-11-12 18:15:22 +00001864 return ['id', 'job_id', 'host_id', 'priority', 'status', 'meta_host',
1865 'active', 'complete', 'deleted', 'execution_subdir']
showard04c82c52008-05-29 19:38:12 +00001866
1867
jadmanski0afbb632008-06-06 21:10:57 +00001868 def set_host(self, host):
1869 if host:
1870 self.queue_log_record('Assigning host ' + host.hostname)
1871 self.update_field('host_id', host.id)
1872 self.update_field('active', True)
1873 self.block_host(host.id)
1874 else:
1875 self.queue_log_record('Releasing host')
1876 self.unblock_host(self.host.id)
1877 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00001878
jadmanski0afbb632008-06-06 21:10:57 +00001879 self.host = host
mbligh36768f02008-02-22 18:28:33 +00001880
1881
jadmanski0afbb632008-06-06 21:10:57 +00001882 def get_host(self):
1883 return self.host
mbligh36768f02008-02-22 18:28:33 +00001884
1885
jadmanski0afbb632008-06-06 21:10:57 +00001886 def queue_log_record(self, log_line):
1887 now = str(datetime.datetime.now())
1888 queue_log = open(self.queue_log_path, 'a', 0)
1889 queue_log.write(now + ' ' + log_line + '\n')
1890 queue_log.close()
mbligh36768f02008-02-22 18:28:33 +00001891
1892
jadmanski0afbb632008-06-06 21:10:57 +00001893 def block_host(self, host_id):
1894 print "creating block %s/%s" % (self.job.id, host_id)
1895 row = [0, self.job.id, host_id]
1896 block = IneligibleHostQueue(row=row, new_record=True)
1897 block.save()
mblighe2586682008-02-29 22:45:46 +00001898
1899
jadmanski0afbb632008-06-06 21:10:57 +00001900 def unblock_host(self, host_id):
1901 print "removing block %s/%s" % (self.job.id, host_id)
1902 blocks = IneligibleHostQueue.fetch(
1903 'job_id=%d and host_id=%d' % (self.job.id, host_id))
1904 for block in blocks:
1905 block.delete()
mblighe2586682008-02-29 22:45:46 +00001906
1907
jadmanski0afbb632008-06-06 21:10:57 +00001908 def results_dir(self):
showard2bab8f42008-11-12 18:15:22 +00001909 return os.path.join(self.job.job_dir, self.execution_subdir)
mbligh36768f02008-02-22 18:28:33 +00001910
mblighe2586682008-02-29 22:45:46 +00001911
showard2bab8f42008-11-12 18:15:22 +00001912 def set_execution_subdir(self, subdir=None):
1913 if subdir is None:
1914 assert self.get_host()
1915 subdir = self.get_host().hostname
1916 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00001917
1918
jadmanski0afbb632008-06-06 21:10:57 +00001919 def set_status(self, status):
mblighf8c624d2008-07-03 16:58:45 +00001920 abort_statuses = ['Abort', 'Aborting', 'Aborted']
1921 if status not in abort_statuses:
1922 condition = ' AND '.join(['status <> "%s"' % x
1923 for x in abort_statuses])
1924 else:
1925 condition = ''
1926 self.update_field('status', status, condition=condition)
1927
jadmanski0afbb632008-06-06 21:10:57 +00001928 if self.host:
1929 hostname = self.host.hostname
1930 else:
showard2bab8f42008-11-12 18:15:22 +00001931 hostname = 'None'
1932 print "%s/%d (%d) -> %s" % (hostname, self.job.id, self.id, self.status)
mblighf8c624d2008-07-03 16:58:45 +00001933
jadmanski0afbb632008-06-06 21:10:57 +00001934 if status in ['Queued']:
1935 self.update_field('complete', False)
1936 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00001937
jadmanski0afbb632008-06-06 21:10:57 +00001938 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showarde58e3f82008-11-20 19:04:59 +00001939 'Aborting']:
jadmanski0afbb632008-06-06 21:10:57 +00001940 self.update_field('complete', False)
1941 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00001942
showard97aed502008-11-04 02:01:24 +00001943 if status in ['Failed', 'Completed', 'Stopped', 'Aborted', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00001944 self.update_field('complete', True)
1945 self.update_field('active', False)
showard542e8402008-09-19 20:16:18 +00001946 self._email_on_job_complete()
1947
1948
1949 def _email_on_job_complete(self):
1950 url = "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
1951
1952 if self.job.is_finished():
1953 subject = "Autotest: Job ID: %s \"%s\" Completed" % (
1954 self.job.id, self.job.name)
1955 body = "Job ID: %s\nJob Name: %s\n%s\n" % (
1956 self.job.id, self.job.name, url)
1957 send_email(_email_from, self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00001958
1959
jadmanski0afbb632008-06-06 21:10:57 +00001960 def run(self,assigned_host=None):
1961 if self.meta_host:
1962 assert assigned_host
1963 # ensure results dir exists for the queue log
1964 self.job.create_results_dir()
1965 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00001966
jadmanski0afbb632008-06-06 21:10:57 +00001967 print "%s/%s scheduled on %s, status=%s" % (self.job.name,
1968 self.meta_host, self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00001969
jadmanski0afbb632008-06-06 21:10:57 +00001970 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00001971
jadmanski0afbb632008-06-06 21:10:57 +00001972 def requeue(self):
1973 self.set_status('Queued')
mblighe2586682008-02-29 22:45:46 +00001974
jadmanski0afbb632008-06-06 21:10:57 +00001975 if self.meta_host:
1976 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00001977
1978
jadmanski0afbb632008-06-06 21:10:57 +00001979 def handle_host_failure(self):
1980 """\
1981 Called when this queue entry's host has failed verification and
1982 repair.
1983 """
1984 assert not self.meta_host
1985 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00001986 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00001987
1988
showard2bab8f42008-11-12 18:15:22 +00001989 def clear_results_dir(self, dont_delete_files=False):
1990 if not self.execution_subdir:
1991 return
1992 results_dir = self.results_dir()
jadmanski0afbb632008-06-06 21:10:57 +00001993 if not os.path.exists(results_dir):
1994 return
1995 if dont_delete_files:
1996 temp_dir = tempfile.mkdtemp(suffix='.clear_results')
showard2bab8f42008-11-12 18:15:22 +00001997 print 'Moving results from %s to %s' % (results_dir, temp_dir)
jadmanski0afbb632008-06-06 21:10:57 +00001998 for filename in os.listdir(results_dir):
1999 path = os.path.join(results_dir, filename)
2000 if dont_delete_files:
showard2bab8f42008-11-12 18:15:22 +00002001 shutil.move(path, os.path.join(temp_dir, filename))
jadmanski0afbb632008-06-06 21:10:57 +00002002 else:
2003 remove_file_or_dir(path)
showard2bab8f42008-11-12 18:15:22 +00002004 remove_file_or_dir(results_dir)
mbligh36768f02008-02-22 18:28:33 +00002005
2006
jadmanskif7fa2cc2008-10-01 14:13:23 +00002007 @property
2008 def aborted_by(self):
2009 self._load_abort_info()
2010 return self._aborted_by
2011
2012
2013 @property
2014 def aborted_on(self):
2015 self._load_abort_info()
2016 return self._aborted_on
2017
2018
2019 def _load_abort_info(self):
2020 """ Fetch info about who aborted the job. """
2021 if hasattr(self, "_aborted_by"):
2022 return
2023 rows = _db.execute("""
2024 SELECT users.login, aborted_host_queue_entries.aborted_on
2025 FROM aborted_host_queue_entries
2026 INNER JOIN users
2027 ON users.id = aborted_host_queue_entries.aborted_by_id
2028 WHERE aborted_host_queue_entries.queue_entry_id = %s
2029 """, (self.id,))
2030 if rows:
2031 self._aborted_by, self._aborted_on = rows[0]
2032 else:
2033 self._aborted_by = self._aborted_on = None
2034
2035
showardb2e2c322008-10-14 17:33:55 +00002036 def on_pending(self):
2037 """
2038 Called when an entry in a synchronous job has passed verify. If the
2039 job is ready to run, returns an agent to run the job. Returns None
2040 otherwise.
2041 """
2042 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002043 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00002044 if self.job.is_ready():
2045 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00002046 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00002047 return None
2048
2049
showard1be97432008-10-17 15:30:45 +00002050 def abort(self, agents_to_abort=[]):
2051 abort_task = AbortTask(self, agents_to_abort)
2052 tasks = [abort_task]
2053
2054 host = self.get_host()
showard9d9ffd52008-11-09 23:14:35 +00002055 if self.active and host:
showard45ae8192008-11-05 19:32:53 +00002056 cleanup_task = CleanupTask(host=host)
showard1be97432008-10-17 15:30:45 +00002057 verify_task = VerifyTask(host=host)
2058 # just to make sure this host does not get taken away
showard45ae8192008-11-05 19:32:53 +00002059 host.set_status('Cleaning')
2060 tasks += [cleanup_task, verify_task]
showard1be97432008-10-17 15:30:45 +00002061
2062 self.set_status('Aborting')
2063 return Agent(tasks=tasks, queue_entry_ids=[self.id])
2064
2065
mbligh36768f02008-02-22 18:28:33 +00002066class Job(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00002067 def __init__(self, id=None, row=None):
2068 assert id or row
2069 super(Job, self).__init__(id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00002070
jadmanski0afbb632008-06-06 21:10:57 +00002071 self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id,
2072 self.owner))
mblighe2586682008-02-29 22:45:46 +00002073
2074
jadmanski0afbb632008-06-06 21:10:57 +00002075 @classmethod
2076 def _get_table(cls):
2077 return 'jobs'
mbligh36768f02008-02-22 18:28:33 +00002078
2079
jadmanski0afbb632008-06-06 21:10:57 +00002080 @classmethod
2081 def _fields(cls):
2082 return ['id', 'owner', 'name', 'priority', 'control_file',
showard2bab8f42008-11-12 18:15:22 +00002083 'control_type', 'created_on', 'synch_count', 'timeout',
showard21baa452008-10-21 00:08:39 +00002084 'run_verify', 'email_list', 'reboot_before', 'reboot_after']
showard04c82c52008-05-29 19:38:12 +00002085
2086
jadmanski0afbb632008-06-06 21:10:57 +00002087 def is_server_job(self):
2088 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002089
2090
jadmanski0afbb632008-06-06 21:10:57 +00002091 def get_host_queue_entries(self):
2092 rows = _db.execute("""
2093 SELECT * FROM host_queue_entries
2094 WHERE job_id= %s
2095 """, (self.id,))
2096 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002097
jadmanski0afbb632008-06-06 21:10:57 +00002098 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002099
jadmanski0afbb632008-06-06 21:10:57 +00002100 return entries
mbligh36768f02008-02-22 18:28:33 +00002101
2102
jadmanski0afbb632008-06-06 21:10:57 +00002103 def set_status(self, status, update_queues=False):
2104 self.update_field('status',status)
2105
2106 if update_queues:
2107 for queue_entry in self.get_host_queue_entries():
2108 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002109
2110
jadmanski0afbb632008-06-06 21:10:57 +00002111 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00002112 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
2113 status='Pending')
2114 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00002115
2116
jadmanski0afbb632008-06-06 21:10:57 +00002117 def results_dir(self):
2118 return self.job_dir
mbligh36768f02008-02-22 18:28:33 +00002119
jadmanski0afbb632008-06-06 21:10:57 +00002120 def num_machines(self, clause = None):
2121 sql = "job_id=%s" % self.id
2122 if clause:
2123 sql += " AND (%s)" % clause
2124 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002125
2126
jadmanski0afbb632008-06-06 21:10:57 +00002127 def num_queued(self):
2128 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002129
2130
jadmanski0afbb632008-06-06 21:10:57 +00002131 def num_active(self):
2132 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002133
2134
jadmanski0afbb632008-06-06 21:10:57 +00002135 def num_complete(self):
2136 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002137
2138
jadmanski0afbb632008-06-06 21:10:57 +00002139 def is_finished(self):
2140 left = self.num_queued()
2141 print "%s: %s machines left" % (self.name, left)
2142 return left==0
mbligh36768f02008-02-22 18:28:33 +00002143
mbligh36768f02008-02-22 18:28:33 +00002144
showard2bab8f42008-11-12 18:15:22 +00002145 def _stop_all_entries(self, entries_to_abort):
2146 """
2147 queue_entries: sequence of models.HostQueueEntry objects
2148 """
2149 for child_entry in entries_to_abort:
2150 assert not child_entry.complete
2151 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2152 child_entry.host.status = models.Host.Status.READY
2153 child_entry.host.save()
2154 child_entry.status = models.HostQueueEntry.Status.STOPPED
2155 child_entry.save()
2156
2157
2158 def stop_if_necessary(self):
2159 not_yet_run = models.HostQueueEntry.objects.filter(
2160 job=self.id, status__in=(models.HostQueueEntry.Status.QUEUED,
2161 models.HostQueueEntry.Status.VERIFYING,
2162 models.HostQueueEntry.Status.PENDING))
2163 if not_yet_run.count() < self.synch_count:
2164 self._stop_all_entries(not_yet_run)
mblighe2586682008-02-29 22:45:46 +00002165
2166
jadmanski0afbb632008-06-06 21:10:57 +00002167 def write_to_machines_file(self, queue_entry):
2168 hostname = queue_entry.get_host().hostname
2169 print "writing %s to job %s machines file" % (hostname, self.id)
2170 file_path = os.path.join(self.job_dir, '.machines')
2171 mf = open(file_path, 'a')
showard2bab8f42008-11-12 18:15:22 +00002172 mf.write(hostname + '\n')
jadmanski0afbb632008-06-06 21:10:57 +00002173 mf.close()
mbligh36768f02008-02-22 18:28:33 +00002174
2175
jadmanski0afbb632008-06-06 21:10:57 +00002176 def create_results_dir(self, queue_entry=None):
showard2bab8f42008-11-12 18:15:22 +00002177 ensure_directory_exists(self.job_dir)
mbligh36768f02008-02-22 18:28:33 +00002178
jadmanski0afbb632008-06-06 21:10:57 +00002179 if queue_entry:
showarde05654d2008-10-28 20:38:40 +00002180 results_dir = queue_entry.results_dir()
showarde788ea62008-11-17 21:02:47 +00002181 if os.path.exists(results_dir):
2182 warning = 'QE results dir ' + results_dir + ' already exists'
2183 print warning
2184 email_manager.enqueue_notify_email(warning, warning)
showard2bab8f42008-11-12 18:15:22 +00002185 ensure_directory_exists(results_dir)
showarde05654d2008-10-28 20:38:40 +00002186 return results_dir
jadmanski0afbb632008-06-06 21:10:57 +00002187 return self.job_dir
mbligh36768f02008-02-22 18:28:33 +00002188
2189
showard2bab8f42008-11-12 18:15:22 +00002190 def _next_group_name(self):
2191 query = models.HostQueueEntry.objects.filter(
2192 job=self.id).values('execution_subdir').distinct()
2193 subdirs = (entry['execution_subdir'] for entry in query)
2194 groups = (re.match(r'group(\d+)', subdir) for subdir in subdirs)
2195 ids = [int(match.group(1)) for match in groups if match]
2196 if ids:
2197 next_id = max(ids) + 1
2198 else:
2199 next_id = 0
2200 return "group%d" % next_id
2201
2202
showardb2e2c322008-10-14 17:33:55 +00002203 def _write_control_file(self):
2204 'Writes control file out to disk, returns a filename'
2205 control_fd, control_filename = tempfile.mkstemp(suffix='.control_file')
2206 control_file = os.fdopen(control_fd, 'w')
jadmanski0afbb632008-06-06 21:10:57 +00002207 if self.control_file:
showardb2e2c322008-10-14 17:33:55 +00002208 control_file.write(self.control_file)
2209 control_file.close()
2210 return control_filename
mbligh36768f02008-02-22 18:28:33 +00002211
showardb2e2c322008-10-14 17:33:55 +00002212
showard2bab8f42008-11-12 18:15:22 +00002213 def get_group_entries(self, queue_entry_from_group):
2214 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002215 return list(HostQueueEntry.fetch(
2216 where='job_id=%s AND execution_subdir=%s',
2217 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002218
2219
2220 def get_job_tag(self, queue_entries):
2221 assert len(queue_entries) > 0
2222 execution_subdir = queue_entries[0].execution_subdir
2223 assert execution_subdir
2224 return "%s-%s/%s" % (self.id, self.owner, execution_subdir)
showardb2e2c322008-10-14 17:33:55 +00002225
2226
2227 def _get_autoserv_params(self, queue_entries):
2228 results_dir = self.create_results_dir(queue_entries[0])
2229 control_filename = self._write_control_file()
jadmanski0afbb632008-06-06 21:10:57 +00002230 hostnames = ','.join([entry.get_host().hostname
2231 for entry in queue_entries])
showard2bab8f42008-11-12 18:15:22 +00002232 job_tag = self.get_job_tag(queue_entries)
mbligh36768f02008-02-22 18:28:33 +00002233
showardb2e2c322008-10-14 17:33:55 +00002234 params = [_autoserv_path, '-P', job_tag, '-p', '-n',
showard21baa452008-10-21 00:08:39 +00002235 '-r', os.path.abspath(results_dir), '-u', self.owner,
2236 '-l', self.name, '-m', hostnames, control_filename]
mbligh36768f02008-02-22 18:28:33 +00002237
jadmanski0afbb632008-06-06 21:10:57 +00002238 if not self.is_server_job():
2239 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002240
showardb2e2c322008-10-14 17:33:55 +00002241 return params
mblighe2586682008-02-29 22:45:46 +00002242
mbligh36768f02008-02-22 18:28:33 +00002243
showard2bab8f42008-11-12 18:15:22 +00002244 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002245 do_reboot = False
showard0fc38302008-10-23 00:44:07 +00002246 if self.reboot_before == models.RebootBefore.ALWAYS:
showard21baa452008-10-21 00:08:39 +00002247 do_reboot = True
showard0fc38302008-10-23 00:44:07 +00002248 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showard21baa452008-10-21 00:08:39 +00002249 do_reboot = queue_entry.get_host().dirty
2250
2251 tasks = []
2252 if do_reboot:
showard45ae8192008-11-05 19:32:53 +00002253 tasks.append(CleanupTask(queue_entry=queue_entry))
showard2bab8f42008-11-12 18:15:22 +00002254 tasks.append(VerifyTask(queue_entry=queue_entry))
showard21baa452008-10-21 00:08:39 +00002255 return tasks
2256
2257
showard2bab8f42008-11-12 18:15:22 +00002258 def _assign_new_group(self, queue_entries):
2259 if len(queue_entries) == 1:
2260 group_name = queue_entries[0].get_host().hostname
2261 else:
2262 group_name = self._next_group_name()
2263 print 'Running synchronous job %d hosts %s as %s' % (
2264 self.id, [entry.host.hostname for entry in queue_entries],
2265 group_name)
2266
2267 for queue_entry in queue_entries:
2268 queue_entry.set_execution_subdir(group_name)
2269
2270
2271 def _choose_group_to_run(self, include_queue_entry):
2272 chosen_entries = [include_queue_entry]
2273
2274 num_entries_needed = self.synch_count - 1
2275 if num_entries_needed > 0:
2276 pending_entries = HostQueueEntry.fetch(
2277 where='job_id = %s AND status = "Pending" AND id != %s',
2278 params=(self.id, include_queue_entry.id))
2279 chosen_entries += list(pending_entries)[:num_entries_needed]
2280
2281 self._assign_new_group(chosen_entries)
2282 return chosen_entries
2283
2284
2285 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002286 if not self.is_ready():
showard9976ce92008-10-15 20:28:13 +00002287 if self.run_verify:
showarde58e3f82008-11-20 19:04:59 +00002288 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
showard2bab8f42008-11-12 18:15:22 +00002289 return Agent(self._get_pre_job_tasks(queue_entry),
showard21baa452008-10-21 00:08:39 +00002290 [queue_entry.id])
showard9976ce92008-10-15 20:28:13 +00002291 else:
2292 return queue_entry.on_pending()
mbligh36768f02008-02-22 18:28:33 +00002293
showard2bab8f42008-11-12 18:15:22 +00002294 queue_entries = self._choose_group_to_run(queue_entry)
2295 return self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00002296
2297
2298 def _finish_run(self, queue_entries, initial_tasks=[]):
showardb2ccdda2008-10-28 20:39:05 +00002299 for queue_entry in queue_entries:
2300 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002301 params = self._get_autoserv_params(queue_entries)
2302 queue_task = QueueTask(job=self, queue_entries=queue_entries,
2303 cmd=params)
2304 tasks = initial_tasks + [queue_task]
2305 entry_ids = [entry.id for entry in queue_entries]
2306
2307 return Agent(tasks, entry_ids, num_processes=len(queue_entries))
2308
2309
mbligh36768f02008-02-22 18:28:33 +00002310if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002311 main()