blob: 1c7f8d40f65ce18ebecabaeb7d1786c46e2e4cbb [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard542e8402008-09-19 20:16:18 +00008import datetime, errno, MySQLdb, optparse, os, pwd, Queue, re, shutil, signal
9import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
mbligh70feeee2008-06-11 16:20:49 +000010import common
showard21baa452008-10-21 00:08:39 +000011from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000012from autotest_lib.client.common_lib import global_config
13from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000014from autotest_lib.database import database_connection
showard21baa452008-10-21 00:08:39 +000015from autotest_lib.frontend.afe import models
mbligh70feeee2008-06-11 16:20:49 +000016
mblighb090f142008-02-27 21:33:46 +000017
mbligh36768f02008-02-22 18:28:33 +000018RESULTS_DIR = '.'
19AUTOSERV_NICE_LEVEL = 10
showardb1e51872008-10-07 11:08:18 +000020CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000021
22AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
23
24if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000025 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000026AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
27AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
28
29if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000030 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000031
mblighbb421852008-03-11 22:36:16 +000032AUTOSERV_PID_FILE = '.autoserv_execute'
mbligh90a549d2008-03-25 23:52:34 +000033# how long to wait for autoserv to write a pidfile
34PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000035
mbligh6f8bab42008-02-29 22:45:14 +000036_db = None
mbligh36768f02008-02-22 18:28:33 +000037_shutdown = False
38_notify_email = None
mbligh4314a712008-02-29 22:44:30 +000039_autoserv_path = 'autoserv'
40_testing_mode = False
showardec113162008-05-08 00:52:49 +000041_global_config_section = 'SCHEDULER'
showard542e8402008-09-19 20:16:18 +000042_base_url = None
43# see os.getlogin() online docs
44_email_from = pwd.getpwuid(os.getuid())[0]
mbligh36768f02008-02-22 18:28:33 +000045
46
47def main():
jadmanski0afbb632008-06-06 21:10:57 +000048 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000049
jadmanski0afbb632008-06-06 21:10:57 +000050 parser = optparse.OptionParser(usage)
51 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
52 action='store_true')
53 parser.add_option('--logfile', help='Set a log file that all stdout ' +
54 'should be redirected to. Stderr will go to this ' +
55 'file + ".err"')
56 parser.add_option('--test', help='Indicate that scheduler is under ' +
57 'test and should use dummy autoserv and no parsing',
58 action='store_true')
59 (options, args) = parser.parse_args()
60 if len(args) != 1:
61 parser.print_usage()
62 return
mbligh36768f02008-02-22 18:28:33 +000063
jadmanski0afbb632008-06-06 21:10:57 +000064 global RESULTS_DIR
65 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +000066
jadmanski0afbb632008-06-06 21:10:57 +000067 # read in notify_email from global_config
68 c = global_config.global_config
69 global _notify_email
70 val = c.get_config_value(_global_config_section, "notify_email")
71 if val != "":
72 _notify_email = val
mbligh36768f02008-02-22 18:28:33 +000073
showard3bb499f2008-07-03 19:42:20 +000074 tick_pause = c.get_config_value(
75 _global_config_section, 'tick_pause_sec', type=int)
76
jadmanski0afbb632008-06-06 21:10:57 +000077 if options.test:
78 global _autoserv_path
79 _autoserv_path = 'autoserv_dummy'
80 global _testing_mode
81 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +000082
showard542e8402008-09-19 20:16:18 +000083 # read in base url
84 global _base_url
showardb1e51872008-10-07 11:08:18 +000085 val = c.get_config_value(CONFIG_SECTION, "base_url")
showard542e8402008-09-19 20:16:18 +000086 if val:
87 _base_url = val
88 else:
89 _base_url = "http://your_autotest_server/afe/"
90
jadmanski0afbb632008-06-06 21:10:57 +000091 init(options.logfile)
92 dispatcher = Dispatcher()
93 dispatcher.do_initial_recovery(recover_hosts=options.recover_hosts)
94
95 try:
96 while not _shutdown:
97 dispatcher.tick()
showard3bb499f2008-07-03 19:42:20 +000098 time.sleep(tick_pause)
jadmanski0afbb632008-06-06 21:10:57 +000099 except:
100 log_stacktrace("Uncaught exception; terminating monitor_db")
101
102 email_manager.send_queued_emails()
103 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000104
105
106def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000107 global _shutdown
108 _shutdown = True
109 print "Shutdown request received."
mbligh36768f02008-02-22 18:28:33 +0000110
111
112def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000113 if logfile:
114 enable_logging(logfile)
115 print "%s> dispatcher starting" % time.strftime("%X %x")
116 print "My PID is %d" % os.getpid()
mbligh36768f02008-02-22 18:28:33 +0000117
showardb1e51872008-10-07 11:08:18 +0000118 if _testing_mode:
119 global_config.global_config.override_config_value(
120 CONFIG_SECTION, 'database', 'stresstest_autotest_web')
121
jadmanski0afbb632008-06-06 21:10:57 +0000122 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
123 global _db
showardb1e51872008-10-07 11:08:18 +0000124 _db = database_connection.DatabaseConnection(CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000125 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000126
showardfa8629c2008-11-04 16:51:23 +0000127 # ensure Django connection is in autocommit
128 setup_django_environment.enable_autocommit()
129
jadmanski0afbb632008-06-06 21:10:57 +0000130 print "Setting signal handler"
131 signal.signal(signal.SIGINT, handle_sigint)
132
133 print "Connected! Running..."
mbligh36768f02008-02-22 18:28:33 +0000134
135
136def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000137 out_file = logfile
138 err_file = "%s.err" % logfile
139 print "Enabling logging to %s (%s)" % (out_file, err_file)
140 out_fd = open(out_file, "a", buffering=0)
141 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000142
jadmanski0afbb632008-06-06 21:10:57 +0000143 os.dup2(out_fd.fileno(), sys.stdout.fileno())
144 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000145
jadmanski0afbb632008-06-06 21:10:57 +0000146 sys.stdout = out_fd
147 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000148
149
mblighd5c95802008-03-05 00:33:46 +0000150def queue_entries_to_abort():
jadmanski0afbb632008-06-06 21:10:57 +0000151 rows = _db.execute("""
152 SELECT * FROM host_queue_entries WHERE status='Abort';
153 """)
154 qe = [HostQueueEntry(row=i) for i in rows]
155 return qe
mbligh36768f02008-02-22 18:28:33 +0000156
mblighe2586682008-02-29 22:45:46 +0000157def remove_file_or_dir(path):
jadmanski0afbb632008-06-06 21:10:57 +0000158 if stat.S_ISDIR(os.stat(path).st_mode):
159 # directory
160 shutil.rmtree(path)
161 else:
162 # file
163 os.remove(path)
mblighe2586682008-02-29 22:45:46 +0000164
165
mbligh36768f02008-02-22 18:28:33 +0000166def log_stacktrace(reason):
jadmanski0afbb632008-06-06 21:10:57 +0000167 (type, value, tb) = sys.exc_info()
168 str = "EXCEPTION: %s\n" % reason
169 str += ''.join(traceback.format_exception(type, value, tb))
mbligh36768f02008-02-22 18:28:33 +0000170
jadmanski0afbb632008-06-06 21:10:57 +0000171 sys.stderr.write("\n%s\n" % str)
172 email_manager.enqueue_notify_email("monitor_db exception", str)
mbligh36768f02008-02-22 18:28:33 +0000173
mblighbb421852008-03-11 22:36:16 +0000174
175def get_proc_poll_fn(pid):
jadmanski0afbb632008-06-06 21:10:57 +0000176 proc_path = os.path.join('/proc', str(pid))
177 def poll_fn():
178 if os.path.exists(proc_path):
179 return None
180 return 0 # we can't get a real exit code
181 return poll_fn
mblighbb421852008-03-11 22:36:16 +0000182
183
showard542e8402008-09-19 20:16:18 +0000184def send_email(from_addr, to_string, subject, body):
185 """Mails out emails to the addresses listed in to_string.
186
187 to_string is split into a list which can be delimited by any of:
188 ';', ',', ':' or any whitespace
189 """
190
191 # Create list from string removing empty strings from the list.
192 to_list = [x for x in re.split('\s|,|;|:', to_string) if x]
showard7d182aa2008-09-22 16:17:24 +0000193 if not to_list:
194 return
195
showard542e8402008-09-19 20:16:18 +0000196 msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (
197 from_addr, ', '.join(to_list), subject, body)
showard7d182aa2008-09-22 16:17:24 +0000198 try:
199 mailer = smtplib.SMTP('localhost')
200 try:
201 mailer.sendmail(from_addr, to_list, msg)
202 finally:
203 mailer.quit()
204 except Exception, e:
205 print "Sending email failed. Reason: %s" % repr(e)
showard542e8402008-09-19 20:16:18 +0000206
207
mblighbb421852008-03-11 22:36:16 +0000208def kill_autoserv(pid, poll_fn=None):
jadmanski0afbb632008-06-06 21:10:57 +0000209 print 'killing', pid
210 if poll_fn is None:
211 poll_fn = get_proc_poll_fn(pid)
212 if poll_fn() == None:
213 os.kill(pid, signal.SIGCONT)
214 os.kill(pid, signal.SIGTERM)
mbligh36768f02008-02-22 18:28:33 +0000215
216
showard7cf9a9b2008-05-15 21:15:52 +0000217class EmailNotificationManager(object):
jadmanski0afbb632008-06-06 21:10:57 +0000218 def __init__(self):
219 self._emails = []
showard7cf9a9b2008-05-15 21:15:52 +0000220
jadmanski0afbb632008-06-06 21:10:57 +0000221 def enqueue_notify_email(self, subject, message):
222 if not _notify_email:
223 return
showard7cf9a9b2008-05-15 21:15:52 +0000224
jadmanski0afbb632008-06-06 21:10:57 +0000225 body = 'Subject: ' + subject + '\n'
226 body += "%s / %s / %s\n%s" % (socket.gethostname(),
227 os.getpid(),
228 time.strftime("%X %x"), message)
229 self._emails.append(body)
showard7cf9a9b2008-05-15 21:15:52 +0000230
231
jadmanski0afbb632008-06-06 21:10:57 +0000232 def send_queued_emails(self):
233 if not self._emails:
234 return
235 subject = 'Scheduler notifications from ' + socket.gethostname()
236 separator = '\n' + '-' * 40 + '\n'
237 body = separator.join(self._emails)
showard7cf9a9b2008-05-15 21:15:52 +0000238
showard542e8402008-09-19 20:16:18 +0000239 send_email(_email_from, _notify_email, subject, body)
jadmanski0afbb632008-06-06 21:10:57 +0000240 self._emails = []
showard7cf9a9b2008-05-15 21:15:52 +0000241
242email_manager = EmailNotificationManager()
243
244
showard63a34772008-08-18 19:32:50 +0000245class HostScheduler(object):
246 def _get_ready_hosts(self):
247 # avoid any host with a currently active queue entry against it
248 hosts = Host.fetch(
249 joins='LEFT JOIN host_queue_entries AS active_hqe '
250 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000251 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000252 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000253 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000254 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
255 return dict((host.id, host) for host in hosts)
256
257
258 @staticmethod
259 def _get_sql_id_list(id_list):
260 return ','.join(str(item_id) for item_id in id_list)
261
262
263 @classmethod
showard989f25d2008-10-01 11:38:11 +0000264 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000265 if not id_list:
266 return {}
showard63a34772008-08-18 19:32:50 +0000267 query %= cls._get_sql_id_list(id_list)
268 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000269 return cls._process_many2many_dict(rows, flip)
270
271
272 @staticmethod
273 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000274 result = {}
275 for row in rows:
276 left_id, right_id = long(row[0]), long(row[1])
showard989f25d2008-10-01 11:38:11 +0000277 if flip:
278 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000279 result.setdefault(left_id, set()).add(right_id)
280 return result
281
282
283 @classmethod
284 def _get_job_acl_groups(cls, job_ids):
285 query = """
286 SELECT jobs.id, acl_groups_users.acl_group_id
287 FROM jobs
288 INNER JOIN users ON users.login = jobs.owner
289 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
290 WHERE jobs.id IN (%s)
291 """
292 return cls._get_many2many_dict(query, job_ids)
293
294
295 @classmethod
296 def _get_job_ineligible_hosts(cls, job_ids):
297 query = """
298 SELECT job_id, host_id
299 FROM ineligible_host_queues
300 WHERE job_id IN (%s)
301 """
302 return cls._get_many2many_dict(query, job_ids)
303
304
305 @classmethod
showard989f25d2008-10-01 11:38:11 +0000306 def _get_job_dependencies(cls, job_ids):
307 query = """
308 SELECT job_id, label_id
309 FROM jobs_dependency_labels
310 WHERE job_id IN (%s)
311 """
312 return cls._get_many2many_dict(query, job_ids)
313
314
315 @classmethod
showard63a34772008-08-18 19:32:50 +0000316 def _get_host_acls(cls, host_ids):
317 query = """
318 SELECT host_id, acl_group_id
319 FROM acl_groups_hosts
320 WHERE host_id IN (%s)
321 """
322 return cls._get_many2many_dict(query, host_ids)
323
324
325 @classmethod
326 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000327 if not host_ids:
328 return {}, {}
showard63a34772008-08-18 19:32:50 +0000329 query = """
330 SELECT label_id, host_id
331 FROM hosts_labels
332 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000333 """ % cls._get_sql_id_list(host_ids)
334 rows = _db.execute(query)
335 labels_to_hosts = cls._process_many2many_dict(rows)
336 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
337 return labels_to_hosts, hosts_to_labels
338
339
340 @classmethod
341 def _get_labels(cls):
342 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000343
344
345 def refresh(self, pending_queue_entries):
346 self._hosts_available = self._get_ready_hosts()
347
348 relevant_jobs = [queue_entry.job_id
349 for queue_entry in pending_queue_entries]
350 self._job_acls = self._get_job_acl_groups(relevant_jobs)
351 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000352 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000353
354 host_ids = self._hosts_available.keys()
355 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000356 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
357
358 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000359
360
361 def _is_acl_accessible(self, host_id, queue_entry):
362 job_acls = self._job_acls.get(queue_entry.job_id, set())
363 host_acls = self._host_acls.get(host_id, set())
364 return len(host_acls.intersection(job_acls)) > 0
365
366
showard989f25d2008-10-01 11:38:11 +0000367 def _check_job_dependencies(self, job_dependencies, host_labels):
368 missing = job_dependencies - host_labels
369 return len(job_dependencies - host_labels) == 0
370
371
372 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
373 queue_entry):
374 for label_id in host_labels:
375 label = self._labels[label_id]
376 if not label.only_if_needed:
377 # we don't care about non-only_if_needed labels
378 continue
379 if queue_entry.meta_host == label_id:
380 # if the label was requested in a metahost it's OK
381 continue
382 if label_id not in job_dependencies:
383 return False
384 return True
385
386
387 def _is_host_eligible_for_job(self, host_id, queue_entry):
388 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
389 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000390
391 acl = self._is_acl_accessible(host_id, queue_entry)
392 deps = self._check_job_dependencies(job_dependencies, host_labels)
393 only_if = self._check_only_if_needed_labels(job_dependencies,
394 host_labels, queue_entry)
395 return acl and deps and only_if
showard989f25d2008-10-01 11:38:11 +0000396
397
showard63a34772008-08-18 19:32:50 +0000398 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000399 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000400 return None
401 return self._hosts_available.pop(queue_entry.host_id, None)
402
403
404 def _is_host_usable(self, host_id):
405 if host_id not in self._hosts_available:
406 # host was already used during this scheduling cycle
407 return False
408 if self._hosts_available[host_id].invalid:
409 # Invalid hosts cannot be used for metahosts. They're included in
410 # the original query because they can be used by non-metahosts.
411 return False
412 return True
413
414
415 def _schedule_metahost(self, queue_entry):
416 label_id = queue_entry.meta_host
417 hosts_in_label = self._label_hosts.get(label_id, set())
418 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
419 set())
420
421 # must iterate over a copy so we can mutate the original while iterating
422 for host_id in list(hosts_in_label):
423 if not self._is_host_usable(host_id):
424 hosts_in_label.remove(host_id)
425 continue
426 if host_id in ineligible_host_ids:
427 continue
showard989f25d2008-10-01 11:38:11 +0000428 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000429 continue
430
431 hosts_in_label.remove(host_id)
432 return self._hosts_available.pop(host_id)
433 return None
434
435
436 def find_eligible_host(self, queue_entry):
437 if not queue_entry.meta_host:
438 return self._schedule_non_metahost(queue_entry)
439 return self._schedule_metahost(queue_entry)
440
441
mbligh36768f02008-02-22 18:28:33 +0000442class Dispatcher:
jadmanski0afbb632008-06-06 21:10:57 +0000443 autoserv_procs_cache = None
showard4c5374f2008-09-04 17:02:56 +0000444 max_running_processes = global_config.global_config.get_config_value(
jadmanski0afbb632008-06-06 21:10:57 +0000445 _global_config_section, 'max_running_jobs', type=int)
showard4c5374f2008-09-04 17:02:56 +0000446 max_processes_started_per_cycle = (
jadmanski0afbb632008-06-06 21:10:57 +0000447 global_config.global_config.get_config_value(
448 _global_config_section, 'max_jobs_started_per_cycle', type=int))
showard3bb499f2008-07-03 19:42:20 +0000449 clean_interval = (
450 global_config.global_config.get_config_value(
451 _global_config_section, 'clean_interval_minutes', type=int))
showard98863972008-10-29 21:14:56 +0000452 synch_job_start_timeout_minutes = (
453 global_config.global_config.get_config_value(
454 _global_config_section, 'synch_job_start_timeout_minutes',
455 type=int))
mbligh90a549d2008-03-25 23:52:34 +0000456
jadmanski0afbb632008-06-06 21:10:57 +0000457 def __init__(self):
458 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000459 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000460 self._host_scheduler = HostScheduler()
mbligh36768f02008-02-22 18:28:33 +0000461
mbligh36768f02008-02-22 18:28:33 +0000462
jadmanski0afbb632008-06-06 21:10:57 +0000463 def do_initial_recovery(self, recover_hosts=True):
464 # always recover processes
465 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000466
jadmanski0afbb632008-06-06 21:10:57 +0000467 if recover_hosts:
468 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000469
470
jadmanski0afbb632008-06-06 21:10:57 +0000471 def tick(self):
472 Dispatcher.autoserv_procs_cache = None
showarda3ab0d52008-11-03 19:03:47 +0000473 self._run_cleanup_maybe()
jadmanski0afbb632008-06-06 21:10:57 +0000474 self._find_aborting()
475 self._schedule_new_jobs()
476 self._handle_agents()
jadmanski0afbb632008-06-06 21:10:57 +0000477 email_manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000478
showard97aed502008-11-04 02:01:24 +0000479
showarda3ab0d52008-11-03 19:03:47 +0000480 def _run_cleanup_maybe(self):
481 if self._last_clean_time + self.clean_interval * 60 < time.time():
482 print 'Running cleanup'
483 self._abort_timed_out_jobs()
484 self._abort_jobs_past_synch_start_timeout()
485 self._clear_inactive_blocks()
showardfa8629c2008-11-04 16:51:23 +0000486 self._check_for_db_inconsistencies()
showarda3ab0d52008-11-03 19:03:47 +0000487 self._last_clean_time = time.time()
488
mbligh36768f02008-02-22 18:28:33 +0000489
jadmanski0afbb632008-06-06 21:10:57 +0000490 def add_agent(self, agent):
491 self._agents.append(agent)
492 agent.dispatcher = self
mblighd5c95802008-03-05 00:33:46 +0000493
jadmanski0afbb632008-06-06 21:10:57 +0000494 # Find agent corresponding to the specified queue_entry
495 def get_agents(self, queue_entry):
496 res_agents = []
497 for agent in self._agents:
498 if queue_entry.id in agent.queue_entry_ids:
499 res_agents.append(agent)
500 return res_agents
mbligh36768f02008-02-22 18:28:33 +0000501
502
jadmanski0afbb632008-06-06 21:10:57 +0000503 def remove_agent(self, agent):
504 self._agents.remove(agent)
showardec113162008-05-08 00:52:49 +0000505
506
showard4c5374f2008-09-04 17:02:56 +0000507 def num_running_processes(self):
508 return sum(agent.num_processes for agent in self._agents
509 if agent.is_running())
mblighbb421852008-03-11 22:36:16 +0000510
511
jadmanski0afbb632008-06-06 21:10:57 +0000512 @classmethod
513 def find_autoservs(cls, orphans_only=False):
514 """\
515 Returns a dict mapping pids to command lines for root autoserv
516 processes. If orphans_only=True, return only processes that
517 have been orphaned (i.e. parent pid = 1).
518 """
519 if cls.autoserv_procs_cache is not None:
520 return cls.autoserv_procs_cache
521
522 proc = subprocess.Popen(
523 ['/bin/ps', 'x', '-o', 'pid,pgid,ppid,comm,args'],
524 stdout=subprocess.PIPE)
525 # split each line into the four columns output by ps
526 procs = [line.split(None, 4) for line in
527 proc.communicate()[0].splitlines()]
528 autoserv_procs = {}
529 for proc in procs:
530 # check ppid == 1 for orphans
531 if orphans_only and proc[2] != 1:
532 continue
533 # only root autoserv processes have pgid == pid
534 if (proc[3] == 'autoserv' and # comm
535 proc[1] == proc[0]): # pgid == pid
536 # map pid to args
537 autoserv_procs[int(proc[0])] = proc[4]
538 cls.autoserv_procs_cache = autoserv_procs
539 return autoserv_procs
mblighbb421852008-03-11 22:36:16 +0000540
541
jadmanski0afbb632008-06-06 21:10:57 +0000542 def recover_queue_entry(self, queue_entry, run_monitor):
543 job = queue_entry.job
544 if job.is_synchronous():
545 all_queue_entries = job.get_host_queue_entries()
546 else:
547 all_queue_entries = [queue_entry]
548 all_queue_entry_ids = [queue_entry.id for queue_entry
549 in all_queue_entries]
550 queue_task = RecoveryQueueTask(
551 job=queue_entry.job,
552 queue_entries=all_queue_entries,
553 run_monitor=run_monitor)
554 self.add_agent(Agent(tasks=[queue_task],
555 queue_entry_ids=all_queue_entry_ids))
mblighbb421852008-03-11 22:36:16 +0000556
557
jadmanski0afbb632008-06-06 21:10:57 +0000558 def _recover_processes(self):
559 orphans = self.find_autoservs(orphans_only=True)
mblighbb421852008-03-11 22:36:16 +0000560
jadmanski0afbb632008-06-06 21:10:57 +0000561 # first, recover running queue entries
562 rows = _db.execute("""SELECT * FROM host_queue_entries
563 WHERE status = 'Running'""")
564 queue_entries = [HostQueueEntry(row=i) for i in rows]
565 requeue_entries = []
566 recovered_entry_ids = set()
567 for queue_entry in queue_entries:
568 run_monitor = PidfileRunMonitor(
569 queue_entry.results_dir())
showard21baa452008-10-21 00:08:39 +0000570 if not run_monitor.has_pid():
jadmanski0afbb632008-06-06 21:10:57 +0000571 # autoserv apparently never got run, so requeue
572 requeue_entries.append(queue_entry)
573 continue
574 if queue_entry.id in recovered_entry_ids:
575 # synchronous job we've already recovered
576 continue
showard21baa452008-10-21 00:08:39 +0000577 pid = run_monitor.get_pid()
jadmanski0afbb632008-06-06 21:10:57 +0000578 print 'Recovering queue entry %d (pid %d)' % (
579 queue_entry.id, pid)
580 job = queue_entry.job
581 if job.is_synchronous():
582 for entry in job.get_host_queue_entries():
583 assert entry.active
584 recovered_entry_ids.add(entry.id)
585 self.recover_queue_entry(queue_entry,
586 run_monitor)
587 orphans.pop(pid, None)
mblighd5c95802008-03-05 00:33:46 +0000588
jadmanski0afbb632008-06-06 21:10:57 +0000589 # and requeue other active queue entries
590 rows = _db.execute("""SELECT * FROM host_queue_entries
591 WHERE active AND NOT complete
592 AND status != 'Running'
593 AND status != 'Pending'
594 AND status != 'Abort'
595 AND status != 'Aborting'""")
596 queue_entries = [HostQueueEntry(row=i) for i in rows]
597 for queue_entry in queue_entries + requeue_entries:
598 print 'Requeuing running QE %d' % queue_entry.id
599 queue_entry.clear_results_dir(dont_delete_files=True)
600 queue_entry.requeue()
mbligh90a549d2008-03-25 23:52:34 +0000601
602
jadmanski0afbb632008-06-06 21:10:57 +0000603 # now kill any remaining autoserv processes
604 for pid in orphans.keys():
605 print 'Killing orphan %d (%s)' % (pid, orphans[pid])
606 kill_autoserv(pid)
607
608 # recover aborting tasks
609 rebooting_host_ids = set()
610 rows = _db.execute("""SELECT * FROM host_queue_entries
611 WHERE status='Abort' or status='Aborting'""")
612 queue_entries = [HostQueueEntry(row=i) for i in rows]
613 for queue_entry in queue_entries:
614 print 'Recovering aborting QE %d' % queue_entry.id
showard1be97432008-10-17 15:30:45 +0000615 agent = queue_entry.abort()
616 self.add_agent(agent)
617 if queue_entry.get_host():
618 rebooting_host_ids.add(queue_entry.get_host().id)
jadmanski0afbb632008-06-06 21:10:57 +0000619
showard97aed502008-11-04 02:01:24 +0000620 self._recover_parsing_entries()
621
showard45ae8192008-11-05 19:32:53 +0000622 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000623 self._reverify_hosts_where("""(status = 'Repairing' OR
624 status = 'Verifying' OR
showard45ae8192008-11-05 19:32:53 +0000625 status = 'Cleaning')""",
jadmanski0afbb632008-06-06 21:10:57 +0000626 exclude_ids=rebooting_host_ids)
627
628 # finally, recover "Running" hosts with no active queue entries,
629 # although this should never happen
630 message = ('Recovering running host %s - this probably '
631 'indicates a scheduler bug')
632 self._reverify_hosts_where("""status = 'Running' AND
633 id NOT IN (SELECT host_id
634 FROM host_queue_entries
635 WHERE active)""",
636 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000637
638
jadmanski0afbb632008-06-06 21:10:57 +0000639 def _reverify_hosts_where(self, where,
640 print_message='Reverifying host %s',
641 exclude_ids=set()):
642 rows = _db.execute('SELECT * FROM hosts WHERE locked = 0 AND '
643 'invalid = 0 AND ' + where)
644 hosts = [Host(row=i) for i in rows]
645 for host in hosts:
646 if host.id in exclude_ids:
647 continue
648 if print_message is not None:
649 print print_message % host.hostname
650 verify_task = VerifyTask(host = host)
651 self.add_agent(Agent(tasks = [verify_task]))
mbligh36768f02008-02-22 18:28:33 +0000652
653
showard97aed502008-11-04 02:01:24 +0000654 def _recover_parsing_entries(self):
655 # make sure there are no old parsers running
656 os.system('killall parse')
657
658 recovered_synch_jobs = set()
659 for entry in HostQueueEntry.fetch(where='status = "Parsing"'):
660 job = entry.job
661 if job.is_synchronous():
662 if job.id in recovered_synch_jobs:
663 continue
664 queue_entries = job.get_host_queue_entries()
665 recovered_synch_jobs.add(job.id)
666 else:
667 queue_entries = [entry]
668
669 reparse_task = FinalReparseTask(queue_entries)
670 self.add_agent(Agent([reparse_task]))
671
672
jadmanski0afbb632008-06-06 21:10:57 +0000673 def _recover_hosts(self):
674 # recover "Repair Failed" hosts
675 message = 'Reverifying dead host %s'
676 self._reverify_hosts_where("status = 'Repair Failed'",
677 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000678
679
showard3bb499f2008-07-03 19:42:20 +0000680 def _abort_timed_out_jobs(self):
681 """
682 Aborts all jobs that have timed out and not completed
683 """
showarda3ab0d52008-11-03 19:03:47 +0000684 query = models.Job.objects.filter(hostqueueentry__complete=False).extra(
685 where=['created_on + INTERVAL timeout HOUR < NOW()'])
686 for job in query.distinct():
687 print 'Aborting job %d due to job timeout' % job.id
688 job.abort(None)
showard3bb499f2008-07-03 19:42:20 +0000689
690
showard98863972008-10-29 21:14:56 +0000691 def _abort_jobs_past_synch_start_timeout(self):
692 """
693 Abort synchronous jobs that are past the start timeout (from global
694 config) and are holding a machine that's in everyone.
695 """
696 timeout_delta = datetime.timedelta(
697 minutes=self.synch_job_start_timeout_minutes)
698 timeout_start = datetime.datetime.now() - timeout_delta
699 query = models.Job.objects.filter(
700 synch_type=models.Test.SynchType.SYNCHRONOUS,
701 created_on__lt=timeout_start,
702 hostqueueentry__status='Pending',
703 hostqueueentry__host__acl_group__name='Everyone')
704 for job in query.distinct():
705 print 'Aborting job %d due to start timeout' % job.id
706 job.abort(None)
707
708
jadmanski0afbb632008-06-06 21:10:57 +0000709 def _clear_inactive_blocks(self):
710 """
711 Clear out blocks for all completed jobs.
712 """
713 # this would be simpler using NOT IN (subquery), but MySQL
714 # treats all IN subqueries as dependent, so this optimizes much
715 # better
716 _db.execute("""
717 DELETE ihq FROM ineligible_host_queues ihq
showard4eaaf522008-06-06 22:28:07 +0000718 LEFT JOIN (SELECT DISTINCT job_id FROM host_queue_entries
jadmanski0afbb632008-06-06 21:10:57 +0000719 WHERE NOT complete) hqe
720 USING (job_id) WHERE hqe.job_id IS NULL""")
showard04c82c52008-05-29 19:38:12 +0000721
722
showardb95b1bd2008-08-15 18:11:04 +0000723 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000724 # prioritize by job priority, then non-metahost over metahost, then FIFO
725 return list(HostQueueEntry.fetch(
726 where='NOT complete AND NOT active',
showard3dd6b882008-10-27 19:21:39 +0000727 order_by='priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000728
729
jadmanski0afbb632008-06-06 21:10:57 +0000730 def _schedule_new_jobs(self):
731 print "finding work"
732
showard63a34772008-08-18 19:32:50 +0000733 queue_entries = self._get_pending_queue_entries()
734 if not queue_entries:
showardb95b1bd2008-08-15 18:11:04 +0000735 return
showardb95b1bd2008-08-15 18:11:04 +0000736
showard63a34772008-08-18 19:32:50 +0000737 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000738
showard63a34772008-08-18 19:32:50 +0000739 for queue_entry in queue_entries:
740 assigned_host = self._host_scheduler.find_eligible_host(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000741 if not assigned_host:
jadmanski0afbb632008-06-06 21:10:57 +0000742 continue
showardb95b1bd2008-08-15 18:11:04 +0000743 self._run_queue_entry(queue_entry, assigned_host)
744
745
746 def _run_queue_entry(self, queue_entry, host):
747 agent = queue_entry.run(assigned_host=host)
showard9976ce92008-10-15 20:28:13 +0000748 # in some cases (synchronous jobs with run_verify=False), agent may be None
749 if agent:
750 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000751
752
jadmanski0afbb632008-06-06 21:10:57 +0000753 def _find_aborting(self):
754 num_aborted = 0
755 # Find jobs that are aborting
756 for entry in queue_entries_to_abort():
757 agents_to_abort = self.get_agents(entry)
showard1be97432008-10-17 15:30:45 +0000758 for agent in agents_to_abort:
759 self.remove_agent(agent)
760
761 agent = entry.abort(agents_to_abort)
762 self.add_agent(agent)
jadmanski0afbb632008-06-06 21:10:57 +0000763 num_aborted += 1
764 if num_aborted >= 50:
765 break
766
767
showard4c5374f2008-09-04 17:02:56 +0000768 def _can_start_agent(self, agent, num_running_processes,
769 num_started_this_cycle, have_reached_limit):
770 # always allow zero-process agents to run
771 if agent.num_processes == 0:
772 return True
773 # don't allow any nonzero-process agents to run after we've reached a
774 # limit (this avoids starvation of many-process agents)
775 if have_reached_limit:
776 return False
777 # total process throttling
778 if (num_running_processes + agent.num_processes >
779 self.max_running_processes):
780 return False
781 # if a single agent exceeds the per-cycle throttling, still allow it to
782 # run when it's the first agent in the cycle
783 if num_started_this_cycle == 0:
784 return True
785 # per-cycle throttling
786 if (num_started_this_cycle + agent.num_processes >
787 self.max_processes_started_per_cycle):
788 return False
789 return True
790
791
jadmanski0afbb632008-06-06 21:10:57 +0000792 def _handle_agents(self):
showard4c5374f2008-09-04 17:02:56 +0000793 num_running_processes = self.num_running_processes()
jadmanski0afbb632008-06-06 21:10:57 +0000794 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000795 have_reached_limit = False
796 # iterate over copy, so we can remove agents during iteration
797 for agent in list(self._agents):
798 if agent.is_done():
jadmanski0afbb632008-06-06 21:10:57 +0000799 print "agent finished"
showard4c5374f2008-09-04 17:02:56 +0000800 self._agents.remove(agent)
showard4c5374f2008-09-04 17:02:56 +0000801 continue
802 if not agent.is_running():
803 if not self._can_start_agent(agent, num_running_processes,
804 num_started_this_cycle,
805 have_reached_limit):
806 have_reached_limit = True
807 continue
808 num_running_processes += agent.num_processes
809 num_started_this_cycle += agent.num_processes
810 agent.tick()
811 print num_running_processes, 'running processes'
mbligh36768f02008-02-22 18:28:33 +0000812
813
showardfa8629c2008-11-04 16:51:23 +0000814 def _check_for_db_inconsistencies(self):
815 query = models.HostQueueEntry.objects.filter(active=True, complete=True)
816 if query.count() != 0:
817 subject = ('%d queue entries found with active=complete=1'
818 % query.count())
819 message = '\n'.join(str(entry.get_object_dict())
820 for entry in query[:50])
821 if len(query) > 50:
822 message += '\n(truncated)\n'
823
824 print subject
825 email_manager.enqueue_notify_email(subject, message)
826
827
mbligh36768f02008-02-22 18:28:33 +0000828class RunMonitor(object):
jadmanski0afbb632008-06-06 21:10:57 +0000829 def __init__(self, cmd, nice_level = None, log_file = None):
830 self.nice_level = nice_level
831 self.log_file = log_file
832 self.cmd = cmd
mbligh36768f02008-02-22 18:28:33 +0000833
jadmanski0afbb632008-06-06 21:10:57 +0000834 def run(self):
835 if self.nice_level:
836 nice_cmd = ['nice','-n', str(self.nice_level)]
837 nice_cmd.extend(self.cmd)
838 self.cmd = nice_cmd
mbligh36768f02008-02-22 18:28:33 +0000839
jadmanski0afbb632008-06-06 21:10:57 +0000840 out_file = None
841 if self.log_file:
842 try:
843 os.makedirs(os.path.dirname(self.log_file))
844 except OSError, exc:
845 if exc.errno != errno.EEXIST:
846 log_stacktrace(
847 'Unexpected error creating logfile '
848 'directory for %s' % self.log_file)
849 try:
850 out_file = open(self.log_file, 'a')
851 out_file.write("\n%s\n" % ('*'*80))
852 out_file.write("%s> %s\n" %
853 (time.strftime("%X %x"),
854 self.cmd))
855 out_file.write("%s\n" % ('*'*80))
856 except (OSError, IOError):
857 log_stacktrace('Error opening log file %s' %
858 self.log_file)
mblighcadb3532008-04-15 17:46:26 +0000859
jadmanski0afbb632008-06-06 21:10:57 +0000860 if not out_file:
861 out_file = open('/dev/null', 'w')
mblighcadb3532008-04-15 17:46:26 +0000862
jadmanski0afbb632008-06-06 21:10:57 +0000863 in_devnull = open('/dev/null', 'r')
864 print "cmd = %s" % self.cmd
865 print "path = %s" % os.getcwd()
mbligh36768f02008-02-22 18:28:33 +0000866
jadmanski0afbb632008-06-06 21:10:57 +0000867 self.proc = subprocess.Popen(self.cmd, stdout=out_file,
868 stderr=subprocess.STDOUT,
869 stdin=in_devnull)
870 out_file.close()
871 in_devnull.close()
mbligh36768f02008-02-22 18:28:33 +0000872
873
jadmanski0afbb632008-06-06 21:10:57 +0000874 def get_pid(self):
875 return self.proc.pid
mblighbb421852008-03-11 22:36:16 +0000876
877
jadmanski0afbb632008-06-06 21:10:57 +0000878 def kill(self):
879 kill_autoserv(self.get_pid(), self.exit_code)
mblighbb421852008-03-11 22:36:16 +0000880
mbligh36768f02008-02-22 18:28:33 +0000881
jadmanski0afbb632008-06-06 21:10:57 +0000882 def exit_code(self):
883 return self.proc.poll()
mbligh36768f02008-02-22 18:28:33 +0000884
885
mblighbb421852008-03-11 22:36:16 +0000886class PidfileException(Exception):
jadmanski0afbb632008-06-06 21:10:57 +0000887 """\
888 Raised when there's some unexpected behavior with the pid file.
889 """
mblighbb421852008-03-11 22:36:16 +0000890
891
892class PidfileRunMonitor(RunMonitor):
showard21baa452008-10-21 00:08:39 +0000893 class PidfileState(object):
894 pid = None
895 exit_status = None
896 num_tests_failed = None
897
898 def reset(self):
899 self.pid = self.exit_status = self.all_tests_passed = None
900
901
jadmanski0afbb632008-06-06 21:10:57 +0000902 def __init__(self, results_dir, cmd=None, nice_level=None,
903 log_file=None):
904 self.results_dir = os.path.abspath(results_dir)
905 self.pid_file = os.path.join(results_dir, AUTOSERV_PID_FILE)
906 self.lost_process = False
907 self.start_time = time.time()
showard21baa452008-10-21 00:08:39 +0000908 self._state = self.PidfileState()
showardb376bc52008-06-13 20:48:45 +0000909 super(PidfileRunMonitor, self).__init__(cmd, nice_level, log_file)
mblighbb421852008-03-11 22:36:16 +0000910
911
showard21baa452008-10-21 00:08:39 +0000912 def has_pid(self):
913 self._get_pidfile_info()
914 return self._state.pid is not None
915
916
jadmanski0afbb632008-06-06 21:10:57 +0000917 def get_pid(self):
showard21baa452008-10-21 00:08:39 +0000918 self._get_pidfile_info()
919 assert self._state.pid is not None
920 return self._state.pid
mblighbb421852008-03-11 22:36:16 +0000921
922
jadmanski0afbb632008-06-06 21:10:57 +0000923 def _check_command_line(self, command_line, spacer=' ',
924 print_error=False):
925 results_dir_arg = spacer.join(('', '-r', self.results_dir, ''))
926 match = results_dir_arg in command_line
927 if print_error and not match:
928 print '%s not found in %s' % (repr(results_dir_arg),
929 repr(command_line))
930 return match
mbligh90a549d2008-03-25 23:52:34 +0000931
932
showard21baa452008-10-21 00:08:39 +0000933 def _check_proc_fs(self):
934 cmdline_path = os.path.join('/proc', str(self._state.pid), 'cmdline')
jadmanski0afbb632008-06-06 21:10:57 +0000935 try:
936 cmdline_file = open(cmdline_path, 'r')
937 cmdline = cmdline_file.read().strip()
938 cmdline_file.close()
939 except IOError:
940 return False
941 # /proc/.../cmdline has \x00 separating args
942 return self._check_command_line(cmdline, spacer='\x00',
943 print_error=True)
mblighbb421852008-03-11 22:36:16 +0000944
945
showard21baa452008-10-21 00:08:39 +0000946 def _read_pidfile(self):
947 self._state.reset()
jadmanski0afbb632008-06-06 21:10:57 +0000948 if not os.path.exists(self.pid_file):
showard21baa452008-10-21 00:08:39 +0000949 return
jadmanski0afbb632008-06-06 21:10:57 +0000950 file_obj = open(self.pid_file, 'r')
951 lines = file_obj.readlines()
952 file_obj.close()
showard3dd6b882008-10-27 19:21:39 +0000953 if not lines:
954 return
955 if len(lines) > 3:
showard21baa452008-10-21 00:08:39 +0000956 raise PidfileException('Corrupt pid file (%d lines) at %s:\n%s' %
957 (len(lines), self.pid_file, lines))
jadmanski0afbb632008-06-06 21:10:57 +0000958 try:
showard21baa452008-10-21 00:08:39 +0000959 self._state.pid = int(lines[0])
960 if len(lines) > 1:
961 self._state.exit_status = int(lines[1])
962 if len(lines) == 3:
963 self._state.num_tests_failed = int(lines[2])
964 else:
965 # maintain backwards-compatibility with two-line pidfiles
966 self._state.num_tests_failed = 0
jadmanski0afbb632008-06-06 21:10:57 +0000967 except ValueError, exc:
showard3dd6b882008-10-27 19:21:39 +0000968 raise PidfileException('Corrupt pid file: ' + str(exc.args))
mblighbb421852008-03-11 22:36:16 +0000969
mblighbb421852008-03-11 22:36:16 +0000970
jadmanski0afbb632008-06-06 21:10:57 +0000971 def _find_autoserv_proc(self):
972 autoserv_procs = Dispatcher.find_autoservs()
973 for pid, args in autoserv_procs.iteritems():
974 if self._check_command_line(args):
975 return pid, args
976 return None, None
mbligh90a549d2008-03-25 23:52:34 +0000977
978
showard21baa452008-10-21 00:08:39 +0000979 def _handle_pidfile_error(self, error, message=''):
980 message = error + '\nPid: %s\nPidfile: %s\n%s' % (self._state.pid,
981 self.pid_file,
982 message)
983 print message
984 email_manager.enqueue_notify_email(error, message)
985 if self._state.pid is not None:
986 pid = self._state.pid
987 else:
988 pid = 0
989 self.on_lost_process(pid)
990
991
992 def _get_pidfile_info_helper(self):
jadmanski0afbb632008-06-06 21:10:57 +0000993 if self.lost_process:
showard21baa452008-10-21 00:08:39 +0000994 return
mblighbb421852008-03-11 22:36:16 +0000995
showard21baa452008-10-21 00:08:39 +0000996 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +0000997
showard21baa452008-10-21 00:08:39 +0000998 if self._state.pid is None:
999 self._handle_no_pid()
1000 return
mbligh90a549d2008-03-25 23:52:34 +00001001
showard21baa452008-10-21 00:08:39 +00001002 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001003 # double check whether or not autoserv is running
showard21baa452008-10-21 00:08:39 +00001004 proc_running = self._check_proc_fs()
jadmanski0afbb632008-06-06 21:10:57 +00001005 if proc_running:
showard21baa452008-10-21 00:08:39 +00001006 return
mbligh90a549d2008-03-25 23:52:34 +00001007
jadmanski0afbb632008-06-06 21:10:57 +00001008 # pid but no process - maybe process *just* exited
showard21baa452008-10-21 00:08:39 +00001009 self._read_pidfile()
1010 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001011 # autoserv exited without writing an exit code
1012 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001013 self._handle_pidfile_error(
1014 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001015
showard21baa452008-10-21 00:08:39 +00001016
1017 def _get_pidfile_info(self):
1018 """\
1019 After completion, self._state will contain:
1020 pid=None, exit_status=None if autoserv has not yet run
1021 pid!=None, exit_status=None if autoserv is running
1022 pid!=None, exit_status!=None if autoserv has completed
1023 """
1024 try:
1025 self._get_pidfile_info_helper()
1026 except PidfileException, exc:
1027 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001028
1029
jadmanski0afbb632008-06-06 21:10:57 +00001030 def _handle_no_pid(self):
1031 """\
1032 Called when no pidfile is found or no pid is in the pidfile.
1033 """
1034 # is autoserv running?
1035 pid, args = self._find_autoserv_proc()
1036 if pid is None:
1037 # no autoserv process running
1038 message = 'No pid found at ' + self.pid_file
1039 else:
1040 message = ("Process %d (%s) hasn't written pidfile %s" %
1041 (pid, args, self.pid_file))
mbligh90a549d2008-03-25 23:52:34 +00001042
jadmanski0afbb632008-06-06 21:10:57 +00001043 print message
1044 if time.time() - self.start_time > PIDFILE_TIMEOUT:
1045 email_manager.enqueue_notify_email(
1046 'Process has failed to write pidfile', message)
1047 if pid is not None:
1048 kill_autoserv(pid)
1049 else:
1050 pid = 0
1051 self.on_lost_process(pid)
showard21baa452008-10-21 00:08:39 +00001052 return
mbligh90a549d2008-03-25 23:52:34 +00001053
1054
jadmanski0afbb632008-06-06 21:10:57 +00001055 def on_lost_process(self, pid):
1056 """\
1057 Called when autoserv has exited without writing an exit status,
1058 or we've timed out waiting for autoserv to write a pid to the
1059 pidfile. In either case, we just return failure and the caller
1060 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001061
jadmanski0afbb632008-06-06 21:10:57 +00001062 pid is unimportant here, as it shouldn't be used by anyone.
1063 """
1064 self.lost_process = True
showard21baa452008-10-21 00:08:39 +00001065 self._state.pid = pid
1066 self._state.exit_status = 1
1067 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001068
1069
jadmanski0afbb632008-06-06 21:10:57 +00001070 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001071 self._get_pidfile_info()
1072 return self._state.exit_status
1073
1074
1075 def num_tests_failed(self):
1076 self._get_pidfile_info()
1077 assert self._state.num_tests_failed is not None
1078 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001079
1080
mbligh36768f02008-02-22 18:28:33 +00001081class Agent(object):
showard4c5374f2008-09-04 17:02:56 +00001082 def __init__(self, tasks, queue_entry_ids=[], num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001083 self.active_task = None
1084 self.queue = Queue.Queue(0)
1085 self.dispatcher = None
1086 self.queue_entry_ids = queue_entry_ids
showard4c5374f2008-09-04 17:02:56 +00001087 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001088
1089 for task in tasks:
1090 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001091
1092
jadmanski0afbb632008-06-06 21:10:57 +00001093 def add_task(self, task):
1094 self.queue.put_nowait(task)
1095 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001096
1097
jadmanski0afbb632008-06-06 21:10:57 +00001098 def tick(self):
showard21baa452008-10-21 00:08:39 +00001099 while not self.is_done():
1100 if self.active_task and not self.active_task.is_done():
1101 self.active_task.poll()
1102 if not self.active_task.is_done():
1103 return
1104 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001105
1106
jadmanski0afbb632008-06-06 21:10:57 +00001107 def _next_task(self):
1108 print "agent picking task"
1109 if self.active_task:
1110 assert self.active_task.is_done()
mbligh36768f02008-02-22 18:28:33 +00001111
jadmanski0afbb632008-06-06 21:10:57 +00001112 if not self.active_task.success:
1113 self.on_task_failure()
mblighe2586682008-02-29 22:45:46 +00001114
jadmanski0afbb632008-06-06 21:10:57 +00001115 self.active_task = None
1116 if not self.is_done():
1117 self.active_task = self.queue.get_nowait()
1118 if self.active_task:
1119 self.active_task.start()
mbligh36768f02008-02-22 18:28:33 +00001120
1121
jadmanski0afbb632008-06-06 21:10:57 +00001122 def on_task_failure(self):
1123 self.queue = Queue.Queue(0)
1124 for task in self.active_task.failure_tasks:
1125 self.add_task(task)
mbligh16c722d2008-03-05 00:58:44 +00001126
mblighe2586682008-02-29 22:45:46 +00001127
showard4c5374f2008-09-04 17:02:56 +00001128 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001129 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001130
1131
jadmanski0afbb632008-06-06 21:10:57 +00001132 def is_done(self):
1133 return self.active_task == None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001134
1135
jadmanski0afbb632008-06-06 21:10:57 +00001136 def start(self):
1137 assert self.dispatcher
mbligh36768f02008-02-22 18:28:33 +00001138
jadmanski0afbb632008-06-06 21:10:57 +00001139 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001140
jadmanski0afbb632008-06-06 21:10:57 +00001141
mbligh36768f02008-02-22 18:28:33 +00001142class AgentTask(object):
jadmanski0afbb632008-06-06 21:10:57 +00001143 def __init__(self, cmd, failure_tasks = []):
1144 self.done = False
1145 self.failure_tasks = failure_tasks
1146 self.started = False
1147 self.cmd = cmd
1148 self.task = None
1149 self.agent = None
1150 self.monitor = None
1151 self.success = None
mbligh36768f02008-02-22 18:28:33 +00001152
1153
jadmanski0afbb632008-06-06 21:10:57 +00001154 def poll(self):
1155 print "poll"
1156 if self.monitor:
1157 self.tick(self.monitor.exit_code())
1158 else:
1159 self.finished(False)
mbligh36768f02008-02-22 18:28:33 +00001160
1161
jadmanski0afbb632008-06-06 21:10:57 +00001162 def tick(self, exit_code):
1163 if exit_code==None:
1164 return
1165# print "exit_code was %d" % exit_code
1166 if exit_code == 0:
1167 success = True
1168 else:
1169 success = False
mbligh36768f02008-02-22 18:28:33 +00001170
jadmanski0afbb632008-06-06 21:10:57 +00001171 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001172
1173
jadmanski0afbb632008-06-06 21:10:57 +00001174 def is_done(self):
1175 return self.done
mbligh36768f02008-02-22 18:28:33 +00001176
1177
jadmanski0afbb632008-06-06 21:10:57 +00001178 def finished(self, success):
1179 self.done = True
1180 self.success = success
1181 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001182
1183
jadmanski0afbb632008-06-06 21:10:57 +00001184 def prolog(self):
1185 pass
mblighd64e5702008-04-04 21:39:28 +00001186
1187
jadmanski0afbb632008-06-06 21:10:57 +00001188 def create_temp_resultsdir(self, suffix=''):
1189 self.temp_results_dir = tempfile.mkdtemp(suffix=suffix)
mblighd64e5702008-04-04 21:39:28 +00001190
mbligh36768f02008-02-22 18:28:33 +00001191
jadmanski0afbb632008-06-06 21:10:57 +00001192 def cleanup(self):
1193 if (hasattr(self, 'temp_results_dir') and
1194 os.path.exists(self.temp_results_dir)):
1195 shutil.rmtree(self.temp_results_dir)
mbligh36768f02008-02-22 18:28:33 +00001196
1197
jadmanski0afbb632008-06-06 21:10:57 +00001198 def epilog(self):
1199 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001200
1201
jadmanski0afbb632008-06-06 21:10:57 +00001202 def start(self):
1203 assert self.agent
1204
1205 if not self.started:
1206 self.prolog()
1207 self.run()
1208
1209 self.started = True
1210
1211
1212 def abort(self):
1213 if self.monitor:
1214 self.monitor.kill()
1215 self.done = True
1216 self.cleanup()
1217
1218
1219 def run(self):
1220 if self.cmd:
1221 print "agent starting monitor"
1222 log_file = None
showard97aed502008-11-04 02:01:24 +00001223 if hasattr(self, 'log_file'):
1224 log_file = self.log_file
1225 elif hasattr(self, 'host'):
jadmanski0afbb632008-06-06 21:10:57 +00001226 log_file = os.path.join(RESULTS_DIR, 'hosts',
1227 self.host.hostname)
1228 self.monitor = RunMonitor(
showard97aed502008-11-04 02:01:24 +00001229 self.cmd, nice_level=AUTOSERV_NICE_LEVEL, log_file=log_file)
jadmanski0afbb632008-06-06 21:10:57 +00001230 self.monitor.run()
mbligh36768f02008-02-22 18:28:33 +00001231
1232
1233class RepairTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001234 def __init__(self, host, fail_queue_entry=None):
1235 """\
1236 fail_queue_entry: queue entry to mark failed if this repair
1237 fails.
1238 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001239 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001240 # normalize the protection name
1241 protection = host_protections.Protection.get_attr_name(protection)
jadmanski0afbb632008-06-06 21:10:57 +00001242 self.create_temp_resultsdir('.repair')
1243 cmd = [_autoserv_path , '-R', '-m', host.hostname,
jadmanskifb7cfb12008-07-09 14:13:21 +00001244 '-r', self.temp_results_dir, '--host-protection', protection]
jadmanski0afbb632008-06-06 21:10:57 +00001245 self.host = host
1246 self.fail_queue_entry = fail_queue_entry
1247 super(RepairTask, self).__init__(cmd)
mblighe2586682008-02-29 22:45:46 +00001248
mbligh36768f02008-02-22 18:28:33 +00001249
jadmanski0afbb632008-06-06 21:10:57 +00001250 def prolog(self):
1251 print "repair_task starting"
1252 self.host.set_status('Repairing')
mbligh36768f02008-02-22 18:28:33 +00001253
1254
jadmanski0afbb632008-06-06 21:10:57 +00001255 def epilog(self):
1256 super(RepairTask, self).epilog()
1257 if self.success:
1258 self.host.set_status('Ready')
1259 else:
1260 self.host.set_status('Repair Failed')
1261 if self.fail_queue_entry:
1262 self.fail_queue_entry.handle_host_failure()
mbligh36768f02008-02-22 18:28:33 +00001263
1264
1265class VerifyTask(AgentTask):
showard9976ce92008-10-15 20:28:13 +00001266 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001267 assert bool(queue_entry) != bool(host)
mbligh36768f02008-02-22 18:28:33 +00001268
jadmanski0afbb632008-06-06 21:10:57 +00001269 self.host = host or queue_entry.host
1270 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001271
jadmanski0afbb632008-06-06 21:10:57 +00001272 self.create_temp_resultsdir('.verify')
showard3d9899a2008-07-31 02:11:58 +00001273
showard9976ce92008-10-15 20:28:13 +00001274 cmd = [_autoserv_path,'-v','-m',self.host.hostname, '-r', self.temp_results_dir]
mbligh36768f02008-02-22 18:28:33 +00001275
jadmanski0afbb632008-06-06 21:10:57 +00001276 fail_queue_entry = None
1277 if queue_entry and not queue_entry.meta_host:
1278 fail_queue_entry = queue_entry
1279 failure_tasks = [RepairTask(self.host, fail_queue_entry)]
mblighe2586682008-02-29 22:45:46 +00001280
jadmanski0afbb632008-06-06 21:10:57 +00001281 super(VerifyTask, self).__init__(cmd,
1282 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001283
1284
jadmanski0afbb632008-06-06 21:10:57 +00001285 def prolog(self):
1286 print "starting verify on %s" % (self.host.hostname)
1287 if self.queue_entry:
1288 self.queue_entry.set_status('Verifying')
1289 self.queue_entry.clear_results_dir(
1290 self.queue_entry.verify_results_dir())
1291 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001292
1293
jadmanski0afbb632008-06-06 21:10:57 +00001294 def cleanup(self):
1295 if not os.path.exists(self.temp_results_dir):
1296 return
1297 if self.queue_entry and (self.success or
1298 not self.queue_entry.meta_host):
1299 self.move_results()
1300 super(VerifyTask, self).cleanup()
mblighd64e5702008-04-04 21:39:28 +00001301
1302
jadmanski0afbb632008-06-06 21:10:57 +00001303 def epilog(self):
1304 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001305
jadmanski0afbb632008-06-06 21:10:57 +00001306 if self.success:
1307 self.host.set_status('Ready')
1308 elif self.queue_entry:
1309 self.queue_entry.requeue()
mbligh36768f02008-02-22 18:28:33 +00001310
1311
jadmanski0afbb632008-06-06 21:10:57 +00001312 def move_results(self):
1313 assert self.queue_entry is not None
1314 target_dir = self.queue_entry.verify_results_dir()
1315 if not os.path.exists(target_dir):
1316 os.makedirs(target_dir)
1317 files = os.listdir(self.temp_results_dir)
1318 for filename in files:
1319 if filename == AUTOSERV_PID_FILE:
1320 continue
1321 self.force_move(os.path.join(self.temp_results_dir,
1322 filename),
1323 os.path.join(target_dir, filename))
mbligh36768f02008-02-22 18:28:33 +00001324
1325
jadmanski0afbb632008-06-06 21:10:57 +00001326 @staticmethod
1327 def force_move(source, dest):
1328 """\
1329 Replacement for shutil.move() that will delete the destination
1330 if it exists, even if it's a directory.
1331 """
1332 if os.path.exists(dest):
showardfa8629c2008-11-04 16:51:23 +00001333 warning = 'Warning: removing existing destination file ' + dest
1334 print warning
1335 email_manager.enqueue_notify_email(warning, warning)
jadmanski0afbb632008-06-06 21:10:57 +00001336 remove_file_or_dir(dest)
1337 shutil.move(source, dest)
mblighe2586682008-02-29 22:45:46 +00001338
1339
mblighdffd6372008-02-29 22:47:33 +00001340class VerifySynchronousTask(VerifyTask):
jadmanski0afbb632008-06-06 21:10:57 +00001341 def epilog(self):
1342 super(VerifySynchronousTask, self).epilog()
1343 if self.success:
1344 if self.queue_entry.job.num_complete() > 0:
1345 # some other entry failed verify, and we've
1346 # already been marked as stopped
1347 return
mblighdffd6372008-02-29 22:47:33 +00001348
showardb2e2c322008-10-14 17:33:55 +00001349 agent = self.queue_entry.on_pending()
1350 if agent:
jadmanski0afbb632008-06-06 21:10:57 +00001351 self.agent.dispatcher.add_agent(agent)
mblighe2586682008-02-29 22:45:46 +00001352
showardb2e2c322008-10-14 17:33:55 +00001353
mbligh36768f02008-02-22 18:28:33 +00001354class QueueTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001355 def __init__(self, job, queue_entries, cmd):
1356 super(QueueTask, self).__init__(cmd)
1357 self.job = job
1358 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001359
1360
jadmanski0afbb632008-06-06 21:10:57 +00001361 @staticmethod
showardd8e548a2008-09-09 03:04:57 +00001362 def _write_keyval(keyval_dir, field, value, keyval_filename='keyval'):
1363 key_path = os.path.join(keyval_dir, keyval_filename)
jadmanski0afbb632008-06-06 21:10:57 +00001364 keyval_file = open(key_path, 'a')
showardd8e548a2008-09-09 03:04:57 +00001365 print >> keyval_file, '%s=%s' % (field, str(value))
jadmanski0afbb632008-06-06 21:10:57 +00001366 keyval_file.close()
mbligh36768f02008-02-22 18:28:33 +00001367
1368
showardd8e548a2008-09-09 03:04:57 +00001369 def _host_keyval_dir(self):
1370 return os.path.join(self.results_dir(), 'host_keyvals')
1371
1372
1373 def _write_host_keyval(self, host):
1374 labels = ','.join(host.labels())
1375 self._write_keyval(self._host_keyval_dir(), 'labels', labels,
1376 keyval_filename=host.hostname)
1377
1378 def _create_host_keyval_dir(self):
1379 directory = self._host_keyval_dir()
1380 if not os.path.exists(directory):
1381 os.makedirs(directory)
1382
1383
jadmanski0afbb632008-06-06 21:10:57 +00001384 def results_dir(self):
1385 return self.queue_entries[0].results_dir()
mblighbb421852008-03-11 22:36:16 +00001386
1387
jadmanski0afbb632008-06-06 21:10:57 +00001388 def run(self):
1389 """\
1390 Override AgentTask.run() so we can use a PidfileRunMonitor.
1391 """
1392 self.monitor = PidfileRunMonitor(self.results_dir(),
1393 cmd=self.cmd,
1394 nice_level=AUTOSERV_NICE_LEVEL)
1395 self.monitor.run()
mblighbb421852008-03-11 22:36:16 +00001396
1397
jadmanski0afbb632008-06-06 21:10:57 +00001398 def prolog(self):
1399 # write some job timestamps into the job keyval file
1400 queued = time.mktime(self.job.created_on.timetuple())
1401 started = time.time()
showardd8e548a2008-09-09 03:04:57 +00001402 self._write_keyval(self.results_dir(), "job_queued", int(queued))
1403 self._write_keyval(self.results_dir(), "job_started", int(started))
1404 self._create_host_keyval_dir()
jadmanski0afbb632008-06-06 21:10:57 +00001405 for queue_entry in self.queue_entries:
showardd8e548a2008-09-09 03:04:57 +00001406 self._write_host_keyval(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001407 print "starting queue_task on %s/%s" % (queue_entry.host.hostname, queue_entry.id)
1408 queue_entry.set_status('Running')
1409 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001410 queue_entry.host.update_field('dirty', 1)
jadmanski0afbb632008-06-06 21:10:57 +00001411 if (not self.job.is_synchronous() and
1412 self.job.num_machines() > 1):
1413 assert len(self.queue_entries) == 1
1414 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001415
1416
showard97aed502008-11-04 02:01:24 +00001417 def _finish_task(self, success):
jadmanski0afbb632008-06-06 21:10:57 +00001418 # write out the finished time into the results keyval
1419 finished = time.time()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001420 self._write_keyval(self.results_dir(), "job_finished", int(finished))
jadmanskic2ac77f2008-05-16 21:44:04 +00001421
jadmanski0afbb632008-06-06 21:10:57 +00001422 # parse the results of the job
showard97aed502008-11-04 02:01:24 +00001423 reparse_task = FinalReparseTask(self.queue_entries)
1424 self.agent.dispatcher.add_agent(Agent([reparse_task]))
jadmanskif7fa2cc2008-10-01 14:13:23 +00001425
1426
1427 def _log_abort(self):
1428 # build up sets of all the aborted_by and aborted_on values
1429 aborted_by, aborted_on = set(), set()
1430 for queue_entry in self.queue_entries:
1431 if queue_entry.aborted_by:
1432 aborted_by.add(queue_entry.aborted_by)
1433 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1434 aborted_on.add(t)
1435
1436 # extract some actual, unique aborted by value and write it out
1437 assert len(aborted_by) <= 1
1438 if len(aborted_by) == 1:
1439 results_dir = self.results_dir()
1440 self._write_keyval(results_dir, "aborted_by", aborted_by.pop())
1441 self._write_keyval(results_dir, "aborted_on", max(aborted_on))
jadmanskic2ac77f2008-05-16 21:44:04 +00001442
1443
jadmanski0afbb632008-06-06 21:10:57 +00001444 def abort(self):
1445 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001446 self._log_abort()
showard97aed502008-11-04 02:01:24 +00001447 self._finish_task(False)
jadmanskic2ac77f2008-05-16 21:44:04 +00001448
1449
showard21baa452008-10-21 00:08:39 +00001450 def _reboot_hosts(self):
1451 reboot_after = self.job.reboot_after
1452 do_reboot = False
showard0fc38302008-10-23 00:44:07 +00001453 if reboot_after == models.RebootAfter.ALWAYS:
showard21baa452008-10-21 00:08:39 +00001454 do_reboot = True
showard0fc38302008-10-23 00:44:07 +00001455 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
showard21baa452008-10-21 00:08:39 +00001456 num_tests_failed = self.monitor.num_tests_failed()
1457 do_reboot = (self.success and num_tests_failed == 0)
1458
showard8ebca792008-11-04 21:54:22 +00001459 for queue_entry in self.queue_entries:
1460 if do_reboot:
showard45ae8192008-11-05 19:32:53 +00001461 # don't pass the queue entry to the CleanupTask. if the cleanup
showardfa8629c2008-11-04 16:51:23 +00001462 # fails, the job doesn't care -- it's over.
showard45ae8192008-11-05 19:32:53 +00001463 cleanup_task = CleanupTask(host=queue_entry.get_host())
1464 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
showard8ebca792008-11-04 21:54:22 +00001465 else:
1466 queue_entry.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001467
1468
jadmanski0afbb632008-06-06 21:10:57 +00001469 def epilog(self):
1470 super(QueueTask, self).epilog()
jadmanski0afbb632008-06-06 21:10:57 +00001471 for queue_entry in self.queue_entries:
showard97aed502008-11-04 02:01:24 +00001472 # set status to PARSING here so queue entry is marked complete
1473 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
mbligh36768f02008-02-22 18:28:33 +00001474
showard97aed502008-11-04 02:01:24 +00001475 self._finish_task(self.success)
showard21baa452008-10-21 00:08:39 +00001476 self._reboot_hosts()
mblighbb421852008-03-11 22:36:16 +00001477
showard97aed502008-11-04 02:01:24 +00001478 print "queue_task finished with succes=%s" % self.success
mbligh36768f02008-02-22 18:28:33 +00001479
1480
mblighbb421852008-03-11 22:36:16 +00001481class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001482 def __init__(self, job, queue_entries, run_monitor):
1483 super(RecoveryQueueTask, self).__init__(job,
1484 queue_entries, cmd=None)
1485 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001486
1487
jadmanski0afbb632008-06-06 21:10:57 +00001488 def run(self):
1489 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001490
1491
jadmanski0afbb632008-06-06 21:10:57 +00001492 def prolog(self):
1493 # recovering an existing process - don't do prolog
1494 pass
mblighbb421852008-03-11 22:36:16 +00001495
1496
showard45ae8192008-11-05 19:32:53 +00001497class CleanupTask(AgentTask):
showardfa8629c2008-11-04 16:51:23 +00001498 def __init__(self, host=None, queue_entry=None):
1499 assert bool(host) ^ bool(queue_entry)
1500 if queue_entry:
1501 host = queue_entry.get_host()
jadmanski0afbb632008-06-06 21:10:57 +00001502
showard45ae8192008-11-05 19:32:53 +00001503 self.create_temp_resultsdir('.cleanup')
1504 self.cmd = [_autoserv_path, '--cleanup', '-m', host.hostname,
1505 '-r', self.temp_results_dir]
showardfa8629c2008-11-04 16:51:23 +00001506 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001507 self.host = host
showardfa8629c2008-11-04 16:51:23 +00001508 repair_task = RepairTask(host, fail_queue_entry=queue_entry)
showard45ae8192008-11-05 19:32:53 +00001509 super(CleanupTask, self).__init__(self.cmd, failure_tasks=[repair_task])
mbligh16c722d2008-03-05 00:58:44 +00001510
mblighd5c95802008-03-05 00:33:46 +00001511
jadmanski0afbb632008-06-06 21:10:57 +00001512 def prolog(self):
showard45ae8192008-11-05 19:32:53 +00001513 print "starting cleanup task for host: %s" % self.host.hostname
1514 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001515
mblighd5c95802008-03-05 00:33:46 +00001516
showard21baa452008-10-21 00:08:39 +00001517 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001518 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001519 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001520 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001521 self.host.update_field('dirty', 0)
showardfa8629c2008-11-04 16:51:23 +00001522 elif self.queue_entry:
1523 self.queue_entry.requeue()
showard21baa452008-10-21 00:08:39 +00001524
1525
mblighd5c95802008-03-05 00:33:46 +00001526class AbortTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001527 def __init__(self, queue_entry, agents_to_abort):
1528 self.queue_entry = queue_entry
1529 self.agents_to_abort = agents_to_abort
jadmanski0afbb632008-06-06 21:10:57 +00001530 super(AbortTask, self).__init__('')
mbligh36768f02008-02-22 18:28:33 +00001531
1532
jadmanski0afbb632008-06-06 21:10:57 +00001533 def prolog(self):
1534 print "starting abort on host %s, job %s" % (
1535 self.queue_entry.host_id, self.queue_entry.job_id)
mbligh36768f02008-02-22 18:28:33 +00001536
mblighd64e5702008-04-04 21:39:28 +00001537
jadmanski0afbb632008-06-06 21:10:57 +00001538 def epilog(self):
1539 super(AbortTask, self).epilog()
1540 self.queue_entry.set_status('Aborted')
1541 self.success = True
1542
1543
1544 def run(self):
1545 for agent in self.agents_to_abort:
1546 if (agent.active_task):
1547 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +00001548
1549
showard97aed502008-11-04 02:01:24 +00001550class FinalReparseTask(AgentTask):
1551 MAX_PARSE_PROCESSES = (
1552 global_config.global_config.get_config_value(
1553 _global_config_section, 'max_parse_processes', type=int))
1554 _num_running_parses = 0
1555
1556 def __init__(self, queue_entries):
1557 self._queue_entries = queue_entries
1558 self._parse_started = False
1559
1560 assert len(queue_entries) > 0
1561 queue_entry = queue_entries[0]
1562 job = queue_entry.job
1563
1564 flags = []
1565 if job.is_synchronous():
1566 assert len(queue_entries) == job.num_machines()
1567 else:
1568 assert len(queue_entries) == 1
1569 if job.num_machines() > 1:
1570 flags = ['-l', '2']
1571
1572 if _testing_mode:
1573 self.cmd = 'true'
1574 return
1575
1576 self._results_dir = queue_entry.results_dir()
1577 self.log_file = os.path.abspath(os.path.join(self._results_dir,
1578 '.parse.log'))
1579 super(FinalReparseTask, self).__init__(
1580 cmd=self.generate_parse_command(flags=flags))
1581
1582
1583 @classmethod
1584 def _increment_running_parses(cls):
1585 cls._num_running_parses += 1
1586
1587
1588 @classmethod
1589 def _decrement_running_parses(cls):
1590 cls._num_running_parses -= 1
1591
1592
1593 @classmethod
1594 def _can_run_new_parse(cls):
1595 return cls._num_running_parses < cls.MAX_PARSE_PROCESSES
1596
1597
1598 def prolog(self):
1599 super(FinalReparseTask, self).prolog()
1600 for queue_entry in self._queue_entries:
1601 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
1602
1603
1604 def epilog(self):
1605 super(FinalReparseTask, self).epilog()
1606 final_status = self._determine_final_status()
1607 for queue_entry in self._queue_entries:
1608 queue_entry.set_status(final_status)
1609
1610
1611 def _determine_final_status(self):
1612 # use a PidfileRunMonitor to read the autoserv exit status
1613 monitor = PidfileRunMonitor(self._results_dir)
1614 if monitor.exit_code() == 0:
1615 return models.HostQueueEntry.Status.COMPLETED
1616 return models.HostQueueEntry.Status.FAILED
1617
1618
1619 def generate_parse_command(self, flags=[]):
1620 parse = os.path.abspath(os.path.join(AUTOTEST_TKO_DIR, 'parse'))
1621 return [parse] + flags + ['-r', '-o', self._results_dir]
1622
1623
1624 def poll(self):
1625 # override poll to keep trying to start until the parse count goes down
1626 # and we can, at which point we revert to default behavior
1627 if self._parse_started:
1628 super(FinalReparseTask, self).poll()
1629 else:
1630 self._try_starting_parse()
1631
1632
1633 def run(self):
1634 # override run() to not actually run unless we can
1635 self._try_starting_parse()
1636
1637
1638 def _try_starting_parse(self):
1639 if not self._can_run_new_parse():
1640 return
1641 # actually run the parse command
1642 super(FinalReparseTask, self).run()
1643 self._increment_running_parses()
1644 self._parse_started = True
1645
1646
1647 def finished(self, success):
1648 super(FinalReparseTask, self).finished(success)
1649 self._decrement_running_parses()
1650
1651
mbligh36768f02008-02-22 18:28:33 +00001652class DBObject(object):
jadmanski0afbb632008-06-06 21:10:57 +00001653 def __init__(self, id=None, row=None, new_record=False):
1654 assert (bool(id) != bool(row))
mbligh36768f02008-02-22 18:28:33 +00001655
jadmanski0afbb632008-06-06 21:10:57 +00001656 self.__table = self._get_table()
1657 fields = self._fields()
mbligh36768f02008-02-22 18:28:33 +00001658
jadmanski0afbb632008-06-06 21:10:57 +00001659 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00001660
jadmanski0afbb632008-06-06 21:10:57 +00001661 if row is None:
1662 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
1663 rows = _db.execute(sql, (id,))
1664 if len(rows) == 0:
1665 raise "row not found (table=%s, id=%s)" % \
1666 (self.__table, id)
1667 row = rows[0]
mbligh36768f02008-02-22 18:28:33 +00001668
jadmanski0afbb632008-06-06 21:10:57 +00001669 assert len(row) == self.num_cols(), (
1670 "table = %s, row = %s/%d, fields = %s/%d" % (
1671 self.__table, row, len(row), fields, self.num_cols()))
mbligh36768f02008-02-22 18:28:33 +00001672
jadmanski0afbb632008-06-06 21:10:57 +00001673 self.__valid_fields = {}
1674 for i,value in enumerate(row):
1675 self.__dict__[fields[i]] = value
1676 self.__valid_fields[fields[i]] = True
mbligh36768f02008-02-22 18:28:33 +00001677
jadmanski0afbb632008-06-06 21:10:57 +00001678 del self.__valid_fields['id']
mbligh36768f02008-02-22 18:28:33 +00001679
mblighe2586682008-02-29 22:45:46 +00001680
jadmanski0afbb632008-06-06 21:10:57 +00001681 @classmethod
1682 def _get_table(cls):
1683 raise NotImplementedError('Subclasses must override this')
mblighe2586682008-02-29 22:45:46 +00001684
1685
jadmanski0afbb632008-06-06 21:10:57 +00001686 @classmethod
1687 def _fields(cls):
1688 raise NotImplementedError('Subclasses must override this')
showard04c82c52008-05-29 19:38:12 +00001689
1690
jadmanski0afbb632008-06-06 21:10:57 +00001691 @classmethod
1692 def num_cols(cls):
1693 return len(cls._fields())
showard04c82c52008-05-29 19:38:12 +00001694
1695
jadmanski0afbb632008-06-06 21:10:57 +00001696 def count(self, where, table = None):
1697 if not table:
1698 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00001699
jadmanski0afbb632008-06-06 21:10:57 +00001700 rows = _db.execute("""
1701 SELECT count(*) FROM %s
1702 WHERE %s
1703 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00001704
jadmanski0afbb632008-06-06 21:10:57 +00001705 assert len(rows) == 1
1706
1707 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001708
1709
mblighf8c624d2008-07-03 16:58:45 +00001710 def update_field(self, field, value, condition=''):
jadmanski0afbb632008-06-06 21:10:57 +00001711 assert self.__valid_fields[field]
mbligh36768f02008-02-22 18:28:33 +00001712
jadmanski0afbb632008-06-06 21:10:57 +00001713 if self.__dict__[field] == value:
1714 return
mbligh36768f02008-02-22 18:28:33 +00001715
mblighf8c624d2008-07-03 16:58:45 +00001716 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
1717 if condition:
1718 query += ' AND (%s)' % condition
jadmanski0afbb632008-06-06 21:10:57 +00001719 _db.execute(query, (value, self.id))
1720
1721 self.__dict__[field] = value
mbligh36768f02008-02-22 18:28:33 +00001722
1723
jadmanski0afbb632008-06-06 21:10:57 +00001724 def save(self):
1725 if self.__new_record:
1726 keys = self._fields()[1:] # avoid id
1727 columns = ','.join([str(key) for key in keys])
1728 values = ['"%s"' % self.__dict__[key] for key in keys]
1729 values = ','.join(values)
1730 query = """INSERT INTO %s (%s) VALUES (%s)""" % \
1731 (self.__table, columns, values)
1732 _db.execute(query)
mbligh36768f02008-02-22 18:28:33 +00001733
1734
jadmanski0afbb632008-06-06 21:10:57 +00001735 def delete(self):
1736 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
1737 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00001738
1739
showard63a34772008-08-18 19:32:50 +00001740 @staticmethod
1741 def _prefix_with(string, prefix):
1742 if string:
1743 string = prefix + string
1744 return string
1745
1746
jadmanski0afbb632008-06-06 21:10:57 +00001747 @classmethod
showard989f25d2008-10-01 11:38:11 +00001748 def fetch(cls, where='', params=(), joins='', order_by=''):
showard63a34772008-08-18 19:32:50 +00001749 order_by = cls._prefix_with(order_by, 'ORDER BY ')
1750 where = cls._prefix_with(where, 'WHERE ')
1751 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
1752 '%(where)s %(order_by)s' % {'table' : cls._get_table(),
1753 'joins' : joins,
1754 'where' : where,
1755 'order_by' : order_by})
1756 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00001757 for row in rows:
1758 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00001759
mbligh36768f02008-02-22 18:28:33 +00001760
1761class IneligibleHostQueue(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001762 def __init__(self, id=None, row=None, new_record=None):
1763 super(IneligibleHostQueue, self).__init__(id=id, row=row,
1764 new_record=new_record)
mblighe2586682008-02-29 22:45:46 +00001765
1766
jadmanski0afbb632008-06-06 21:10:57 +00001767 @classmethod
1768 def _get_table(cls):
1769 return 'ineligible_host_queues'
mbligh36768f02008-02-22 18:28:33 +00001770
1771
jadmanski0afbb632008-06-06 21:10:57 +00001772 @classmethod
1773 def _fields(cls):
1774 return ['id', 'job_id', 'host_id']
showard04c82c52008-05-29 19:38:12 +00001775
1776
showard989f25d2008-10-01 11:38:11 +00001777class Label(DBObject):
1778 @classmethod
1779 def _get_table(cls):
1780 return 'labels'
1781
1782
1783 @classmethod
1784 def _fields(cls):
1785 return ['id', 'name', 'kernel_config', 'platform', 'invalid',
1786 'only_if_needed']
1787
1788
mbligh36768f02008-02-22 18:28:33 +00001789class Host(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001790 def __init__(self, id=None, row=None):
1791 super(Host, self).__init__(id=id, row=row)
mblighe2586682008-02-29 22:45:46 +00001792
1793
jadmanski0afbb632008-06-06 21:10:57 +00001794 @classmethod
1795 def _get_table(cls):
1796 return 'hosts'
mbligh36768f02008-02-22 18:28:33 +00001797
1798
jadmanski0afbb632008-06-06 21:10:57 +00001799 @classmethod
1800 def _fields(cls):
1801 return ['id', 'hostname', 'locked', 'synch_id','status',
showard21baa452008-10-21 00:08:39 +00001802 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty']
showard04c82c52008-05-29 19:38:12 +00001803
1804
jadmanski0afbb632008-06-06 21:10:57 +00001805 def current_task(self):
1806 rows = _db.execute("""
1807 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
1808 """, (self.id,))
1809
1810 if len(rows) == 0:
1811 return None
1812 else:
1813 assert len(rows) == 1
1814 results = rows[0];
mblighf8c624d2008-07-03 16:58:45 +00001815# print "current = %s" % results
jadmanski0afbb632008-06-06 21:10:57 +00001816 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00001817
1818
jadmanski0afbb632008-06-06 21:10:57 +00001819 def yield_work(self):
1820 print "%s yielding work" % self.hostname
1821 if self.current_task():
1822 self.current_task().requeue()
1823
1824 def set_status(self,status):
1825 print '%s -> %s' % (self.hostname, status)
1826 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00001827
1828
showardd8e548a2008-09-09 03:04:57 +00001829 def labels(self):
1830 """
1831 Fetch a list of names of all non-platform labels associated with this
1832 host.
1833 """
1834 rows = _db.execute("""
1835 SELECT labels.name
1836 FROM labels
1837 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
1838 WHERE NOT labels.platform AND hosts_labels.host_id = %s
1839 ORDER BY labels.name
1840 """, (self.id,))
1841 return [row[0] for row in rows]
1842
1843
mbligh36768f02008-02-22 18:28:33 +00001844class HostQueueEntry(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001845 def __init__(self, id=None, row=None):
1846 assert id or row
1847 super(HostQueueEntry, self).__init__(id=id, row=row)
1848 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00001849
jadmanski0afbb632008-06-06 21:10:57 +00001850 if self.host_id:
1851 self.host = Host(self.host_id)
1852 else:
1853 self.host = None
mbligh36768f02008-02-22 18:28:33 +00001854
jadmanski0afbb632008-06-06 21:10:57 +00001855 self.queue_log_path = os.path.join(self.job.results_dir(),
1856 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00001857
1858
jadmanski0afbb632008-06-06 21:10:57 +00001859 @classmethod
1860 def _get_table(cls):
1861 return 'host_queue_entries'
mblighe2586682008-02-29 22:45:46 +00001862
1863
jadmanski0afbb632008-06-06 21:10:57 +00001864 @classmethod
1865 def _fields(cls):
1866 return ['id', 'job_id', 'host_id', 'priority', 'status',
showardb8471e32008-07-03 19:51:08 +00001867 'meta_host', 'active', 'complete', 'deleted']
showard04c82c52008-05-29 19:38:12 +00001868
1869
jadmanski0afbb632008-06-06 21:10:57 +00001870 def set_host(self, host):
1871 if host:
1872 self.queue_log_record('Assigning host ' + host.hostname)
1873 self.update_field('host_id', host.id)
1874 self.update_field('active', True)
1875 self.block_host(host.id)
1876 else:
1877 self.queue_log_record('Releasing host')
1878 self.unblock_host(self.host.id)
1879 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00001880
jadmanski0afbb632008-06-06 21:10:57 +00001881 self.host = host
mbligh36768f02008-02-22 18:28:33 +00001882
1883
jadmanski0afbb632008-06-06 21:10:57 +00001884 def get_host(self):
1885 return self.host
mbligh36768f02008-02-22 18:28:33 +00001886
1887
jadmanski0afbb632008-06-06 21:10:57 +00001888 def queue_log_record(self, log_line):
1889 now = str(datetime.datetime.now())
1890 queue_log = open(self.queue_log_path, 'a', 0)
1891 queue_log.write(now + ' ' + log_line + '\n')
1892 queue_log.close()
mbligh36768f02008-02-22 18:28:33 +00001893
1894
jadmanski0afbb632008-06-06 21:10:57 +00001895 def block_host(self, host_id):
1896 print "creating block %s/%s" % (self.job.id, host_id)
1897 row = [0, self.job.id, host_id]
1898 block = IneligibleHostQueue(row=row, new_record=True)
1899 block.save()
mblighe2586682008-02-29 22:45:46 +00001900
1901
jadmanski0afbb632008-06-06 21:10:57 +00001902 def unblock_host(self, host_id):
1903 print "removing block %s/%s" % (self.job.id, host_id)
1904 blocks = IneligibleHostQueue.fetch(
1905 'job_id=%d and host_id=%d' % (self.job.id, host_id))
1906 for block in blocks:
1907 block.delete()
mblighe2586682008-02-29 22:45:46 +00001908
1909
jadmanski0afbb632008-06-06 21:10:57 +00001910 def results_dir(self):
1911 if self.job.is_synchronous() or self.job.num_machines() == 1:
1912 return self.job.job_dir
1913 else:
1914 assert self.host
1915 return os.path.join(self.job.job_dir,
1916 self.host.hostname)
mbligh36768f02008-02-22 18:28:33 +00001917
mblighe2586682008-02-29 22:45:46 +00001918
jadmanski0afbb632008-06-06 21:10:57 +00001919 def verify_results_dir(self):
1920 if self.job.is_synchronous() or self.job.num_machines() > 1:
1921 assert self.host
1922 return os.path.join(self.job.job_dir,
1923 self.host.hostname)
1924 else:
1925 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +00001926
1927
jadmanski0afbb632008-06-06 21:10:57 +00001928 def set_status(self, status):
mblighf8c624d2008-07-03 16:58:45 +00001929 abort_statuses = ['Abort', 'Aborting', 'Aborted']
1930 if status not in abort_statuses:
1931 condition = ' AND '.join(['status <> "%s"' % x
1932 for x in abort_statuses])
1933 else:
1934 condition = ''
1935 self.update_field('status', status, condition=condition)
1936
jadmanski0afbb632008-06-06 21:10:57 +00001937 if self.host:
1938 hostname = self.host.hostname
1939 else:
1940 hostname = 'no host'
1941 print "%s/%d status -> %s" % (hostname, self.id, self.status)
mblighf8c624d2008-07-03 16:58:45 +00001942
jadmanski0afbb632008-06-06 21:10:57 +00001943 if status in ['Queued']:
1944 self.update_field('complete', False)
1945 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00001946
jadmanski0afbb632008-06-06 21:10:57 +00001947 if status in ['Pending', 'Running', 'Verifying', 'Starting',
1948 'Abort', 'Aborting']:
1949 self.update_field('complete', False)
1950 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00001951
showard97aed502008-11-04 02:01:24 +00001952 if status in ['Failed', 'Completed', 'Stopped', 'Aborted', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00001953 self.update_field('complete', True)
1954 self.update_field('active', False)
showard542e8402008-09-19 20:16:18 +00001955 self._email_on_job_complete()
1956
1957
1958 def _email_on_job_complete(self):
1959 url = "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
1960
1961 if self.job.is_finished():
1962 subject = "Autotest: Job ID: %s \"%s\" Completed" % (
1963 self.job.id, self.job.name)
1964 body = "Job ID: %s\nJob Name: %s\n%s\n" % (
1965 self.job.id, self.job.name, url)
1966 send_email(_email_from, self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00001967
1968
jadmanski0afbb632008-06-06 21:10:57 +00001969 def run(self,assigned_host=None):
1970 if self.meta_host:
1971 assert assigned_host
1972 # ensure results dir exists for the queue log
1973 self.job.create_results_dir()
1974 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00001975
jadmanski0afbb632008-06-06 21:10:57 +00001976 print "%s/%s scheduled on %s, status=%s" % (self.job.name,
1977 self.meta_host, self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00001978
jadmanski0afbb632008-06-06 21:10:57 +00001979 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00001980
jadmanski0afbb632008-06-06 21:10:57 +00001981 def requeue(self):
1982 self.set_status('Queued')
mblighe2586682008-02-29 22:45:46 +00001983
jadmanski0afbb632008-06-06 21:10:57 +00001984 if self.meta_host:
1985 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00001986
1987
jadmanski0afbb632008-06-06 21:10:57 +00001988 def handle_host_failure(self):
1989 """\
1990 Called when this queue entry's host has failed verification and
1991 repair.
1992 """
1993 assert not self.meta_host
1994 self.set_status('Failed')
1995 if self.job.is_synchronous():
1996 self.job.stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00001997
1998
jadmanski0afbb632008-06-06 21:10:57 +00001999 def clear_results_dir(self, results_dir=None, dont_delete_files=False):
2000 results_dir = results_dir or self.results_dir()
2001 if not os.path.exists(results_dir):
2002 return
2003 if dont_delete_files:
2004 temp_dir = tempfile.mkdtemp(suffix='.clear_results')
2005 print 'Moving results from %s to %s' % (results_dir,
2006 temp_dir)
2007 for filename in os.listdir(results_dir):
2008 path = os.path.join(results_dir, filename)
2009 if dont_delete_files:
2010 shutil.move(path,
2011 os.path.join(temp_dir, filename))
2012 else:
2013 remove_file_or_dir(path)
mbligh36768f02008-02-22 18:28:33 +00002014
2015
jadmanskif7fa2cc2008-10-01 14:13:23 +00002016 @property
2017 def aborted_by(self):
2018 self._load_abort_info()
2019 return self._aborted_by
2020
2021
2022 @property
2023 def aborted_on(self):
2024 self._load_abort_info()
2025 return self._aborted_on
2026
2027
2028 def _load_abort_info(self):
2029 """ Fetch info about who aborted the job. """
2030 if hasattr(self, "_aborted_by"):
2031 return
2032 rows = _db.execute("""
2033 SELECT users.login, aborted_host_queue_entries.aborted_on
2034 FROM aborted_host_queue_entries
2035 INNER JOIN users
2036 ON users.id = aborted_host_queue_entries.aborted_by_id
2037 WHERE aborted_host_queue_entries.queue_entry_id = %s
2038 """, (self.id,))
2039 if rows:
2040 self._aborted_by, self._aborted_on = rows[0]
2041 else:
2042 self._aborted_by = self._aborted_on = None
2043
2044
showardb2e2c322008-10-14 17:33:55 +00002045 def on_pending(self):
2046 """
2047 Called when an entry in a synchronous job has passed verify. If the
2048 job is ready to run, returns an agent to run the job. Returns None
2049 otherwise.
2050 """
2051 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002052 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00002053 if self.job.is_ready():
2054 return self.job.run(self)
2055 return None
2056
2057
showard1be97432008-10-17 15:30:45 +00002058 def abort(self, agents_to_abort=[]):
2059 abort_task = AbortTask(self, agents_to_abort)
2060 tasks = [abort_task]
2061
2062 host = self.get_host()
2063 if host:
showard45ae8192008-11-05 19:32:53 +00002064 cleanup_task = CleanupTask(host=host)
showard1be97432008-10-17 15:30:45 +00002065 verify_task = VerifyTask(host=host)
2066 # just to make sure this host does not get taken away
showard45ae8192008-11-05 19:32:53 +00002067 host.set_status('Cleaning')
2068 tasks += [cleanup_task, verify_task]
showard1be97432008-10-17 15:30:45 +00002069
2070 self.set_status('Aborting')
2071 return Agent(tasks=tasks, queue_entry_ids=[self.id])
2072
2073
mbligh36768f02008-02-22 18:28:33 +00002074class Job(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00002075 def __init__(self, id=None, row=None):
2076 assert id or row
2077 super(Job, self).__init__(id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00002078
jadmanski0afbb632008-06-06 21:10:57 +00002079 self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id,
2080 self.owner))
mblighe2586682008-02-29 22:45:46 +00002081
2082
jadmanski0afbb632008-06-06 21:10:57 +00002083 @classmethod
2084 def _get_table(cls):
2085 return 'jobs'
mbligh36768f02008-02-22 18:28:33 +00002086
2087
jadmanski0afbb632008-06-06 21:10:57 +00002088 @classmethod
2089 def _fields(cls):
2090 return ['id', 'owner', 'name', 'priority', 'control_file',
2091 'control_type', 'created_on', 'synch_type',
showard542e8402008-09-19 20:16:18 +00002092 'synch_count', 'synchronizing', 'timeout',
showard21baa452008-10-21 00:08:39 +00002093 'run_verify', 'email_list', 'reboot_before', 'reboot_after']
showard04c82c52008-05-29 19:38:12 +00002094
2095
jadmanski0afbb632008-06-06 21:10:57 +00002096 def is_server_job(self):
2097 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002098
2099
jadmanski0afbb632008-06-06 21:10:57 +00002100 def get_host_queue_entries(self):
2101 rows = _db.execute("""
2102 SELECT * FROM host_queue_entries
2103 WHERE job_id= %s
2104 """, (self.id,))
2105 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002106
jadmanski0afbb632008-06-06 21:10:57 +00002107 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002108
jadmanski0afbb632008-06-06 21:10:57 +00002109 return entries
mbligh36768f02008-02-22 18:28:33 +00002110
2111
jadmanski0afbb632008-06-06 21:10:57 +00002112 def set_status(self, status, update_queues=False):
2113 self.update_field('status',status)
2114
2115 if update_queues:
2116 for queue_entry in self.get_host_queue_entries():
2117 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002118
2119
jadmanski0afbb632008-06-06 21:10:57 +00002120 def is_synchronous(self):
2121 return self.synch_type == 2
mbligh36768f02008-02-22 18:28:33 +00002122
2123
jadmanski0afbb632008-06-06 21:10:57 +00002124 def is_ready(self):
2125 if not self.is_synchronous():
2126 return True
2127 sql = "job_id=%s AND status='Pending'" % self.id
2128 count = self.count(sql, table='host_queue_entries')
showardb2e2c322008-10-14 17:33:55 +00002129 return (count == self.num_machines())
mbligh36768f02008-02-22 18:28:33 +00002130
2131
jadmanski0afbb632008-06-06 21:10:57 +00002132 def results_dir(self):
2133 return self.job_dir
mbligh36768f02008-02-22 18:28:33 +00002134
jadmanski0afbb632008-06-06 21:10:57 +00002135 def num_machines(self, clause = None):
2136 sql = "job_id=%s" % self.id
2137 if clause:
2138 sql += " AND (%s)" % clause
2139 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002140
2141
jadmanski0afbb632008-06-06 21:10:57 +00002142 def num_queued(self):
2143 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002144
2145
jadmanski0afbb632008-06-06 21:10:57 +00002146 def num_active(self):
2147 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002148
2149
jadmanski0afbb632008-06-06 21:10:57 +00002150 def num_complete(self):
2151 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002152
2153
jadmanski0afbb632008-06-06 21:10:57 +00002154 def is_finished(self):
2155 left = self.num_queued()
2156 print "%s: %s machines left" % (self.name, left)
2157 return left==0
mbligh36768f02008-02-22 18:28:33 +00002158
mbligh36768f02008-02-22 18:28:33 +00002159
jadmanski0afbb632008-06-06 21:10:57 +00002160 def stop_all_entries(self):
2161 for child_entry in self.get_host_queue_entries():
2162 if not child_entry.complete:
2163 child_entry.set_status('Stopped')
mblighe2586682008-02-29 22:45:46 +00002164
2165
jadmanski0afbb632008-06-06 21:10:57 +00002166 def write_to_machines_file(self, queue_entry):
2167 hostname = queue_entry.get_host().hostname
2168 print "writing %s to job %s machines file" % (hostname, self.id)
2169 file_path = os.path.join(self.job_dir, '.machines')
2170 mf = open(file_path, 'a')
2171 mf.write("%s\n" % queue_entry.get_host().hostname)
2172 mf.close()
mbligh36768f02008-02-22 18:28:33 +00002173
2174
jadmanski0afbb632008-06-06 21:10:57 +00002175 def create_results_dir(self, queue_entry=None):
2176 print "create: active: %s complete %s" % (self.num_active(),
2177 self.num_complete())
mbligh36768f02008-02-22 18:28:33 +00002178
jadmanski0afbb632008-06-06 21:10:57 +00002179 if not os.path.exists(self.job_dir):
2180 os.makedirs(self.job_dir)
mbligh36768f02008-02-22 18:28:33 +00002181
jadmanski0afbb632008-06-06 21:10:57 +00002182 if queue_entry:
showarde05654d2008-10-28 20:38:40 +00002183 results_dir = queue_entry.results_dir()
2184 if not os.path.exists(results_dir):
2185 os.makedirs(results_dir)
2186 return results_dir
jadmanski0afbb632008-06-06 21:10:57 +00002187 return self.job_dir
mbligh36768f02008-02-22 18:28:33 +00002188
2189
showardb2e2c322008-10-14 17:33:55 +00002190 def _write_control_file(self):
2191 'Writes control file out to disk, returns a filename'
2192 control_fd, control_filename = tempfile.mkstemp(suffix='.control_file')
2193 control_file = os.fdopen(control_fd, 'w')
jadmanski0afbb632008-06-06 21:10:57 +00002194 if self.control_file:
showardb2e2c322008-10-14 17:33:55 +00002195 control_file.write(self.control_file)
2196 control_file.close()
2197 return control_filename
mbligh36768f02008-02-22 18:28:33 +00002198
showardb2e2c322008-10-14 17:33:55 +00002199
2200 def _get_job_tag(self, queue_entries):
2201 base_job_tag = "%s-%s" % (self.id, self.owner)
2202 if self.is_synchronous() or self.num_machines() == 1:
2203 return base_job_tag
jadmanski0afbb632008-06-06 21:10:57 +00002204 else:
showardb2e2c322008-10-14 17:33:55 +00002205 return base_job_tag + '/' + queue_entries[0].get_host().hostname
2206
2207
2208 def _get_autoserv_params(self, queue_entries):
2209 results_dir = self.create_results_dir(queue_entries[0])
2210 control_filename = self._write_control_file()
jadmanski0afbb632008-06-06 21:10:57 +00002211 hostnames = ','.join([entry.get_host().hostname
2212 for entry in queue_entries])
showardb2e2c322008-10-14 17:33:55 +00002213 job_tag = self._get_job_tag(queue_entries)
mbligh36768f02008-02-22 18:28:33 +00002214
showardb2e2c322008-10-14 17:33:55 +00002215 params = [_autoserv_path, '-P', job_tag, '-p', '-n',
showard21baa452008-10-21 00:08:39 +00002216 '-r', os.path.abspath(results_dir), '-u', self.owner,
2217 '-l', self.name, '-m', hostnames, control_filename]
mbligh36768f02008-02-22 18:28:33 +00002218
jadmanski0afbb632008-06-06 21:10:57 +00002219 if not self.is_server_job():
2220 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002221
showardb2e2c322008-10-14 17:33:55 +00002222 return params
mblighe2586682008-02-29 22:45:46 +00002223
mbligh36768f02008-02-22 18:28:33 +00002224
showard21baa452008-10-21 00:08:39 +00002225 def _get_pre_job_tasks(self, queue_entry, verify_task_class=VerifyTask):
2226 do_reboot = False
showard0fc38302008-10-23 00:44:07 +00002227 if self.reboot_before == models.RebootBefore.ALWAYS:
showard21baa452008-10-21 00:08:39 +00002228 do_reboot = True
showard0fc38302008-10-23 00:44:07 +00002229 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showard21baa452008-10-21 00:08:39 +00002230 do_reboot = queue_entry.get_host().dirty
2231
2232 tasks = []
2233 if do_reboot:
showard45ae8192008-11-05 19:32:53 +00002234 tasks.append(CleanupTask(queue_entry=queue_entry))
showard21baa452008-10-21 00:08:39 +00002235 tasks.append(verify_task_class(queue_entry=queue_entry))
2236 return tasks
2237
2238
showardb2e2c322008-10-14 17:33:55 +00002239 def _run_synchronous(self, queue_entry):
2240 if not self.is_ready():
showard9976ce92008-10-15 20:28:13 +00002241 if self.run_verify:
showard21baa452008-10-21 00:08:39 +00002242 return Agent(self._get_pre_job_tasks(queue_entry,
2243 VerifySynchronousTask),
2244 [queue_entry.id])
showard9976ce92008-10-15 20:28:13 +00002245 else:
2246 return queue_entry.on_pending()
mbligh36768f02008-02-22 18:28:33 +00002247
showardb2e2c322008-10-14 17:33:55 +00002248 return self._finish_run(self.get_host_queue_entries())
2249
2250
2251 def _run_asynchronous(self, queue_entry):
showard9976ce92008-10-15 20:28:13 +00002252 initial_tasks = []
2253 if self.run_verify:
showard21baa452008-10-21 00:08:39 +00002254 initial_tasks = self._get_pre_job_tasks(queue_entry)
showardb2e2c322008-10-14 17:33:55 +00002255 return self._finish_run([queue_entry], initial_tasks)
2256
2257
2258 def _finish_run(self, queue_entries, initial_tasks=[]):
showardb2ccdda2008-10-28 20:39:05 +00002259 for queue_entry in queue_entries:
2260 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002261 params = self._get_autoserv_params(queue_entries)
2262 queue_task = QueueTask(job=self, queue_entries=queue_entries,
2263 cmd=params)
2264 tasks = initial_tasks + [queue_task]
2265 entry_ids = [entry.id for entry in queue_entries]
2266
2267 return Agent(tasks, entry_ids, num_processes=len(queue_entries))
2268
2269
2270 def run(self, queue_entry):
2271 if self.is_synchronous():
2272 return self._run_synchronous(queue_entry)
2273 return self._run_asynchronous(queue_entry)
mbligh36768f02008-02-22 18:28:33 +00002274
2275
2276if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002277 main()