blob: 4210d424dc7af9569abf32ea7b3c95bbb8390316 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard542e8402008-09-19 20:16:18 +00008import datetime, errno, MySQLdb, optparse, os, pwd, Queue, re, shutil, signal
9import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
mbligh70feeee2008-06-11 16:20:49 +000010import common
showard21baa452008-10-21 00:08:39 +000011from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000012from autotest_lib.client.common_lib import global_config
showard2bab8f42008-11-12 18:15:22 +000013from autotest_lib.client.common_lib import host_protections, utils, debug
showardb1e51872008-10-07 11:08:18 +000014from autotest_lib.database import database_connection
showard21baa452008-10-21 00:08:39 +000015from autotest_lib.frontend.afe import models
mbligh70feeee2008-06-11 16:20:49 +000016
mblighb090f142008-02-27 21:33:46 +000017
mbligh36768f02008-02-22 18:28:33 +000018RESULTS_DIR = '.'
19AUTOSERV_NICE_LEVEL = 10
showardb1e51872008-10-07 11:08:18 +000020CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000021
22AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
23
24if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000025 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000026AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
27AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
28
29if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000030 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000031
mblighbb421852008-03-11 22:36:16 +000032AUTOSERV_PID_FILE = '.autoserv_execute'
mbligh90a549d2008-03-25 23:52:34 +000033# how long to wait for autoserv to write a pidfile
34PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000035
mbligh6f8bab42008-02-29 22:45:14 +000036_db = None
mbligh36768f02008-02-22 18:28:33 +000037_shutdown = False
38_notify_email = None
mbligh4314a712008-02-29 22:44:30 +000039_autoserv_path = 'autoserv'
40_testing_mode = False
showardec113162008-05-08 00:52:49 +000041_global_config_section = 'SCHEDULER'
showard542e8402008-09-19 20:16:18 +000042_base_url = None
43# see os.getlogin() online docs
44_email_from = pwd.getpwuid(os.getuid())[0]
showardc85c21b2008-11-24 22:17:37 +000045_notify_email_statuses = []
mbligh36768f02008-02-22 18:28:33 +000046
47
48def main():
jadmanski0afbb632008-06-06 21:10:57 +000049 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000050
jadmanski0afbb632008-06-06 21:10:57 +000051 parser = optparse.OptionParser(usage)
52 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
53 action='store_true')
54 parser.add_option('--logfile', help='Set a log file that all stdout ' +
55 'should be redirected to. Stderr will go to this ' +
56 'file + ".err"')
57 parser.add_option('--test', help='Indicate that scheduler is under ' +
58 'test and should use dummy autoserv and no parsing',
59 action='store_true')
60 (options, args) = parser.parse_args()
61 if len(args) != 1:
62 parser.print_usage()
63 return
mbligh36768f02008-02-22 18:28:33 +000064
jadmanski0afbb632008-06-06 21:10:57 +000065 global RESULTS_DIR
66 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +000067
jadmanski0afbb632008-06-06 21:10:57 +000068 # read in notify_email from global_config
69 c = global_config.global_config
70 global _notify_email
71 val = c.get_config_value(_global_config_section, "notify_email")
72 if val != "":
73 _notify_email = val
mbligh36768f02008-02-22 18:28:33 +000074
showardc85c21b2008-11-24 22:17:37 +000075 global _email_from
76 val = c.get_config_value(_global_config_section, "notify_email_from")
77 if val != "":
78 _email_from = val
79
80 global _notify_email_statuses
81 val = c.get_config_value(_global_config_section, "notify_email_statuses")
82 if val != "":
83 _notify_email_statuses = [status for status in
84 re.split(r'[\s,;:]', val.lower()) if status]
85
showard3bb499f2008-07-03 19:42:20 +000086 tick_pause = c.get_config_value(
87 _global_config_section, 'tick_pause_sec', type=int)
88
jadmanski0afbb632008-06-06 21:10:57 +000089 if options.test:
90 global _autoserv_path
91 _autoserv_path = 'autoserv_dummy'
92 global _testing_mode
93 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +000094
mbligh37eceaa2008-12-15 22:56:37 +000095 # AUTOTEST_WEB.base_url is still a supported config option as some people
96 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +000097 global _base_url
mbligh37eceaa2008-12-15 22:56:37 +000098 config_base_url = c.get_config_value(CONFIG_SECTION, 'base_url')
99 if config_base_url:
100 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000101 else:
mbligh37eceaa2008-12-15 22:56:37 +0000102 # For the common case of everything running on a single server you
103 # can just set the hostname in a single place in the config file.
104 server_name = c.get_config_value('SERVER', 'hostname')
105 if not server_name:
106 print 'Error: [SERVER] hostname missing from the config file.'
107 sys.exit(1)
108 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000109
jadmanski0afbb632008-06-06 21:10:57 +0000110 init(options.logfile)
111 dispatcher = Dispatcher()
112 dispatcher.do_initial_recovery(recover_hosts=options.recover_hosts)
113
114 try:
115 while not _shutdown:
116 dispatcher.tick()
showard3bb499f2008-07-03 19:42:20 +0000117 time.sleep(tick_pause)
jadmanski0afbb632008-06-06 21:10:57 +0000118 except:
119 log_stacktrace("Uncaught exception; terminating monitor_db")
120
121 email_manager.send_queued_emails()
122 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000123
124
125def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000126 global _shutdown
127 _shutdown = True
128 print "Shutdown request received."
mbligh36768f02008-02-22 18:28:33 +0000129
130
131def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000132 if logfile:
133 enable_logging(logfile)
134 print "%s> dispatcher starting" % time.strftime("%X %x")
135 print "My PID is %d" % os.getpid()
mbligh36768f02008-02-22 18:28:33 +0000136
showardb1e51872008-10-07 11:08:18 +0000137 if _testing_mode:
138 global_config.global_config.override_config_value(
139 CONFIG_SECTION, 'database', 'stresstest_autotest_web')
140
jadmanski0afbb632008-06-06 21:10:57 +0000141 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
142 global _db
showardb1e51872008-10-07 11:08:18 +0000143 _db = database_connection.DatabaseConnection(CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000144 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000145
showardfa8629c2008-11-04 16:51:23 +0000146 # ensure Django connection is in autocommit
147 setup_django_environment.enable_autocommit()
148
showard2bab8f42008-11-12 18:15:22 +0000149 debug.configure('scheduler', format_string='%(message)s')
150
jadmanski0afbb632008-06-06 21:10:57 +0000151 print "Setting signal handler"
152 signal.signal(signal.SIGINT, handle_sigint)
153
154 print "Connected! Running..."
mbligh36768f02008-02-22 18:28:33 +0000155
156
157def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000158 out_file = logfile
159 err_file = "%s.err" % logfile
160 print "Enabling logging to %s (%s)" % (out_file, err_file)
161 out_fd = open(out_file, "a", buffering=0)
162 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000163
jadmanski0afbb632008-06-06 21:10:57 +0000164 os.dup2(out_fd.fileno(), sys.stdout.fileno())
165 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000166
jadmanski0afbb632008-06-06 21:10:57 +0000167 sys.stdout = out_fd
168 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000169
170
mblighd5c95802008-03-05 00:33:46 +0000171def queue_entries_to_abort():
jadmanski0afbb632008-06-06 21:10:57 +0000172 rows = _db.execute("""
173 SELECT * FROM host_queue_entries WHERE status='Abort';
174 """)
showard2bab8f42008-11-12 18:15:22 +0000175
jadmanski0afbb632008-06-06 21:10:57 +0000176 qe = [HostQueueEntry(row=i) for i in rows]
177 return qe
mbligh36768f02008-02-22 18:28:33 +0000178
mblighe2586682008-02-29 22:45:46 +0000179def remove_file_or_dir(path):
jadmanski0afbb632008-06-06 21:10:57 +0000180 if stat.S_ISDIR(os.stat(path).st_mode):
181 # directory
182 shutil.rmtree(path)
183 else:
184 # file
185 os.remove(path)
mblighe2586682008-02-29 22:45:46 +0000186
187
mbligh36768f02008-02-22 18:28:33 +0000188def log_stacktrace(reason):
jadmanski0afbb632008-06-06 21:10:57 +0000189 (type, value, tb) = sys.exc_info()
190 str = "EXCEPTION: %s\n" % reason
191 str += ''.join(traceback.format_exception(type, value, tb))
mbligh36768f02008-02-22 18:28:33 +0000192
jadmanski0afbb632008-06-06 21:10:57 +0000193 sys.stderr.write("\n%s\n" % str)
194 email_manager.enqueue_notify_email("monitor_db exception", str)
mbligh36768f02008-02-22 18:28:33 +0000195
mblighbb421852008-03-11 22:36:16 +0000196
197def get_proc_poll_fn(pid):
jadmanski0afbb632008-06-06 21:10:57 +0000198 proc_path = os.path.join('/proc', str(pid))
199 def poll_fn():
200 if os.path.exists(proc_path):
201 return None
202 return 0 # we can't get a real exit code
203 return poll_fn
mblighbb421852008-03-11 22:36:16 +0000204
205
showardc85c21b2008-11-24 22:17:37 +0000206def send_email(to_string, subject, body):
showard542e8402008-09-19 20:16:18 +0000207 """Mails out emails to the addresses listed in to_string.
208
209 to_string is split into a list which can be delimited by any of:
210 ';', ',', ':' or any whitespace
211 """
212
213 # Create list from string removing empty strings from the list.
214 to_list = [x for x in re.split('\s|,|;|:', to_string) if x]
showard7d182aa2008-09-22 16:17:24 +0000215 if not to_list:
216 return
217
showard542e8402008-09-19 20:16:18 +0000218 msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (
showardc85c21b2008-11-24 22:17:37 +0000219 _email_from, ', '.join(to_list), subject, body)
showard7d182aa2008-09-22 16:17:24 +0000220 try:
221 mailer = smtplib.SMTP('localhost')
222 try:
showardc85c21b2008-11-24 22:17:37 +0000223 mailer.sendmail(_email_from, to_list, msg)
showard7d182aa2008-09-22 16:17:24 +0000224 finally:
225 mailer.quit()
226 except Exception, e:
227 print "Sending email failed. Reason: %s" % repr(e)
showard542e8402008-09-19 20:16:18 +0000228
229
mblighbb421852008-03-11 22:36:16 +0000230def kill_autoserv(pid, poll_fn=None):
jadmanski0afbb632008-06-06 21:10:57 +0000231 print 'killing', pid
232 if poll_fn is None:
233 poll_fn = get_proc_poll_fn(pid)
mblighd876f452008-12-03 15:09:17 +0000234 if poll_fn() is None:
jadmanski0afbb632008-06-06 21:10:57 +0000235 os.kill(pid, signal.SIGCONT)
236 os.kill(pid, signal.SIGTERM)
mbligh36768f02008-02-22 18:28:33 +0000237
238
showard2bab8f42008-11-12 18:15:22 +0000239def ensure_directory_exists(directory_path):
240 if not os.path.exists(directory_path):
241 os.makedirs(directory_path)
242
243
showard7cf9a9b2008-05-15 21:15:52 +0000244class EmailNotificationManager(object):
jadmanski0afbb632008-06-06 21:10:57 +0000245 def __init__(self):
246 self._emails = []
showard7cf9a9b2008-05-15 21:15:52 +0000247
jadmanski0afbb632008-06-06 21:10:57 +0000248 def enqueue_notify_email(self, subject, message):
249 if not _notify_email:
250 return
showard7cf9a9b2008-05-15 21:15:52 +0000251
jadmanski0afbb632008-06-06 21:10:57 +0000252 body = 'Subject: ' + subject + '\n'
253 body += "%s / %s / %s\n%s" % (socket.gethostname(),
254 os.getpid(),
255 time.strftime("%X %x"), message)
256 self._emails.append(body)
showard7cf9a9b2008-05-15 21:15:52 +0000257
258
jadmanski0afbb632008-06-06 21:10:57 +0000259 def send_queued_emails(self):
260 if not self._emails:
261 return
262 subject = 'Scheduler notifications from ' + socket.gethostname()
263 separator = '\n' + '-' * 40 + '\n'
264 body = separator.join(self._emails)
showard7cf9a9b2008-05-15 21:15:52 +0000265
showardc85c21b2008-11-24 22:17:37 +0000266 send_email(_notify_email, subject, body)
jadmanski0afbb632008-06-06 21:10:57 +0000267 self._emails = []
showard7cf9a9b2008-05-15 21:15:52 +0000268
269email_manager = EmailNotificationManager()
270
271
showard63a34772008-08-18 19:32:50 +0000272class HostScheduler(object):
273 def _get_ready_hosts(self):
274 # avoid any host with a currently active queue entry against it
275 hosts = Host.fetch(
276 joins='LEFT JOIN host_queue_entries AS active_hqe '
277 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000278 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000279 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000280 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000281 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
282 return dict((host.id, host) for host in hosts)
283
284
285 @staticmethod
286 def _get_sql_id_list(id_list):
287 return ','.join(str(item_id) for item_id in id_list)
288
289
290 @classmethod
showard989f25d2008-10-01 11:38:11 +0000291 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000292 if not id_list:
293 return {}
showard63a34772008-08-18 19:32:50 +0000294 query %= cls._get_sql_id_list(id_list)
295 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000296 return cls._process_many2many_dict(rows, flip)
297
298
299 @staticmethod
300 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000301 result = {}
302 for row in rows:
303 left_id, right_id = long(row[0]), long(row[1])
showard989f25d2008-10-01 11:38:11 +0000304 if flip:
305 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000306 result.setdefault(left_id, set()).add(right_id)
307 return result
308
309
310 @classmethod
311 def _get_job_acl_groups(cls, job_ids):
312 query = """
313 SELECT jobs.id, acl_groups_users.acl_group_id
314 FROM jobs
315 INNER JOIN users ON users.login = jobs.owner
316 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
317 WHERE jobs.id IN (%s)
318 """
319 return cls._get_many2many_dict(query, job_ids)
320
321
322 @classmethod
323 def _get_job_ineligible_hosts(cls, job_ids):
324 query = """
325 SELECT job_id, host_id
326 FROM ineligible_host_queues
327 WHERE job_id IN (%s)
328 """
329 return cls._get_many2many_dict(query, job_ids)
330
331
332 @classmethod
showard989f25d2008-10-01 11:38:11 +0000333 def _get_job_dependencies(cls, job_ids):
334 query = """
335 SELECT job_id, label_id
336 FROM jobs_dependency_labels
337 WHERE job_id IN (%s)
338 """
339 return cls._get_many2many_dict(query, job_ids)
340
341
342 @classmethod
showard63a34772008-08-18 19:32:50 +0000343 def _get_host_acls(cls, host_ids):
344 query = """
345 SELECT host_id, acl_group_id
346 FROM acl_groups_hosts
347 WHERE host_id IN (%s)
348 """
349 return cls._get_many2many_dict(query, host_ids)
350
351
352 @classmethod
353 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000354 if not host_ids:
355 return {}, {}
showard63a34772008-08-18 19:32:50 +0000356 query = """
357 SELECT label_id, host_id
358 FROM hosts_labels
359 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000360 """ % cls._get_sql_id_list(host_ids)
361 rows = _db.execute(query)
362 labels_to_hosts = cls._process_many2many_dict(rows)
363 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
364 return labels_to_hosts, hosts_to_labels
365
366
367 @classmethod
368 def _get_labels(cls):
369 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000370
371
372 def refresh(self, pending_queue_entries):
373 self._hosts_available = self._get_ready_hosts()
374
375 relevant_jobs = [queue_entry.job_id
376 for queue_entry in pending_queue_entries]
377 self._job_acls = self._get_job_acl_groups(relevant_jobs)
378 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000379 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000380
381 host_ids = self._hosts_available.keys()
382 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000383 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
384
385 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000386
387
388 def _is_acl_accessible(self, host_id, queue_entry):
389 job_acls = self._job_acls.get(queue_entry.job_id, set())
390 host_acls = self._host_acls.get(host_id, set())
391 return len(host_acls.intersection(job_acls)) > 0
392
393
showard989f25d2008-10-01 11:38:11 +0000394 def _check_job_dependencies(self, job_dependencies, host_labels):
395 missing = job_dependencies - host_labels
396 return len(job_dependencies - host_labels) == 0
397
398
399 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
400 queue_entry):
401 for label_id in host_labels:
402 label = self._labels[label_id]
403 if not label.only_if_needed:
404 # we don't care about non-only_if_needed labels
405 continue
406 if queue_entry.meta_host == label_id:
407 # if the label was requested in a metahost it's OK
408 continue
409 if label_id not in job_dependencies:
410 return False
411 return True
412
413
414 def _is_host_eligible_for_job(self, host_id, queue_entry):
415 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
416 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000417
418 acl = self._is_acl_accessible(host_id, queue_entry)
419 deps = self._check_job_dependencies(job_dependencies, host_labels)
420 only_if = self._check_only_if_needed_labels(job_dependencies,
421 host_labels, queue_entry)
422 return acl and deps and only_if
showard989f25d2008-10-01 11:38:11 +0000423
424
showard63a34772008-08-18 19:32:50 +0000425 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000426 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000427 return None
428 return self._hosts_available.pop(queue_entry.host_id, None)
429
430
431 def _is_host_usable(self, host_id):
432 if host_id not in self._hosts_available:
433 # host was already used during this scheduling cycle
434 return False
435 if self._hosts_available[host_id].invalid:
436 # Invalid hosts cannot be used for metahosts. They're included in
437 # the original query because they can be used by non-metahosts.
438 return False
439 return True
440
441
442 def _schedule_metahost(self, queue_entry):
443 label_id = queue_entry.meta_host
444 hosts_in_label = self._label_hosts.get(label_id, set())
445 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
446 set())
447
448 # must iterate over a copy so we can mutate the original while iterating
449 for host_id in list(hosts_in_label):
450 if not self._is_host_usable(host_id):
451 hosts_in_label.remove(host_id)
452 continue
453 if host_id in ineligible_host_ids:
454 continue
showard989f25d2008-10-01 11:38:11 +0000455 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000456 continue
457
458 hosts_in_label.remove(host_id)
459 return self._hosts_available.pop(host_id)
460 return None
461
462
463 def find_eligible_host(self, queue_entry):
464 if not queue_entry.meta_host:
465 return self._schedule_non_metahost(queue_entry)
466 return self._schedule_metahost(queue_entry)
467
468
mbligh36768f02008-02-22 18:28:33 +0000469class Dispatcher:
jadmanski0afbb632008-06-06 21:10:57 +0000470 autoserv_procs_cache = None
showard4c5374f2008-09-04 17:02:56 +0000471 max_running_processes = global_config.global_config.get_config_value(
jadmanski0afbb632008-06-06 21:10:57 +0000472 _global_config_section, 'max_running_jobs', type=int)
showard4c5374f2008-09-04 17:02:56 +0000473 max_processes_started_per_cycle = (
jadmanski0afbb632008-06-06 21:10:57 +0000474 global_config.global_config.get_config_value(
475 _global_config_section, 'max_jobs_started_per_cycle', type=int))
showard3bb499f2008-07-03 19:42:20 +0000476 clean_interval = (
477 global_config.global_config.get_config_value(
478 _global_config_section, 'clean_interval_minutes', type=int))
showard98863972008-10-29 21:14:56 +0000479 synch_job_start_timeout_minutes = (
480 global_config.global_config.get_config_value(
481 _global_config_section, 'synch_job_start_timeout_minutes',
482 type=int))
mbligh90a549d2008-03-25 23:52:34 +0000483
jadmanski0afbb632008-06-06 21:10:57 +0000484 def __init__(self):
485 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000486 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000487 self._host_scheduler = HostScheduler()
mbligh36768f02008-02-22 18:28:33 +0000488
mbligh36768f02008-02-22 18:28:33 +0000489
jadmanski0afbb632008-06-06 21:10:57 +0000490 def do_initial_recovery(self, recover_hosts=True):
491 # always recover processes
492 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000493
jadmanski0afbb632008-06-06 21:10:57 +0000494 if recover_hosts:
495 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000496
497
jadmanski0afbb632008-06-06 21:10:57 +0000498 def tick(self):
499 Dispatcher.autoserv_procs_cache = None
showarda3ab0d52008-11-03 19:03:47 +0000500 self._run_cleanup_maybe()
jadmanski0afbb632008-06-06 21:10:57 +0000501 self._find_aborting()
502 self._schedule_new_jobs()
503 self._handle_agents()
jadmanski0afbb632008-06-06 21:10:57 +0000504 email_manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000505
showard97aed502008-11-04 02:01:24 +0000506
showarda3ab0d52008-11-03 19:03:47 +0000507 def _run_cleanup_maybe(self):
508 if self._last_clean_time + self.clean_interval * 60 < time.time():
509 print 'Running cleanup'
510 self._abort_timed_out_jobs()
511 self._abort_jobs_past_synch_start_timeout()
512 self._clear_inactive_blocks()
showardfa8629c2008-11-04 16:51:23 +0000513 self._check_for_db_inconsistencies()
showarda3ab0d52008-11-03 19:03:47 +0000514 self._last_clean_time = time.time()
515
mbligh36768f02008-02-22 18:28:33 +0000516
jadmanski0afbb632008-06-06 21:10:57 +0000517 def add_agent(self, agent):
518 self._agents.append(agent)
519 agent.dispatcher = self
mblighd5c95802008-03-05 00:33:46 +0000520
jadmanski0afbb632008-06-06 21:10:57 +0000521 # Find agent corresponding to the specified queue_entry
522 def get_agents(self, queue_entry):
523 res_agents = []
524 for agent in self._agents:
525 if queue_entry.id in agent.queue_entry_ids:
526 res_agents.append(agent)
527 return res_agents
mbligh36768f02008-02-22 18:28:33 +0000528
529
jadmanski0afbb632008-06-06 21:10:57 +0000530 def remove_agent(self, agent):
531 self._agents.remove(agent)
showardec113162008-05-08 00:52:49 +0000532
533
showard4c5374f2008-09-04 17:02:56 +0000534 def num_running_processes(self):
535 return sum(agent.num_processes for agent in self._agents
536 if agent.is_running())
mblighbb421852008-03-11 22:36:16 +0000537
538
jadmanski0afbb632008-06-06 21:10:57 +0000539 @classmethod
540 def find_autoservs(cls, orphans_only=False):
541 """\
542 Returns a dict mapping pids to command lines for root autoserv
543 processes. If orphans_only=True, return only processes that
544 have been orphaned (i.e. parent pid = 1).
545 """
546 if cls.autoserv_procs_cache is not None:
547 return cls.autoserv_procs_cache
548
549 proc = subprocess.Popen(
550 ['/bin/ps', 'x', '-o', 'pid,pgid,ppid,comm,args'],
551 stdout=subprocess.PIPE)
552 # split each line into the four columns output by ps
553 procs = [line.split(None, 4) for line in
554 proc.communicate()[0].splitlines()]
555 autoserv_procs = {}
556 for proc in procs:
557 # check ppid == 1 for orphans
558 if orphans_only and proc[2] != 1:
559 continue
560 # only root autoserv processes have pgid == pid
561 if (proc[3] == 'autoserv' and # comm
562 proc[1] == proc[0]): # pgid == pid
563 # map pid to args
564 autoserv_procs[int(proc[0])] = proc[4]
565 cls.autoserv_procs_cache = autoserv_procs
566 return autoserv_procs
mblighbb421852008-03-11 22:36:16 +0000567
568
showard2bab8f42008-11-12 18:15:22 +0000569 def _recover_queue_entries(self, queue_entries, run_monitor):
570 assert len(queue_entries) > 0
571 queue_entry_ids = [entry.id for entry in queue_entries]
572 queue_task = RecoveryQueueTask(job=queue_entries[0].job,
573 queue_entries=queue_entries,
574 run_monitor=run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +0000575 self.add_agent(Agent(tasks=[queue_task],
showard2bab8f42008-11-12 18:15:22 +0000576 queue_entry_ids=queue_entry_ids))
mblighbb421852008-03-11 22:36:16 +0000577
578
jadmanski0afbb632008-06-06 21:10:57 +0000579 def _recover_processes(self):
580 orphans = self.find_autoservs(orphans_only=True)
mblighbb421852008-03-11 22:36:16 +0000581
jadmanski0afbb632008-06-06 21:10:57 +0000582 # first, recover running queue entries
583 rows = _db.execute("""SELECT * FROM host_queue_entries
584 WHERE status = 'Running'""")
585 queue_entries = [HostQueueEntry(row=i) for i in rows]
586 requeue_entries = []
587 recovered_entry_ids = set()
588 for queue_entry in queue_entries:
showard2bab8f42008-11-12 18:15:22 +0000589 run_monitor = PidfileRunMonitor(queue_entry.results_dir())
showard21baa452008-10-21 00:08:39 +0000590 if not run_monitor.has_pid():
jadmanski0afbb632008-06-06 21:10:57 +0000591 # autoserv apparently never got run, so requeue
592 requeue_entries.append(queue_entry)
593 continue
594 if queue_entry.id in recovered_entry_ids:
595 # synchronous job we've already recovered
596 continue
showard2bab8f42008-11-12 18:15:22 +0000597 job_tag = queue_entry.job.get_job_tag([queue_entry])
showard21baa452008-10-21 00:08:39 +0000598 pid = run_monitor.get_pid()
showard2bab8f42008-11-12 18:15:22 +0000599 print 'Recovering %s (pid %d)' % (queue_entry.id, pid)
showarde788ea62008-11-17 21:02:47 +0000600 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showard2bab8f42008-11-12 18:15:22 +0000601 recovered_entry_ids.union(entry.id for entry in queue_entries)
602 self._recover_queue_entries(queue_entries, run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +0000603 orphans.pop(pid, None)
mblighd5c95802008-03-05 00:33:46 +0000604
jadmanski0afbb632008-06-06 21:10:57 +0000605 # and requeue other active queue entries
606 rows = _db.execute("""SELECT * FROM host_queue_entries
607 WHERE active AND NOT complete
608 AND status != 'Running'
609 AND status != 'Pending'
610 AND status != 'Abort'
611 AND status != 'Aborting'""")
612 queue_entries = [HostQueueEntry(row=i) for i in rows]
613 for queue_entry in queue_entries + requeue_entries:
614 print 'Requeuing running QE %d' % queue_entry.id
615 queue_entry.clear_results_dir(dont_delete_files=True)
616 queue_entry.requeue()
mbligh90a549d2008-03-25 23:52:34 +0000617
618
jadmanski0afbb632008-06-06 21:10:57 +0000619 # now kill any remaining autoserv processes
620 for pid in orphans.keys():
621 print 'Killing orphan %d (%s)' % (pid, orphans[pid])
622 kill_autoserv(pid)
623
624 # recover aborting tasks
625 rebooting_host_ids = set()
626 rows = _db.execute("""SELECT * FROM host_queue_entries
627 WHERE status='Abort' or status='Aborting'""")
628 queue_entries = [HostQueueEntry(row=i) for i in rows]
629 for queue_entry in queue_entries:
630 print 'Recovering aborting QE %d' % queue_entry.id
showard1be97432008-10-17 15:30:45 +0000631 agent = queue_entry.abort()
632 self.add_agent(agent)
633 if queue_entry.get_host():
634 rebooting_host_ids.add(queue_entry.get_host().id)
jadmanski0afbb632008-06-06 21:10:57 +0000635
showard97aed502008-11-04 02:01:24 +0000636 self._recover_parsing_entries()
637
showard45ae8192008-11-05 19:32:53 +0000638 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000639 self._reverify_hosts_where("""(status = 'Repairing' OR
640 status = 'Verifying' OR
showard45ae8192008-11-05 19:32:53 +0000641 status = 'Cleaning')""",
jadmanski0afbb632008-06-06 21:10:57 +0000642 exclude_ids=rebooting_host_ids)
643
644 # finally, recover "Running" hosts with no active queue entries,
645 # although this should never happen
646 message = ('Recovering running host %s - this probably '
647 'indicates a scheduler bug')
648 self._reverify_hosts_where("""status = 'Running' AND
649 id NOT IN (SELECT host_id
650 FROM host_queue_entries
651 WHERE active)""",
652 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000653
654
jadmanski0afbb632008-06-06 21:10:57 +0000655 def _reverify_hosts_where(self, where,
656 print_message='Reverifying host %s',
657 exclude_ids=set()):
658 rows = _db.execute('SELECT * FROM hosts WHERE locked = 0 AND '
659 'invalid = 0 AND ' + where)
660 hosts = [Host(row=i) for i in rows]
661 for host in hosts:
662 if host.id in exclude_ids:
663 continue
664 if print_message is not None:
665 print print_message % host.hostname
666 verify_task = VerifyTask(host = host)
667 self.add_agent(Agent(tasks = [verify_task]))
mbligh36768f02008-02-22 18:28:33 +0000668
669
showard97aed502008-11-04 02:01:24 +0000670 def _recover_parsing_entries(self):
671 # make sure there are no old parsers running
672 os.system('killall parse')
673
showard2bab8f42008-11-12 18:15:22 +0000674 recovered_entry_ids = set()
showard97aed502008-11-04 02:01:24 +0000675 for entry in HostQueueEntry.fetch(where='status = "Parsing"'):
showard2bab8f42008-11-12 18:15:22 +0000676 if entry.id in recovered_entry_ids:
677 continue
678 queue_entries = entry.job.get_group_entries(entry)
679 recovered_entry_ids.union(entry.id for entry in queue_entries)
showard97aed502008-11-04 02:01:24 +0000680
681 reparse_task = FinalReparseTask(queue_entries)
682 self.add_agent(Agent([reparse_task]))
683
684
jadmanski0afbb632008-06-06 21:10:57 +0000685 def _recover_hosts(self):
686 # recover "Repair Failed" hosts
687 message = 'Reverifying dead host %s'
688 self._reverify_hosts_where("status = 'Repair Failed'",
689 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000690
691
showard3bb499f2008-07-03 19:42:20 +0000692 def _abort_timed_out_jobs(self):
693 """
694 Aborts all jobs that have timed out and not completed
695 """
showarda3ab0d52008-11-03 19:03:47 +0000696 query = models.Job.objects.filter(hostqueueentry__complete=False).extra(
697 where=['created_on + INTERVAL timeout HOUR < NOW()'])
698 for job in query.distinct():
699 print 'Aborting job %d due to job timeout' % job.id
700 job.abort(None)
showard3bb499f2008-07-03 19:42:20 +0000701
702
showard98863972008-10-29 21:14:56 +0000703 def _abort_jobs_past_synch_start_timeout(self):
704 """
705 Abort synchronous jobs that are past the start timeout (from global
706 config) and are holding a machine that's in everyone.
707 """
708 timeout_delta = datetime.timedelta(
709 minutes=self.synch_job_start_timeout_minutes)
710 timeout_start = datetime.datetime.now() - timeout_delta
711 query = models.Job.objects.filter(
showard98863972008-10-29 21:14:56 +0000712 created_on__lt=timeout_start,
713 hostqueueentry__status='Pending',
714 hostqueueentry__host__acl_group__name='Everyone')
715 for job in query.distinct():
716 print 'Aborting job %d due to start timeout' % job.id
showardff059d72008-12-03 18:18:53 +0000717 entries_to_abort = job.hostqueueentry_set.exclude(
718 status=models.HostQueueEntry.Status.RUNNING)
719 for queue_entry in entries_to_abort:
720 queue_entry.abort(None)
showard98863972008-10-29 21:14:56 +0000721
722
jadmanski0afbb632008-06-06 21:10:57 +0000723 def _clear_inactive_blocks(self):
724 """
725 Clear out blocks for all completed jobs.
726 """
727 # this would be simpler using NOT IN (subquery), but MySQL
728 # treats all IN subqueries as dependent, so this optimizes much
729 # better
730 _db.execute("""
731 DELETE ihq FROM ineligible_host_queues ihq
showard4eaaf522008-06-06 22:28:07 +0000732 LEFT JOIN (SELECT DISTINCT job_id FROM host_queue_entries
jadmanski0afbb632008-06-06 21:10:57 +0000733 WHERE NOT complete) hqe
734 USING (job_id) WHERE hqe.job_id IS NULL""")
showard04c82c52008-05-29 19:38:12 +0000735
736
showardb95b1bd2008-08-15 18:11:04 +0000737 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000738 # prioritize by job priority, then non-metahost over metahost, then FIFO
739 return list(HostQueueEntry.fetch(
showardac9ce222008-12-03 18:19:44 +0000740 where='NOT complete AND NOT active AND status="Queued"',
showard3dd6b882008-10-27 19:21:39 +0000741 order_by='priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000742
743
jadmanski0afbb632008-06-06 21:10:57 +0000744 def _schedule_new_jobs(self):
745 print "finding work"
746
showard63a34772008-08-18 19:32:50 +0000747 queue_entries = self._get_pending_queue_entries()
748 if not queue_entries:
showardb95b1bd2008-08-15 18:11:04 +0000749 return
showardb95b1bd2008-08-15 18:11:04 +0000750
showard63a34772008-08-18 19:32:50 +0000751 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000752
showard63a34772008-08-18 19:32:50 +0000753 for queue_entry in queue_entries:
754 assigned_host = self._host_scheduler.find_eligible_host(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000755 if not assigned_host:
jadmanski0afbb632008-06-06 21:10:57 +0000756 continue
showardb95b1bd2008-08-15 18:11:04 +0000757 self._run_queue_entry(queue_entry, assigned_host)
758
759
760 def _run_queue_entry(self, queue_entry, host):
761 agent = queue_entry.run(assigned_host=host)
showard9976ce92008-10-15 20:28:13 +0000762 # in some cases (synchronous jobs with run_verify=False), agent may be None
763 if agent:
764 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000765
766
jadmanski0afbb632008-06-06 21:10:57 +0000767 def _find_aborting(self):
768 num_aborted = 0
769 # Find jobs that are aborting
770 for entry in queue_entries_to_abort():
771 agents_to_abort = self.get_agents(entry)
showard1be97432008-10-17 15:30:45 +0000772 for agent in agents_to_abort:
773 self.remove_agent(agent)
774
775 agent = entry.abort(agents_to_abort)
776 self.add_agent(agent)
jadmanski0afbb632008-06-06 21:10:57 +0000777 num_aborted += 1
778 if num_aborted >= 50:
779 break
780
781
showard4c5374f2008-09-04 17:02:56 +0000782 def _can_start_agent(self, agent, num_running_processes,
783 num_started_this_cycle, have_reached_limit):
784 # always allow zero-process agents to run
785 if agent.num_processes == 0:
786 return True
787 # don't allow any nonzero-process agents to run after we've reached a
788 # limit (this avoids starvation of many-process agents)
789 if have_reached_limit:
790 return False
791 # total process throttling
792 if (num_running_processes + agent.num_processes >
793 self.max_running_processes):
794 return False
795 # if a single agent exceeds the per-cycle throttling, still allow it to
796 # run when it's the first agent in the cycle
797 if num_started_this_cycle == 0:
798 return True
799 # per-cycle throttling
800 if (num_started_this_cycle + agent.num_processes >
801 self.max_processes_started_per_cycle):
802 return False
803 return True
804
805
jadmanski0afbb632008-06-06 21:10:57 +0000806 def _handle_agents(self):
showard4c5374f2008-09-04 17:02:56 +0000807 num_running_processes = self.num_running_processes()
jadmanski0afbb632008-06-06 21:10:57 +0000808 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000809 have_reached_limit = False
810 # iterate over copy, so we can remove agents during iteration
811 for agent in list(self._agents):
812 if agent.is_done():
jadmanski0afbb632008-06-06 21:10:57 +0000813 print "agent finished"
showard4c5374f2008-09-04 17:02:56 +0000814 self._agents.remove(agent)
showard4c5374f2008-09-04 17:02:56 +0000815 continue
816 if not agent.is_running():
817 if not self._can_start_agent(agent, num_running_processes,
818 num_started_this_cycle,
819 have_reached_limit):
820 have_reached_limit = True
821 continue
822 num_running_processes += agent.num_processes
823 num_started_this_cycle += agent.num_processes
824 agent.tick()
825 print num_running_processes, 'running processes'
mbligh36768f02008-02-22 18:28:33 +0000826
827
showardfa8629c2008-11-04 16:51:23 +0000828 def _check_for_db_inconsistencies(self):
829 query = models.HostQueueEntry.objects.filter(active=True, complete=True)
830 if query.count() != 0:
831 subject = ('%d queue entries found with active=complete=1'
832 % query.count())
833 message = '\n'.join(str(entry.get_object_dict())
834 for entry in query[:50])
835 if len(query) > 50:
836 message += '\n(truncated)\n'
837
838 print subject
839 email_manager.enqueue_notify_email(subject, message)
840
841
mbligh36768f02008-02-22 18:28:33 +0000842class RunMonitor(object):
jadmanski0afbb632008-06-06 21:10:57 +0000843 def __init__(self, cmd, nice_level = None, log_file = None):
844 self.nice_level = nice_level
845 self.log_file = log_file
846 self.cmd = cmd
showard2bab8f42008-11-12 18:15:22 +0000847 self.proc = None
mbligh36768f02008-02-22 18:28:33 +0000848
jadmanski0afbb632008-06-06 21:10:57 +0000849 def run(self):
850 if self.nice_level:
851 nice_cmd = ['nice','-n', str(self.nice_level)]
852 nice_cmd.extend(self.cmd)
853 self.cmd = nice_cmd
mbligh36768f02008-02-22 18:28:33 +0000854
jadmanski0afbb632008-06-06 21:10:57 +0000855 out_file = None
856 if self.log_file:
857 try:
858 os.makedirs(os.path.dirname(self.log_file))
859 except OSError, exc:
860 if exc.errno != errno.EEXIST:
861 log_stacktrace(
862 'Unexpected error creating logfile '
863 'directory for %s' % self.log_file)
864 try:
865 out_file = open(self.log_file, 'a')
866 out_file.write("\n%s\n" % ('*'*80))
867 out_file.write("%s> %s\n" %
868 (time.strftime("%X %x"),
869 self.cmd))
870 out_file.write("%s\n" % ('*'*80))
871 except (OSError, IOError):
872 log_stacktrace('Error opening log file %s' %
873 self.log_file)
mblighcadb3532008-04-15 17:46:26 +0000874
jadmanski0afbb632008-06-06 21:10:57 +0000875 if not out_file:
876 out_file = open('/dev/null', 'w')
mblighcadb3532008-04-15 17:46:26 +0000877
jadmanski0afbb632008-06-06 21:10:57 +0000878 in_devnull = open('/dev/null', 'r')
879 print "cmd = %s" % self.cmd
880 print "path = %s" % os.getcwd()
mbligh36768f02008-02-22 18:28:33 +0000881
jadmanski0afbb632008-06-06 21:10:57 +0000882 self.proc = subprocess.Popen(self.cmd, stdout=out_file,
883 stderr=subprocess.STDOUT,
884 stdin=in_devnull)
885 out_file.close()
886 in_devnull.close()
mbligh36768f02008-02-22 18:28:33 +0000887
888
showard2bab8f42008-11-12 18:15:22 +0000889 def has_pid(self):
890 return self.proc is not None
891
892
jadmanski0afbb632008-06-06 21:10:57 +0000893 def get_pid(self):
894 return self.proc.pid
mblighbb421852008-03-11 22:36:16 +0000895
896
jadmanski0afbb632008-06-06 21:10:57 +0000897 def kill(self):
showard2bab8f42008-11-12 18:15:22 +0000898 if self.has_pid():
899 kill_autoserv(self.get_pid(), self.exit_code)
mblighbb421852008-03-11 22:36:16 +0000900
mbligh36768f02008-02-22 18:28:33 +0000901
jadmanski0afbb632008-06-06 21:10:57 +0000902 def exit_code(self):
903 return self.proc.poll()
mbligh36768f02008-02-22 18:28:33 +0000904
905
mblighbb421852008-03-11 22:36:16 +0000906class PidfileException(Exception):
jadmanski0afbb632008-06-06 21:10:57 +0000907 """\
908 Raised when there's some unexpected behavior with the pid file.
909 """
mblighbb421852008-03-11 22:36:16 +0000910
911
912class PidfileRunMonitor(RunMonitor):
showard21baa452008-10-21 00:08:39 +0000913 class PidfileState(object):
914 pid = None
915 exit_status = None
916 num_tests_failed = None
917
918 def reset(self):
919 self.pid = self.exit_status = self.all_tests_passed = None
920
921
jadmanski0afbb632008-06-06 21:10:57 +0000922 def __init__(self, results_dir, cmd=None, nice_level=None,
923 log_file=None):
924 self.results_dir = os.path.abspath(results_dir)
925 self.pid_file = os.path.join(results_dir, AUTOSERV_PID_FILE)
926 self.lost_process = False
927 self.start_time = time.time()
showard21baa452008-10-21 00:08:39 +0000928 self._state = self.PidfileState()
showardb376bc52008-06-13 20:48:45 +0000929 super(PidfileRunMonitor, self).__init__(cmd, nice_level, log_file)
mblighbb421852008-03-11 22:36:16 +0000930
931
showard21baa452008-10-21 00:08:39 +0000932 def has_pid(self):
933 self._get_pidfile_info()
934 return self._state.pid is not None
935
936
jadmanski0afbb632008-06-06 21:10:57 +0000937 def get_pid(self):
showard21baa452008-10-21 00:08:39 +0000938 self._get_pidfile_info()
939 assert self._state.pid is not None
940 return self._state.pid
mblighbb421852008-03-11 22:36:16 +0000941
942
jadmanski0afbb632008-06-06 21:10:57 +0000943 def _check_command_line(self, command_line, spacer=' ',
944 print_error=False):
945 results_dir_arg = spacer.join(('', '-r', self.results_dir, ''))
946 match = results_dir_arg in command_line
947 if print_error and not match:
948 print '%s not found in %s' % (repr(results_dir_arg),
949 repr(command_line))
950 return match
mbligh90a549d2008-03-25 23:52:34 +0000951
952
showard21baa452008-10-21 00:08:39 +0000953 def _check_proc_fs(self):
954 cmdline_path = os.path.join('/proc', str(self._state.pid), 'cmdline')
jadmanski0afbb632008-06-06 21:10:57 +0000955 try:
956 cmdline_file = open(cmdline_path, 'r')
957 cmdline = cmdline_file.read().strip()
958 cmdline_file.close()
959 except IOError:
960 return False
961 # /proc/.../cmdline has \x00 separating args
962 return self._check_command_line(cmdline, spacer='\x00',
963 print_error=True)
mblighbb421852008-03-11 22:36:16 +0000964
965
showard21baa452008-10-21 00:08:39 +0000966 def _read_pidfile(self):
967 self._state.reset()
jadmanski0afbb632008-06-06 21:10:57 +0000968 if not os.path.exists(self.pid_file):
showard21baa452008-10-21 00:08:39 +0000969 return
jadmanski0afbb632008-06-06 21:10:57 +0000970 file_obj = open(self.pid_file, 'r')
971 lines = file_obj.readlines()
972 file_obj.close()
showard3dd6b882008-10-27 19:21:39 +0000973 if not lines:
974 return
975 if len(lines) > 3:
showard21baa452008-10-21 00:08:39 +0000976 raise PidfileException('Corrupt pid file (%d lines) at %s:\n%s' %
977 (len(lines), self.pid_file, lines))
jadmanski0afbb632008-06-06 21:10:57 +0000978 try:
showard21baa452008-10-21 00:08:39 +0000979 self._state.pid = int(lines[0])
980 if len(lines) > 1:
981 self._state.exit_status = int(lines[1])
982 if len(lines) == 3:
983 self._state.num_tests_failed = int(lines[2])
984 else:
985 # maintain backwards-compatibility with two-line pidfiles
986 self._state.num_tests_failed = 0
jadmanski0afbb632008-06-06 21:10:57 +0000987 except ValueError, exc:
showard3dd6b882008-10-27 19:21:39 +0000988 raise PidfileException('Corrupt pid file: ' + str(exc.args))
mblighbb421852008-03-11 22:36:16 +0000989
mblighbb421852008-03-11 22:36:16 +0000990
jadmanski0afbb632008-06-06 21:10:57 +0000991 def _find_autoserv_proc(self):
992 autoserv_procs = Dispatcher.find_autoservs()
993 for pid, args in autoserv_procs.iteritems():
994 if self._check_command_line(args):
995 return pid, args
996 return None, None
mbligh90a549d2008-03-25 23:52:34 +0000997
998
showard21baa452008-10-21 00:08:39 +0000999 def _handle_pidfile_error(self, error, message=''):
1000 message = error + '\nPid: %s\nPidfile: %s\n%s' % (self._state.pid,
1001 self.pid_file,
1002 message)
1003 print message
1004 email_manager.enqueue_notify_email(error, message)
1005 if self._state.pid is not None:
1006 pid = self._state.pid
1007 else:
1008 pid = 0
1009 self.on_lost_process(pid)
1010
1011
1012 def _get_pidfile_info_helper(self):
jadmanski0afbb632008-06-06 21:10:57 +00001013 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001014 return
mblighbb421852008-03-11 22:36:16 +00001015
showard21baa452008-10-21 00:08:39 +00001016 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001017
showard21baa452008-10-21 00:08:39 +00001018 if self._state.pid is None:
1019 self._handle_no_pid()
1020 return
mbligh90a549d2008-03-25 23:52:34 +00001021
showard21baa452008-10-21 00:08:39 +00001022 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001023 # double check whether or not autoserv is running
showard21baa452008-10-21 00:08:39 +00001024 proc_running = self._check_proc_fs()
jadmanski0afbb632008-06-06 21:10:57 +00001025 if proc_running:
showard21baa452008-10-21 00:08:39 +00001026 return
mbligh90a549d2008-03-25 23:52:34 +00001027
jadmanski0afbb632008-06-06 21:10:57 +00001028 # pid but no process - maybe process *just* exited
showard21baa452008-10-21 00:08:39 +00001029 self._read_pidfile()
1030 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001031 # autoserv exited without writing an exit code
1032 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001033 self._handle_pidfile_error(
1034 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001035
showard21baa452008-10-21 00:08:39 +00001036
1037 def _get_pidfile_info(self):
1038 """\
1039 After completion, self._state will contain:
1040 pid=None, exit_status=None if autoserv has not yet run
1041 pid!=None, exit_status=None if autoserv is running
1042 pid!=None, exit_status!=None if autoserv has completed
1043 """
1044 try:
1045 self._get_pidfile_info_helper()
1046 except PidfileException, exc:
1047 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001048
1049
jadmanski0afbb632008-06-06 21:10:57 +00001050 def _handle_no_pid(self):
1051 """\
1052 Called when no pidfile is found or no pid is in the pidfile.
1053 """
1054 # is autoserv running?
1055 pid, args = self._find_autoserv_proc()
1056 if pid is None:
1057 # no autoserv process running
1058 message = 'No pid found at ' + self.pid_file
1059 else:
1060 message = ("Process %d (%s) hasn't written pidfile %s" %
1061 (pid, args, self.pid_file))
mbligh90a549d2008-03-25 23:52:34 +00001062
jadmanski0afbb632008-06-06 21:10:57 +00001063 print message
1064 if time.time() - self.start_time > PIDFILE_TIMEOUT:
1065 email_manager.enqueue_notify_email(
1066 'Process has failed to write pidfile', message)
1067 if pid is not None:
1068 kill_autoserv(pid)
1069 else:
1070 pid = 0
1071 self.on_lost_process(pid)
showard21baa452008-10-21 00:08:39 +00001072 return
mbligh90a549d2008-03-25 23:52:34 +00001073
1074
jadmanski0afbb632008-06-06 21:10:57 +00001075 def on_lost_process(self, pid):
1076 """\
1077 Called when autoserv has exited without writing an exit status,
1078 or we've timed out waiting for autoserv to write a pid to the
1079 pidfile. In either case, we just return failure and the caller
1080 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001081
jadmanski0afbb632008-06-06 21:10:57 +00001082 pid is unimportant here, as it shouldn't be used by anyone.
1083 """
1084 self.lost_process = True
showard21baa452008-10-21 00:08:39 +00001085 self._state.pid = pid
1086 self._state.exit_status = 1
1087 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001088
1089
jadmanski0afbb632008-06-06 21:10:57 +00001090 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001091 self._get_pidfile_info()
1092 return self._state.exit_status
1093
1094
1095 def num_tests_failed(self):
1096 self._get_pidfile_info()
1097 assert self._state.num_tests_failed is not None
1098 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001099
1100
mbligh36768f02008-02-22 18:28:33 +00001101class Agent(object):
showard4c5374f2008-09-04 17:02:56 +00001102 def __init__(self, tasks, queue_entry_ids=[], num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001103 self.active_task = None
1104 self.queue = Queue.Queue(0)
1105 self.dispatcher = None
1106 self.queue_entry_ids = queue_entry_ids
showard4c5374f2008-09-04 17:02:56 +00001107 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001108
1109 for task in tasks:
1110 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001111
1112
jadmanski0afbb632008-06-06 21:10:57 +00001113 def add_task(self, task):
1114 self.queue.put_nowait(task)
1115 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001116
1117
jadmanski0afbb632008-06-06 21:10:57 +00001118 def tick(self):
showard21baa452008-10-21 00:08:39 +00001119 while not self.is_done():
1120 if self.active_task and not self.active_task.is_done():
1121 self.active_task.poll()
1122 if not self.active_task.is_done():
1123 return
1124 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001125
1126
jadmanski0afbb632008-06-06 21:10:57 +00001127 def _next_task(self):
1128 print "agent picking task"
1129 if self.active_task:
1130 assert self.active_task.is_done()
mbligh36768f02008-02-22 18:28:33 +00001131
jadmanski0afbb632008-06-06 21:10:57 +00001132 if not self.active_task.success:
1133 self.on_task_failure()
mblighe2586682008-02-29 22:45:46 +00001134
jadmanski0afbb632008-06-06 21:10:57 +00001135 self.active_task = None
1136 if not self.is_done():
1137 self.active_task = self.queue.get_nowait()
1138 if self.active_task:
1139 self.active_task.start()
mbligh36768f02008-02-22 18:28:33 +00001140
1141
jadmanski0afbb632008-06-06 21:10:57 +00001142 def on_task_failure(self):
1143 self.queue = Queue.Queue(0)
1144 for task in self.active_task.failure_tasks:
1145 self.add_task(task)
mbligh16c722d2008-03-05 00:58:44 +00001146
mblighe2586682008-02-29 22:45:46 +00001147
showard4c5374f2008-09-04 17:02:56 +00001148 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001149 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001150
1151
jadmanski0afbb632008-06-06 21:10:57 +00001152 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001153 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001154
1155
jadmanski0afbb632008-06-06 21:10:57 +00001156 def start(self):
1157 assert self.dispatcher
mbligh36768f02008-02-22 18:28:33 +00001158
jadmanski0afbb632008-06-06 21:10:57 +00001159 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001160
jadmanski0afbb632008-06-06 21:10:57 +00001161
mbligh36768f02008-02-22 18:28:33 +00001162class AgentTask(object):
jadmanski0afbb632008-06-06 21:10:57 +00001163 def __init__(self, cmd, failure_tasks = []):
1164 self.done = False
1165 self.failure_tasks = failure_tasks
1166 self.started = False
1167 self.cmd = cmd
1168 self.task = None
1169 self.agent = None
1170 self.monitor = None
1171 self.success = None
mbligh36768f02008-02-22 18:28:33 +00001172
1173
jadmanski0afbb632008-06-06 21:10:57 +00001174 def poll(self):
1175 print "poll"
1176 if self.monitor:
1177 self.tick(self.monitor.exit_code())
1178 else:
1179 self.finished(False)
mbligh36768f02008-02-22 18:28:33 +00001180
1181
jadmanski0afbb632008-06-06 21:10:57 +00001182 def tick(self, exit_code):
1183 if exit_code==None:
1184 return
1185# print "exit_code was %d" % exit_code
1186 if exit_code == 0:
1187 success = True
1188 else:
1189 success = False
mbligh36768f02008-02-22 18:28:33 +00001190
jadmanski0afbb632008-06-06 21:10:57 +00001191 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001192
1193
jadmanski0afbb632008-06-06 21:10:57 +00001194 def is_done(self):
1195 return self.done
mbligh36768f02008-02-22 18:28:33 +00001196
1197
jadmanski0afbb632008-06-06 21:10:57 +00001198 def finished(self, success):
1199 self.done = True
1200 self.success = success
1201 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001202
1203
jadmanski0afbb632008-06-06 21:10:57 +00001204 def prolog(self):
1205 pass
mblighd64e5702008-04-04 21:39:28 +00001206
1207
jadmanski0afbb632008-06-06 21:10:57 +00001208 def create_temp_resultsdir(self, suffix=''):
1209 self.temp_results_dir = tempfile.mkdtemp(suffix=suffix)
mblighd64e5702008-04-04 21:39:28 +00001210
mbligh36768f02008-02-22 18:28:33 +00001211
jadmanski0afbb632008-06-06 21:10:57 +00001212 def cleanup(self):
1213 if (hasattr(self, 'temp_results_dir') and
1214 os.path.exists(self.temp_results_dir)):
1215 shutil.rmtree(self.temp_results_dir)
mbligh36768f02008-02-22 18:28:33 +00001216
1217
jadmanski0afbb632008-06-06 21:10:57 +00001218 def epilog(self):
1219 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001220
1221
jadmanski0afbb632008-06-06 21:10:57 +00001222 def start(self):
1223 assert self.agent
1224
1225 if not self.started:
1226 self.prolog()
1227 self.run()
1228
1229 self.started = True
1230
1231
1232 def abort(self):
1233 if self.monitor:
1234 self.monitor.kill()
1235 self.done = True
1236 self.cleanup()
1237
1238
1239 def run(self):
1240 if self.cmd:
1241 print "agent starting monitor"
1242 log_file = None
showard97aed502008-11-04 02:01:24 +00001243 if hasattr(self, 'log_file'):
1244 log_file = self.log_file
1245 elif hasattr(self, 'host'):
jadmanski0afbb632008-06-06 21:10:57 +00001246 log_file = os.path.join(RESULTS_DIR, 'hosts',
1247 self.host.hostname)
1248 self.monitor = RunMonitor(
showard97aed502008-11-04 02:01:24 +00001249 self.cmd, nice_level=AUTOSERV_NICE_LEVEL, log_file=log_file)
jadmanski0afbb632008-06-06 21:10:57 +00001250 self.monitor.run()
mbligh36768f02008-02-22 18:28:33 +00001251
1252
1253class RepairTask(AgentTask):
showarde788ea62008-11-17 21:02:47 +00001254 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001255 """\
1256 fail_queue_entry: queue entry to mark failed if this repair
1257 fails.
1258 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001259 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001260 # normalize the protection name
1261 protection = host_protections.Protection.get_attr_name(protection)
jadmanski0afbb632008-06-06 21:10:57 +00001262 self.create_temp_resultsdir('.repair')
1263 cmd = [_autoserv_path , '-R', '-m', host.hostname,
jadmanskifb7cfb12008-07-09 14:13:21 +00001264 '-r', self.temp_results_dir, '--host-protection', protection]
jadmanski0afbb632008-06-06 21:10:57 +00001265 self.host = host
showarde788ea62008-11-17 21:02:47 +00001266 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001267 super(RepairTask, self).__init__(cmd)
mblighe2586682008-02-29 22:45:46 +00001268
mbligh36768f02008-02-22 18:28:33 +00001269
jadmanski0afbb632008-06-06 21:10:57 +00001270 def prolog(self):
1271 print "repair_task starting"
1272 self.host.set_status('Repairing')
showarde788ea62008-11-17 21:02:47 +00001273 if self.queue_entry:
1274 self.queue_entry.requeue()
mbligh36768f02008-02-22 18:28:33 +00001275
1276
jadmanski0afbb632008-06-06 21:10:57 +00001277 def epilog(self):
1278 super(RepairTask, self).epilog()
1279 if self.success:
1280 self.host.set_status('Ready')
1281 else:
1282 self.host.set_status('Repair Failed')
showarde788ea62008-11-17 21:02:47 +00001283 if self.queue_entry and not self.queue_entry.meta_host:
1284 self.queue_entry.handle_host_failure()
mbligh36768f02008-02-22 18:28:33 +00001285
1286
showard8fe93b52008-11-18 17:53:22 +00001287class PreJobTask(AgentTask):
1288 def prolog(self):
1289 super(PreJobTask, self).prolog()
1290 if self.queue_entry:
1291 # clear any possibly existing results, could be a previously failed
1292 # verify or a previous execution that crashed
1293 self.queue_entry.clear_results_dir()
1294
1295
1296 def cleanup(self):
1297 if not os.path.exists(self.temp_results_dir):
1298 return
1299 should_copy_results = (self.queue_entry and not self.success
1300 and not self.queue_entry.meta_host)
1301 if should_copy_results:
1302 self.queue_entry.set_execution_subdir()
1303 self._move_results()
1304 super(PreJobTask, self).cleanup()
1305
1306
1307 def _move_results(self):
1308 assert self.queue_entry is not None
1309 target_dir = self.queue_entry.results_dir()
1310 ensure_directory_exists(target_dir)
1311 files = os.listdir(self.temp_results_dir)
1312 for filename in files:
1313 if filename == AUTOSERV_PID_FILE:
1314 continue
1315 self._force_move(os.path.join(self.temp_results_dir, filename),
1316 os.path.join(target_dir, filename))
1317
1318
1319 @staticmethod
1320 def _force_move(source, dest):
1321 """\
1322 Replacement for shutil.move() that will delete the destination
1323 if it exists, even if it's a directory.
1324 """
1325 if os.path.exists(dest):
1326 warning = 'Warning: removing existing destination file ' + dest
1327 print warning
1328 email_manager.enqueue_notify_email(warning, warning)
1329 remove_file_or_dir(dest)
1330 shutil.move(source, dest)
1331
1332
1333class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001334 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001335 assert bool(queue_entry) != bool(host)
mbligh36768f02008-02-22 18:28:33 +00001336
jadmanski0afbb632008-06-06 21:10:57 +00001337 self.host = host or queue_entry.host
1338 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001339
jadmanski0afbb632008-06-06 21:10:57 +00001340 self.create_temp_resultsdir('.verify')
showard3d9899a2008-07-31 02:11:58 +00001341
showard2bab8f42008-11-12 18:15:22 +00001342 cmd = [_autoserv_path, '-v', '-m', self.host.hostname, '-r',
1343 self.temp_results_dir]
mbligh36768f02008-02-22 18:28:33 +00001344
showarde788ea62008-11-17 21:02:47 +00001345 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
mblighe2586682008-02-29 22:45:46 +00001346
showard2bab8f42008-11-12 18:15:22 +00001347 super(VerifyTask, self).__init__(cmd, failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001348
1349
jadmanski0afbb632008-06-06 21:10:57 +00001350 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001351 super(VerifyTask, self).prolog()
jadmanski0afbb632008-06-06 21:10:57 +00001352 print "starting verify on %s" % (self.host.hostname)
1353 if self.queue_entry:
1354 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001355 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001356
1357
jadmanski0afbb632008-06-06 21:10:57 +00001358 def epilog(self):
1359 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001360
jadmanski0afbb632008-06-06 21:10:57 +00001361 if self.success:
1362 self.host.set_status('Ready')
showard2bab8f42008-11-12 18:15:22 +00001363 if self.queue_entry:
1364 agent = self.queue_entry.on_pending()
1365 if agent:
1366 self.agent.dispatcher.add_agent(agent)
mbligh36768f02008-02-22 18:28:33 +00001367
1368
mbligh36768f02008-02-22 18:28:33 +00001369class QueueTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001370 def __init__(self, job, queue_entries, cmd):
1371 super(QueueTask, self).__init__(cmd)
1372 self.job = job
1373 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001374
1375
jadmanski0afbb632008-06-06 21:10:57 +00001376 @staticmethod
showardd8e548a2008-09-09 03:04:57 +00001377 def _write_keyval(keyval_dir, field, value, keyval_filename='keyval'):
1378 key_path = os.path.join(keyval_dir, keyval_filename)
jadmanski0afbb632008-06-06 21:10:57 +00001379 keyval_file = open(key_path, 'a')
showardd8e548a2008-09-09 03:04:57 +00001380 print >> keyval_file, '%s=%s' % (field, str(value))
jadmanski0afbb632008-06-06 21:10:57 +00001381 keyval_file.close()
mbligh36768f02008-02-22 18:28:33 +00001382
1383
showardd8e548a2008-09-09 03:04:57 +00001384 def _host_keyval_dir(self):
1385 return os.path.join(self.results_dir(), 'host_keyvals')
1386
1387
1388 def _write_host_keyval(self, host):
1389 labels = ','.join(host.labels())
1390 self._write_keyval(self._host_keyval_dir(), 'labels', labels,
1391 keyval_filename=host.hostname)
1392
1393 def _create_host_keyval_dir(self):
1394 directory = self._host_keyval_dir()
showard2bab8f42008-11-12 18:15:22 +00001395 ensure_directory_exists(directory)
showardd8e548a2008-09-09 03:04:57 +00001396
1397
jadmanski0afbb632008-06-06 21:10:57 +00001398 def results_dir(self):
1399 return self.queue_entries[0].results_dir()
mblighbb421852008-03-11 22:36:16 +00001400
1401
jadmanski0afbb632008-06-06 21:10:57 +00001402 def run(self):
1403 """\
1404 Override AgentTask.run() so we can use a PidfileRunMonitor.
1405 """
1406 self.monitor = PidfileRunMonitor(self.results_dir(),
1407 cmd=self.cmd,
1408 nice_level=AUTOSERV_NICE_LEVEL)
1409 self.monitor.run()
mblighbb421852008-03-11 22:36:16 +00001410
1411
jadmanski0afbb632008-06-06 21:10:57 +00001412 def prolog(self):
1413 # write some job timestamps into the job keyval file
1414 queued = time.mktime(self.job.created_on.timetuple())
1415 started = time.time()
showardd8e548a2008-09-09 03:04:57 +00001416 self._write_keyval(self.results_dir(), "job_queued", int(queued))
1417 self._write_keyval(self.results_dir(), "job_started", int(started))
1418 self._create_host_keyval_dir()
jadmanski0afbb632008-06-06 21:10:57 +00001419 for queue_entry in self.queue_entries:
showardd8e548a2008-09-09 03:04:57 +00001420 self._write_host_keyval(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001421 queue_entry.set_status('Running')
1422 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001423 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001424 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001425 assert len(self.queue_entries) == 1
1426 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001427
1428
showard97aed502008-11-04 02:01:24 +00001429 def _finish_task(self, success):
jadmanski0afbb632008-06-06 21:10:57 +00001430 # write out the finished time into the results keyval
1431 finished = time.time()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001432 self._write_keyval(self.results_dir(), "job_finished", int(finished))
jadmanskic2ac77f2008-05-16 21:44:04 +00001433
jadmanski0afbb632008-06-06 21:10:57 +00001434 # parse the results of the job
showard97aed502008-11-04 02:01:24 +00001435 reparse_task = FinalReparseTask(self.queue_entries)
1436 self.agent.dispatcher.add_agent(Agent([reparse_task]))
jadmanskif7fa2cc2008-10-01 14:13:23 +00001437
1438
showardcbd74612008-11-19 21:42:02 +00001439 def _write_status_comment(self, comment):
1440 status_log = open(os.path.join(self.results_dir(), 'status.log'), 'a')
1441 status_log.write('INFO\t----\t----\t' + comment)
1442 status_log.close()
1443
1444
jadmanskif7fa2cc2008-10-01 14:13:23 +00001445 def _log_abort(self):
1446 # build up sets of all the aborted_by and aborted_on values
1447 aborted_by, aborted_on = set(), set()
1448 for queue_entry in self.queue_entries:
1449 if queue_entry.aborted_by:
1450 aborted_by.add(queue_entry.aborted_by)
1451 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1452 aborted_on.add(t)
1453
1454 # extract some actual, unique aborted by value and write it out
1455 assert len(aborted_by) <= 1
1456 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001457 aborted_by_value = aborted_by.pop()
1458 aborted_on_value = max(aborted_on)
1459 else:
1460 aborted_by_value = 'autotest_system'
1461 aborted_on_value = int(time.time())
1462 results_dir = self.results_dir()
1463 self._write_keyval(results_dir, "aborted_by", aborted_by_value)
1464 self._write_keyval(results_dir, "aborted_on", aborted_on_value)
1465 aborted_on_string = str(datetime.datetime.fromtimestamp(
1466 aborted_on_value))
1467 self._write_status_comment('Job aborted by %s on %s' %
1468 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001469
1470
jadmanski0afbb632008-06-06 21:10:57 +00001471 def abort(self):
1472 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001473 self._log_abort()
showard97aed502008-11-04 02:01:24 +00001474 self._finish_task(False)
jadmanskic2ac77f2008-05-16 21:44:04 +00001475
1476
showard21baa452008-10-21 00:08:39 +00001477 def _reboot_hosts(self):
1478 reboot_after = self.job.reboot_after
1479 do_reboot = False
showard0fc38302008-10-23 00:44:07 +00001480 if reboot_after == models.RebootAfter.ALWAYS:
showard21baa452008-10-21 00:08:39 +00001481 do_reboot = True
showard0fc38302008-10-23 00:44:07 +00001482 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
showard21baa452008-10-21 00:08:39 +00001483 num_tests_failed = self.monitor.num_tests_failed()
1484 do_reboot = (self.success and num_tests_failed == 0)
1485
showard8ebca792008-11-04 21:54:22 +00001486 for queue_entry in self.queue_entries:
1487 if do_reboot:
showard45ae8192008-11-05 19:32:53 +00001488 # don't pass the queue entry to the CleanupTask. if the cleanup
showardfa8629c2008-11-04 16:51:23 +00001489 # fails, the job doesn't care -- it's over.
showard45ae8192008-11-05 19:32:53 +00001490 cleanup_task = CleanupTask(host=queue_entry.get_host())
1491 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
showard8ebca792008-11-04 21:54:22 +00001492 else:
1493 queue_entry.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001494
1495
jadmanski0afbb632008-06-06 21:10:57 +00001496 def epilog(self):
1497 super(QueueTask, self).epilog()
jadmanski0afbb632008-06-06 21:10:57 +00001498 for queue_entry in self.queue_entries:
showard97aed502008-11-04 02:01:24 +00001499 # set status to PARSING here so queue entry is marked complete
1500 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
mbligh36768f02008-02-22 18:28:33 +00001501
showard97aed502008-11-04 02:01:24 +00001502 self._finish_task(self.success)
showard21baa452008-10-21 00:08:39 +00001503 self._reboot_hosts()
mblighbb421852008-03-11 22:36:16 +00001504
showard97aed502008-11-04 02:01:24 +00001505 print "queue_task finished with succes=%s" % self.success
mbligh36768f02008-02-22 18:28:33 +00001506
1507
mblighbb421852008-03-11 22:36:16 +00001508class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001509 def __init__(self, job, queue_entries, run_monitor):
1510 super(RecoveryQueueTask, self).__init__(job,
1511 queue_entries, cmd=None)
1512 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001513
1514
jadmanski0afbb632008-06-06 21:10:57 +00001515 def run(self):
1516 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001517
1518
jadmanski0afbb632008-06-06 21:10:57 +00001519 def prolog(self):
1520 # recovering an existing process - don't do prolog
1521 pass
mblighbb421852008-03-11 22:36:16 +00001522
1523
showard8fe93b52008-11-18 17:53:22 +00001524class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001525 def __init__(self, host=None, queue_entry=None):
1526 assert bool(host) ^ bool(queue_entry)
1527 if queue_entry:
1528 host = queue_entry.get_host()
jadmanski0afbb632008-06-06 21:10:57 +00001529
showard45ae8192008-11-05 19:32:53 +00001530 self.create_temp_resultsdir('.cleanup')
1531 self.cmd = [_autoserv_path, '--cleanup', '-m', host.hostname,
1532 '-r', self.temp_results_dir]
showardfa8629c2008-11-04 16:51:23 +00001533 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001534 self.host = host
showarde788ea62008-11-17 21:02:47 +00001535 repair_task = RepairTask(host, queue_entry=queue_entry)
showard45ae8192008-11-05 19:32:53 +00001536 super(CleanupTask, self).__init__(self.cmd, failure_tasks=[repair_task])
mbligh16c722d2008-03-05 00:58:44 +00001537
mblighd5c95802008-03-05 00:33:46 +00001538
jadmanski0afbb632008-06-06 21:10:57 +00001539 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001540 super(CleanupTask, self).prolog()
showard45ae8192008-11-05 19:32:53 +00001541 print "starting cleanup task for host: %s" % self.host.hostname
1542 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001543
mblighd5c95802008-03-05 00:33:46 +00001544
showard21baa452008-10-21 00:08:39 +00001545 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001546 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001547 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001548 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001549 self.host.update_field('dirty', 0)
1550
1551
mblighd5c95802008-03-05 00:33:46 +00001552class AbortTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001553 def __init__(self, queue_entry, agents_to_abort):
1554 self.queue_entry = queue_entry
1555 self.agents_to_abort = agents_to_abort
jadmanski0afbb632008-06-06 21:10:57 +00001556 super(AbortTask, self).__init__('')
mbligh36768f02008-02-22 18:28:33 +00001557
1558
jadmanski0afbb632008-06-06 21:10:57 +00001559 def prolog(self):
1560 print "starting abort on host %s, job %s" % (
1561 self.queue_entry.host_id, self.queue_entry.job_id)
mbligh36768f02008-02-22 18:28:33 +00001562
mblighd64e5702008-04-04 21:39:28 +00001563
jadmanski0afbb632008-06-06 21:10:57 +00001564 def epilog(self):
1565 super(AbortTask, self).epilog()
1566 self.queue_entry.set_status('Aborted')
1567 self.success = True
1568
1569
1570 def run(self):
1571 for agent in self.agents_to_abort:
1572 if (agent.active_task):
1573 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +00001574
1575
showard97aed502008-11-04 02:01:24 +00001576class FinalReparseTask(AgentTask):
1577 MAX_PARSE_PROCESSES = (
1578 global_config.global_config.get_config_value(
1579 _global_config_section, 'max_parse_processes', type=int))
1580 _num_running_parses = 0
1581
1582 def __init__(self, queue_entries):
1583 self._queue_entries = queue_entries
1584 self._parse_started = False
1585
1586 assert len(queue_entries) > 0
1587 queue_entry = queue_entries[0]
showard97aed502008-11-04 02:01:24 +00001588
1589 if _testing_mode:
1590 self.cmd = 'true'
1591 return
1592
1593 self._results_dir = queue_entry.results_dir()
1594 self.log_file = os.path.abspath(os.path.join(self._results_dir,
1595 '.parse.log'))
1596 super(FinalReparseTask, self).__init__(
showard2bab8f42008-11-12 18:15:22 +00001597 cmd=self._generate_parse_command())
showard97aed502008-11-04 02:01:24 +00001598
1599
1600 @classmethod
1601 def _increment_running_parses(cls):
1602 cls._num_running_parses += 1
1603
1604
1605 @classmethod
1606 def _decrement_running_parses(cls):
1607 cls._num_running_parses -= 1
1608
1609
1610 @classmethod
1611 def _can_run_new_parse(cls):
1612 return cls._num_running_parses < cls.MAX_PARSE_PROCESSES
1613
1614
1615 def prolog(self):
1616 super(FinalReparseTask, self).prolog()
1617 for queue_entry in self._queue_entries:
1618 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
1619
1620
1621 def epilog(self):
1622 super(FinalReparseTask, self).epilog()
1623 final_status = self._determine_final_status()
1624 for queue_entry in self._queue_entries:
1625 queue_entry.set_status(final_status)
1626
1627
1628 def _determine_final_status(self):
1629 # use a PidfileRunMonitor to read the autoserv exit status
1630 monitor = PidfileRunMonitor(self._results_dir)
1631 if monitor.exit_code() == 0:
1632 return models.HostQueueEntry.Status.COMPLETED
1633 return models.HostQueueEntry.Status.FAILED
1634
1635
showard2bab8f42008-11-12 18:15:22 +00001636 def _generate_parse_command(self):
showard97aed502008-11-04 02:01:24 +00001637 parse = os.path.abspath(os.path.join(AUTOTEST_TKO_DIR, 'parse'))
showard2bab8f42008-11-12 18:15:22 +00001638 return [parse, '-l', '2', '-r', '-o', self._results_dir]
showard97aed502008-11-04 02:01:24 +00001639
1640
1641 def poll(self):
1642 # override poll to keep trying to start until the parse count goes down
1643 # and we can, at which point we revert to default behavior
1644 if self._parse_started:
1645 super(FinalReparseTask, self).poll()
1646 else:
1647 self._try_starting_parse()
1648
1649
1650 def run(self):
1651 # override run() to not actually run unless we can
1652 self._try_starting_parse()
1653
1654
1655 def _try_starting_parse(self):
1656 if not self._can_run_new_parse():
1657 return
1658 # actually run the parse command
1659 super(FinalReparseTask, self).run()
1660 self._increment_running_parses()
1661 self._parse_started = True
1662
1663
1664 def finished(self, success):
1665 super(FinalReparseTask, self).finished(success)
1666 self._decrement_running_parses()
1667
1668
mbligh36768f02008-02-22 18:28:33 +00001669class DBObject(object):
jadmanski0afbb632008-06-06 21:10:57 +00001670 def __init__(self, id=None, row=None, new_record=False):
1671 assert (bool(id) != bool(row))
mbligh36768f02008-02-22 18:28:33 +00001672
jadmanski0afbb632008-06-06 21:10:57 +00001673 self.__table = self._get_table()
mbligh36768f02008-02-22 18:28:33 +00001674
jadmanski0afbb632008-06-06 21:10:57 +00001675 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00001676
jadmanski0afbb632008-06-06 21:10:57 +00001677 if row is None:
1678 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
1679 rows = _db.execute(sql, (id,))
1680 if len(rows) == 0:
1681 raise "row not found (table=%s, id=%s)" % \
1682 (self.__table, id)
1683 row = rows[0]
mbligh36768f02008-02-22 18:28:33 +00001684
showard2bab8f42008-11-12 18:15:22 +00001685 self._update_fields_from_row(row)
1686
1687
1688 def _update_fields_from_row(self, row):
jadmanski0afbb632008-06-06 21:10:57 +00001689 assert len(row) == self.num_cols(), (
1690 "table = %s, row = %s/%d, fields = %s/%d" % (
showard2bab8f42008-11-12 18:15:22 +00001691 self.__table, row, len(row), self._fields(), self.num_cols()))
mbligh36768f02008-02-22 18:28:33 +00001692
showard2bab8f42008-11-12 18:15:22 +00001693 self._valid_fields = set()
1694 for field, value in zip(self._fields(), row):
1695 setattr(self, field, value)
1696 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00001697
showard2bab8f42008-11-12 18:15:22 +00001698 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00001699
mblighe2586682008-02-29 22:45:46 +00001700
jadmanski0afbb632008-06-06 21:10:57 +00001701 @classmethod
1702 def _get_table(cls):
1703 raise NotImplementedError('Subclasses must override this')
mblighe2586682008-02-29 22:45:46 +00001704
1705
jadmanski0afbb632008-06-06 21:10:57 +00001706 @classmethod
1707 def _fields(cls):
1708 raise NotImplementedError('Subclasses must override this')
showard04c82c52008-05-29 19:38:12 +00001709
1710
jadmanski0afbb632008-06-06 21:10:57 +00001711 @classmethod
1712 def num_cols(cls):
1713 return len(cls._fields())
showard04c82c52008-05-29 19:38:12 +00001714
1715
jadmanski0afbb632008-06-06 21:10:57 +00001716 def count(self, where, table = None):
1717 if not table:
1718 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00001719
jadmanski0afbb632008-06-06 21:10:57 +00001720 rows = _db.execute("""
1721 SELECT count(*) FROM %s
1722 WHERE %s
1723 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00001724
jadmanski0afbb632008-06-06 21:10:57 +00001725 assert len(rows) == 1
1726
1727 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001728
1729
mblighf8c624d2008-07-03 16:58:45 +00001730 def update_field(self, field, value, condition=''):
showard2bab8f42008-11-12 18:15:22 +00001731 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00001732
showard2bab8f42008-11-12 18:15:22 +00001733 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00001734 return
mbligh36768f02008-02-22 18:28:33 +00001735
mblighf8c624d2008-07-03 16:58:45 +00001736 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
1737 if condition:
1738 query += ' AND (%s)' % condition
jadmanski0afbb632008-06-06 21:10:57 +00001739 _db.execute(query, (value, self.id))
1740
showard2bab8f42008-11-12 18:15:22 +00001741 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00001742
1743
jadmanski0afbb632008-06-06 21:10:57 +00001744 def save(self):
1745 if self.__new_record:
1746 keys = self._fields()[1:] # avoid id
1747 columns = ','.join([str(key) for key in keys])
1748 values = ['"%s"' % self.__dict__[key] for key in keys]
1749 values = ','.join(values)
1750 query = """INSERT INTO %s (%s) VALUES (%s)""" % \
1751 (self.__table, columns, values)
1752 _db.execute(query)
mbligh36768f02008-02-22 18:28:33 +00001753
1754
jadmanski0afbb632008-06-06 21:10:57 +00001755 def delete(self):
1756 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
1757 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00001758
1759
showard63a34772008-08-18 19:32:50 +00001760 @staticmethod
1761 def _prefix_with(string, prefix):
1762 if string:
1763 string = prefix + string
1764 return string
1765
1766
jadmanski0afbb632008-06-06 21:10:57 +00001767 @classmethod
showard989f25d2008-10-01 11:38:11 +00001768 def fetch(cls, where='', params=(), joins='', order_by=''):
showard63a34772008-08-18 19:32:50 +00001769 order_by = cls._prefix_with(order_by, 'ORDER BY ')
1770 where = cls._prefix_with(where, 'WHERE ')
1771 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
1772 '%(where)s %(order_by)s' % {'table' : cls._get_table(),
1773 'joins' : joins,
1774 'where' : where,
1775 'order_by' : order_by})
1776 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00001777 for row in rows:
1778 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00001779
mbligh36768f02008-02-22 18:28:33 +00001780
1781class IneligibleHostQueue(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001782 def __init__(self, id=None, row=None, new_record=None):
1783 super(IneligibleHostQueue, self).__init__(id=id, row=row,
1784 new_record=new_record)
mblighe2586682008-02-29 22:45:46 +00001785
1786
jadmanski0afbb632008-06-06 21:10:57 +00001787 @classmethod
1788 def _get_table(cls):
1789 return 'ineligible_host_queues'
mbligh36768f02008-02-22 18:28:33 +00001790
1791
jadmanski0afbb632008-06-06 21:10:57 +00001792 @classmethod
1793 def _fields(cls):
1794 return ['id', 'job_id', 'host_id']
showard04c82c52008-05-29 19:38:12 +00001795
1796
showard989f25d2008-10-01 11:38:11 +00001797class Label(DBObject):
1798 @classmethod
1799 def _get_table(cls):
1800 return 'labels'
1801
1802
1803 @classmethod
1804 def _fields(cls):
1805 return ['id', 'name', 'kernel_config', 'platform', 'invalid',
1806 'only_if_needed']
1807
1808
mbligh36768f02008-02-22 18:28:33 +00001809class Host(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001810 def __init__(self, id=None, row=None):
1811 super(Host, self).__init__(id=id, row=row)
mblighe2586682008-02-29 22:45:46 +00001812
1813
jadmanski0afbb632008-06-06 21:10:57 +00001814 @classmethod
1815 def _get_table(cls):
1816 return 'hosts'
mbligh36768f02008-02-22 18:28:33 +00001817
1818
jadmanski0afbb632008-06-06 21:10:57 +00001819 @classmethod
1820 def _fields(cls):
1821 return ['id', 'hostname', 'locked', 'synch_id','status',
showard21baa452008-10-21 00:08:39 +00001822 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty']
showard04c82c52008-05-29 19:38:12 +00001823
1824
jadmanski0afbb632008-06-06 21:10:57 +00001825 def current_task(self):
1826 rows = _db.execute("""
1827 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
1828 """, (self.id,))
1829
1830 if len(rows) == 0:
1831 return None
1832 else:
1833 assert len(rows) == 1
1834 results = rows[0];
mblighf8c624d2008-07-03 16:58:45 +00001835# print "current = %s" % results
jadmanski0afbb632008-06-06 21:10:57 +00001836 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00001837
1838
jadmanski0afbb632008-06-06 21:10:57 +00001839 def yield_work(self):
1840 print "%s yielding work" % self.hostname
1841 if self.current_task():
1842 self.current_task().requeue()
1843
1844 def set_status(self,status):
1845 print '%s -> %s' % (self.hostname, status)
1846 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00001847
1848
showardd8e548a2008-09-09 03:04:57 +00001849 def labels(self):
1850 """
1851 Fetch a list of names of all non-platform labels associated with this
1852 host.
1853 """
1854 rows = _db.execute("""
1855 SELECT labels.name
1856 FROM labels
1857 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
1858 WHERE NOT labels.platform AND hosts_labels.host_id = %s
1859 ORDER BY labels.name
1860 """, (self.id,))
1861 return [row[0] for row in rows]
1862
1863
mbligh36768f02008-02-22 18:28:33 +00001864class HostQueueEntry(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001865 def __init__(self, id=None, row=None):
1866 assert id or row
1867 super(HostQueueEntry, self).__init__(id=id, row=row)
1868 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00001869
jadmanski0afbb632008-06-06 21:10:57 +00001870 if self.host_id:
1871 self.host = Host(self.host_id)
1872 else:
1873 self.host = None
mbligh36768f02008-02-22 18:28:33 +00001874
jadmanski0afbb632008-06-06 21:10:57 +00001875 self.queue_log_path = os.path.join(self.job.results_dir(),
1876 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00001877
1878
jadmanski0afbb632008-06-06 21:10:57 +00001879 @classmethod
1880 def _get_table(cls):
1881 return 'host_queue_entries'
mblighe2586682008-02-29 22:45:46 +00001882
1883
jadmanski0afbb632008-06-06 21:10:57 +00001884 @classmethod
1885 def _fields(cls):
showard2bab8f42008-11-12 18:15:22 +00001886 return ['id', 'job_id', 'host_id', 'priority', 'status', 'meta_host',
1887 'active', 'complete', 'deleted', 'execution_subdir']
showard04c82c52008-05-29 19:38:12 +00001888
1889
showardc85c21b2008-11-24 22:17:37 +00001890 def _view_job_url(self):
1891 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
1892
1893
jadmanski0afbb632008-06-06 21:10:57 +00001894 def set_host(self, host):
1895 if host:
1896 self.queue_log_record('Assigning host ' + host.hostname)
1897 self.update_field('host_id', host.id)
1898 self.update_field('active', True)
1899 self.block_host(host.id)
1900 else:
1901 self.queue_log_record('Releasing host')
1902 self.unblock_host(self.host.id)
1903 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00001904
jadmanski0afbb632008-06-06 21:10:57 +00001905 self.host = host
mbligh36768f02008-02-22 18:28:33 +00001906
1907
jadmanski0afbb632008-06-06 21:10:57 +00001908 def get_host(self):
1909 return self.host
mbligh36768f02008-02-22 18:28:33 +00001910
1911
jadmanski0afbb632008-06-06 21:10:57 +00001912 def queue_log_record(self, log_line):
1913 now = str(datetime.datetime.now())
1914 queue_log = open(self.queue_log_path, 'a', 0)
1915 queue_log.write(now + ' ' + log_line + '\n')
1916 queue_log.close()
mbligh36768f02008-02-22 18:28:33 +00001917
1918
jadmanski0afbb632008-06-06 21:10:57 +00001919 def block_host(self, host_id):
1920 print "creating block %s/%s" % (self.job.id, host_id)
1921 row = [0, self.job.id, host_id]
1922 block = IneligibleHostQueue(row=row, new_record=True)
1923 block.save()
mblighe2586682008-02-29 22:45:46 +00001924
1925
jadmanski0afbb632008-06-06 21:10:57 +00001926 def unblock_host(self, host_id):
1927 print "removing block %s/%s" % (self.job.id, host_id)
1928 blocks = IneligibleHostQueue.fetch(
1929 'job_id=%d and host_id=%d' % (self.job.id, host_id))
1930 for block in blocks:
1931 block.delete()
mblighe2586682008-02-29 22:45:46 +00001932
1933
jadmanski0afbb632008-06-06 21:10:57 +00001934 def results_dir(self):
showard2bab8f42008-11-12 18:15:22 +00001935 return os.path.join(self.job.job_dir, self.execution_subdir)
mbligh36768f02008-02-22 18:28:33 +00001936
mblighe2586682008-02-29 22:45:46 +00001937
showard2bab8f42008-11-12 18:15:22 +00001938 def set_execution_subdir(self, subdir=None):
1939 if subdir is None:
1940 assert self.get_host()
1941 subdir = self.get_host().hostname
1942 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00001943
1944
showard6355f6b2008-12-05 18:52:13 +00001945 def _get_hostname(self):
1946 if self.host:
1947 return self.host.hostname
1948 return 'no host'
1949
1950
jadmanski0afbb632008-06-06 21:10:57 +00001951 def set_status(self, status):
mblighf8c624d2008-07-03 16:58:45 +00001952 abort_statuses = ['Abort', 'Aborting', 'Aborted']
1953 if status not in abort_statuses:
1954 condition = ' AND '.join(['status <> "%s"' % x
1955 for x in abort_statuses])
1956 else:
1957 condition = ''
1958 self.update_field('status', status, condition=condition)
1959
showard6355f6b2008-12-05 18:52:13 +00001960 print "%s/%d (%d) -> %s" % (self._get_hostname(), self.job.id, self.id,
1961 self.status)
mblighf8c624d2008-07-03 16:58:45 +00001962
showardc85c21b2008-11-24 22:17:37 +00001963 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00001964 self.update_field('complete', False)
1965 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00001966
jadmanski0afbb632008-06-06 21:10:57 +00001967 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showarde58e3f82008-11-20 19:04:59 +00001968 'Aborting']:
jadmanski0afbb632008-06-06 21:10:57 +00001969 self.update_field('complete', False)
1970 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00001971
showardc85c21b2008-11-24 22:17:37 +00001972 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00001973 self.update_field('complete', True)
1974 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00001975
1976 should_email_status = (status.lower() in _notify_email_statuses or
1977 'all' in _notify_email_statuses)
1978 if should_email_status:
1979 self._email_on_status(status)
1980
1981 self._email_on_job_complete()
1982
1983
1984 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00001985 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00001986
1987 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
1988 self.job.id, self.job.name, hostname, status)
1989 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
1990 self.job.id, self.job.name, hostname, status,
1991 self._view_job_url())
1992 send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00001993
1994
1995 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00001996 if not self.job.is_finished():
1997 return
showard542e8402008-09-19 20:16:18 +00001998
showardc85c21b2008-11-24 22:17:37 +00001999 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002000 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002001 for queue_entry in hosts_queue:
2002 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002003 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002004 queue_entry.status))
2005
2006 summary_text = "\n".join(summary_text)
2007 status_counts = models.Job.objects.get_status_counts(
2008 [self.job.id])[self.job.id]
2009 status = ', '.join('%d %s' % (count, status) for status, count
2010 in status_counts.iteritems())
2011
2012 subject = 'Autotest: Job ID: %s "%s" %s' % (
2013 self.job.id, self.job.name, status)
2014 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2015 self.job.id, self.job.name, status, self._view_job_url(),
2016 summary_text)
2017 send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002018
2019
jadmanski0afbb632008-06-06 21:10:57 +00002020 def run(self,assigned_host=None):
2021 if self.meta_host:
2022 assert assigned_host
2023 # ensure results dir exists for the queue log
2024 self.job.create_results_dir()
2025 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00002026
jadmanski0afbb632008-06-06 21:10:57 +00002027 print "%s/%s scheduled on %s, status=%s" % (self.job.name,
2028 self.meta_host, self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002029
jadmanski0afbb632008-06-06 21:10:57 +00002030 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002031
jadmanski0afbb632008-06-06 21:10:57 +00002032 def requeue(self):
2033 self.set_status('Queued')
mblighe2586682008-02-29 22:45:46 +00002034
jadmanski0afbb632008-06-06 21:10:57 +00002035 if self.meta_host:
2036 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002037
2038
jadmanski0afbb632008-06-06 21:10:57 +00002039 def handle_host_failure(self):
2040 """\
2041 Called when this queue entry's host has failed verification and
2042 repair.
2043 """
2044 assert not self.meta_host
2045 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002046 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002047
2048
showard2bab8f42008-11-12 18:15:22 +00002049 def clear_results_dir(self, dont_delete_files=False):
2050 if not self.execution_subdir:
2051 return
2052 results_dir = self.results_dir()
jadmanski0afbb632008-06-06 21:10:57 +00002053 if not os.path.exists(results_dir):
2054 return
2055 if dont_delete_files:
2056 temp_dir = tempfile.mkdtemp(suffix='.clear_results')
showard2bab8f42008-11-12 18:15:22 +00002057 print 'Moving results from %s to %s' % (results_dir, temp_dir)
jadmanski0afbb632008-06-06 21:10:57 +00002058 for filename in os.listdir(results_dir):
2059 path = os.path.join(results_dir, filename)
2060 if dont_delete_files:
showard2bab8f42008-11-12 18:15:22 +00002061 shutil.move(path, os.path.join(temp_dir, filename))
jadmanski0afbb632008-06-06 21:10:57 +00002062 else:
2063 remove_file_or_dir(path)
showard2bab8f42008-11-12 18:15:22 +00002064 remove_file_or_dir(results_dir)
mbligh36768f02008-02-22 18:28:33 +00002065
2066
jadmanskif7fa2cc2008-10-01 14:13:23 +00002067 @property
2068 def aborted_by(self):
2069 self._load_abort_info()
2070 return self._aborted_by
2071
2072
2073 @property
2074 def aborted_on(self):
2075 self._load_abort_info()
2076 return self._aborted_on
2077
2078
2079 def _load_abort_info(self):
2080 """ Fetch info about who aborted the job. """
2081 if hasattr(self, "_aborted_by"):
2082 return
2083 rows = _db.execute("""
2084 SELECT users.login, aborted_host_queue_entries.aborted_on
2085 FROM aborted_host_queue_entries
2086 INNER JOIN users
2087 ON users.id = aborted_host_queue_entries.aborted_by_id
2088 WHERE aborted_host_queue_entries.queue_entry_id = %s
2089 """, (self.id,))
2090 if rows:
2091 self._aborted_by, self._aborted_on = rows[0]
2092 else:
2093 self._aborted_by = self._aborted_on = None
2094
2095
showardb2e2c322008-10-14 17:33:55 +00002096 def on_pending(self):
2097 """
2098 Called when an entry in a synchronous job has passed verify. If the
2099 job is ready to run, returns an agent to run the job. Returns None
2100 otherwise.
2101 """
2102 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002103 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00002104 if self.job.is_ready():
2105 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00002106 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00002107 return None
2108
2109
showard1be97432008-10-17 15:30:45 +00002110 def abort(self, agents_to_abort=[]):
2111 abort_task = AbortTask(self, agents_to_abort)
2112 tasks = [abort_task]
2113
2114 host = self.get_host()
showard9d9ffd52008-11-09 23:14:35 +00002115 if self.active and host:
showard45ae8192008-11-05 19:32:53 +00002116 cleanup_task = CleanupTask(host=host)
showard1be97432008-10-17 15:30:45 +00002117 verify_task = VerifyTask(host=host)
2118 # just to make sure this host does not get taken away
showard45ae8192008-11-05 19:32:53 +00002119 host.set_status('Cleaning')
2120 tasks += [cleanup_task, verify_task]
showard1be97432008-10-17 15:30:45 +00002121
2122 self.set_status('Aborting')
2123 return Agent(tasks=tasks, queue_entry_ids=[self.id])
2124
2125
mbligh36768f02008-02-22 18:28:33 +00002126class Job(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00002127 def __init__(self, id=None, row=None):
2128 assert id or row
2129 super(Job, self).__init__(id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00002130
jadmanski0afbb632008-06-06 21:10:57 +00002131 self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id,
2132 self.owner))
mblighe2586682008-02-29 22:45:46 +00002133
2134
jadmanski0afbb632008-06-06 21:10:57 +00002135 @classmethod
2136 def _get_table(cls):
2137 return 'jobs'
mbligh36768f02008-02-22 18:28:33 +00002138
2139
jadmanski0afbb632008-06-06 21:10:57 +00002140 @classmethod
2141 def _fields(cls):
2142 return ['id', 'owner', 'name', 'priority', 'control_file',
showard2bab8f42008-11-12 18:15:22 +00002143 'control_type', 'created_on', 'synch_count', 'timeout',
showard21baa452008-10-21 00:08:39 +00002144 'run_verify', 'email_list', 'reboot_before', 'reboot_after']
showard04c82c52008-05-29 19:38:12 +00002145
2146
jadmanski0afbb632008-06-06 21:10:57 +00002147 def is_server_job(self):
2148 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002149
2150
jadmanski0afbb632008-06-06 21:10:57 +00002151 def get_host_queue_entries(self):
2152 rows = _db.execute("""
2153 SELECT * FROM host_queue_entries
2154 WHERE job_id= %s
2155 """, (self.id,))
2156 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002157
jadmanski0afbb632008-06-06 21:10:57 +00002158 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002159
jadmanski0afbb632008-06-06 21:10:57 +00002160 return entries
mbligh36768f02008-02-22 18:28:33 +00002161
2162
jadmanski0afbb632008-06-06 21:10:57 +00002163 def set_status(self, status, update_queues=False):
2164 self.update_field('status',status)
2165
2166 if update_queues:
2167 for queue_entry in self.get_host_queue_entries():
2168 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002169
2170
jadmanski0afbb632008-06-06 21:10:57 +00002171 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00002172 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
2173 status='Pending')
2174 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00002175
2176
jadmanski0afbb632008-06-06 21:10:57 +00002177 def results_dir(self):
2178 return self.job_dir
mbligh36768f02008-02-22 18:28:33 +00002179
jadmanski0afbb632008-06-06 21:10:57 +00002180 def num_machines(self, clause = None):
2181 sql = "job_id=%s" % self.id
2182 if clause:
2183 sql += " AND (%s)" % clause
2184 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002185
2186
jadmanski0afbb632008-06-06 21:10:57 +00002187 def num_queued(self):
2188 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002189
2190
jadmanski0afbb632008-06-06 21:10:57 +00002191 def num_active(self):
2192 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002193
2194
jadmanski0afbb632008-06-06 21:10:57 +00002195 def num_complete(self):
2196 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002197
2198
jadmanski0afbb632008-06-06 21:10:57 +00002199 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002200 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002201
mbligh36768f02008-02-22 18:28:33 +00002202
showard2bab8f42008-11-12 18:15:22 +00002203 def _stop_all_entries(self, entries_to_abort):
2204 """
2205 queue_entries: sequence of models.HostQueueEntry objects
2206 """
2207 for child_entry in entries_to_abort:
2208 assert not child_entry.complete
2209 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2210 child_entry.host.status = models.Host.Status.READY
2211 child_entry.host.save()
2212 child_entry.status = models.HostQueueEntry.Status.STOPPED
2213 child_entry.save()
2214
2215
2216 def stop_if_necessary(self):
2217 not_yet_run = models.HostQueueEntry.objects.filter(
2218 job=self.id, status__in=(models.HostQueueEntry.Status.QUEUED,
2219 models.HostQueueEntry.Status.VERIFYING,
2220 models.HostQueueEntry.Status.PENDING))
2221 if not_yet_run.count() < self.synch_count:
2222 self._stop_all_entries(not_yet_run)
mblighe2586682008-02-29 22:45:46 +00002223
2224
jadmanski0afbb632008-06-06 21:10:57 +00002225 def write_to_machines_file(self, queue_entry):
2226 hostname = queue_entry.get_host().hostname
2227 print "writing %s to job %s machines file" % (hostname, self.id)
2228 file_path = os.path.join(self.job_dir, '.machines')
2229 mf = open(file_path, 'a')
showard2bab8f42008-11-12 18:15:22 +00002230 mf.write(hostname + '\n')
jadmanski0afbb632008-06-06 21:10:57 +00002231 mf.close()
mbligh36768f02008-02-22 18:28:33 +00002232
2233
jadmanski0afbb632008-06-06 21:10:57 +00002234 def create_results_dir(self, queue_entry=None):
showard2bab8f42008-11-12 18:15:22 +00002235 ensure_directory_exists(self.job_dir)
mbligh36768f02008-02-22 18:28:33 +00002236
jadmanski0afbb632008-06-06 21:10:57 +00002237 if queue_entry:
showarde05654d2008-10-28 20:38:40 +00002238 results_dir = queue_entry.results_dir()
showarde788ea62008-11-17 21:02:47 +00002239 if os.path.exists(results_dir):
2240 warning = 'QE results dir ' + results_dir + ' already exists'
2241 print warning
2242 email_manager.enqueue_notify_email(warning, warning)
showard2bab8f42008-11-12 18:15:22 +00002243 ensure_directory_exists(results_dir)
showarde05654d2008-10-28 20:38:40 +00002244 return results_dir
jadmanski0afbb632008-06-06 21:10:57 +00002245 return self.job_dir
mbligh36768f02008-02-22 18:28:33 +00002246
2247
showard2bab8f42008-11-12 18:15:22 +00002248 def _next_group_name(self):
2249 query = models.HostQueueEntry.objects.filter(
2250 job=self.id).values('execution_subdir').distinct()
2251 subdirs = (entry['execution_subdir'] for entry in query)
2252 groups = (re.match(r'group(\d+)', subdir) for subdir in subdirs)
2253 ids = [int(match.group(1)) for match in groups if match]
2254 if ids:
2255 next_id = max(ids) + 1
2256 else:
2257 next_id = 0
2258 return "group%d" % next_id
2259
2260
showardb2e2c322008-10-14 17:33:55 +00002261 def _write_control_file(self):
2262 'Writes control file out to disk, returns a filename'
2263 control_fd, control_filename = tempfile.mkstemp(suffix='.control_file')
2264 control_file = os.fdopen(control_fd, 'w')
jadmanski0afbb632008-06-06 21:10:57 +00002265 if self.control_file:
showardb2e2c322008-10-14 17:33:55 +00002266 control_file.write(self.control_file)
2267 control_file.close()
2268 return control_filename
mbligh36768f02008-02-22 18:28:33 +00002269
showardb2e2c322008-10-14 17:33:55 +00002270
showard2bab8f42008-11-12 18:15:22 +00002271 def get_group_entries(self, queue_entry_from_group):
2272 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002273 return list(HostQueueEntry.fetch(
2274 where='job_id=%s AND execution_subdir=%s',
2275 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002276
2277
2278 def get_job_tag(self, queue_entries):
2279 assert len(queue_entries) > 0
2280 execution_subdir = queue_entries[0].execution_subdir
2281 assert execution_subdir
2282 return "%s-%s/%s" % (self.id, self.owner, execution_subdir)
showardb2e2c322008-10-14 17:33:55 +00002283
2284
2285 def _get_autoserv_params(self, queue_entries):
2286 results_dir = self.create_results_dir(queue_entries[0])
2287 control_filename = self._write_control_file()
jadmanski0afbb632008-06-06 21:10:57 +00002288 hostnames = ','.join([entry.get_host().hostname
2289 for entry in queue_entries])
showard2bab8f42008-11-12 18:15:22 +00002290 job_tag = self.get_job_tag(queue_entries)
mbligh36768f02008-02-22 18:28:33 +00002291
showardb2e2c322008-10-14 17:33:55 +00002292 params = [_autoserv_path, '-P', job_tag, '-p', '-n',
showard21baa452008-10-21 00:08:39 +00002293 '-r', os.path.abspath(results_dir), '-u', self.owner,
2294 '-l', self.name, '-m', hostnames, control_filename]
mbligh36768f02008-02-22 18:28:33 +00002295
jadmanski0afbb632008-06-06 21:10:57 +00002296 if not self.is_server_job():
2297 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002298
showardb2e2c322008-10-14 17:33:55 +00002299 return params
mblighe2586682008-02-29 22:45:46 +00002300
mbligh36768f02008-02-22 18:28:33 +00002301
showard2bab8f42008-11-12 18:15:22 +00002302 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002303 do_reboot = False
showard0fc38302008-10-23 00:44:07 +00002304 if self.reboot_before == models.RebootBefore.ALWAYS:
showard21baa452008-10-21 00:08:39 +00002305 do_reboot = True
showard0fc38302008-10-23 00:44:07 +00002306 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showard21baa452008-10-21 00:08:39 +00002307 do_reboot = queue_entry.get_host().dirty
2308
2309 tasks = []
2310 if do_reboot:
showard45ae8192008-11-05 19:32:53 +00002311 tasks.append(CleanupTask(queue_entry=queue_entry))
showard2bab8f42008-11-12 18:15:22 +00002312 tasks.append(VerifyTask(queue_entry=queue_entry))
showard21baa452008-10-21 00:08:39 +00002313 return tasks
2314
2315
showard2bab8f42008-11-12 18:15:22 +00002316 def _assign_new_group(self, queue_entries):
2317 if len(queue_entries) == 1:
2318 group_name = queue_entries[0].get_host().hostname
2319 else:
2320 group_name = self._next_group_name()
2321 print 'Running synchronous job %d hosts %s as %s' % (
2322 self.id, [entry.host.hostname for entry in queue_entries],
2323 group_name)
2324
2325 for queue_entry in queue_entries:
2326 queue_entry.set_execution_subdir(group_name)
2327
2328
2329 def _choose_group_to_run(self, include_queue_entry):
2330 chosen_entries = [include_queue_entry]
2331
2332 num_entries_needed = self.synch_count - 1
2333 if num_entries_needed > 0:
2334 pending_entries = HostQueueEntry.fetch(
2335 where='job_id = %s AND status = "Pending" AND id != %s',
2336 params=(self.id, include_queue_entry.id))
2337 chosen_entries += list(pending_entries)[:num_entries_needed]
2338
2339 self._assign_new_group(chosen_entries)
2340 return chosen_entries
2341
2342
2343 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002344 if not self.is_ready():
showard9976ce92008-10-15 20:28:13 +00002345 if self.run_verify:
showarde58e3f82008-11-20 19:04:59 +00002346 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
showard2bab8f42008-11-12 18:15:22 +00002347 return Agent(self._get_pre_job_tasks(queue_entry),
showard21baa452008-10-21 00:08:39 +00002348 [queue_entry.id])
showard9976ce92008-10-15 20:28:13 +00002349 else:
2350 return queue_entry.on_pending()
mbligh36768f02008-02-22 18:28:33 +00002351
showard2bab8f42008-11-12 18:15:22 +00002352 queue_entries = self._choose_group_to_run(queue_entry)
2353 return self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00002354
2355
2356 def _finish_run(self, queue_entries, initial_tasks=[]):
showardb2ccdda2008-10-28 20:39:05 +00002357 for queue_entry in queue_entries:
2358 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002359 params = self._get_autoserv_params(queue_entries)
2360 queue_task = QueueTask(job=self, queue_entries=queue_entries,
2361 cmd=params)
2362 tasks = initial_tasks + [queue_task]
2363 entry_ids = [entry.id for entry in queue_entries]
2364
2365 return Agent(tasks, entry_ids, num_processes=len(queue_entries))
2366
2367
mbligh36768f02008-02-22 18:28:33 +00002368if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002369 main()