blob: 26f576041a8db196fa256168148f9a163ce48afc [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard542e8402008-09-19 20:16:18 +00008import datetime, errno, MySQLdb, optparse, os, pwd, Queue, re, shutil, signal
9import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
mbligh70feeee2008-06-11 16:20:49 +000010import common
showard21baa452008-10-21 00:08:39 +000011from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000012from autotest_lib.client.common_lib import global_config
13from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000014from autotest_lib.database import database_connection
showard21baa452008-10-21 00:08:39 +000015from autotest_lib.frontend.afe import models
mbligh70feeee2008-06-11 16:20:49 +000016
mblighb090f142008-02-27 21:33:46 +000017
mbligh36768f02008-02-22 18:28:33 +000018RESULTS_DIR = '.'
19AUTOSERV_NICE_LEVEL = 10
showardb1e51872008-10-07 11:08:18 +000020CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000021
22AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
23
24if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000025 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000026AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
27AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
28
29if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000030 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000031
mblighbb421852008-03-11 22:36:16 +000032AUTOSERV_PID_FILE = '.autoserv_execute'
mbligh90a549d2008-03-25 23:52:34 +000033# how long to wait for autoserv to write a pidfile
34PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000035
mbligh6f8bab42008-02-29 22:45:14 +000036_db = None
mbligh36768f02008-02-22 18:28:33 +000037_shutdown = False
38_notify_email = None
mbligh4314a712008-02-29 22:44:30 +000039_autoserv_path = 'autoserv'
40_testing_mode = False
showardec113162008-05-08 00:52:49 +000041_global_config_section = 'SCHEDULER'
showard542e8402008-09-19 20:16:18 +000042_base_url = None
43# see os.getlogin() online docs
44_email_from = pwd.getpwuid(os.getuid())[0]
mbligh36768f02008-02-22 18:28:33 +000045
46
47def main():
jadmanski0afbb632008-06-06 21:10:57 +000048 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000049
jadmanski0afbb632008-06-06 21:10:57 +000050 parser = optparse.OptionParser(usage)
51 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
52 action='store_true')
53 parser.add_option('--logfile', help='Set a log file that all stdout ' +
54 'should be redirected to. Stderr will go to this ' +
55 'file + ".err"')
56 parser.add_option('--test', help='Indicate that scheduler is under ' +
57 'test and should use dummy autoserv and no parsing',
58 action='store_true')
59 (options, args) = parser.parse_args()
60 if len(args) != 1:
61 parser.print_usage()
62 return
mbligh36768f02008-02-22 18:28:33 +000063
jadmanski0afbb632008-06-06 21:10:57 +000064 global RESULTS_DIR
65 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +000066
jadmanski0afbb632008-06-06 21:10:57 +000067 # read in notify_email from global_config
68 c = global_config.global_config
69 global _notify_email
70 val = c.get_config_value(_global_config_section, "notify_email")
71 if val != "":
72 _notify_email = val
mbligh36768f02008-02-22 18:28:33 +000073
showard3bb499f2008-07-03 19:42:20 +000074 tick_pause = c.get_config_value(
75 _global_config_section, 'tick_pause_sec', type=int)
76
jadmanski0afbb632008-06-06 21:10:57 +000077 if options.test:
78 global _autoserv_path
79 _autoserv_path = 'autoserv_dummy'
80 global _testing_mode
81 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +000082
showard542e8402008-09-19 20:16:18 +000083 # read in base url
84 global _base_url
showardb1e51872008-10-07 11:08:18 +000085 val = c.get_config_value(CONFIG_SECTION, "base_url")
showard542e8402008-09-19 20:16:18 +000086 if val:
87 _base_url = val
88 else:
89 _base_url = "http://your_autotest_server/afe/"
90
jadmanski0afbb632008-06-06 21:10:57 +000091 init(options.logfile)
92 dispatcher = Dispatcher()
93 dispatcher.do_initial_recovery(recover_hosts=options.recover_hosts)
94
95 try:
96 while not _shutdown:
97 dispatcher.tick()
showard3bb499f2008-07-03 19:42:20 +000098 time.sleep(tick_pause)
jadmanski0afbb632008-06-06 21:10:57 +000099 except:
100 log_stacktrace("Uncaught exception; terminating monitor_db")
101
102 email_manager.send_queued_emails()
103 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000104
105
106def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000107 global _shutdown
108 _shutdown = True
109 print "Shutdown request received."
mbligh36768f02008-02-22 18:28:33 +0000110
111
112def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000113 if logfile:
114 enable_logging(logfile)
115 print "%s> dispatcher starting" % time.strftime("%X %x")
116 print "My PID is %d" % os.getpid()
mbligh36768f02008-02-22 18:28:33 +0000117
showardb1e51872008-10-07 11:08:18 +0000118 if _testing_mode:
119 global_config.global_config.override_config_value(
120 CONFIG_SECTION, 'database', 'stresstest_autotest_web')
121
jadmanski0afbb632008-06-06 21:10:57 +0000122 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
123 global _db
showardb1e51872008-10-07 11:08:18 +0000124 _db = database_connection.DatabaseConnection(CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000125 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000126
jadmanski0afbb632008-06-06 21:10:57 +0000127 print "Setting signal handler"
128 signal.signal(signal.SIGINT, handle_sigint)
129
130 print "Connected! Running..."
mbligh36768f02008-02-22 18:28:33 +0000131
132
133def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000134 out_file = logfile
135 err_file = "%s.err" % logfile
136 print "Enabling logging to %s (%s)" % (out_file, err_file)
137 out_fd = open(out_file, "a", buffering=0)
138 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000139
jadmanski0afbb632008-06-06 21:10:57 +0000140 os.dup2(out_fd.fileno(), sys.stdout.fileno())
141 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000142
jadmanski0afbb632008-06-06 21:10:57 +0000143 sys.stdout = out_fd
144 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000145
146
mblighd5c95802008-03-05 00:33:46 +0000147def queue_entries_to_abort():
jadmanski0afbb632008-06-06 21:10:57 +0000148 rows = _db.execute("""
149 SELECT * FROM host_queue_entries WHERE status='Abort';
150 """)
151 qe = [HostQueueEntry(row=i) for i in rows]
152 return qe
mbligh36768f02008-02-22 18:28:33 +0000153
mblighe2586682008-02-29 22:45:46 +0000154def remove_file_or_dir(path):
jadmanski0afbb632008-06-06 21:10:57 +0000155 if stat.S_ISDIR(os.stat(path).st_mode):
156 # directory
157 shutil.rmtree(path)
158 else:
159 # file
160 os.remove(path)
mblighe2586682008-02-29 22:45:46 +0000161
162
mblighdbdac6c2008-03-05 15:49:58 +0000163def generate_parse_command(results_dir, flags=""):
jadmanski0afbb632008-06-06 21:10:57 +0000164 parse = os.path.abspath(os.path.join(AUTOTEST_TKO_DIR, 'parse'))
165 output = os.path.abspath(os.path.join(results_dir, '.parse.log'))
166 cmd = "%s %s -r -o %s > %s 2>&1 &"
167 return cmd % (parse, flags, results_dir, output)
mblighdbdac6c2008-03-05 15:49:58 +0000168
169
showard970a6db2008-09-03 20:02:39 +0000170_parse_command_queue = []
mbligh36768f02008-02-22 18:28:33 +0000171def parse_results(results_dir, flags=""):
jadmanski0afbb632008-06-06 21:10:57 +0000172 if _testing_mode:
173 return
showard970a6db2008-09-03 20:02:39 +0000174 _parse_command_queue.append(generate_parse_command(results_dir, flags))
mbligh36768f02008-02-22 18:28:33 +0000175
176
mblighbb421852008-03-11 22:36:16 +0000177
178
mbligh36768f02008-02-22 18:28:33 +0000179def log_stacktrace(reason):
jadmanski0afbb632008-06-06 21:10:57 +0000180 (type, value, tb) = sys.exc_info()
181 str = "EXCEPTION: %s\n" % reason
182 str += ''.join(traceback.format_exception(type, value, tb))
mbligh36768f02008-02-22 18:28:33 +0000183
jadmanski0afbb632008-06-06 21:10:57 +0000184 sys.stderr.write("\n%s\n" % str)
185 email_manager.enqueue_notify_email("monitor_db exception", str)
mbligh36768f02008-02-22 18:28:33 +0000186
mblighbb421852008-03-11 22:36:16 +0000187
188def get_proc_poll_fn(pid):
jadmanski0afbb632008-06-06 21:10:57 +0000189 proc_path = os.path.join('/proc', str(pid))
190 def poll_fn():
191 if os.path.exists(proc_path):
192 return None
193 return 0 # we can't get a real exit code
194 return poll_fn
mblighbb421852008-03-11 22:36:16 +0000195
196
showard542e8402008-09-19 20:16:18 +0000197def send_email(from_addr, to_string, subject, body):
198 """Mails out emails to the addresses listed in to_string.
199
200 to_string is split into a list which can be delimited by any of:
201 ';', ',', ':' or any whitespace
202 """
203
204 # Create list from string removing empty strings from the list.
205 to_list = [x for x in re.split('\s|,|;|:', to_string) if x]
showard7d182aa2008-09-22 16:17:24 +0000206 if not to_list:
207 return
208
showard542e8402008-09-19 20:16:18 +0000209 msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (
210 from_addr, ', '.join(to_list), subject, body)
showard7d182aa2008-09-22 16:17:24 +0000211 try:
212 mailer = smtplib.SMTP('localhost')
213 try:
214 mailer.sendmail(from_addr, to_list, msg)
215 finally:
216 mailer.quit()
217 except Exception, e:
218 print "Sending email failed. Reason: %s" % repr(e)
showard542e8402008-09-19 20:16:18 +0000219
220
mblighbb421852008-03-11 22:36:16 +0000221def kill_autoserv(pid, poll_fn=None):
jadmanski0afbb632008-06-06 21:10:57 +0000222 print 'killing', pid
223 if poll_fn is None:
224 poll_fn = get_proc_poll_fn(pid)
225 if poll_fn() == None:
226 os.kill(pid, signal.SIGCONT)
227 os.kill(pid, signal.SIGTERM)
mbligh36768f02008-02-22 18:28:33 +0000228
229
showard7cf9a9b2008-05-15 21:15:52 +0000230class EmailNotificationManager(object):
jadmanski0afbb632008-06-06 21:10:57 +0000231 def __init__(self):
232 self._emails = []
showard7cf9a9b2008-05-15 21:15:52 +0000233
jadmanski0afbb632008-06-06 21:10:57 +0000234 def enqueue_notify_email(self, subject, message):
235 if not _notify_email:
236 return
showard7cf9a9b2008-05-15 21:15:52 +0000237
jadmanski0afbb632008-06-06 21:10:57 +0000238 body = 'Subject: ' + subject + '\n'
239 body += "%s / %s / %s\n%s" % (socket.gethostname(),
240 os.getpid(),
241 time.strftime("%X %x"), message)
242 self._emails.append(body)
showard7cf9a9b2008-05-15 21:15:52 +0000243
244
jadmanski0afbb632008-06-06 21:10:57 +0000245 def send_queued_emails(self):
246 if not self._emails:
247 return
248 subject = 'Scheduler notifications from ' + socket.gethostname()
249 separator = '\n' + '-' * 40 + '\n'
250 body = separator.join(self._emails)
showard7cf9a9b2008-05-15 21:15:52 +0000251
showard542e8402008-09-19 20:16:18 +0000252 send_email(_email_from, _notify_email, subject, body)
jadmanski0afbb632008-06-06 21:10:57 +0000253 self._emails = []
showard7cf9a9b2008-05-15 21:15:52 +0000254
255email_manager = EmailNotificationManager()
256
257
showard63a34772008-08-18 19:32:50 +0000258class HostScheduler(object):
259 def _get_ready_hosts(self):
260 # avoid any host with a currently active queue entry against it
261 hosts = Host.fetch(
262 joins='LEFT JOIN host_queue_entries AS active_hqe '
263 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000264 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000265 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000266 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000267 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
268 return dict((host.id, host) for host in hosts)
269
270
271 @staticmethod
272 def _get_sql_id_list(id_list):
273 return ','.join(str(item_id) for item_id in id_list)
274
275
276 @classmethod
showard989f25d2008-10-01 11:38:11 +0000277 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000278 if not id_list:
279 return {}
showard63a34772008-08-18 19:32:50 +0000280 query %= cls._get_sql_id_list(id_list)
281 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000282 return cls._process_many2many_dict(rows, flip)
283
284
285 @staticmethod
286 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000287 result = {}
288 for row in rows:
289 left_id, right_id = long(row[0]), long(row[1])
showard989f25d2008-10-01 11:38:11 +0000290 if flip:
291 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000292 result.setdefault(left_id, set()).add(right_id)
293 return result
294
295
296 @classmethod
297 def _get_job_acl_groups(cls, job_ids):
298 query = """
299 SELECT jobs.id, acl_groups_users.acl_group_id
300 FROM jobs
301 INNER JOIN users ON users.login = jobs.owner
302 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
303 WHERE jobs.id IN (%s)
304 """
305 return cls._get_many2many_dict(query, job_ids)
306
307
308 @classmethod
309 def _get_job_ineligible_hosts(cls, job_ids):
310 query = """
311 SELECT job_id, host_id
312 FROM ineligible_host_queues
313 WHERE job_id IN (%s)
314 """
315 return cls._get_many2many_dict(query, job_ids)
316
317
318 @classmethod
showard989f25d2008-10-01 11:38:11 +0000319 def _get_job_dependencies(cls, job_ids):
320 query = """
321 SELECT job_id, label_id
322 FROM jobs_dependency_labels
323 WHERE job_id IN (%s)
324 """
325 return cls._get_many2many_dict(query, job_ids)
326
327
328 @classmethod
showard63a34772008-08-18 19:32:50 +0000329 def _get_host_acls(cls, host_ids):
330 query = """
331 SELECT host_id, acl_group_id
332 FROM acl_groups_hosts
333 WHERE host_id IN (%s)
334 """
335 return cls._get_many2many_dict(query, host_ids)
336
337
338 @classmethod
339 def _get_label_hosts(cls, host_ids):
340 query = """
341 SELECT label_id, host_id
342 FROM hosts_labels
343 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000344 """ % cls._get_sql_id_list(host_ids)
345 rows = _db.execute(query)
346 labels_to_hosts = cls._process_many2many_dict(rows)
347 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
348 return labels_to_hosts, hosts_to_labels
349
350
351 @classmethod
352 def _get_labels(cls):
353 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000354
355
356 def refresh(self, pending_queue_entries):
357 self._hosts_available = self._get_ready_hosts()
358
359 relevant_jobs = [queue_entry.job_id
360 for queue_entry in pending_queue_entries]
361 self._job_acls = self._get_job_acl_groups(relevant_jobs)
362 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000363 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000364
365 host_ids = self._hosts_available.keys()
366 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000367 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
368
369 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000370
371
372 def _is_acl_accessible(self, host_id, queue_entry):
373 job_acls = self._job_acls.get(queue_entry.job_id, set())
374 host_acls = self._host_acls.get(host_id, set())
375 return len(host_acls.intersection(job_acls)) > 0
376
377
showard989f25d2008-10-01 11:38:11 +0000378 def _check_job_dependencies(self, job_dependencies, host_labels):
379 missing = job_dependencies - host_labels
380 return len(job_dependencies - host_labels) == 0
381
382
383 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
384 queue_entry):
385 for label_id in host_labels:
386 label = self._labels[label_id]
387 if not label.only_if_needed:
388 # we don't care about non-only_if_needed labels
389 continue
390 if queue_entry.meta_host == label_id:
391 # if the label was requested in a metahost it's OK
392 continue
393 if label_id not in job_dependencies:
394 return False
395 return True
396
397
398 def _is_host_eligible_for_job(self, host_id, queue_entry):
399 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
400 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000401
402 acl = self._is_acl_accessible(host_id, queue_entry)
403 deps = self._check_job_dependencies(job_dependencies, host_labels)
404 only_if = self._check_only_if_needed_labels(job_dependencies,
405 host_labels, queue_entry)
406 return acl and deps and only_if
showard989f25d2008-10-01 11:38:11 +0000407
408
showard63a34772008-08-18 19:32:50 +0000409 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000410 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000411 return None
412 return self._hosts_available.pop(queue_entry.host_id, None)
413
414
415 def _is_host_usable(self, host_id):
416 if host_id not in self._hosts_available:
417 # host was already used during this scheduling cycle
418 return False
419 if self._hosts_available[host_id].invalid:
420 # Invalid hosts cannot be used for metahosts. They're included in
421 # the original query because they can be used by non-metahosts.
422 return False
423 return True
424
425
426 def _schedule_metahost(self, queue_entry):
427 label_id = queue_entry.meta_host
428 hosts_in_label = self._label_hosts.get(label_id, set())
429 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
430 set())
431
432 # must iterate over a copy so we can mutate the original while iterating
433 for host_id in list(hosts_in_label):
434 if not self._is_host_usable(host_id):
435 hosts_in_label.remove(host_id)
436 continue
437 if host_id in ineligible_host_ids:
438 continue
showard989f25d2008-10-01 11:38:11 +0000439 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000440 continue
441
442 hosts_in_label.remove(host_id)
443 return self._hosts_available.pop(host_id)
444 return None
445
446
447 def find_eligible_host(self, queue_entry):
448 if not queue_entry.meta_host:
449 return self._schedule_non_metahost(queue_entry)
450 return self._schedule_metahost(queue_entry)
451
452
mbligh36768f02008-02-22 18:28:33 +0000453class Dispatcher:
jadmanski0afbb632008-06-06 21:10:57 +0000454 autoserv_procs_cache = None
showard4c5374f2008-09-04 17:02:56 +0000455 max_running_processes = global_config.global_config.get_config_value(
jadmanski0afbb632008-06-06 21:10:57 +0000456 _global_config_section, 'max_running_jobs', type=int)
showard4c5374f2008-09-04 17:02:56 +0000457 max_processes_started_per_cycle = (
jadmanski0afbb632008-06-06 21:10:57 +0000458 global_config.global_config.get_config_value(
459 _global_config_section, 'max_jobs_started_per_cycle', type=int))
showard3bb499f2008-07-03 19:42:20 +0000460 clean_interval = (
461 global_config.global_config.get_config_value(
462 _global_config_section, 'clean_interval_minutes', type=int))
showard970a6db2008-09-03 20:02:39 +0000463 max_parse_processes = (
464 global_config.global_config.get_config_value(
465 _global_config_section, 'max_parse_processes', type=int))
showard98863972008-10-29 21:14:56 +0000466 synch_job_start_timeout_minutes = (
467 global_config.global_config.get_config_value(
468 _global_config_section, 'synch_job_start_timeout_minutes',
469 type=int))
mbligh90a549d2008-03-25 23:52:34 +0000470
jadmanski0afbb632008-06-06 21:10:57 +0000471 def __init__(self):
472 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000473 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000474 self._host_scheduler = HostScheduler()
mbligh36768f02008-02-22 18:28:33 +0000475
mbligh36768f02008-02-22 18:28:33 +0000476
jadmanski0afbb632008-06-06 21:10:57 +0000477 def do_initial_recovery(self, recover_hosts=True):
478 # always recover processes
479 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000480
jadmanski0afbb632008-06-06 21:10:57 +0000481 if recover_hosts:
482 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000483
484
jadmanski0afbb632008-06-06 21:10:57 +0000485 def tick(self):
486 Dispatcher.autoserv_procs_cache = None
showarda3ab0d52008-11-03 19:03:47 +0000487 self._run_cleanup_maybe()
jadmanski0afbb632008-06-06 21:10:57 +0000488 self._find_aborting()
489 self._schedule_new_jobs()
490 self._handle_agents()
showard970a6db2008-09-03 20:02:39 +0000491 self._run_final_parses()
jadmanski0afbb632008-06-06 21:10:57 +0000492 email_manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000493
showarda3ab0d52008-11-03 19:03:47 +0000494 def _run_cleanup_maybe(self):
495 if self._last_clean_time + self.clean_interval * 60 < time.time():
496 print 'Running cleanup'
497 self._abort_timed_out_jobs()
498 self._abort_jobs_past_synch_start_timeout()
499 self._clear_inactive_blocks()
500 self._last_clean_time = time.time()
501
mbligh36768f02008-02-22 18:28:33 +0000502
showard970a6db2008-09-03 20:02:39 +0000503 def _run_final_parses(self):
504 process_count = 0
505 try:
506 for line in utils.system_output('ps -e').splitlines():
507 if 'parse.py' in line:
508 process_count += 1
509 except Exception:
510 # We'll try again in a bit. This is a work-around for one time
511 # when the scheduler crashed due to a "Interrupted system call"
512 return
513
514 if process_count:
515 print "%d parses currently running" % process_count
516
517 while (process_count < self.max_parse_processes and
518 _parse_command_queue):
519 cmd = _parse_command_queue.pop(0)
520 print "Starting another final parse with cmd %s" % cmd
521 os.system(cmd)
522 process_count += 1
523
524 if _parse_command_queue:
525 print ("%d cmds still in final parse queue" %
526 len(_parse_command_queue))
527
528
jadmanski0afbb632008-06-06 21:10:57 +0000529 def add_agent(self, agent):
530 self._agents.append(agent)
531 agent.dispatcher = self
mblighd5c95802008-03-05 00:33:46 +0000532
jadmanski0afbb632008-06-06 21:10:57 +0000533 # Find agent corresponding to the specified queue_entry
534 def get_agents(self, queue_entry):
535 res_agents = []
536 for agent in self._agents:
537 if queue_entry.id in agent.queue_entry_ids:
538 res_agents.append(agent)
539 return res_agents
mbligh36768f02008-02-22 18:28:33 +0000540
541
jadmanski0afbb632008-06-06 21:10:57 +0000542 def remove_agent(self, agent):
543 self._agents.remove(agent)
showardec113162008-05-08 00:52:49 +0000544
545
showard4c5374f2008-09-04 17:02:56 +0000546 def num_running_processes(self):
547 return sum(agent.num_processes for agent in self._agents
548 if agent.is_running())
mblighbb421852008-03-11 22:36:16 +0000549
550
jadmanski0afbb632008-06-06 21:10:57 +0000551 @classmethod
552 def find_autoservs(cls, orphans_only=False):
553 """\
554 Returns a dict mapping pids to command lines for root autoserv
555 processes. If orphans_only=True, return only processes that
556 have been orphaned (i.e. parent pid = 1).
557 """
558 if cls.autoserv_procs_cache is not None:
559 return cls.autoserv_procs_cache
560
561 proc = subprocess.Popen(
562 ['/bin/ps', 'x', '-o', 'pid,pgid,ppid,comm,args'],
563 stdout=subprocess.PIPE)
564 # split each line into the four columns output by ps
565 procs = [line.split(None, 4) for line in
566 proc.communicate()[0].splitlines()]
567 autoserv_procs = {}
568 for proc in procs:
569 # check ppid == 1 for orphans
570 if orphans_only and proc[2] != 1:
571 continue
572 # only root autoserv processes have pgid == pid
573 if (proc[3] == 'autoserv' and # comm
574 proc[1] == proc[0]): # pgid == pid
575 # map pid to args
576 autoserv_procs[int(proc[0])] = proc[4]
577 cls.autoserv_procs_cache = autoserv_procs
578 return autoserv_procs
mblighbb421852008-03-11 22:36:16 +0000579
580
jadmanski0afbb632008-06-06 21:10:57 +0000581 def recover_queue_entry(self, queue_entry, run_monitor):
582 job = queue_entry.job
583 if job.is_synchronous():
584 all_queue_entries = job.get_host_queue_entries()
585 else:
586 all_queue_entries = [queue_entry]
587 all_queue_entry_ids = [queue_entry.id for queue_entry
588 in all_queue_entries]
589 queue_task = RecoveryQueueTask(
590 job=queue_entry.job,
591 queue_entries=all_queue_entries,
592 run_monitor=run_monitor)
593 self.add_agent(Agent(tasks=[queue_task],
594 queue_entry_ids=all_queue_entry_ids))
mblighbb421852008-03-11 22:36:16 +0000595
596
jadmanski0afbb632008-06-06 21:10:57 +0000597 def _recover_processes(self):
598 orphans = self.find_autoservs(orphans_only=True)
mblighbb421852008-03-11 22:36:16 +0000599
jadmanski0afbb632008-06-06 21:10:57 +0000600 # first, recover running queue entries
601 rows = _db.execute("""SELECT * FROM host_queue_entries
602 WHERE status = 'Running'""")
603 queue_entries = [HostQueueEntry(row=i) for i in rows]
604 requeue_entries = []
605 recovered_entry_ids = set()
606 for queue_entry in queue_entries:
607 run_monitor = PidfileRunMonitor(
608 queue_entry.results_dir())
showard21baa452008-10-21 00:08:39 +0000609 if not run_monitor.has_pid():
jadmanski0afbb632008-06-06 21:10:57 +0000610 # autoserv apparently never got run, so requeue
611 requeue_entries.append(queue_entry)
612 continue
613 if queue_entry.id in recovered_entry_ids:
614 # synchronous job we've already recovered
615 continue
showard21baa452008-10-21 00:08:39 +0000616 pid = run_monitor.get_pid()
jadmanski0afbb632008-06-06 21:10:57 +0000617 print 'Recovering queue entry %d (pid %d)' % (
618 queue_entry.id, pid)
619 job = queue_entry.job
620 if job.is_synchronous():
621 for entry in job.get_host_queue_entries():
622 assert entry.active
623 recovered_entry_ids.add(entry.id)
624 self.recover_queue_entry(queue_entry,
625 run_monitor)
626 orphans.pop(pid, None)
mblighd5c95802008-03-05 00:33:46 +0000627
jadmanski0afbb632008-06-06 21:10:57 +0000628 # and requeue other active queue entries
629 rows = _db.execute("""SELECT * FROM host_queue_entries
630 WHERE active AND NOT complete
631 AND status != 'Running'
632 AND status != 'Pending'
633 AND status != 'Abort'
634 AND status != 'Aborting'""")
635 queue_entries = [HostQueueEntry(row=i) for i in rows]
636 for queue_entry in queue_entries + requeue_entries:
637 print 'Requeuing running QE %d' % queue_entry.id
638 queue_entry.clear_results_dir(dont_delete_files=True)
639 queue_entry.requeue()
mbligh90a549d2008-03-25 23:52:34 +0000640
641
jadmanski0afbb632008-06-06 21:10:57 +0000642 # now kill any remaining autoserv processes
643 for pid in orphans.keys():
644 print 'Killing orphan %d (%s)' % (pid, orphans[pid])
645 kill_autoserv(pid)
646
647 # recover aborting tasks
648 rebooting_host_ids = set()
649 rows = _db.execute("""SELECT * FROM host_queue_entries
650 WHERE status='Abort' or status='Aborting'""")
651 queue_entries = [HostQueueEntry(row=i) for i in rows]
652 for queue_entry in queue_entries:
653 print 'Recovering aborting QE %d' % queue_entry.id
showard1be97432008-10-17 15:30:45 +0000654 agent = queue_entry.abort()
655 self.add_agent(agent)
656 if queue_entry.get_host():
657 rebooting_host_ids.add(queue_entry.get_host().id)
jadmanski0afbb632008-06-06 21:10:57 +0000658
659 # reverify hosts that were in the middle of verify, repair or
660 # reboot
661 self._reverify_hosts_where("""(status = 'Repairing' OR
662 status = 'Verifying' OR
663 status = 'Rebooting')""",
664 exclude_ids=rebooting_host_ids)
665
666 # finally, recover "Running" hosts with no active queue entries,
667 # although this should never happen
668 message = ('Recovering running host %s - this probably '
669 'indicates a scheduler bug')
670 self._reverify_hosts_where("""status = 'Running' AND
671 id NOT IN (SELECT host_id
672 FROM host_queue_entries
673 WHERE active)""",
674 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000675
676
jadmanski0afbb632008-06-06 21:10:57 +0000677 def _reverify_hosts_where(self, where,
678 print_message='Reverifying host %s',
679 exclude_ids=set()):
680 rows = _db.execute('SELECT * FROM hosts WHERE locked = 0 AND '
681 'invalid = 0 AND ' + where)
682 hosts = [Host(row=i) for i in rows]
683 for host in hosts:
684 if host.id in exclude_ids:
685 continue
686 if print_message is not None:
687 print print_message % host.hostname
688 verify_task = VerifyTask(host = host)
689 self.add_agent(Agent(tasks = [verify_task]))
mbligh36768f02008-02-22 18:28:33 +0000690
691
jadmanski0afbb632008-06-06 21:10:57 +0000692 def _recover_hosts(self):
693 # recover "Repair Failed" hosts
694 message = 'Reverifying dead host %s'
695 self._reverify_hosts_where("status = 'Repair Failed'",
696 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000697
698
showard3bb499f2008-07-03 19:42:20 +0000699 def _abort_timed_out_jobs(self):
700 """
701 Aborts all jobs that have timed out and not completed
702 """
showarda3ab0d52008-11-03 19:03:47 +0000703 query = models.Job.objects.filter(hostqueueentry__complete=False).extra(
704 where=['created_on + INTERVAL timeout HOUR < NOW()'])
705 for job in query.distinct():
706 print 'Aborting job %d due to job timeout' % job.id
707 job.abort(None)
showard3bb499f2008-07-03 19:42:20 +0000708
709
showard98863972008-10-29 21:14:56 +0000710 def _abort_jobs_past_synch_start_timeout(self):
711 """
712 Abort synchronous jobs that are past the start timeout (from global
713 config) and are holding a machine that's in everyone.
714 """
715 timeout_delta = datetime.timedelta(
716 minutes=self.synch_job_start_timeout_minutes)
717 timeout_start = datetime.datetime.now() - timeout_delta
718 query = models.Job.objects.filter(
719 synch_type=models.Test.SynchType.SYNCHRONOUS,
720 created_on__lt=timeout_start,
721 hostqueueentry__status='Pending',
722 hostqueueentry__host__acl_group__name='Everyone')
723 for job in query.distinct():
724 print 'Aborting job %d due to start timeout' % job.id
725 job.abort(None)
726
727
jadmanski0afbb632008-06-06 21:10:57 +0000728 def _clear_inactive_blocks(self):
729 """
730 Clear out blocks for all completed jobs.
731 """
732 # this would be simpler using NOT IN (subquery), but MySQL
733 # treats all IN subqueries as dependent, so this optimizes much
734 # better
735 _db.execute("""
736 DELETE ihq FROM ineligible_host_queues ihq
showard4eaaf522008-06-06 22:28:07 +0000737 LEFT JOIN (SELECT DISTINCT job_id FROM host_queue_entries
jadmanski0afbb632008-06-06 21:10:57 +0000738 WHERE NOT complete) hqe
739 USING (job_id) WHERE hqe.job_id IS NULL""")
showard04c82c52008-05-29 19:38:12 +0000740
741
showardb95b1bd2008-08-15 18:11:04 +0000742 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000743 # prioritize by job priority, then non-metahost over metahost, then FIFO
744 return list(HostQueueEntry.fetch(
745 where='NOT complete AND NOT active',
showard3dd6b882008-10-27 19:21:39 +0000746 order_by='priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000747
748
jadmanski0afbb632008-06-06 21:10:57 +0000749 def _schedule_new_jobs(self):
750 print "finding work"
751
showard63a34772008-08-18 19:32:50 +0000752 queue_entries = self._get_pending_queue_entries()
753 if not queue_entries:
showardb95b1bd2008-08-15 18:11:04 +0000754 return
showardb95b1bd2008-08-15 18:11:04 +0000755
showard63a34772008-08-18 19:32:50 +0000756 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000757
showard63a34772008-08-18 19:32:50 +0000758 for queue_entry in queue_entries:
759 assigned_host = self._host_scheduler.find_eligible_host(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000760 if not assigned_host:
jadmanski0afbb632008-06-06 21:10:57 +0000761 continue
showardb95b1bd2008-08-15 18:11:04 +0000762 self._run_queue_entry(queue_entry, assigned_host)
763
764
765 def _run_queue_entry(self, queue_entry, host):
766 agent = queue_entry.run(assigned_host=host)
showard9976ce92008-10-15 20:28:13 +0000767 # in some cases (synchronous jobs with run_verify=False), agent may be None
768 if agent:
769 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000770
771
jadmanski0afbb632008-06-06 21:10:57 +0000772 def _find_aborting(self):
773 num_aborted = 0
774 # Find jobs that are aborting
775 for entry in queue_entries_to_abort():
776 agents_to_abort = self.get_agents(entry)
showard1be97432008-10-17 15:30:45 +0000777 for agent in agents_to_abort:
778 self.remove_agent(agent)
779
780 agent = entry.abort(agents_to_abort)
781 self.add_agent(agent)
jadmanski0afbb632008-06-06 21:10:57 +0000782 num_aborted += 1
783 if num_aborted >= 50:
784 break
785
786
showard4c5374f2008-09-04 17:02:56 +0000787 def _can_start_agent(self, agent, num_running_processes,
788 num_started_this_cycle, have_reached_limit):
789 # always allow zero-process agents to run
790 if agent.num_processes == 0:
791 return True
792 # don't allow any nonzero-process agents to run after we've reached a
793 # limit (this avoids starvation of many-process agents)
794 if have_reached_limit:
795 return False
796 # total process throttling
797 if (num_running_processes + agent.num_processes >
798 self.max_running_processes):
799 return False
800 # if a single agent exceeds the per-cycle throttling, still allow it to
801 # run when it's the first agent in the cycle
802 if num_started_this_cycle == 0:
803 return True
804 # per-cycle throttling
805 if (num_started_this_cycle + agent.num_processes >
806 self.max_processes_started_per_cycle):
807 return False
808 return True
809
810
jadmanski0afbb632008-06-06 21:10:57 +0000811 def _handle_agents(self):
showard4c5374f2008-09-04 17:02:56 +0000812 num_running_processes = self.num_running_processes()
jadmanski0afbb632008-06-06 21:10:57 +0000813 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000814 have_reached_limit = False
815 # iterate over copy, so we can remove agents during iteration
816 for agent in list(self._agents):
817 if agent.is_done():
jadmanski0afbb632008-06-06 21:10:57 +0000818 print "agent finished"
showard4c5374f2008-09-04 17:02:56 +0000819 self._agents.remove(agent)
820 num_running_processes -= agent.num_processes
821 continue
822 if not agent.is_running():
823 if not self._can_start_agent(agent, num_running_processes,
824 num_started_this_cycle,
825 have_reached_limit):
826 have_reached_limit = True
827 continue
828 num_running_processes += agent.num_processes
829 num_started_this_cycle += agent.num_processes
830 agent.tick()
831 print num_running_processes, 'running processes'
mbligh36768f02008-02-22 18:28:33 +0000832
833
834class RunMonitor(object):
jadmanski0afbb632008-06-06 21:10:57 +0000835 def __init__(self, cmd, nice_level = None, log_file = None):
836 self.nice_level = nice_level
837 self.log_file = log_file
838 self.cmd = cmd
mbligh36768f02008-02-22 18:28:33 +0000839
jadmanski0afbb632008-06-06 21:10:57 +0000840 def run(self):
841 if self.nice_level:
842 nice_cmd = ['nice','-n', str(self.nice_level)]
843 nice_cmd.extend(self.cmd)
844 self.cmd = nice_cmd
mbligh36768f02008-02-22 18:28:33 +0000845
jadmanski0afbb632008-06-06 21:10:57 +0000846 out_file = None
847 if self.log_file:
848 try:
849 os.makedirs(os.path.dirname(self.log_file))
850 except OSError, exc:
851 if exc.errno != errno.EEXIST:
852 log_stacktrace(
853 'Unexpected error creating logfile '
854 'directory for %s' % self.log_file)
855 try:
856 out_file = open(self.log_file, 'a')
857 out_file.write("\n%s\n" % ('*'*80))
858 out_file.write("%s> %s\n" %
859 (time.strftime("%X %x"),
860 self.cmd))
861 out_file.write("%s\n" % ('*'*80))
862 except (OSError, IOError):
863 log_stacktrace('Error opening log file %s' %
864 self.log_file)
mblighcadb3532008-04-15 17:46:26 +0000865
jadmanski0afbb632008-06-06 21:10:57 +0000866 if not out_file:
867 out_file = open('/dev/null', 'w')
mblighcadb3532008-04-15 17:46:26 +0000868
jadmanski0afbb632008-06-06 21:10:57 +0000869 in_devnull = open('/dev/null', 'r')
870 print "cmd = %s" % self.cmd
871 print "path = %s" % os.getcwd()
mbligh36768f02008-02-22 18:28:33 +0000872
jadmanski0afbb632008-06-06 21:10:57 +0000873 self.proc = subprocess.Popen(self.cmd, stdout=out_file,
874 stderr=subprocess.STDOUT,
875 stdin=in_devnull)
876 out_file.close()
877 in_devnull.close()
mbligh36768f02008-02-22 18:28:33 +0000878
879
jadmanski0afbb632008-06-06 21:10:57 +0000880 def get_pid(self):
881 return self.proc.pid
mblighbb421852008-03-11 22:36:16 +0000882
883
jadmanski0afbb632008-06-06 21:10:57 +0000884 def kill(self):
885 kill_autoserv(self.get_pid(), self.exit_code)
mblighbb421852008-03-11 22:36:16 +0000886
mbligh36768f02008-02-22 18:28:33 +0000887
jadmanski0afbb632008-06-06 21:10:57 +0000888 def exit_code(self):
889 return self.proc.poll()
mbligh36768f02008-02-22 18:28:33 +0000890
891
mblighbb421852008-03-11 22:36:16 +0000892class PidfileException(Exception):
jadmanski0afbb632008-06-06 21:10:57 +0000893 """\
894 Raised when there's some unexpected behavior with the pid file.
895 """
mblighbb421852008-03-11 22:36:16 +0000896
897
898class PidfileRunMonitor(RunMonitor):
showard21baa452008-10-21 00:08:39 +0000899 class PidfileState(object):
900 pid = None
901 exit_status = None
902 num_tests_failed = None
903
904 def reset(self):
905 self.pid = self.exit_status = self.all_tests_passed = None
906
907
jadmanski0afbb632008-06-06 21:10:57 +0000908 def __init__(self, results_dir, cmd=None, nice_level=None,
909 log_file=None):
910 self.results_dir = os.path.abspath(results_dir)
911 self.pid_file = os.path.join(results_dir, AUTOSERV_PID_FILE)
912 self.lost_process = False
913 self.start_time = time.time()
showard21baa452008-10-21 00:08:39 +0000914 self._state = self.PidfileState()
showardb376bc52008-06-13 20:48:45 +0000915 super(PidfileRunMonitor, self).__init__(cmd, nice_level, log_file)
mblighbb421852008-03-11 22:36:16 +0000916
917
showard21baa452008-10-21 00:08:39 +0000918 def has_pid(self):
919 self._get_pidfile_info()
920 return self._state.pid is not None
921
922
jadmanski0afbb632008-06-06 21:10:57 +0000923 def get_pid(self):
showard21baa452008-10-21 00:08:39 +0000924 self._get_pidfile_info()
925 assert self._state.pid is not None
926 return self._state.pid
mblighbb421852008-03-11 22:36:16 +0000927
928
jadmanski0afbb632008-06-06 21:10:57 +0000929 def _check_command_line(self, command_line, spacer=' ',
930 print_error=False):
931 results_dir_arg = spacer.join(('', '-r', self.results_dir, ''))
932 match = results_dir_arg in command_line
933 if print_error and not match:
934 print '%s not found in %s' % (repr(results_dir_arg),
935 repr(command_line))
936 return match
mbligh90a549d2008-03-25 23:52:34 +0000937
938
showard21baa452008-10-21 00:08:39 +0000939 def _check_proc_fs(self):
940 cmdline_path = os.path.join('/proc', str(self._state.pid), 'cmdline')
jadmanski0afbb632008-06-06 21:10:57 +0000941 try:
942 cmdline_file = open(cmdline_path, 'r')
943 cmdline = cmdline_file.read().strip()
944 cmdline_file.close()
945 except IOError:
946 return False
947 # /proc/.../cmdline has \x00 separating args
948 return self._check_command_line(cmdline, spacer='\x00',
949 print_error=True)
mblighbb421852008-03-11 22:36:16 +0000950
951
showard21baa452008-10-21 00:08:39 +0000952 def _read_pidfile(self):
953 self._state.reset()
jadmanski0afbb632008-06-06 21:10:57 +0000954 if not os.path.exists(self.pid_file):
showard21baa452008-10-21 00:08:39 +0000955 return
jadmanski0afbb632008-06-06 21:10:57 +0000956 file_obj = open(self.pid_file, 'r')
957 lines = file_obj.readlines()
958 file_obj.close()
showard3dd6b882008-10-27 19:21:39 +0000959 if not lines:
960 return
961 if len(lines) > 3:
showard21baa452008-10-21 00:08:39 +0000962 raise PidfileException('Corrupt pid file (%d lines) at %s:\n%s' %
963 (len(lines), self.pid_file, lines))
jadmanski0afbb632008-06-06 21:10:57 +0000964 try:
showard21baa452008-10-21 00:08:39 +0000965 self._state.pid = int(lines[0])
966 if len(lines) > 1:
967 self._state.exit_status = int(lines[1])
968 if len(lines) == 3:
969 self._state.num_tests_failed = int(lines[2])
970 else:
971 # maintain backwards-compatibility with two-line pidfiles
972 self._state.num_tests_failed = 0
jadmanski0afbb632008-06-06 21:10:57 +0000973 except ValueError, exc:
showard3dd6b882008-10-27 19:21:39 +0000974 raise PidfileException('Corrupt pid file: ' + str(exc.args))
mblighbb421852008-03-11 22:36:16 +0000975
mblighbb421852008-03-11 22:36:16 +0000976
jadmanski0afbb632008-06-06 21:10:57 +0000977 def _find_autoserv_proc(self):
978 autoserv_procs = Dispatcher.find_autoservs()
979 for pid, args in autoserv_procs.iteritems():
980 if self._check_command_line(args):
981 return pid, args
982 return None, None
mbligh90a549d2008-03-25 23:52:34 +0000983
984
showard21baa452008-10-21 00:08:39 +0000985 def _handle_pidfile_error(self, error, message=''):
986 message = error + '\nPid: %s\nPidfile: %s\n%s' % (self._state.pid,
987 self.pid_file,
988 message)
989 print message
990 email_manager.enqueue_notify_email(error, message)
991 if self._state.pid is not None:
992 pid = self._state.pid
993 else:
994 pid = 0
995 self.on_lost_process(pid)
996
997
998 def _get_pidfile_info_helper(self):
jadmanski0afbb632008-06-06 21:10:57 +0000999 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001000 return
mblighbb421852008-03-11 22:36:16 +00001001
showard21baa452008-10-21 00:08:39 +00001002 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001003
showard21baa452008-10-21 00:08:39 +00001004 if self._state.pid is None:
1005 self._handle_no_pid()
1006 return
mbligh90a549d2008-03-25 23:52:34 +00001007
showard21baa452008-10-21 00:08:39 +00001008 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001009 # double check whether or not autoserv is running
showard21baa452008-10-21 00:08:39 +00001010 proc_running = self._check_proc_fs()
jadmanski0afbb632008-06-06 21:10:57 +00001011 if proc_running:
showard21baa452008-10-21 00:08:39 +00001012 return
mbligh90a549d2008-03-25 23:52:34 +00001013
jadmanski0afbb632008-06-06 21:10:57 +00001014 # pid but no process - maybe process *just* exited
showard21baa452008-10-21 00:08:39 +00001015 self._read_pidfile()
1016 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001017 # autoserv exited without writing an exit code
1018 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001019 self._handle_pidfile_error(
1020 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001021
showard21baa452008-10-21 00:08:39 +00001022
1023 def _get_pidfile_info(self):
1024 """\
1025 After completion, self._state will contain:
1026 pid=None, exit_status=None if autoserv has not yet run
1027 pid!=None, exit_status=None if autoserv is running
1028 pid!=None, exit_status!=None if autoserv has completed
1029 """
1030 try:
1031 self._get_pidfile_info_helper()
1032 except PidfileException, exc:
1033 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001034
1035
jadmanski0afbb632008-06-06 21:10:57 +00001036 def _handle_no_pid(self):
1037 """\
1038 Called when no pidfile is found or no pid is in the pidfile.
1039 """
1040 # is autoserv running?
1041 pid, args = self._find_autoserv_proc()
1042 if pid is None:
1043 # no autoserv process running
1044 message = 'No pid found at ' + self.pid_file
1045 else:
1046 message = ("Process %d (%s) hasn't written pidfile %s" %
1047 (pid, args, self.pid_file))
mbligh90a549d2008-03-25 23:52:34 +00001048
jadmanski0afbb632008-06-06 21:10:57 +00001049 print message
1050 if time.time() - self.start_time > PIDFILE_TIMEOUT:
1051 email_manager.enqueue_notify_email(
1052 'Process has failed to write pidfile', message)
1053 if pid is not None:
1054 kill_autoserv(pid)
1055 else:
1056 pid = 0
1057 self.on_lost_process(pid)
showard21baa452008-10-21 00:08:39 +00001058 return
mbligh90a549d2008-03-25 23:52:34 +00001059
1060
jadmanski0afbb632008-06-06 21:10:57 +00001061 def on_lost_process(self, pid):
1062 """\
1063 Called when autoserv has exited without writing an exit status,
1064 or we've timed out waiting for autoserv to write a pid to the
1065 pidfile. In either case, we just return failure and the caller
1066 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001067
jadmanski0afbb632008-06-06 21:10:57 +00001068 pid is unimportant here, as it shouldn't be used by anyone.
1069 """
1070 self.lost_process = True
showard21baa452008-10-21 00:08:39 +00001071 self._state.pid = pid
1072 self._state.exit_status = 1
1073 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001074
1075
jadmanski0afbb632008-06-06 21:10:57 +00001076 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001077 self._get_pidfile_info()
1078 return self._state.exit_status
1079
1080
1081 def num_tests_failed(self):
1082 self._get_pidfile_info()
1083 assert self._state.num_tests_failed is not None
1084 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001085
1086
mbligh36768f02008-02-22 18:28:33 +00001087class Agent(object):
showard4c5374f2008-09-04 17:02:56 +00001088 def __init__(self, tasks, queue_entry_ids=[], num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001089 self.active_task = None
1090 self.queue = Queue.Queue(0)
1091 self.dispatcher = None
1092 self.queue_entry_ids = queue_entry_ids
showard4c5374f2008-09-04 17:02:56 +00001093 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001094
1095 for task in tasks:
1096 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001097
1098
jadmanski0afbb632008-06-06 21:10:57 +00001099 def add_task(self, task):
1100 self.queue.put_nowait(task)
1101 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001102
1103
jadmanski0afbb632008-06-06 21:10:57 +00001104 def tick(self):
showard21baa452008-10-21 00:08:39 +00001105 while not self.is_done():
1106 if self.active_task and not self.active_task.is_done():
1107 self.active_task.poll()
1108 if not self.active_task.is_done():
1109 return
1110 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001111
1112
jadmanski0afbb632008-06-06 21:10:57 +00001113 def _next_task(self):
1114 print "agent picking task"
1115 if self.active_task:
1116 assert self.active_task.is_done()
mbligh36768f02008-02-22 18:28:33 +00001117
jadmanski0afbb632008-06-06 21:10:57 +00001118 if not self.active_task.success:
1119 self.on_task_failure()
mblighe2586682008-02-29 22:45:46 +00001120
jadmanski0afbb632008-06-06 21:10:57 +00001121 self.active_task = None
1122 if not self.is_done():
1123 self.active_task = self.queue.get_nowait()
1124 if self.active_task:
1125 self.active_task.start()
mbligh36768f02008-02-22 18:28:33 +00001126
1127
jadmanski0afbb632008-06-06 21:10:57 +00001128 def on_task_failure(self):
1129 self.queue = Queue.Queue(0)
1130 for task in self.active_task.failure_tasks:
1131 self.add_task(task)
mbligh16c722d2008-03-05 00:58:44 +00001132
mblighe2586682008-02-29 22:45:46 +00001133
showard4c5374f2008-09-04 17:02:56 +00001134 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001135 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001136
1137
jadmanski0afbb632008-06-06 21:10:57 +00001138 def is_done(self):
1139 return self.active_task == None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001140
1141
jadmanski0afbb632008-06-06 21:10:57 +00001142 def start(self):
1143 assert self.dispatcher
mbligh36768f02008-02-22 18:28:33 +00001144
jadmanski0afbb632008-06-06 21:10:57 +00001145 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001146
jadmanski0afbb632008-06-06 21:10:57 +00001147
mbligh36768f02008-02-22 18:28:33 +00001148class AgentTask(object):
jadmanski0afbb632008-06-06 21:10:57 +00001149 def __init__(self, cmd, failure_tasks = []):
1150 self.done = False
1151 self.failure_tasks = failure_tasks
1152 self.started = False
1153 self.cmd = cmd
1154 self.task = None
1155 self.agent = None
1156 self.monitor = None
1157 self.success = None
mbligh36768f02008-02-22 18:28:33 +00001158
1159
jadmanski0afbb632008-06-06 21:10:57 +00001160 def poll(self):
1161 print "poll"
1162 if self.monitor:
1163 self.tick(self.monitor.exit_code())
1164 else:
1165 self.finished(False)
mbligh36768f02008-02-22 18:28:33 +00001166
1167
jadmanski0afbb632008-06-06 21:10:57 +00001168 def tick(self, exit_code):
1169 if exit_code==None:
1170 return
1171# print "exit_code was %d" % exit_code
1172 if exit_code == 0:
1173 success = True
1174 else:
1175 success = False
mbligh36768f02008-02-22 18:28:33 +00001176
jadmanski0afbb632008-06-06 21:10:57 +00001177 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001178
1179
jadmanski0afbb632008-06-06 21:10:57 +00001180 def is_done(self):
1181 return self.done
mbligh36768f02008-02-22 18:28:33 +00001182
1183
jadmanski0afbb632008-06-06 21:10:57 +00001184 def finished(self, success):
1185 self.done = True
1186 self.success = success
1187 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001188
1189
jadmanski0afbb632008-06-06 21:10:57 +00001190 def prolog(self):
1191 pass
mblighd64e5702008-04-04 21:39:28 +00001192
1193
jadmanski0afbb632008-06-06 21:10:57 +00001194 def create_temp_resultsdir(self, suffix=''):
1195 self.temp_results_dir = tempfile.mkdtemp(suffix=suffix)
mblighd64e5702008-04-04 21:39:28 +00001196
mbligh36768f02008-02-22 18:28:33 +00001197
jadmanski0afbb632008-06-06 21:10:57 +00001198 def cleanup(self):
1199 if (hasattr(self, 'temp_results_dir') and
1200 os.path.exists(self.temp_results_dir)):
1201 shutil.rmtree(self.temp_results_dir)
mbligh36768f02008-02-22 18:28:33 +00001202
1203
jadmanski0afbb632008-06-06 21:10:57 +00001204 def epilog(self):
1205 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001206
1207
jadmanski0afbb632008-06-06 21:10:57 +00001208 def start(self):
1209 assert self.agent
1210
1211 if not self.started:
1212 self.prolog()
1213 self.run()
1214
1215 self.started = True
1216
1217
1218 def abort(self):
1219 if self.monitor:
1220 self.monitor.kill()
1221 self.done = True
1222 self.cleanup()
1223
1224
1225 def run(self):
1226 if self.cmd:
1227 print "agent starting monitor"
1228 log_file = None
1229 if hasattr(self, 'host'):
1230 log_file = os.path.join(RESULTS_DIR, 'hosts',
1231 self.host.hostname)
1232 self.monitor = RunMonitor(
1233 self.cmd, nice_level = AUTOSERV_NICE_LEVEL,
1234 log_file = log_file)
1235 self.monitor.run()
mbligh36768f02008-02-22 18:28:33 +00001236
1237
1238class RepairTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001239 def __init__(self, host, fail_queue_entry=None):
1240 """\
1241 fail_queue_entry: queue entry to mark failed if this repair
1242 fails.
1243 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001244 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001245 # normalize the protection name
1246 protection = host_protections.Protection.get_attr_name(protection)
jadmanski0afbb632008-06-06 21:10:57 +00001247 self.create_temp_resultsdir('.repair')
1248 cmd = [_autoserv_path , '-R', '-m', host.hostname,
jadmanskifb7cfb12008-07-09 14:13:21 +00001249 '-r', self.temp_results_dir, '--host-protection', protection]
jadmanski0afbb632008-06-06 21:10:57 +00001250 self.host = host
1251 self.fail_queue_entry = fail_queue_entry
1252 super(RepairTask, self).__init__(cmd)
mblighe2586682008-02-29 22:45:46 +00001253
mbligh36768f02008-02-22 18:28:33 +00001254
jadmanski0afbb632008-06-06 21:10:57 +00001255 def prolog(self):
1256 print "repair_task starting"
1257 self.host.set_status('Repairing')
mbligh36768f02008-02-22 18:28:33 +00001258
1259
jadmanski0afbb632008-06-06 21:10:57 +00001260 def epilog(self):
1261 super(RepairTask, self).epilog()
1262 if self.success:
1263 self.host.set_status('Ready')
1264 else:
1265 self.host.set_status('Repair Failed')
1266 if self.fail_queue_entry:
1267 self.fail_queue_entry.handle_host_failure()
mbligh36768f02008-02-22 18:28:33 +00001268
1269
1270class VerifyTask(AgentTask):
showard9976ce92008-10-15 20:28:13 +00001271 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001272 assert bool(queue_entry) != bool(host)
mbligh36768f02008-02-22 18:28:33 +00001273
jadmanski0afbb632008-06-06 21:10:57 +00001274 self.host = host or queue_entry.host
1275 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001276
jadmanski0afbb632008-06-06 21:10:57 +00001277 self.create_temp_resultsdir('.verify')
showard3d9899a2008-07-31 02:11:58 +00001278
showard9976ce92008-10-15 20:28:13 +00001279 cmd = [_autoserv_path,'-v','-m',self.host.hostname, '-r', self.temp_results_dir]
mbligh36768f02008-02-22 18:28:33 +00001280
jadmanski0afbb632008-06-06 21:10:57 +00001281 fail_queue_entry = None
1282 if queue_entry and not queue_entry.meta_host:
1283 fail_queue_entry = queue_entry
1284 failure_tasks = [RepairTask(self.host, fail_queue_entry)]
mblighe2586682008-02-29 22:45:46 +00001285
jadmanski0afbb632008-06-06 21:10:57 +00001286 super(VerifyTask, self).__init__(cmd,
1287 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001288
1289
jadmanski0afbb632008-06-06 21:10:57 +00001290 def prolog(self):
1291 print "starting verify on %s" % (self.host.hostname)
1292 if self.queue_entry:
1293 self.queue_entry.set_status('Verifying')
1294 self.queue_entry.clear_results_dir(
1295 self.queue_entry.verify_results_dir())
1296 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001297
1298
jadmanski0afbb632008-06-06 21:10:57 +00001299 def cleanup(self):
1300 if not os.path.exists(self.temp_results_dir):
1301 return
1302 if self.queue_entry and (self.success or
1303 not self.queue_entry.meta_host):
1304 self.move_results()
1305 super(VerifyTask, self).cleanup()
mblighd64e5702008-04-04 21:39:28 +00001306
1307
jadmanski0afbb632008-06-06 21:10:57 +00001308 def epilog(self):
1309 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001310
jadmanski0afbb632008-06-06 21:10:57 +00001311 if self.success:
1312 self.host.set_status('Ready')
1313 elif self.queue_entry:
1314 self.queue_entry.requeue()
mbligh36768f02008-02-22 18:28:33 +00001315
1316
jadmanski0afbb632008-06-06 21:10:57 +00001317 def move_results(self):
1318 assert self.queue_entry is not None
1319 target_dir = self.queue_entry.verify_results_dir()
1320 if not os.path.exists(target_dir):
1321 os.makedirs(target_dir)
1322 files = os.listdir(self.temp_results_dir)
1323 for filename in files:
1324 if filename == AUTOSERV_PID_FILE:
1325 continue
1326 self.force_move(os.path.join(self.temp_results_dir,
1327 filename),
1328 os.path.join(target_dir, filename))
mbligh36768f02008-02-22 18:28:33 +00001329
1330
jadmanski0afbb632008-06-06 21:10:57 +00001331 @staticmethod
1332 def force_move(source, dest):
1333 """\
1334 Replacement for shutil.move() that will delete the destination
1335 if it exists, even if it's a directory.
1336 """
1337 if os.path.exists(dest):
1338 print ('Warning: removing existing destination file ' +
1339 dest)
1340 remove_file_or_dir(dest)
1341 shutil.move(source, dest)
mblighe2586682008-02-29 22:45:46 +00001342
1343
mblighdffd6372008-02-29 22:47:33 +00001344class VerifySynchronousTask(VerifyTask):
jadmanski0afbb632008-06-06 21:10:57 +00001345 def epilog(self):
1346 super(VerifySynchronousTask, self).epilog()
1347 if self.success:
1348 if self.queue_entry.job.num_complete() > 0:
1349 # some other entry failed verify, and we've
1350 # already been marked as stopped
1351 return
mblighdffd6372008-02-29 22:47:33 +00001352
showardb2e2c322008-10-14 17:33:55 +00001353 agent = self.queue_entry.on_pending()
1354 if agent:
jadmanski0afbb632008-06-06 21:10:57 +00001355 self.agent.dispatcher.add_agent(agent)
mblighe2586682008-02-29 22:45:46 +00001356
showardb2e2c322008-10-14 17:33:55 +00001357
mbligh36768f02008-02-22 18:28:33 +00001358class QueueTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001359 def __init__(self, job, queue_entries, cmd):
1360 super(QueueTask, self).__init__(cmd)
1361 self.job = job
1362 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001363
1364
jadmanski0afbb632008-06-06 21:10:57 +00001365 @staticmethod
showardd8e548a2008-09-09 03:04:57 +00001366 def _write_keyval(keyval_dir, field, value, keyval_filename='keyval'):
1367 key_path = os.path.join(keyval_dir, keyval_filename)
jadmanski0afbb632008-06-06 21:10:57 +00001368 keyval_file = open(key_path, 'a')
showardd8e548a2008-09-09 03:04:57 +00001369 print >> keyval_file, '%s=%s' % (field, str(value))
jadmanski0afbb632008-06-06 21:10:57 +00001370 keyval_file.close()
mbligh36768f02008-02-22 18:28:33 +00001371
1372
showardd8e548a2008-09-09 03:04:57 +00001373 def _host_keyval_dir(self):
1374 return os.path.join(self.results_dir(), 'host_keyvals')
1375
1376
1377 def _write_host_keyval(self, host):
1378 labels = ','.join(host.labels())
1379 self._write_keyval(self._host_keyval_dir(), 'labels', labels,
1380 keyval_filename=host.hostname)
1381
1382 def _create_host_keyval_dir(self):
1383 directory = self._host_keyval_dir()
1384 if not os.path.exists(directory):
1385 os.makedirs(directory)
1386
1387
jadmanski0afbb632008-06-06 21:10:57 +00001388 def results_dir(self):
1389 return self.queue_entries[0].results_dir()
mblighbb421852008-03-11 22:36:16 +00001390
1391
jadmanski0afbb632008-06-06 21:10:57 +00001392 def run(self):
1393 """\
1394 Override AgentTask.run() so we can use a PidfileRunMonitor.
1395 """
1396 self.monitor = PidfileRunMonitor(self.results_dir(),
1397 cmd=self.cmd,
1398 nice_level=AUTOSERV_NICE_LEVEL)
1399 self.monitor.run()
mblighbb421852008-03-11 22:36:16 +00001400
1401
jadmanski0afbb632008-06-06 21:10:57 +00001402 def prolog(self):
1403 # write some job timestamps into the job keyval file
1404 queued = time.mktime(self.job.created_on.timetuple())
1405 started = time.time()
showardd8e548a2008-09-09 03:04:57 +00001406 self._write_keyval(self.results_dir(), "job_queued", int(queued))
1407 self._write_keyval(self.results_dir(), "job_started", int(started))
1408 self._create_host_keyval_dir()
jadmanski0afbb632008-06-06 21:10:57 +00001409 for queue_entry in self.queue_entries:
showardd8e548a2008-09-09 03:04:57 +00001410 self._write_host_keyval(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001411 print "starting queue_task on %s/%s" % (queue_entry.host.hostname, queue_entry.id)
1412 queue_entry.set_status('Running')
1413 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001414 queue_entry.host.update_field('dirty', 1)
jadmanski0afbb632008-06-06 21:10:57 +00001415 if (not self.job.is_synchronous() and
1416 self.job.num_machines() > 1):
1417 assert len(self.queue_entries) == 1
1418 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001419
1420
jadmanski0afbb632008-06-06 21:10:57 +00001421 def _finish_task(self):
1422 # write out the finished time into the results keyval
1423 finished = time.time()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001424 self._write_keyval(self.results_dir(), "job_finished", int(finished))
jadmanskic2ac77f2008-05-16 21:44:04 +00001425
jadmanski0afbb632008-06-06 21:10:57 +00001426 # parse the results of the job
1427 if self.job.is_synchronous() or self.job.num_machines() == 1:
1428 parse_results(self.job.results_dir())
1429 else:
1430 for queue_entry in self.queue_entries:
jadmanskif7fa2cc2008-10-01 14:13:23 +00001431 parse_results(queue_entry.results_dir(), flags="-l 2")
1432
1433
1434 def _log_abort(self):
1435 # build up sets of all the aborted_by and aborted_on values
1436 aborted_by, aborted_on = set(), set()
1437 for queue_entry in self.queue_entries:
1438 if queue_entry.aborted_by:
1439 aborted_by.add(queue_entry.aborted_by)
1440 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1441 aborted_on.add(t)
1442
1443 # extract some actual, unique aborted by value and write it out
1444 assert len(aborted_by) <= 1
1445 if len(aborted_by) == 1:
1446 results_dir = self.results_dir()
1447 self._write_keyval(results_dir, "aborted_by", aborted_by.pop())
1448 self._write_keyval(results_dir, "aborted_on", max(aborted_on))
jadmanskic2ac77f2008-05-16 21:44:04 +00001449
1450
jadmanski0afbb632008-06-06 21:10:57 +00001451 def abort(self):
1452 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001453 self._log_abort()
jadmanski0afbb632008-06-06 21:10:57 +00001454 self._finish_task()
jadmanskic2ac77f2008-05-16 21:44:04 +00001455
1456
showard21baa452008-10-21 00:08:39 +00001457 def _reboot_hosts(self):
1458 reboot_after = self.job.reboot_after
1459 do_reboot = False
showard0fc38302008-10-23 00:44:07 +00001460 if reboot_after == models.RebootAfter.ALWAYS:
showard21baa452008-10-21 00:08:39 +00001461 do_reboot = True
showard0fc38302008-10-23 00:44:07 +00001462 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
showard21baa452008-10-21 00:08:39 +00001463 num_tests_failed = self.monitor.num_tests_failed()
1464 do_reboot = (self.success and num_tests_failed == 0)
1465
1466 if do_reboot:
1467 for queue_entry in self.queue_entries:
1468 reboot_task = RebootTask(queue_entry.get_host())
1469 self.agent.dispatcher.add_agent(Agent([reboot_task]))
1470
1471
jadmanski0afbb632008-06-06 21:10:57 +00001472 def epilog(self):
1473 super(QueueTask, self).epilog()
1474 if self.success:
1475 status = 'Completed'
1476 else:
1477 status = 'Failed'
mbligh36768f02008-02-22 18:28:33 +00001478
jadmanski0afbb632008-06-06 21:10:57 +00001479 for queue_entry in self.queue_entries:
1480 queue_entry.set_status(status)
1481 queue_entry.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001482
jadmanski0afbb632008-06-06 21:10:57 +00001483 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001484 self._reboot_hosts()
mblighbb421852008-03-11 22:36:16 +00001485
jadmanski0afbb632008-06-06 21:10:57 +00001486 print "queue_task finished with %s/%s" % (status, self.success)
mbligh36768f02008-02-22 18:28:33 +00001487
1488
mblighbb421852008-03-11 22:36:16 +00001489class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001490 def __init__(self, job, queue_entries, run_monitor):
1491 super(RecoveryQueueTask, self).__init__(job,
1492 queue_entries, cmd=None)
1493 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001494
1495
jadmanski0afbb632008-06-06 21:10:57 +00001496 def run(self):
1497 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001498
1499
jadmanski0afbb632008-06-06 21:10:57 +00001500 def prolog(self):
1501 # recovering an existing process - don't do prolog
1502 pass
mblighbb421852008-03-11 22:36:16 +00001503
1504
mbligh36768f02008-02-22 18:28:33 +00001505class RebootTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001506 def __init__(self, host):
1507 global _autoserv_path
1508
1509 # Current implementation of autoserv requires control file
1510 # to be passed on reboot action request. TODO: remove when no
1511 # longer appropriate.
1512 self.create_temp_resultsdir('.reboot')
1513 self.cmd = [_autoserv_path, '-b', '-m', host.hostname,
1514 '-r', self.temp_results_dir, '/dev/null']
1515 self.host = host
1516 super(RebootTask, self).__init__(self.cmd,
1517 failure_tasks=[RepairTask(host)])
mbligh16c722d2008-03-05 00:58:44 +00001518
mblighd5c95802008-03-05 00:33:46 +00001519
jadmanski0afbb632008-06-06 21:10:57 +00001520 def prolog(self):
1521 print "starting reboot task for host: %s" % self.host.hostname
1522 self.host.set_status("Rebooting")
mblighd5c95802008-03-05 00:33:46 +00001523
mblighd5c95802008-03-05 00:33:46 +00001524
showard21baa452008-10-21 00:08:39 +00001525 def epilog(self):
1526 super(RebootTask, self).epilog()
1527 self.host.set_status('Ready')
1528 if self.success:
1529 self.host.update_field('dirty', 0)
1530
1531
mblighd5c95802008-03-05 00:33:46 +00001532class AbortTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001533 def __init__(self, queue_entry, agents_to_abort):
1534 self.queue_entry = queue_entry
1535 self.agents_to_abort = agents_to_abort
jadmanski0afbb632008-06-06 21:10:57 +00001536 super(AbortTask, self).__init__('')
mbligh36768f02008-02-22 18:28:33 +00001537
1538
jadmanski0afbb632008-06-06 21:10:57 +00001539 def prolog(self):
1540 print "starting abort on host %s, job %s" % (
1541 self.queue_entry.host_id, self.queue_entry.job_id)
mbligh36768f02008-02-22 18:28:33 +00001542
mblighd64e5702008-04-04 21:39:28 +00001543
jadmanski0afbb632008-06-06 21:10:57 +00001544 def epilog(self):
1545 super(AbortTask, self).epilog()
1546 self.queue_entry.set_status('Aborted')
1547 self.success = True
1548
1549
1550 def run(self):
1551 for agent in self.agents_to_abort:
1552 if (agent.active_task):
1553 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +00001554
1555
1556class DBObject(object):
jadmanski0afbb632008-06-06 21:10:57 +00001557 def __init__(self, id=None, row=None, new_record=False):
1558 assert (bool(id) != bool(row))
mbligh36768f02008-02-22 18:28:33 +00001559
jadmanski0afbb632008-06-06 21:10:57 +00001560 self.__table = self._get_table()
1561 fields = self._fields()
mbligh36768f02008-02-22 18:28:33 +00001562
jadmanski0afbb632008-06-06 21:10:57 +00001563 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00001564
jadmanski0afbb632008-06-06 21:10:57 +00001565 if row is None:
1566 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
1567 rows = _db.execute(sql, (id,))
1568 if len(rows) == 0:
1569 raise "row not found (table=%s, id=%s)" % \
1570 (self.__table, id)
1571 row = rows[0]
mbligh36768f02008-02-22 18:28:33 +00001572
jadmanski0afbb632008-06-06 21:10:57 +00001573 assert len(row) == self.num_cols(), (
1574 "table = %s, row = %s/%d, fields = %s/%d" % (
1575 self.__table, row, len(row), fields, self.num_cols()))
mbligh36768f02008-02-22 18:28:33 +00001576
jadmanski0afbb632008-06-06 21:10:57 +00001577 self.__valid_fields = {}
1578 for i,value in enumerate(row):
1579 self.__dict__[fields[i]] = value
1580 self.__valid_fields[fields[i]] = True
mbligh36768f02008-02-22 18:28:33 +00001581
jadmanski0afbb632008-06-06 21:10:57 +00001582 del self.__valid_fields['id']
mbligh36768f02008-02-22 18:28:33 +00001583
mblighe2586682008-02-29 22:45:46 +00001584
jadmanski0afbb632008-06-06 21:10:57 +00001585 @classmethod
1586 def _get_table(cls):
1587 raise NotImplementedError('Subclasses must override this')
mblighe2586682008-02-29 22:45:46 +00001588
1589
jadmanski0afbb632008-06-06 21:10:57 +00001590 @classmethod
1591 def _fields(cls):
1592 raise NotImplementedError('Subclasses must override this')
showard04c82c52008-05-29 19:38:12 +00001593
1594
jadmanski0afbb632008-06-06 21:10:57 +00001595 @classmethod
1596 def num_cols(cls):
1597 return len(cls._fields())
showard04c82c52008-05-29 19:38:12 +00001598
1599
jadmanski0afbb632008-06-06 21:10:57 +00001600 def count(self, where, table = None):
1601 if not table:
1602 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00001603
jadmanski0afbb632008-06-06 21:10:57 +00001604 rows = _db.execute("""
1605 SELECT count(*) FROM %s
1606 WHERE %s
1607 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00001608
jadmanski0afbb632008-06-06 21:10:57 +00001609 assert len(rows) == 1
1610
1611 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001612
1613
mblighf8c624d2008-07-03 16:58:45 +00001614 def update_field(self, field, value, condition=''):
jadmanski0afbb632008-06-06 21:10:57 +00001615 assert self.__valid_fields[field]
mbligh36768f02008-02-22 18:28:33 +00001616
jadmanski0afbb632008-06-06 21:10:57 +00001617 if self.__dict__[field] == value:
1618 return
mbligh36768f02008-02-22 18:28:33 +00001619
mblighf8c624d2008-07-03 16:58:45 +00001620 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
1621 if condition:
1622 query += ' AND (%s)' % condition
jadmanski0afbb632008-06-06 21:10:57 +00001623 _db.execute(query, (value, self.id))
1624
1625 self.__dict__[field] = value
mbligh36768f02008-02-22 18:28:33 +00001626
1627
jadmanski0afbb632008-06-06 21:10:57 +00001628 def save(self):
1629 if self.__new_record:
1630 keys = self._fields()[1:] # avoid id
1631 columns = ','.join([str(key) for key in keys])
1632 values = ['"%s"' % self.__dict__[key] for key in keys]
1633 values = ','.join(values)
1634 query = """INSERT INTO %s (%s) VALUES (%s)""" % \
1635 (self.__table, columns, values)
1636 _db.execute(query)
mbligh36768f02008-02-22 18:28:33 +00001637
1638
jadmanski0afbb632008-06-06 21:10:57 +00001639 def delete(self):
1640 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
1641 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00001642
1643
showard63a34772008-08-18 19:32:50 +00001644 @staticmethod
1645 def _prefix_with(string, prefix):
1646 if string:
1647 string = prefix + string
1648 return string
1649
1650
jadmanski0afbb632008-06-06 21:10:57 +00001651 @classmethod
showard989f25d2008-10-01 11:38:11 +00001652 def fetch(cls, where='', params=(), joins='', order_by=''):
showard63a34772008-08-18 19:32:50 +00001653 order_by = cls._prefix_with(order_by, 'ORDER BY ')
1654 where = cls._prefix_with(where, 'WHERE ')
1655 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
1656 '%(where)s %(order_by)s' % {'table' : cls._get_table(),
1657 'joins' : joins,
1658 'where' : where,
1659 'order_by' : order_by})
1660 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00001661 for row in rows:
1662 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00001663
mbligh36768f02008-02-22 18:28:33 +00001664
1665class IneligibleHostQueue(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001666 def __init__(self, id=None, row=None, new_record=None):
1667 super(IneligibleHostQueue, self).__init__(id=id, row=row,
1668 new_record=new_record)
mblighe2586682008-02-29 22:45:46 +00001669
1670
jadmanski0afbb632008-06-06 21:10:57 +00001671 @classmethod
1672 def _get_table(cls):
1673 return 'ineligible_host_queues'
mbligh36768f02008-02-22 18:28:33 +00001674
1675
jadmanski0afbb632008-06-06 21:10:57 +00001676 @classmethod
1677 def _fields(cls):
1678 return ['id', 'job_id', 'host_id']
showard04c82c52008-05-29 19:38:12 +00001679
1680
showard989f25d2008-10-01 11:38:11 +00001681class Label(DBObject):
1682 @classmethod
1683 def _get_table(cls):
1684 return 'labels'
1685
1686
1687 @classmethod
1688 def _fields(cls):
1689 return ['id', 'name', 'kernel_config', 'platform', 'invalid',
1690 'only_if_needed']
1691
1692
mbligh36768f02008-02-22 18:28:33 +00001693class Host(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001694 def __init__(self, id=None, row=None):
1695 super(Host, self).__init__(id=id, row=row)
mblighe2586682008-02-29 22:45:46 +00001696
1697
jadmanski0afbb632008-06-06 21:10:57 +00001698 @classmethod
1699 def _get_table(cls):
1700 return 'hosts'
mbligh36768f02008-02-22 18:28:33 +00001701
1702
jadmanski0afbb632008-06-06 21:10:57 +00001703 @classmethod
1704 def _fields(cls):
1705 return ['id', 'hostname', 'locked', 'synch_id','status',
showard21baa452008-10-21 00:08:39 +00001706 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty']
showard04c82c52008-05-29 19:38:12 +00001707
1708
jadmanski0afbb632008-06-06 21:10:57 +00001709 def current_task(self):
1710 rows = _db.execute("""
1711 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
1712 """, (self.id,))
1713
1714 if len(rows) == 0:
1715 return None
1716 else:
1717 assert len(rows) == 1
1718 results = rows[0];
mblighf8c624d2008-07-03 16:58:45 +00001719# print "current = %s" % results
jadmanski0afbb632008-06-06 21:10:57 +00001720 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00001721
1722
jadmanski0afbb632008-06-06 21:10:57 +00001723 def yield_work(self):
1724 print "%s yielding work" % self.hostname
1725 if self.current_task():
1726 self.current_task().requeue()
1727
1728 def set_status(self,status):
1729 print '%s -> %s' % (self.hostname, status)
1730 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00001731
1732
showardd8e548a2008-09-09 03:04:57 +00001733 def labels(self):
1734 """
1735 Fetch a list of names of all non-platform labels associated with this
1736 host.
1737 """
1738 rows = _db.execute("""
1739 SELECT labels.name
1740 FROM labels
1741 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
1742 WHERE NOT labels.platform AND hosts_labels.host_id = %s
1743 ORDER BY labels.name
1744 """, (self.id,))
1745 return [row[0] for row in rows]
1746
1747
mbligh36768f02008-02-22 18:28:33 +00001748class HostQueueEntry(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001749 def __init__(self, id=None, row=None):
1750 assert id or row
1751 super(HostQueueEntry, self).__init__(id=id, row=row)
1752 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00001753
jadmanski0afbb632008-06-06 21:10:57 +00001754 if self.host_id:
1755 self.host = Host(self.host_id)
1756 else:
1757 self.host = None
mbligh36768f02008-02-22 18:28:33 +00001758
jadmanski0afbb632008-06-06 21:10:57 +00001759 self.queue_log_path = os.path.join(self.job.results_dir(),
1760 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00001761
1762
jadmanski0afbb632008-06-06 21:10:57 +00001763 @classmethod
1764 def _get_table(cls):
1765 return 'host_queue_entries'
mblighe2586682008-02-29 22:45:46 +00001766
1767
jadmanski0afbb632008-06-06 21:10:57 +00001768 @classmethod
1769 def _fields(cls):
1770 return ['id', 'job_id', 'host_id', 'priority', 'status',
showardb8471e32008-07-03 19:51:08 +00001771 'meta_host', 'active', 'complete', 'deleted']
showard04c82c52008-05-29 19:38:12 +00001772
1773
jadmanski0afbb632008-06-06 21:10:57 +00001774 def set_host(self, host):
1775 if host:
1776 self.queue_log_record('Assigning host ' + host.hostname)
1777 self.update_field('host_id', host.id)
1778 self.update_field('active', True)
1779 self.block_host(host.id)
1780 else:
1781 self.queue_log_record('Releasing host')
1782 self.unblock_host(self.host.id)
1783 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00001784
jadmanski0afbb632008-06-06 21:10:57 +00001785 self.host = host
mbligh36768f02008-02-22 18:28:33 +00001786
1787
jadmanski0afbb632008-06-06 21:10:57 +00001788 def get_host(self):
1789 return self.host
mbligh36768f02008-02-22 18:28:33 +00001790
1791
jadmanski0afbb632008-06-06 21:10:57 +00001792 def queue_log_record(self, log_line):
1793 now = str(datetime.datetime.now())
1794 queue_log = open(self.queue_log_path, 'a', 0)
1795 queue_log.write(now + ' ' + log_line + '\n')
1796 queue_log.close()
mbligh36768f02008-02-22 18:28:33 +00001797
1798
jadmanski0afbb632008-06-06 21:10:57 +00001799 def block_host(self, host_id):
1800 print "creating block %s/%s" % (self.job.id, host_id)
1801 row = [0, self.job.id, host_id]
1802 block = IneligibleHostQueue(row=row, new_record=True)
1803 block.save()
mblighe2586682008-02-29 22:45:46 +00001804
1805
jadmanski0afbb632008-06-06 21:10:57 +00001806 def unblock_host(self, host_id):
1807 print "removing block %s/%s" % (self.job.id, host_id)
1808 blocks = IneligibleHostQueue.fetch(
1809 'job_id=%d and host_id=%d' % (self.job.id, host_id))
1810 for block in blocks:
1811 block.delete()
mblighe2586682008-02-29 22:45:46 +00001812
1813
jadmanski0afbb632008-06-06 21:10:57 +00001814 def results_dir(self):
1815 if self.job.is_synchronous() or self.job.num_machines() == 1:
1816 return self.job.job_dir
1817 else:
1818 assert self.host
1819 return os.path.join(self.job.job_dir,
1820 self.host.hostname)
mbligh36768f02008-02-22 18:28:33 +00001821
mblighe2586682008-02-29 22:45:46 +00001822
jadmanski0afbb632008-06-06 21:10:57 +00001823 def verify_results_dir(self):
1824 if self.job.is_synchronous() or self.job.num_machines() > 1:
1825 assert self.host
1826 return os.path.join(self.job.job_dir,
1827 self.host.hostname)
1828 else:
1829 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +00001830
1831
jadmanski0afbb632008-06-06 21:10:57 +00001832 def set_status(self, status):
mblighf8c624d2008-07-03 16:58:45 +00001833 abort_statuses = ['Abort', 'Aborting', 'Aborted']
1834 if status not in abort_statuses:
1835 condition = ' AND '.join(['status <> "%s"' % x
1836 for x in abort_statuses])
1837 else:
1838 condition = ''
1839 self.update_field('status', status, condition=condition)
1840
jadmanski0afbb632008-06-06 21:10:57 +00001841 if self.host:
1842 hostname = self.host.hostname
1843 else:
1844 hostname = 'no host'
1845 print "%s/%d status -> %s" % (hostname, self.id, self.status)
mblighf8c624d2008-07-03 16:58:45 +00001846
jadmanski0afbb632008-06-06 21:10:57 +00001847 if status in ['Queued']:
1848 self.update_field('complete', False)
1849 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00001850
jadmanski0afbb632008-06-06 21:10:57 +00001851 if status in ['Pending', 'Running', 'Verifying', 'Starting',
1852 'Abort', 'Aborting']:
1853 self.update_field('complete', False)
1854 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00001855
jadmanski0afbb632008-06-06 21:10:57 +00001856 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
1857 self.update_field('complete', True)
1858 self.update_field('active', False)
showard542e8402008-09-19 20:16:18 +00001859 self._email_on_job_complete()
1860
1861
1862 def _email_on_job_complete(self):
1863 url = "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
1864
1865 if self.job.is_finished():
1866 subject = "Autotest: Job ID: %s \"%s\" Completed" % (
1867 self.job.id, self.job.name)
1868 body = "Job ID: %s\nJob Name: %s\n%s\n" % (
1869 self.job.id, self.job.name, url)
1870 send_email(_email_from, self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00001871
1872
jadmanski0afbb632008-06-06 21:10:57 +00001873 def run(self,assigned_host=None):
1874 if self.meta_host:
1875 assert assigned_host
1876 # ensure results dir exists for the queue log
1877 self.job.create_results_dir()
1878 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00001879
jadmanski0afbb632008-06-06 21:10:57 +00001880 print "%s/%s scheduled on %s, status=%s" % (self.job.name,
1881 self.meta_host, self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00001882
jadmanski0afbb632008-06-06 21:10:57 +00001883 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00001884
jadmanski0afbb632008-06-06 21:10:57 +00001885 def requeue(self):
1886 self.set_status('Queued')
mblighe2586682008-02-29 22:45:46 +00001887
jadmanski0afbb632008-06-06 21:10:57 +00001888 if self.meta_host:
1889 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00001890
1891
jadmanski0afbb632008-06-06 21:10:57 +00001892 def handle_host_failure(self):
1893 """\
1894 Called when this queue entry's host has failed verification and
1895 repair.
1896 """
1897 assert not self.meta_host
1898 self.set_status('Failed')
1899 if self.job.is_synchronous():
1900 self.job.stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00001901
1902
jadmanski0afbb632008-06-06 21:10:57 +00001903 def clear_results_dir(self, results_dir=None, dont_delete_files=False):
1904 results_dir = results_dir or self.results_dir()
1905 if not os.path.exists(results_dir):
1906 return
1907 if dont_delete_files:
1908 temp_dir = tempfile.mkdtemp(suffix='.clear_results')
1909 print 'Moving results from %s to %s' % (results_dir,
1910 temp_dir)
1911 for filename in os.listdir(results_dir):
1912 path = os.path.join(results_dir, filename)
1913 if dont_delete_files:
1914 shutil.move(path,
1915 os.path.join(temp_dir, filename))
1916 else:
1917 remove_file_or_dir(path)
mbligh36768f02008-02-22 18:28:33 +00001918
1919
jadmanskif7fa2cc2008-10-01 14:13:23 +00001920 @property
1921 def aborted_by(self):
1922 self._load_abort_info()
1923 return self._aborted_by
1924
1925
1926 @property
1927 def aborted_on(self):
1928 self._load_abort_info()
1929 return self._aborted_on
1930
1931
1932 def _load_abort_info(self):
1933 """ Fetch info about who aborted the job. """
1934 if hasattr(self, "_aborted_by"):
1935 return
1936 rows = _db.execute("""
1937 SELECT users.login, aborted_host_queue_entries.aborted_on
1938 FROM aborted_host_queue_entries
1939 INNER JOIN users
1940 ON users.id = aborted_host_queue_entries.aborted_by_id
1941 WHERE aborted_host_queue_entries.queue_entry_id = %s
1942 """, (self.id,))
1943 if rows:
1944 self._aborted_by, self._aborted_on = rows[0]
1945 else:
1946 self._aborted_by = self._aborted_on = None
1947
1948
showardb2e2c322008-10-14 17:33:55 +00001949 def on_pending(self):
1950 """
1951 Called when an entry in a synchronous job has passed verify. If the
1952 job is ready to run, returns an agent to run the job. Returns None
1953 otherwise.
1954 """
1955 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00001956 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00001957 if self.job.is_ready():
1958 return self.job.run(self)
1959 return None
1960
1961
showard1be97432008-10-17 15:30:45 +00001962 def abort(self, agents_to_abort=[]):
1963 abort_task = AbortTask(self, agents_to_abort)
1964 tasks = [abort_task]
1965
1966 host = self.get_host()
1967 if host:
1968 reboot_task = RebootTask(host)
1969 verify_task = VerifyTask(host=host)
1970 # just to make sure this host does not get taken away
1971 host.set_status('Rebooting')
1972 tasks += [reboot_task, verify_task]
1973
1974 self.set_status('Aborting')
1975 return Agent(tasks=tasks, queue_entry_ids=[self.id])
1976
1977
mbligh36768f02008-02-22 18:28:33 +00001978class Job(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001979 def __init__(self, id=None, row=None):
1980 assert id or row
1981 super(Job, self).__init__(id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00001982
jadmanski0afbb632008-06-06 21:10:57 +00001983 self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id,
1984 self.owner))
mblighe2586682008-02-29 22:45:46 +00001985
1986
jadmanski0afbb632008-06-06 21:10:57 +00001987 @classmethod
1988 def _get_table(cls):
1989 return 'jobs'
mbligh36768f02008-02-22 18:28:33 +00001990
1991
jadmanski0afbb632008-06-06 21:10:57 +00001992 @classmethod
1993 def _fields(cls):
1994 return ['id', 'owner', 'name', 'priority', 'control_file',
1995 'control_type', 'created_on', 'synch_type',
showard542e8402008-09-19 20:16:18 +00001996 'synch_count', 'synchronizing', 'timeout',
showard21baa452008-10-21 00:08:39 +00001997 'run_verify', 'email_list', 'reboot_before', 'reboot_after']
showard04c82c52008-05-29 19:38:12 +00001998
1999
jadmanski0afbb632008-06-06 21:10:57 +00002000 def is_server_job(self):
2001 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002002
2003
jadmanski0afbb632008-06-06 21:10:57 +00002004 def get_host_queue_entries(self):
2005 rows = _db.execute("""
2006 SELECT * FROM host_queue_entries
2007 WHERE job_id= %s
2008 """, (self.id,))
2009 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002010
jadmanski0afbb632008-06-06 21:10:57 +00002011 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002012
jadmanski0afbb632008-06-06 21:10:57 +00002013 return entries
mbligh36768f02008-02-22 18:28:33 +00002014
2015
jadmanski0afbb632008-06-06 21:10:57 +00002016 def set_status(self, status, update_queues=False):
2017 self.update_field('status',status)
2018
2019 if update_queues:
2020 for queue_entry in self.get_host_queue_entries():
2021 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002022
2023
jadmanski0afbb632008-06-06 21:10:57 +00002024 def is_synchronous(self):
2025 return self.synch_type == 2
mbligh36768f02008-02-22 18:28:33 +00002026
2027
jadmanski0afbb632008-06-06 21:10:57 +00002028 def is_ready(self):
2029 if not self.is_synchronous():
2030 return True
2031 sql = "job_id=%s AND status='Pending'" % self.id
2032 count = self.count(sql, table='host_queue_entries')
showardb2e2c322008-10-14 17:33:55 +00002033 return (count == self.num_machines())
mbligh36768f02008-02-22 18:28:33 +00002034
2035
jadmanski0afbb632008-06-06 21:10:57 +00002036 def results_dir(self):
2037 return self.job_dir
mbligh36768f02008-02-22 18:28:33 +00002038
jadmanski0afbb632008-06-06 21:10:57 +00002039 def num_machines(self, clause = None):
2040 sql = "job_id=%s" % self.id
2041 if clause:
2042 sql += " AND (%s)" % clause
2043 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002044
2045
jadmanski0afbb632008-06-06 21:10:57 +00002046 def num_queued(self):
2047 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002048
2049
jadmanski0afbb632008-06-06 21:10:57 +00002050 def num_active(self):
2051 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002052
2053
jadmanski0afbb632008-06-06 21:10:57 +00002054 def num_complete(self):
2055 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002056
2057
jadmanski0afbb632008-06-06 21:10:57 +00002058 def is_finished(self):
2059 left = self.num_queued()
2060 print "%s: %s machines left" % (self.name, left)
2061 return left==0
mbligh36768f02008-02-22 18:28:33 +00002062
mbligh36768f02008-02-22 18:28:33 +00002063
jadmanski0afbb632008-06-06 21:10:57 +00002064 def stop_all_entries(self):
2065 for child_entry in self.get_host_queue_entries():
2066 if not child_entry.complete:
2067 child_entry.set_status('Stopped')
mblighe2586682008-02-29 22:45:46 +00002068
2069
jadmanski0afbb632008-06-06 21:10:57 +00002070 def write_to_machines_file(self, queue_entry):
2071 hostname = queue_entry.get_host().hostname
2072 print "writing %s to job %s machines file" % (hostname, self.id)
2073 file_path = os.path.join(self.job_dir, '.machines')
2074 mf = open(file_path, 'a')
2075 mf.write("%s\n" % queue_entry.get_host().hostname)
2076 mf.close()
mbligh36768f02008-02-22 18:28:33 +00002077
2078
jadmanski0afbb632008-06-06 21:10:57 +00002079 def create_results_dir(self, queue_entry=None):
2080 print "create: active: %s complete %s" % (self.num_active(),
2081 self.num_complete())
mbligh36768f02008-02-22 18:28:33 +00002082
jadmanski0afbb632008-06-06 21:10:57 +00002083 if not os.path.exists(self.job_dir):
2084 os.makedirs(self.job_dir)
mbligh36768f02008-02-22 18:28:33 +00002085
jadmanski0afbb632008-06-06 21:10:57 +00002086 if queue_entry:
showarde05654d2008-10-28 20:38:40 +00002087 results_dir = queue_entry.results_dir()
2088 if not os.path.exists(results_dir):
2089 os.makedirs(results_dir)
2090 return results_dir
jadmanski0afbb632008-06-06 21:10:57 +00002091 return self.job_dir
mbligh36768f02008-02-22 18:28:33 +00002092
2093
showardb2e2c322008-10-14 17:33:55 +00002094 def _write_control_file(self):
2095 'Writes control file out to disk, returns a filename'
2096 control_fd, control_filename = tempfile.mkstemp(suffix='.control_file')
2097 control_file = os.fdopen(control_fd, 'w')
jadmanski0afbb632008-06-06 21:10:57 +00002098 if self.control_file:
showardb2e2c322008-10-14 17:33:55 +00002099 control_file.write(self.control_file)
2100 control_file.close()
2101 return control_filename
mbligh36768f02008-02-22 18:28:33 +00002102
showardb2e2c322008-10-14 17:33:55 +00002103
2104 def _get_job_tag(self, queue_entries):
2105 base_job_tag = "%s-%s" % (self.id, self.owner)
2106 if self.is_synchronous() or self.num_machines() == 1:
2107 return base_job_tag
jadmanski0afbb632008-06-06 21:10:57 +00002108 else:
showardb2e2c322008-10-14 17:33:55 +00002109 return base_job_tag + '/' + queue_entries[0].get_host().hostname
2110
2111
2112 def _get_autoserv_params(self, queue_entries):
2113 results_dir = self.create_results_dir(queue_entries[0])
2114 control_filename = self._write_control_file()
jadmanski0afbb632008-06-06 21:10:57 +00002115 hostnames = ','.join([entry.get_host().hostname
2116 for entry in queue_entries])
showardb2e2c322008-10-14 17:33:55 +00002117 job_tag = self._get_job_tag(queue_entries)
mbligh36768f02008-02-22 18:28:33 +00002118
showardb2e2c322008-10-14 17:33:55 +00002119 params = [_autoserv_path, '-P', job_tag, '-p', '-n',
showard21baa452008-10-21 00:08:39 +00002120 '-r', os.path.abspath(results_dir), '-u', self.owner,
2121 '-l', self.name, '-m', hostnames, control_filename]
mbligh36768f02008-02-22 18:28:33 +00002122
jadmanski0afbb632008-06-06 21:10:57 +00002123 if not self.is_server_job():
2124 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002125
showardb2e2c322008-10-14 17:33:55 +00002126 return params
mblighe2586682008-02-29 22:45:46 +00002127
mbligh36768f02008-02-22 18:28:33 +00002128
showard21baa452008-10-21 00:08:39 +00002129 def _get_pre_job_tasks(self, queue_entry, verify_task_class=VerifyTask):
2130 do_reboot = False
showard0fc38302008-10-23 00:44:07 +00002131 if self.reboot_before == models.RebootBefore.ALWAYS:
showard21baa452008-10-21 00:08:39 +00002132 do_reboot = True
showard0fc38302008-10-23 00:44:07 +00002133 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showard21baa452008-10-21 00:08:39 +00002134 do_reboot = queue_entry.get_host().dirty
2135
2136 tasks = []
2137 if do_reboot:
2138 tasks.append(RebootTask(queue_entry.get_host()))
2139 tasks.append(verify_task_class(queue_entry=queue_entry))
2140 return tasks
2141
2142
showardb2e2c322008-10-14 17:33:55 +00002143 def _run_synchronous(self, queue_entry):
2144 if not self.is_ready():
showard9976ce92008-10-15 20:28:13 +00002145 if self.run_verify:
showard21baa452008-10-21 00:08:39 +00002146 return Agent(self._get_pre_job_tasks(queue_entry,
2147 VerifySynchronousTask),
2148 [queue_entry.id])
showard9976ce92008-10-15 20:28:13 +00002149 else:
2150 return queue_entry.on_pending()
mbligh36768f02008-02-22 18:28:33 +00002151
showardb2e2c322008-10-14 17:33:55 +00002152 return self._finish_run(self.get_host_queue_entries())
2153
2154
2155 def _run_asynchronous(self, queue_entry):
showard9976ce92008-10-15 20:28:13 +00002156 initial_tasks = []
2157 if self.run_verify:
showard21baa452008-10-21 00:08:39 +00002158 initial_tasks = self._get_pre_job_tasks(queue_entry)
showardb2e2c322008-10-14 17:33:55 +00002159 return self._finish_run([queue_entry], initial_tasks)
2160
2161
2162 def _finish_run(self, queue_entries, initial_tasks=[]):
showardb2ccdda2008-10-28 20:39:05 +00002163 for queue_entry in queue_entries:
2164 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002165 params = self._get_autoserv_params(queue_entries)
2166 queue_task = QueueTask(job=self, queue_entries=queue_entries,
2167 cmd=params)
2168 tasks = initial_tasks + [queue_task]
2169 entry_ids = [entry.id for entry in queue_entries]
2170
2171 return Agent(tasks, entry_ids, num_processes=len(queue_entries))
2172
2173
2174 def run(self, queue_entry):
2175 if self.is_synchronous():
2176 return self._run_synchronous(queue_entry)
2177 return self._run_asynchronous(queue_entry)
mbligh36768f02008-02-22 18:28:33 +00002178
2179
2180if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002181 main()