blob: 3975f3c6eea39e30a12cbe2858cafcbcdb6dd9c0 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard542e8402008-09-19 20:16:18 +00008import datetime, errno, MySQLdb, optparse, os, pwd, Queue, re, shutil, signal
9import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
mbligh70feeee2008-06-11 16:20:49 +000010import common
showard21baa452008-10-21 00:08:39 +000011from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000012from autotest_lib.client.common_lib import global_config
13from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000014from autotest_lib.database import database_connection
showard21baa452008-10-21 00:08:39 +000015from autotest_lib.frontend.afe import models
mbligh70feeee2008-06-11 16:20:49 +000016
mblighb090f142008-02-27 21:33:46 +000017
mbligh36768f02008-02-22 18:28:33 +000018RESULTS_DIR = '.'
19AUTOSERV_NICE_LEVEL = 10
showardb1e51872008-10-07 11:08:18 +000020CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000021
22AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
23
24if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000025 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000026AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
27AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
28
29if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000030 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000031
mblighbb421852008-03-11 22:36:16 +000032AUTOSERV_PID_FILE = '.autoserv_execute'
mbligh90a549d2008-03-25 23:52:34 +000033# how long to wait for autoserv to write a pidfile
34PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000035
mbligh6f8bab42008-02-29 22:45:14 +000036_db = None
mbligh36768f02008-02-22 18:28:33 +000037_shutdown = False
38_notify_email = None
mbligh4314a712008-02-29 22:44:30 +000039_autoserv_path = 'autoserv'
40_testing_mode = False
showardec113162008-05-08 00:52:49 +000041_global_config_section = 'SCHEDULER'
showard542e8402008-09-19 20:16:18 +000042_base_url = None
43# see os.getlogin() online docs
44_email_from = pwd.getpwuid(os.getuid())[0]
mbligh36768f02008-02-22 18:28:33 +000045
46
47def main():
jadmanski0afbb632008-06-06 21:10:57 +000048 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000049
jadmanski0afbb632008-06-06 21:10:57 +000050 parser = optparse.OptionParser(usage)
51 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
52 action='store_true')
53 parser.add_option('--logfile', help='Set a log file that all stdout ' +
54 'should be redirected to. Stderr will go to this ' +
55 'file + ".err"')
56 parser.add_option('--test', help='Indicate that scheduler is under ' +
57 'test and should use dummy autoserv and no parsing',
58 action='store_true')
59 (options, args) = parser.parse_args()
60 if len(args) != 1:
61 parser.print_usage()
62 return
mbligh36768f02008-02-22 18:28:33 +000063
jadmanski0afbb632008-06-06 21:10:57 +000064 global RESULTS_DIR
65 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +000066
jadmanski0afbb632008-06-06 21:10:57 +000067 # read in notify_email from global_config
68 c = global_config.global_config
69 global _notify_email
70 val = c.get_config_value(_global_config_section, "notify_email")
71 if val != "":
72 _notify_email = val
mbligh36768f02008-02-22 18:28:33 +000073
showard3bb499f2008-07-03 19:42:20 +000074 tick_pause = c.get_config_value(
75 _global_config_section, 'tick_pause_sec', type=int)
76
jadmanski0afbb632008-06-06 21:10:57 +000077 if options.test:
78 global _autoserv_path
79 _autoserv_path = 'autoserv_dummy'
80 global _testing_mode
81 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +000082
showard542e8402008-09-19 20:16:18 +000083 # read in base url
84 global _base_url
showardb1e51872008-10-07 11:08:18 +000085 val = c.get_config_value(CONFIG_SECTION, "base_url")
showard542e8402008-09-19 20:16:18 +000086 if val:
87 _base_url = val
88 else:
89 _base_url = "http://your_autotest_server/afe/"
90
jadmanski0afbb632008-06-06 21:10:57 +000091 init(options.logfile)
92 dispatcher = Dispatcher()
93 dispatcher.do_initial_recovery(recover_hosts=options.recover_hosts)
94
95 try:
96 while not _shutdown:
97 dispatcher.tick()
showard3bb499f2008-07-03 19:42:20 +000098 time.sleep(tick_pause)
jadmanski0afbb632008-06-06 21:10:57 +000099 except:
100 log_stacktrace("Uncaught exception; terminating monitor_db")
101
102 email_manager.send_queued_emails()
103 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000104
105
106def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000107 global _shutdown
108 _shutdown = True
109 print "Shutdown request received."
mbligh36768f02008-02-22 18:28:33 +0000110
111
112def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000113 if logfile:
114 enable_logging(logfile)
115 print "%s> dispatcher starting" % time.strftime("%X %x")
116 print "My PID is %d" % os.getpid()
mbligh36768f02008-02-22 18:28:33 +0000117
showardb1e51872008-10-07 11:08:18 +0000118 if _testing_mode:
119 global_config.global_config.override_config_value(
120 CONFIG_SECTION, 'database', 'stresstest_autotest_web')
121
jadmanski0afbb632008-06-06 21:10:57 +0000122 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
123 global _db
showardb1e51872008-10-07 11:08:18 +0000124 _db = database_connection.DatabaseConnection(CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000125 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000126
jadmanski0afbb632008-06-06 21:10:57 +0000127 print "Setting signal handler"
128 signal.signal(signal.SIGINT, handle_sigint)
129
130 print "Connected! Running..."
mbligh36768f02008-02-22 18:28:33 +0000131
132
133def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000134 out_file = logfile
135 err_file = "%s.err" % logfile
136 print "Enabling logging to %s (%s)" % (out_file, err_file)
137 out_fd = open(out_file, "a", buffering=0)
138 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000139
jadmanski0afbb632008-06-06 21:10:57 +0000140 os.dup2(out_fd.fileno(), sys.stdout.fileno())
141 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000142
jadmanski0afbb632008-06-06 21:10:57 +0000143 sys.stdout = out_fd
144 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000145
146
mblighd5c95802008-03-05 00:33:46 +0000147def queue_entries_to_abort():
jadmanski0afbb632008-06-06 21:10:57 +0000148 rows = _db.execute("""
149 SELECT * FROM host_queue_entries WHERE status='Abort';
150 """)
151 qe = [HostQueueEntry(row=i) for i in rows]
152 return qe
mbligh36768f02008-02-22 18:28:33 +0000153
mblighe2586682008-02-29 22:45:46 +0000154def remove_file_or_dir(path):
jadmanski0afbb632008-06-06 21:10:57 +0000155 if stat.S_ISDIR(os.stat(path).st_mode):
156 # directory
157 shutil.rmtree(path)
158 else:
159 # file
160 os.remove(path)
mblighe2586682008-02-29 22:45:46 +0000161
162
mbligh36768f02008-02-22 18:28:33 +0000163def log_stacktrace(reason):
jadmanski0afbb632008-06-06 21:10:57 +0000164 (type, value, tb) = sys.exc_info()
165 str = "EXCEPTION: %s\n" % reason
166 str += ''.join(traceback.format_exception(type, value, tb))
mbligh36768f02008-02-22 18:28:33 +0000167
jadmanski0afbb632008-06-06 21:10:57 +0000168 sys.stderr.write("\n%s\n" % str)
169 email_manager.enqueue_notify_email("monitor_db exception", str)
mbligh36768f02008-02-22 18:28:33 +0000170
mblighbb421852008-03-11 22:36:16 +0000171
172def get_proc_poll_fn(pid):
jadmanski0afbb632008-06-06 21:10:57 +0000173 proc_path = os.path.join('/proc', str(pid))
174 def poll_fn():
175 if os.path.exists(proc_path):
176 return None
177 return 0 # we can't get a real exit code
178 return poll_fn
mblighbb421852008-03-11 22:36:16 +0000179
180
showard542e8402008-09-19 20:16:18 +0000181def send_email(from_addr, to_string, subject, body):
182 """Mails out emails to the addresses listed in to_string.
183
184 to_string is split into a list which can be delimited by any of:
185 ';', ',', ':' or any whitespace
186 """
187
188 # Create list from string removing empty strings from the list.
189 to_list = [x for x in re.split('\s|,|;|:', to_string) if x]
showard7d182aa2008-09-22 16:17:24 +0000190 if not to_list:
191 return
192
showard542e8402008-09-19 20:16:18 +0000193 msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (
194 from_addr, ', '.join(to_list), subject, body)
showard7d182aa2008-09-22 16:17:24 +0000195 try:
196 mailer = smtplib.SMTP('localhost')
197 try:
198 mailer.sendmail(from_addr, to_list, msg)
199 finally:
200 mailer.quit()
201 except Exception, e:
202 print "Sending email failed. Reason: %s" % repr(e)
showard542e8402008-09-19 20:16:18 +0000203
204
mblighbb421852008-03-11 22:36:16 +0000205def kill_autoserv(pid, poll_fn=None):
jadmanski0afbb632008-06-06 21:10:57 +0000206 print 'killing', pid
207 if poll_fn is None:
208 poll_fn = get_proc_poll_fn(pid)
209 if poll_fn() == None:
210 os.kill(pid, signal.SIGCONT)
211 os.kill(pid, signal.SIGTERM)
mbligh36768f02008-02-22 18:28:33 +0000212
213
showard7cf9a9b2008-05-15 21:15:52 +0000214class EmailNotificationManager(object):
jadmanski0afbb632008-06-06 21:10:57 +0000215 def __init__(self):
216 self._emails = []
showard7cf9a9b2008-05-15 21:15:52 +0000217
jadmanski0afbb632008-06-06 21:10:57 +0000218 def enqueue_notify_email(self, subject, message):
219 if not _notify_email:
220 return
showard7cf9a9b2008-05-15 21:15:52 +0000221
jadmanski0afbb632008-06-06 21:10:57 +0000222 body = 'Subject: ' + subject + '\n'
223 body += "%s / %s / %s\n%s" % (socket.gethostname(),
224 os.getpid(),
225 time.strftime("%X %x"), message)
226 self._emails.append(body)
showard7cf9a9b2008-05-15 21:15:52 +0000227
228
jadmanski0afbb632008-06-06 21:10:57 +0000229 def send_queued_emails(self):
230 if not self._emails:
231 return
232 subject = 'Scheduler notifications from ' + socket.gethostname()
233 separator = '\n' + '-' * 40 + '\n'
234 body = separator.join(self._emails)
showard7cf9a9b2008-05-15 21:15:52 +0000235
showard542e8402008-09-19 20:16:18 +0000236 send_email(_email_from, _notify_email, subject, body)
jadmanski0afbb632008-06-06 21:10:57 +0000237 self._emails = []
showard7cf9a9b2008-05-15 21:15:52 +0000238
239email_manager = EmailNotificationManager()
240
241
showard63a34772008-08-18 19:32:50 +0000242class HostScheduler(object):
243 def _get_ready_hosts(self):
244 # avoid any host with a currently active queue entry against it
245 hosts = Host.fetch(
246 joins='LEFT JOIN host_queue_entries AS active_hqe '
247 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000248 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000249 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000250 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000251 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
252 return dict((host.id, host) for host in hosts)
253
254
255 @staticmethod
256 def _get_sql_id_list(id_list):
257 return ','.join(str(item_id) for item_id in id_list)
258
259
260 @classmethod
showard989f25d2008-10-01 11:38:11 +0000261 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000262 if not id_list:
263 return {}
showard63a34772008-08-18 19:32:50 +0000264 query %= cls._get_sql_id_list(id_list)
265 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000266 return cls._process_many2many_dict(rows, flip)
267
268
269 @staticmethod
270 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000271 result = {}
272 for row in rows:
273 left_id, right_id = long(row[0]), long(row[1])
showard989f25d2008-10-01 11:38:11 +0000274 if flip:
275 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000276 result.setdefault(left_id, set()).add(right_id)
277 return result
278
279
280 @classmethod
281 def _get_job_acl_groups(cls, job_ids):
282 query = """
283 SELECT jobs.id, acl_groups_users.acl_group_id
284 FROM jobs
285 INNER JOIN users ON users.login = jobs.owner
286 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
287 WHERE jobs.id IN (%s)
288 """
289 return cls._get_many2many_dict(query, job_ids)
290
291
292 @classmethod
293 def _get_job_ineligible_hosts(cls, job_ids):
294 query = """
295 SELECT job_id, host_id
296 FROM ineligible_host_queues
297 WHERE job_id IN (%s)
298 """
299 return cls._get_many2many_dict(query, job_ids)
300
301
302 @classmethod
showard989f25d2008-10-01 11:38:11 +0000303 def _get_job_dependencies(cls, job_ids):
304 query = """
305 SELECT job_id, label_id
306 FROM jobs_dependency_labels
307 WHERE job_id IN (%s)
308 """
309 return cls._get_many2many_dict(query, job_ids)
310
311
312 @classmethod
showard63a34772008-08-18 19:32:50 +0000313 def _get_host_acls(cls, host_ids):
314 query = """
315 SELECT host_id, acl_group_id
316 FROM acl_groups_hosts
317 WHERE host_id IN (%s)
318 """
319 return cls._get_many2many_dict(query, host_ids)
320
321
322 @classmethod
323 def _get_label_hosts(cls, host_ids):
324 query = """
325 SELECT label_id, host_id
326 FROM hosts_labels
327 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000328 """ % cls._get_sql_id_list(host_ids)
329 rows = _db.execute(query)
330 labels_to_hosts = cls._process_many2many_dict(rows)
331 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
332 return labels_to_hosts, hosts_to_labels
333
334
335 @classmethod
336 def _get_labels(cls):
337 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000338
339
340 def refresh(self, pending_queue_entries):
341 self._hosts_available = self._get_ready_hosts()
342
343 relevant_jobs = [queue_entry.job_id
344 for queue_entry in pending_queue_entries]
345 self._job_acls = self._get_job_acl_groups(relevant_jobs)
346 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000347 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000348
349 host_ids = self._hosts_available.keys()
350 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000351 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
352
353 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000354
355
356 def _is_acl_accessible(self, host_id, queue_entry):
357 job_acls = self._job_acls.get(queue_entry.job_id, set())
358 host_acls = self._host_acls.get(host_id, set())
359 return len(host_acls.intersection(job_acls)) > 0
360
361
showard989f25d2008-10-01 11:38:11 +0000362 def _check_job_dependencies(self, job_dependencies, host_labels):
363 missing = job_dependencies - host_labels
364 return len(job_dependencies - host_labels) == 0
365
366
367 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
368 queue_entry):
369 for label_id in host_labels:
370 label = self._labels[label_id]
371 if not label.only_if_needed:
372 # we don't care about non-only_if_needed labels
373 continue
374 if queue_entry.meta_host == label_id:
375 # if the label was requested in a metahost it's OK
376 continue
377 if label_id not in job_dependencies:
378 return False
379 return True
380
381
382 def _is_host_eligible_for_job(self, host_id, queue_entry):
383 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
384 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000385
386 acl = self._is_acl_accessible(host_id, queue_entry)
387 deps = self._check_job_dependencies(job_dependencies, host_labels)
388 only_if = self._check_only_if_needed_labels(job_dependencies,
389 host_labels, queue_entry)
390 return acl and deps and only_if
showard989f25d2008-10-01 11:38:11 +0000391
392
showard63a34772008-08-18 19:32:50 +0000393 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000394 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000395 return None
396 return self._hosts_available.pop(queue_entry.host_id, None)
397
398
399 def _is_host_usable(self, host_id):
400 if host_id not in self._hosts_available:
401 # host was already used during this scheduling cycle
402 return False
403 if self._hosts_available[host_id].invalid:
404 # Invalid hosts cannot be used for metahosts. They're included in
405 # the original query because they can be used by non-metahosts.
406 return False
407 return True
408
409
410 def _schedule_metahost(self, queue_entry):
411 label_id = queue_entry.meta_host
412 hosts_in_label = self._label_hosts.get(label_id, set())
413 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
414 set())
415
416 # must iterate over a copy so we can mutate the original while iterating
417 for host_id in list(hosts_in_label):
418 if not self._is_host_usable(host_id):
419 hosts_in_label.remove(host_id)
420 continue
421 if host_id in ineligible_host_ids:
422 continue
showard989f25d2008-10-01 11:38:11 +0000423 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000424 continue
425
426 hosts_in_label.remove(host_id)
427 return self._hosts_available.pop(host_id)
428 return None
429
430
431 def find_eligible_host(self, queue_entry):
432 if not queue_entry.meta_host:
433 return self._schedule_non_metahost(queue_entry)
434 return self._schedule_metahost(queue_entry)
435
436
mbligh36768f02008-02-22 18:28:33 +0000437class Dispatcher:
jadmanski0afbb632008-06-06 21:10:57 +0000438 autoserv_procs_cache = None
showard4c5374f2008-09-04 17:02:56 +0000439 max_running_processes = global_config.global_config.get_config_value(
jadmanski0afbb632008-06-06 21:10:57 +0000440 _global_config_section, 'max_running_jobs', type=int)
showard4c5374f2008-09-04 17:02:56 +0000441 max_processes_started_per_cycle = (
jadmanski0afbb632008-06-06 21:10:57 +0000442 global_config.global_config.get_config_value(
443 _global_config_section, 'max_jobs_started_per_cycle', type=int))
showard3bb499f2008-07-03 19:42:20 +0000444 clean_interval = (
445 global_config.global_config.get_config_value(
446 _global_config_section, 'clean_interval_minutes', type=int))
showard98863972008-10-29 21:14:56 +0000447 synch_job_start_timeout_minutes = (
448 global_config.global_config.get_config_value(
449 _global_config_section, 'synch_job_start_timeout_minutes',
450 type=int))
mbligh90a549d2008-03-25 23:52:34 +0000451
jadmanski0afbb632008-06-06 21:10:57 +0000452 def __init__(self):
453 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000454 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000455 self._host_scheduler = HostScheduler()
mbligh36768f02008-02-22 18:28:33 +0000456
mbligh36768f02008-02-22 18:28:33 +0000457
jadmanski0afbb632008-06-06 21:10:57 +0000458 def do_initial_recovery(self, recover_hosts=True):
459 # always recover processes
460 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000461
jadmanski0afbb632008-06-06 21:10:57 +0000462 if recover_hosts:
463 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000464
465
jadmanski0afbb632008-06-06 21:10:57 +0000466 def tick(self):
467 Dispatcher.autoserv_procs_cache = None
showarda3ab0d52008-11-03 19:03:47 +0000468 self._run_cleanup_maybe()
jadmanski0afbb632008-06-06 21:10:57 +0000469 self._find_aborting()
470 self._schedule_new_jobs()
471 self._handle_agents()
jadmanski0afbb632008-06-06 21:10:57 +0000472 email_manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000473
showard97aed502008-11-04 02:01:24 +0000474
showarda3ab0d52008-11-03 19:03:47 +0000475 def _run_cleanup_maybe(self):
476 if self._last_clean_time + self.clean_interval * 60 < time.time():
477 print 'Running cleanup'
478 self._abort_timed_out_jobs()
479 self._abort_jobs_past_synch_start_timeout()
480 self._clear_inactive_blocks()
481 self._last_clean_time = time.time()
482
mbligh36768f02008-02-22 18:28:33 +0000483
jadmanski0afbb632008-06-06 21:10:57 +0000484 def add_agent(self, agent):
485 self._agents.append(agent)
486 agent.dispatcher = self
mblighd5c95802008-03-05 00:33:46 +0000487
jadmanski0afbb632008-06-06 21:10:57 +0000488 # Find agent corresponding to the specified queue_entry
489 def get_agents(self, queue_entry):
490 res_agents = []
491 for agent in self._agents:
492 if queue_entry.id in agent.queue_entry_ids:
493 res_agents.append(agent)
494 return res_agents
mbligh36768f02008-02-22 18:28:33 +0000495
496
jadmanski0afbb632008-06-06 21:10:57 +0000497 def remove_agent(self, agent):
498 self._agents.remove(agent)
showardec113162008-05-08 00:52:49 +0000499
500
showard4c5374f2008-09-04 17:02:56 +0000501 def num_running_processes(self):
502 return sum(agent.num_processes for agent in self._agents
503 if agent.is_running())
mblighbb421852008-03-11 22:36:16 +0000504
505
jadmanski0afbb632008-06-06 21:10:57 +0000506 @classmethod
507 def find_autoservs(cls, orphans_only=False):
508 """\
509 Returns a dict mapping pids to command lines for root autoserv
510 processes. If orphans_only=True, return only processes that
511 have been orphaned (i.e. parent pid = 1).
512 """
513 if cls.autoserv_procs_cache is not None:
514 return cls.autoserv_procs_cache
515
516 proc = subprocess.Popen(
517 ['/bin/ps', 'x', '-o', 'pid,pgid,ppid,comm,args'],
518 stdout=subprocess.PIPE)
519 # split each line into the four columns output by ps
520 procs = [line.split(None, 4) for line in
521 proc.communicate()[0].splitlines()]
522 autoserv_procs = {}
523 for proc in procs:
524 # check ppid == 1 for orphans
525 if orphans_only and proc[2] != 1:
526 continue
527 # only root autoserv processes have pgid == pid
528 if (proc[3] == 'autoserv' and # comm
529 proc[1] == proc[0]): # pgid == pid
530 # map pid to args
531 autoserv_procs[int(proc[0])] = proc[4]
532 cls.autoserv_procs_cache = autoserv_procs
533 return autoserv_procs
mblighbb421852008-03-11 22:36:16 +0000534
535
jadmanski0afbb632008-06-06 21:10:57 +0000536 def recover_queue_entry(self, queue_entry, run_monitor):
537 job = queue_entry.job
538 if job.is_synchronous():
539 all_queue_entries = job.get_host_queue_entries()
540 else:
541 all_queue_entries = [queue_entry]
542 all_queue_entry_ids = [queue_entry.id for queue_entry
543 in all_queue_entries]
544 queue_task = RecoveryQueueTask(
545 job=queue_entry.job,
546 queue_entries=all_queue_entries,
547 run_monitor=run_monitor)
548 self.add_agent(Agent(tasks=[queue_task],
549 queue_entry_ids=all_queue_entry_ids))
mblighbb421852008-03-11 22:36:16 +0000550
551
jadmanski0afbb632008-06-06 21:10:57 +0000552 def _recover_processes(self):
553 orphans = self.find_autoservs(orphans_only=True)
mblighbb421852008-03-11 22:36:16 +0000554
jadmanski0afbb632008-06-06 21:10:57 +0000555 # first, recover running queue entries
556 rows = _db.execute("""SELECT * FROM host_queue_entries
557 WHERE status = 'Running'""")
558 queue_entries = [HostQueueEntry(row=i) for i in rows]
559 requeue_entries = []
560 recovered_entry_ids = set()
561 for queue_entry in queue_entries:
562 run_monitor = PidfileRunMonitor(
563 queue_entry.results_dir())
showard21baa452008-10-21 00:08:39 +0000564 if not run_monitor.has_pid():
jadmanski0afbb632008-06-06 21:10:57 +0000565 # autoserv apparently never got run, so requeue
566 requeue_entries.append(queue_entry)
567 continue
568 if queue_entry.id in recovered_entry_ids:
569 # synchronous job we've already recovered
570 continue
showard21baa452008-10-21 00:08:39 +0000571 pid = run_monitor.get_pid()
jadmanski0afbb632008-06-06 21:10:57 +0000572 print 'Recovering queue entry %d (pid %d)' % (
573 queue_entry.id, pid)
574 job = queue_entry.job
575 if job.is_synchronous():
576 for entry in job.get_host_queue_entries():
577 assert entry.active
578 recovered_entry_ids.add(entry.id)
579 self.recover_queue_entry(queue_entry,
580 run_monitor)
581 orphans.pop(pid, None)
mblighd5c95802008-03-05 00:33:46 +0000582
jadmanski0afbb632008-06-06 21:10:57 +0000583 # and requeue other active queue entries
584 rows = _db.execute("""SELECT * FROM host_queue_entries
585 WHERE active AND NOT complete
586 AND status != 'Running'
587 AND status != 'Pending'
588 AND status != 'Abort'
589 AND status != 'Aborting'""")
590 queue_entries = [HostQueueEntry(row=i) for i in rows]
591 for queue_entry in queue_entries + requeue_entries:
592 print 'Requeuing running QE %d' % queue_entry.id
593 queue_entry.clear_results_dir(dont_delete_files=True)
594 queue_entry.requeue()
mbligh90a549d2008-03-25 23:52:34 +0000595
596
jadmanski0afbb632008-06-06 21:10:57 +0000597 # now kill any remaining autoserv processes
598 for pid in orphans.keys():
599 print 'Killing orphan %d (%s)' % (pid, orphans[pid])
600 kill_autoserv(pid)
601
602 # recover aborting tasks
603 rebooting_host_ids = set()
604 rows = _db.execute("""SELECT * FROM host_queue_entries
605 WHERE status='Abort' or status='Aborting'""")
606 queue_entries = [HostQueueEntry(row=i) for i in rows]
607 for queue_entry in queue_entries:
608 print 'Recovering aborting QE %d' % queue_entry.id
showard1be97432008-10-17 15:30:45 +0000609 agent = queue_entry.abort()
610 self.add_agent(agent)
611 if queue_entry.get_host():
612 rebooting_host_ids.add(queue_entry.get_host().id)
jadmanski0afbb632008-06-06 21:10:57 +0000613
showard97aed502008-11-04 02:01:24 +0000614 self._recover_parsing_entries()
615
jadmanski0afbb632008-06-06 21:10:57 +0000616 # reverify hosts that were in the middle of verify, repair or
617 # reboot
618 self._reverify_hosts_where("""(status = 'Repairing' OR
619 status = 'Verifying' OR
620 status = 'Rebooting')""",
621 exclude_ids=rebooting_host_ids)
622
623 # finally, recover "Running" hosts with no active queue entries,
624 # although this should never happen
625 message = ('Recovering running host %s - this probably '
626 'indicates a scheduler bug')
627 self._reverify_hosts_where("""status = 'Running' AND
628 id NOT IN (SELECT host_id
629 FROM host_queue_entries
630 WHERE active)""",
631 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000632
633
jadmanski0afbb632008-06-06 21:10:57 +0000634 def _reverify_hosts_where(self, where,
635 print_message='Reverifying host %s',
636 exclude_ids=set()):
637 rows = _db.execute('SELECT * FROM hosts WHERE locked = 0 AND '
638 'invalid = 0 AND ' + where)
639 hosts = [Host(row=i) for i in rows]
640 for host in hosts:
641 if host.id in exclude_ids:
642 continue
643 if print_message is not None:
644 print print_message % host.hostname
645 verify_task = VerifyTask(host = host)
646 self.add_agent(Agent(tasks = [verify_task]))
mbligh36768f02008-02-22 18:28:33 +0000647
648
showard97aed502008-11-04 02:01:24 +0000649 def _recover_parsing_entries(self):
650 # make sure there are no old parsers running
651 os.system('killall parse')
652
653 recovered_synch_jobs = set()
654 for entry in HostQueueEntry.fetch(where='status = "Parsing"'):
655 job = entry.job
656 if job.is_synchronous():
657 if job.id in recovered_synch_jobs:
658 continue
659 queue_entries = job.get_host_queue_entries()
660 recovered_synch_jobs.add(job.id)
661 else:
662 queue_entries = [entry]
663
664 reparse_task = FinalReparseTask(queue_entries)
665 self.add_agent(Agent([reparse_task]))
666
667
jadmanski0afbb632008-06-06 21:10:57 +0000668 def _recover_hosts(self):
669 # recover "Repair Failed" hosts
670 message = 'Reverifying dead host %s'
671 self._reverify_hosts_where("status = 'Repair Failed'",
672 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000673
674
showard3bb499f2008-07-03 19:42:20 +0000675 def _abort_timed_out_jobs(self):
676 """
677 Aborts all jobs that have timed out and not completed
678 """
showarda3ab0d52008-11-03 19:03:47 +0000679 query = models.Job.objects.filter(hostqueueentry__complete=False).extra(
680 where=['created_on + INTERVAL timeout HOUR < NOW()'])
681 for job in query.distinct():
682 print 'Aborting job %d due to job timeout' % job.id
683 job.abort(None)
showard3bb499f2008-07-03 19:42:20 +0000684
685
showard98863972008-10-29 21:14:56 +0000686 def _abort_jobs_past_synch_start_timeout(self):
687 """
688 Abort synchronous jobs that are past the start timeout (from global
689 config) and are holding a machine that's in everyone.
690 """
691 timeout_delta = datetime.timedelta(
692 minutes=self.synch_job_start_timeout_minutes)
693 timeout_start = datetime.datetime.now() - timeout_delta
694 query = models.Job.objects.filter(
695 synch_type=models.Test.SynchType.SYNCHRONOUS,
696 created_on__lt=timeout_start,
697 hostqueueentry__status='Pending',
698 hostqueueentry__host__acl_group__name='Everyone')
699 for job in query.distinct():
700 print 'Aborting job %d due to start timeout' % job.id
701 job.abort(None)
702
703
jadmanski0afbb632008-06-06 21:10:57 +0000704 def _clear_inactive_blocks(self):
705 """
706 Clear out blocks for all completed jobs.
707 """
708 # this would be simpler using NOT IN (subquery), but MySQL
709 # treats all IN subqueries as dependent, so this optimizes much
710 # better
711 _db.execute("""
712 DELETE ihq FROM ineligible_host_queues ihq
showard4eaaf522008-06-06 22:28:07 +0000713 LEFT JOIN (SELECT DISTINCT job_id FROM host_queue_entries
jadmanski0afbb632008-06-06 21:10:57 +0000714 WHERE NOT complete) hqe
715 USING (job_id) WHERE hqe.job_id IS NULL""")
showard04c82c52008-05-29 19:38:12 +0000716
717
showardb95b1bd2008-08-15 18:11:04 +0000718 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000719 # prioritize by job priority, then non-metahost over metahost, then FIFO
720 return list(HostQueueEntry.fetch(
721 where='NOT complete AND NOT active',
showard3dd6b882008-10-27 19:21:39 +0000722 order_by='priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000723
724
jadmanski0afbb632008-06-06 21:10:57 +0000725 def _schedule_new_jobs(self):
726 print "finding work"
727
showard63a34772008-08-18 19:32:50 +0000728 queue_entries = self._get_pending_queue_entries()
729 if not queue_entries:
showardb95b1bd2008-08-15 18:11:04 +0000730 return
showardb95b1bd2008-08-15 18:11:04 +0000731
showard63a34772008-08-18 19:32:50 +0000732 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000733
showard63a34772008-08-18 19:32:50 +0000734 for queue_entry in queue_entries:
735 assigned_host = self._host_scheduler.find_eligible_host(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000736 if not assigned_host:
jadmanski0afbb632008-06-06 21:10:57 +0000737 continue
showardb95b1bd2008-08-15 18:11:04 +0000738 self._run_queue_entry(queue_entry, assigned_host)
739
740
741 def _run_queue_entry(self, queue_entry, host):
742 agent = queue_entry.run(assigned_host=host)
showard9976ce92008-10-15 20:28:13 +0000743 # in some cases (synchronous jobs with run_verify=False), agent may be None
744 if agent:
745 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000746
747
jadmanski0afbb632008-06-06 21:10:57 +0000748 def _find_aborting(self):
749 num_aborted = 0
750 # Find jobs that are aborting
751 for entry in queue_entries_to_abort():
752 agents_to_abort = self.get_agents(entry)
showard1be97432008-10-17 15:30:45 +0000753 for agent in agents_to_abort:
754 self.remove_agent(agent)
755
756 agent = entry.abort(agents_to_abort)
757 self.add_agent(agent)
jadmanski0afbb632008-06-06 21:10:57 +0000758 num_aborted += 1
759 if num_aborted >= 50:
760 break
761
762
showard4c5374f2008-09-04 17:02:56 +0000763 def _can_start_agent(self, agent, num_running_processes,
764 num_started_this_cycle, have_reached_limit):
765 # always allow zero-process agents to run
766 if agent.num_processes == 0:
767 return True
768 # don't allow any nonzero-process agents to run after we've reached a
769 # limit (this avoids starvation of many-process agents)
770 if have_reached_limit:
771 return False
772 # total process throttling
773 if (num_running_processes + agent.num_processes >
774 self.max_running_processes):
775 return False
776 # if a single agent exceeds the per-cycle throttling, still allow it to
777 # run when it's the first agent in the cycle
778 if num_started_this_cycle == 0:
779 return True
780 # per-cycle throttling
781 if (num_started_this_cycle + agent.num_processes >
782 self.max_processes_started_per_cycle):
783 return False
784 return True
785
786
jadmanski0afbb632008-06-06 21:10:57 +0000787 def _handle_agents(self):
showard4c5374f2008-09-04 17:02:56 +0000788 num_running_processes = self.num_running_processes()
jadmanski0afbb632008-06-06 21:10:57 +0000789 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000790 have_reached_limit = False
791 # iterate over copy, so we can remove agents during iteration
792 for agent in list(self._agents):
793 if agent.is_done():
jadmanski0afbb632008-06-06 21:10:57 +0000794 print "agent finished"
showard4c5374f2008-09-04 17:02:56 +0000795 self._agents.remove(agent)
796 num_running_processes -= agent.num_processes
797 continue
798 if not agent.is_running():
799 if not self._can_start_agent(agent, num_running_processes,
800 num_started_this_cycle,
801 have_reached_limit):
802 have_reached_limit = True
803 continue
804 num_running_processes += agent.num_processes
805 num_started_this_cycle += agent.num_processes
806 agent.tick()
807 print num_running_processes, 'running processes'
mbligh36768f02008-02-22 18:28:33 +0000808
809
810class RunMonitor(object):
jadmanski0afbb632008-06-06 21:10:57 +0000811 def __init__(self, cmd, nice_level = None, log_file = None):
812 self.nice_level = nice_level
813 self.log_file = log_file
814 self.cmd = cmd
mbligh36768f02008-02-22 18:28:33 +0000815
jadmanski0afbb632008-06-06 21:10:57 +0000816 def run(self):
817 if self.nice_level:
818 nice_cmd = ['nice','-n', str(self.nice_level)]
819 nice_cmd.extend(self.cmd)
820 self.cmd = nice_cmd
mbligh36768f02008-02-22 18:28:33 +0000821
jadmanski0afbb632008-06-06 21:10:57 +0000822 out_file = None
823 if self.log_file:
824 try:
825 os.makedirs(os.path.dirname(self.log_file))
826 except OSError, exc:
827 if exc.errno != errno.EEXIST:
828 log_stacktrace(
829 'Unexpected error creating logfile '
830 'directory for %s' % self.log_file)
831 try:
832 out_file = open(self.log_file, 'a')
833 out_file.write("\n%s\n" % ('*'*80))
834 out_file.write("%s> %s\n" %
835 (time.strftime("%X %x"),
836 self.cmd))
837 out_file.write("%s\n" % ('*'*80))
838 except (OSError, IOError):
839 log_stacktrace('Error opening log file %s' %
840 self.log_file)
mblighcadb3532008-04-15 17:46:26 +0000841
jadmanski0afbb632008-06-06 21:10:57 +0000842 if not out_file:
843 out_file = open('/dev/null', 'w')
mblighcadb3532008-04-15 17:46:26 +0000844
jadmanski0afbb632008-06-06 21:10:57 +0000845 in_devnull = open('/dev/null', 'r')
846 print "cmd = %s" % self.cmd
847 print "path = %s" % os.getcwd()
mbligh36768f02008-02-22 18:28:33 +0000848
jadmanski0afbb632008-06-06 21:10:57 +0000849 self.proc = subprocess.Popen(self.cmd, stdout=out_file,
850 stderr=subprocess.STDOUT,
851 stdin=in_devnull)
852 out_file.close()
853 in_devnull.close()
mbligh36768f02008-02-22 18:28:33 +0000854
855
jadmanski0afbb632008-06-06 21:10:57 +0000856 def get_pid(self):
857 return self.proc.pid
mblighbb421852008-03-11 22:36:16 +0000858
859
jadmanski0afbb632008-06-06 21:10:57 +0000860 def kill(self):
861 kill_autoserv(self.get_pid(), self.exit_code)
mblighbb421852008-03-11 22:36:16 +0000862
mbligh36768f02008-02-22 18:28:33 +0000863
jadmanski0afbb632008-06-06 21:10:57 +0000864 def exit_code(self):
865 return self.proc.poll()
mbligh36768f02008-02-22 18:28:33 +0000866
867
mblighbb421852008-03-11 22:36:16 +0000868class PidfileException(Exception):
jadmanski0afbb632008-06-06 21:10:57 +0000869 """\
870 Raised when there's some unexpected behavior with the pid file.
871 """
mblighbb421852008-03-11 22:36:16 +0000872
873
874class PidfileRunMonitor(RunMonitor):
showard21baa452008-10-21 00:08:39 +0000875 class PidfileState(object):
876 pid = None
877 exit_status = None
878 num_tests_failed = None
879
880 def reset(self):
881 self.pid = self.exit_status = self.all_tests_passed = None
882
883
jadmanski0afbb632008-06-06 21:10:57 +0000884 def __init__(self, results_dir, cmd=None, nice_level=None,
885 log_file=None):
886 self.results_dir = os.path.abspath(results_dir)
887 self.pid_file = os.path.join(results_dir, AUTOSERV_PID_FILE)
888 self.lost_process = False
889 self.start_time = time.time()
showard21baa452008-10-21 00:08:39 +0000890 self._state = self.PidfileState()
showardb376bc52008-06-13 20:48:45 +0000891 super(PidfileRunMonitor, self).__init__(cmd, nice_level, log_file)
mblighbb421852008-03-11 22:36:16 +0000892
893
showard21baa452008-10-21 00:08:39 +0000894 def has_pid(self):
895 self._get_pidfile_info()
896 return self._state.pid is not None
897
898
jadmanski0afbb632008-06-06 21:10:57 +0000899 def get_pid(self):
showard21baa452008-10-21 00:08:39 +0000900 self._get_pidfile_info()
901 assert self._state.pid is not None
902 return self._state.pid
mblighbb421852008-03-11 22:36:16 +0000903
904
jadmanski0afbb632008-06-06 21:10:57 +0000905 def _check_command_line(self, command_line, spacer=' ',
906 print_error=False):
907 results_dir_arg = spacer.join(('', '-r', self.results_dir, ''))
908 match = results_dir_arg in command_line
909 if print_error and not match:
910 print '%s not found in %s' % (repr(results_dir_arg),
911 repr(command_line))
912 return match
mbligh90a549d2008-03-25 23:52:34 +0000913
914
showard21baa452008-10-21 00:08:39 +0000915 def _check_proc_fs(self):
916 cmdline_path = os.path.join('/proc', str(self._state.pid), 'cmdline')
jadmanski0afbb632008-06-06 21:10:57 +0000917 try:
918 cmdline_file = open(cmdline_path, 'r')
919 cmdline = cmdline_file.read().strip()
920 cmdline_file.close()
921 except IOError:
922 return False
923 # /proc/.../cmdline has \x00 separating args
924 return self._check_command_line(cmdline, spacer='\x00',
925 print_error=True)
mblighbb421852008-03-11 22:36:16 +0000926
927
showard21baa452008-10-21 00:08:39 +0000928 def _read_pidfile(self):
929 self._state.reset()
jadmanski0afbb632008-06-06 21:10:57 +0000930 if not os.path.exists(self.pid_file):
showard21baa452008-10-21 00:08:39 +0000931 return
jadmanski0afbb632008-06-06 21:10:57 +0000932 file_obj = open(self.pid_file, 'r')
933 lines = file_obj.readlines()
934 file_obj.close()
showard3dd6b882008-10-27 19:21:39 +0000935 if not lines:
936 return
937 if len(lines) > 3:
showard21baa452008-10-21 00:08:39 +0000938 raise PidfileException('Corrupt pid file (%d lines) at %s:\n%s' %
939 (len(lines), self.pid_file, lines))
jadmanski0afbb632008-06-06 21:10:57 +0000940 try:
showard21baa452008-10-21 00:08:39 +0000941 self._state.pid = int(lines[0])
942 if len(lines) > 1:
943 self._state.exit_status = int(lines[1])
944 if len(lines) == 3:
945 self._state.num_tests_failed = int(lines[2])
946 else:
947 # maintain backwards-compatibility with two-line pidfiles
948 self._state.num_tests_failed = 0
jadmanski0afbb632008-06-06 21:10:57 +0000949 except ValueError, exc:
showard3dd6b882008-10-27 19:21:39 +0000950 raise PidfileException('Corrupt pid file: ' + str(exc.args))
mblighbb421852008-03-11 22:36:16 +0000951
mblighbb421852008-03-11 22:36:16 +0000952
jadmanski0afbb632008-06-06 21:10:57 +0000953 def _find_autoserv_proc(self):
954 autoserv_procs = Dispatcher.find_autoservs()
955 for pid, args in autoserv_procs.iteritems():
956 if self._check_command_line(args):
957 return pid, args
958 return None, None
mbligh90a549d2008-03-25 23:52:34 +0000959
960
showard21baa452008-10-21 00:08:39 +0000961 def _handle_pidfile_error(self, error, message=''):
962 message = error + '\nPid: %s\nPidfile: %s\n%s' % (self._state.pid,
963 self.pid_file,
964 message)
965 print message
966 email_manager.enqueue_notify_email(error, message)
967 if self._state.pid is not None:
968 pid = self._state.pid
969 else:
970 pid = 0
971 self.on_lost_process(pid)
972
973
974 def _get_pidfile_info_helper(self):
jadmanski0afbb632008-06-06 21:10:57 +0000975 if self.lost_process:
showard21baa452008-10-21 00:08:39 +0000976 return
mblighbb421852008-03-11 22:36:16 +0000977
showard21baa452008-10-21 00:08:39 +0000978 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +0000979
showard21baa452008-10-21 00:08:39 +0000980 if self._state.pid is None:
981 self._handle_no_pid()
982 return
mbligh90a549d2008-03-25 23:52:34 +0000983
showard21baa452008-10-21 00:08:39 +0000984 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000985 # double check whether or not autoserv is running
showard21baa452008-10-21 00:08:39 +0000986 proc_running = self._check_proc_fs()
jadmanski0afbb632008-06-06 21:10:57 +0000987 if proc_running:
showard21baa452008-10-21 00:08:39 +0000988 return
mbligh90a549d2008-03-25 23:52:34 +0000989
jadmanski0afbb632008-06-06 21:10:57 +0000990 # pid but no process - maybe process *just* exited
showard21baa452008-10-21 00:08:39 +0000991 self._read_pidfile()
992 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000993 # autoserv exited without writing an exit code
994 # to the pidfile
showard21baa452008-10-21 00:08:39 +0000995 self._handle_pidfile_error(
996 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +0000997
showard21baa452008-10-21 00:08:39 +0000998
999 def _get_pidfile_info(self):
1000 """\
1001 After completion, self._state will contain:
1002 pid=None, exit_status=None if autoserv has not yet run
1003 pid!=None, exit_status=None if autoserv is running
1004 pid!=None, exit_status!=None if autoserv has completed
1005 """
1006 try:
1007 self._get_pidfile_info_helper()
1008 except PidfileException, exc:
1009 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001010
1011
jadmanski0afbb632008-06-06 21:10:57 +00001012 def _handle_no_pid(self):
1013 """\
1014 Called when no pidfile is found or no pid is in the pidfile.
1015 """
1016 # is autoserv running?
1017 pid, args = self._find_autoserv_proc()
1018 if pid is None:
1019 # no autoserv process running
1020 message = 'No pid found at ' + self.pid_file
1021 else:
1022 message = ("Process %d (%s) hasn't written pidfile %s" %
1023 (pid, args, self.pid_file))
mbligh90a549d2008-03-25 23:52:34 +00001024
jadmanski0afbb632008-06-06 21:10:57 +00001025 print message
1026 if time.time() - self.start_time > PIDFILE_TIMEOUT:
1027 email_manager.enqueue_notify_email(
1028 'Process has failed to write pidfile', message)
1029 if pid is not None:
1030 kill_autoserv(pid)
1031 else:
1032 pid = 0
1033 self.on_lost_process(pid)
showard21baa452008-10-21 00:08:39 +00001034 return
mbligh90a549d2008-03-25 23:52:34 +00001035
1036
jadmanski0afbb632008-06-06 21:10:57 +00001037 def on_lost_process(self, pid):
1038 """\
1039 Called when autoserv has exited without writing an exit status,
1040 or we've timed out waiting for autoserv to write a pid to the
1041 pidfile. In either case, we just return failure and the caller
1042 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001043
jadmanski0afbb632008-06-06 21:10:57 +00001044 pid is unimportant here, as it shouldn't be used by anyone.
1045 """
1046 self.lost_process = True
showard21baa452008-10-21 00:08:39 +00001047 self._state.pid = pid
1048 self._state.exit_status = 1
1049 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001050
1051
jadmanski0afbb632008-06-06 21:10:57 +00001052 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001053 self._get_pidfile_info()
1054 return self._state.exit_status
1055
1056
1057 def num_tests_failed(self):
1058 self._get_pidfile_info()
1059 assert self._state.num_tests_failed is not None
1060 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001061
1062
mbligh36768f02008-02-22 18:28:33 +00001063class Agent(object):
showard4c5374f2008-09-04 17:02:56 +00001064 def __init__(self, tasks, queue_entry_ids=[], num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001065 self.active_task = None
1066 self.queue = Queue.Queue(0)
1067 self.dispatcher = None
1068 self.queue_entry_ids = queue_entry_ids
showard4c5374f2008-09-04 17:02:56 +00001069 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001070
1071 for task in tasks:
1072 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001073
1074
jadmanski0afbb632008-06-06 21:10:57 +00001075 def add_task(self, task):
1076 self.queue.put_nowait(task)
1077 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001078
1079
jadmanski0afbb632008-06-06 21:10:57 +00001080 def tick(self):
showard21baa452008-10-21 00:08:39 +00001081 while not self.is_done():
1082 if self.active_task and not self.active_task.is_done():
1083 self.active_task.poll()
1084 if not self.active_task.is_done():
1085 return
1086 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001087
1088
jadmanski0afbb632008-06-06 21:10:57 +00001089 def _next_task(self):
1090 print "agent picking task"
1091 if self.active_task:
1092 assert self.active_task.is_done()
mbligh36768f02008-02-22 18:28:33 +00001093
jadmanski0afbb632008-06-06 21:10:57 +00001094 if not self.active_task.success:
1095 self.on_task_failure()
mblighe2586682008-02-29 22:45:46 +00001096
jadmanski0afbb632008-06-06 21:10:57 +00001097 self.active_task = None
1098 if not self.is_done():
1099 self.active_task = self.queue.get_nowait()
1100 if self.active_task:
1101 self.active_task.start()
mbligh36768f02008-02-22 18:28:33 +00001102
1103
jadmanski0afbb632008-06-06 21:10:57 +00001104 def on_task_failure(self):
1105 self.queue = Queue.Queue(0)
1106 for task in self.active_task.failure_tasks:
1107 self.add_task(task)
mbligh16c722d2008-03-05 00:58:44 +00001108
mblighe2586682008-02-29 22:45:46 +00001109
showard4c5374f2008-09-04 17:02:56 +00001110 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001111 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001112
1113
jadmanski0afbb632008-06-06 21:10:57 +00001114 def is_done(self):
1115 return self.active_task == None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001116
1117
jadmanski0afbb632008-06-06 21:10:57 +00001118 def start(self):
1119 assert self.dispatcher
mbligh36768f02008-02-22 18:28:33 +00001120
jadmanski0afbb632008-06-06 21:10:57 +00001121 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001122
jadmanski0afbb632008-06-06 21:10:57 +00001123
mbligh36768f02008-02-22 18:28:33 +00001124class AgentTask(object):
jadmanski0afbb632008-06-06 21:10:57 +00001125 def __init__(self, cmd, failure_tasks = []):
1126 self.done = False
1127 self.failure_tasks = failure_tasks
1128 self.started = False
1129 self.cmd = cmd
1130 self.task = None
1131 self.agent = None
1132 self.monitor = None
1133 self.success = None
mbligh36768f02008-02-22 18:28:33 +00001134
1135
jadmanski0afbb632008-06-06 21:10:57 +00001136 def poll(self):
1137 print "poll"
1138 if self.monitor:
1139 self.tick(self.monitor.exit_code())
1140 else:
1141 self.finished(False)
mbligh36768f02008-02-22 18:28:33 +00001142
1143
jadmanski0afbb632008-06-06 21:10:57 +00001144 def tick(self, exit_code):
1145 if exit_code==None:
1146 return
1147# print "exit_code was %d" % exit_code
1148 if exit_code == 0:
1149 success = True
1150 else:
1151 success = False
mbligh36768f02008-02-22 18:28:33 +00001152
jadmanski0afbb632008-06-06 21:10:57 +00001153 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001154
1155
jadmanski0afbb632008-06-06 21:10:57 +00001156 def is_done(self):
1157 return self.done
mbligh36768f02008-02-22 18:28:33 +00001158
1159
jadmanski0afbb632008-06-06 21:10:57 +00001160 def finished(self, success):
1161 self.done = True
1162 self.success = success
1163 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001164
1165
jadmanski0afbb632008-06-06 21:10:57 +00001166 def prolog(self):
1167 pass
mblighd64e5702008-04-04 21:39:28 +00001168
1169
jadmanski0afbb632008-06-06 21:10:57 +00001170 def create_temp_resultsdir(self, suffix=''):
1171 self.temp_results_dir = tempfile.mkdtemp(suffix=suffix)
mblighd64e5702008-04-04 21:39:28 +00001172
mbligh36768f02008-02-22 18:28:33 +00001173
jadmanski0afbb632008-06-06 21:10:57 +00001174 def cleanup(self):
1175 if (hasattr(self, 'temp_results_dir') and
1176 os.path.exists(self.temp_results_dir)):
1177 shutil.rmtree(self.temp_results_dir)
mbligh36768f02008-02-22 18:28:33 +00001178
1179
jadmanski0afbb632008-06-06 21:10:57 +00001180 def epilog(self):
1181 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001182
1183
jadmanski0afbb632008-06-06 21:10:57 +00001184 def start(self):
1185 assert self.agent
1186
1187 if not self.started:
1188 self.prolog()
1189 self.run()
1190
1191 self.started = True
1192
1193
1194 def abort(self):
1195 if self.monitor:
1196 self.monitor.kill()
1197 self.done = True
1198 self.cleanup()
1199
1200
1201 def run(self):
1202 if self.cmd:
1203 print "agent starting monitor"
1204 log_file = None
showard97aed502008-11-04 02:01:24 +00001205 if hasattr(self, 'log_file'):
1206 log_file = self.log_file
1207 elif hasattr(self, 'host'):
jadmanski0afbb632008-06-06 21:10:57 +00001208 log_file = os.path.join(RESULTS_DIR, 'hosts',
1209 self.host.hostname)
1210 self.monitor = RunMonitor(
showard97aed502008-11-04 02:01:24 +00001211 self.cmd, nice_level=AUTOSERV_NICE_LEVEL, log_file=log_file)
jadmanski0afbb632008-06-06 21:10:57 +00001212 self.monitor.run()
mbligh36768f02008-02-22 18:28:33 +00001213
1214
1215class RepairTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001216 def __init__(self, host, fail_queue_entry=None):
1217 """\
1218 fail_queue_entry: queue entry to mark failed if this repair
1219 fails.
1220 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001221 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001222 # normalize the protection name
1223 protection = host_protections.Protection.get_attr_name(protection)
jadmanski0afbb632008-06-06 21:10:57 +00001224 self.create_temp_resultsdir('.repair')
1225 cmd = [_autoserv_path , '-R', '-m', host.hostname,
jadmanskifb7cfb12008-07-09 14:13:21 +00001226 '-r', self.temp_results_dir, '--host-protection', protection]
jadmanski0afbb632008-06-06 21:10:57 +00001227 self.host = host
1228 self.fail_queue_entry = fail_queue_entry
1229 super(RepairTask, self).__init__(cmd)
mblighe2586682008-02-29 22:45:46 +00001230
mbligh36768f02008-02-22 18:28:33 +00001231
jadmanski0afbb632008-06-06 21:10:57 +00001232 def prolog(self):
1233 print "repair_task starting"
1234 self.host.set_status('Repairing')
mbligh36768f02008-02-22 18:28:33 +00001235
1236
jadmanski0afbb632008-06-06 21:10:57 +00001237 def epilog(self):
1238 super(RepairTask, self).epilog()
1239 if self.success:
1240 self.host.set_status('Ready')
1241 else:
1242 self.host.set_status('Repair Failed')
1243 if self.fail_queue_entry:
1244 self.fail_queue_entry.handle_host_failure()
mbligh36768f02008-02-22 18:28:33 +00001245
1246
1247class VerifyTask(AgentTask):
showard9976ce92008-10-15 20:28:13 +00001248 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001249 assert bool(queue_entry) != bool(host)
mbligh36768f02008-02-22 18:28:33 +00001250
jadmanski0afbb632008-06-06 21:10:57 +00001251 self.host = host or queue_entry.host
1252 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001253
jadmanski0afbb632008-06-06 21:10:57 +00001254 self.create_temp_resultsdir('.verify')
showard3d9899a2008-07-31 02:11:58 +00001255
showard9976ce92008-10-15 20:28:13 +00001256 cmd = [_autoserv_path,'-v','-m',self.host.hostname, '-r', self.temp_results_dir]
mbligh36768f02008-02-22 18:28:33 +00001257
jadmanski0afbb632008-06-06 21:10:57 +00001258 fail_queue_entry = None
1259 if queue_entry and not queue_entry.meta_host:
1260 fail_queue_entry = queue_entry
1261 failure_tasks = [RepairTask(self.host, fail_queue_entry)]
mblighe2586682008-02-29 22:45:46 +00001262
jadmanski0afbb632008-06-06 21:10:57 +00001263 super(VerifyTask, self).__init__(cmd,
1264 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001265
1266
jadmanski0afbb632008-06-06 21:10:57 +00001267 def prolog(self):
1268 print "starting verify on %s" % (self.host.hostname)
1269 if self.queue_entry:
1270 self.queue_entry.set_status('Verifying')
1271 self.queue_entry.clear_results_dir(
1272 self.queue_entry.verify_results_dir())
1273 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001274
1275
jadmanski0afbb632008-06-06 21:10:57 +00001276 def cleanup(self):
1277 if not os.path.exists(self.temp_results_dir):
1278 return
1279 if self.queue_entry and (self.success or
1280 not self.queue_entry.meta_host):
1281 self.move_results()
1282 super(VerifyTask, self).cleanup()
mblighd64e5702008-04-04 21:39:28 +00001283
1284
jadmanski0afbb632008-06-06 21:10:57 +00001285 def epilog(self):
1286 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001287
jadmanski0afbb632008-06-06 21:10:57 +00001288 if self.success:
1289 self.host.set_status('Ready')
1290 elif self.queue_entry:
1291 self.queue_entry.requeue()
mbligh36768f02008-02-22 18:28:33 +00001292
1293
jadmanski0afbb632008-06-06 21:10:57 +00001294 def move_results(self):
1295 assert self.queue_entry is not None
1296 target_dir = self.queue_entry.verify_results_dir()
1297 if not os.path.exists(target_dir):
1298 os.makedirs(target_dir)
1299 files = os.listdir(self.temp_results_dir)
1300 for filename in files:
1301 if filename == AUTOSERV_PID_FILE:
1302 continue
1303 self.force_move(os.path.join(self.temp_results_dir,
1304 filename),
1305 os.path.join(target_dir, filename))
mbligh36768f02008-02-22 18:28:33 +00001306
1307
jadmanski0afbb632008-06-06 21:10:57 +00001308 @staticmethod
1309 def force_move(source, dest):
1310 """\
1311 Replacement for shutil.move() that will delete the destination
1312 if it exists, even if it's a directory.
1313 """
1314 if os.path.exists(dest):
1315 print ('Warning: removing existing destination file ' +
1316 dest)
1317 remove_file_or_dir(dest)
1318 shutil.move(source, dest)
mblighe2586682008-02-29 22:45:46 +00001319
1320
mblighdffd6372008-02-29 22:47:33 +00001321class VerifySynchronousTask(VerifyTask):
jadmanski0afbb632008-06-06 21:10:57 +00001322 def epilog(self):
1323 super(VerifySynchronousTask, self).epilog()
1324 if self.success:
1325 if self.queue_entry.job.num_complete() > 0:
1326 # some other entry failed verify, and we've
1327 # already been marked as stopped
1328 return
mblighdffd6372008-02-29 22:47:33 +00001329
showardb2e2c322008-10-14 17:33:55 +00001330 agent = self.queue_entry.on_pending()
1331 if agent:
jadmanski0afbb632008-06-06 21:10:57 +00001332 self.agent.dispatcher.add_agent(agent)
mblighe2586682008-02-29 22:45:46 +00001333
showardb2e2c322008-10-14 17:33:55 +00001334
mbligh36768f02008-02-22 18:28:33 +00001335class QueueTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001336 def __init__(self, job, queue_entries, cmd):
1337 super(QueueTask, self).__init__(cmd)
1338 self.job = job
1339 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001340
1341
jadmanski0afbb632008-06-06 21:10:57 +00001342 @staticmethod
showardd8e548a2008-09-09 03:04:57 +00001343 def _write_keyval(keyval_dir, field, value, keyval_filename='keyval'):
1344 key_path = os.path.join(keyval_dir, keyval_filename)
jadmanski0afbb632008-06-06 21:10:57 +00001345 keyval_file = open(key_path, 'a')
showardd8e548a2008-09-09 03:04:57 +00001346 print >> keyval_file, '%s=%s' % (field, str(value))
jadmanski0afbb632008-06-06 21:10:57 +00001347 keyval_file.close()
mbligh36768f02008-02-22 18:28:33 +00001348
1349
showardd8e548a2008-09-09 03:04:57 +00001350 def _host_keyval_dir(self):
1351 return os.path.join(self.results_dir(), 'host_keyvals')
1352
1353
1354 def _write_host_keyval(self, host):
1355 labels = ','.join(host.labels())
1356 self._write_keyval(self._host_keyval_dir(), 'labels', labels,
1357 keyval_filename=host.hostname)
1358
1359 def _create_host_keyval_dir(self):
1360 directory = self._host_keyval_dir()
1361 if not os.path.exists(directory):
1362 os.makedirs(directory)
1363
1364
jadmanski0afbb632008-06-06 21:10:57 +00001365 def results_dir(self):
1366 return self.queue_entries[0].results_dir()
mblighbb421852008-03-11 22:36:16 +00001367
1368
jadmanski0afbb632008-06-06 21:10:57 +00001369 def run(self):
1370 """\
1371 Override AgentTask.run() so we can use a PidfileRunMonitor.
1372 """
1373 self.monitor = PidfileRunMonitor(self.results_dir(),
1374 cmd=self.cmd,
1375 nice_level=AUTOSERV_NICE_LEVEL)
1376 self.monitor.run()
mblighbb421852008-03-11 22:36:16 +00001377
1378
jadmanski0afbb632008-06-06 21:10:57 +00001379 def prolog(self):
1380 # write some job timestamps into the job keyval file
1381 queued = time.mktime(self.job.created_on.timetuple())
1382 started = time.time()
showardd8e548a2008-09-09 03:04:57 +00001383 self._write_keyval(self.results_dir(), "job_queued", int(queued))
1384 self._write_keyval(self.results_dir(), "job_started", int(started))
1385 self._create_host_keyval_dir()
jadmanski0afbb632008-06-06 21:10:57 +00001386 for queue_entry in self.queue_entries:
showardd8e548a2008-09-09 03:04:57 +00001387 self._write_host_keyval(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001388 print "starting queue_task on %s/%s" % (queue_entry.host.hostname, queue_entry.id)
1389 queue_entry.set_status('Running')
1390 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001391 queue_entry.host.update_field('dirty', 1)
jadmanski0afbb632008-06-06 21:10:57 +00001392 if (not self.job.is_synchronous() and
1393 self.job.num_machines() > 1):
1394 assert len(self.queue_entries) == 1
1395 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001396
1397
showard97aed502008-11-04 02:01:24 +00001398 def _finish_task(self, success):
jadmanski0afbb632008-06-06 21:10:57 +00001399 # write out the finished time into the results keyval
1400 finished = time.time()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001401 self._write_keyval(self.results_dir(), "job_finished", int(finished))
jadmanskic2ac77f2008-05-16 21:44:04 +00001402
jadmanski0afbb632008-06-06 21:10:57 +00001403 # parse the results of the job
showard97aed502008-11-04 02:01:24 +00001404 reparse_task = FinalReparseTask(self.queue_entries)
1405 self.agent.dispatcher.add_agent(Agent([reparse_task]))
jadmanskif7fa2cc2008-10-01 14:13:23 +00001406
1407
1408 def _log_abort(self):
1409 # build up sets of all the aborted_by and aborted_on values
1410 aborted_by, aborted_on = set(), set()
1411 for queue_entry in self.queue_entries:
1412 if queue_entry.aborted_by:
1413 aborted_by.add(queue_entry.aborted_by)
1414 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1415 aborted_on.add(t)
1416
1417 # extract some actual, unique aborted by value and write it out
1418 assert len(aborted_by) <= 1
1419 if len(aborted_by) == 1:
1420 results_dir = self.results_dir()
1421 self._write_keyval(results_dir, "aborted_by", aborted_by.pop())
1422 self._write_keyval(results_dir, "aborted_on", max(aborted_on))
jadmanskic2ac77f2008-05-16 21:44:04 +00001423
1424
jadmanski0afbb632008-06-06 21:10:57 +00001425 def abort(self):
1426 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001427 self._log_abort()
showard97aed502008-11-04 02:01:24 +00001428 self._finish_task(False)
jadmanskic2ac77f2008-05-16 21:44:04 +00001429
1430
showard21baa452008-10-21 00:08:39 +00001431 def _reboot_hosts(self):
1432 reboot_after = self.job.reboot_after
1433 do_reboot = False
showard0fc38302008-10-23 00:44:07 +00001434 if reboot_after == models.RebootAfter.ALWAYS:
showard21baa452008-10-21 00:08:39 +00001435 do_reboot = True
showard0fc38302008-10-23 00:44:07 +00001436 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
showard21baa452008-10-21 00:08:39 +00001437 num_tests_failed = self.monitor.num_tests_failed()
1438 do_reboot = (self.success and num_tests_failed == 0)
1439
1440 if do_reboot:
1441 for queue_entry in self.queue_entries:
1442 reboot_task = RebootTask(queue_entry.get_host())
1443 self.agent.dispatcher.add_agent(Agent([reboot_task]))
1444
1445
jadmanski0afbb632008-06-06 21:10:57 +00001446 def epilog(self):
1447 super(QueueTask, self).epilog()
jadmanski0afbb632008-06-06 21:10:57 +00001448 for queue_entry in self.queue_entries:
showard97aed502008-11-04 02:01:24 +00001449 # set status to PARSING here so queue entry is marked complete
1450 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
jadmanski0afbb632008-06-06 21:10:57 +00001451 queue_entry.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001452
showard97aed502008-11-04 02:01:24 +00001453 self._finish_task(self.success)
showard21baa452008-10-21 00:08:39 +00001454 self._reboot_hosts()
mblighbb421852008-03-11 22:36:16 +00001455
showard97aed502008-11-04 02:01:24 +00001456 print "queue_task finished with succes=%s" % self.success
mbligh36768f02008-02-22 18:28:33 +00001457
1458
mblighbb421852008-03-11 22:36:16 +00001459class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001460 def __init__(self, job, queue_entries, run_monitor):
1461 super(RecoveryQueueTask, self).__init__(job,
1462 queue_entries, cmd=None)
1463 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001464
1465
jadmanski0afbb632008-06-06 21:10:57 +00001466 def run(self):
1467 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001468
1469
jadmanski0afbb632008-06-06 21:10:57 +00001470 def prolog(self):
1471 # recovering an existing process - don't do prolog
1472 pass
mblighbb421852008-03-11 22:36:16 +00001473
1474
mbligh36768f02008-02-22 18:28:33 +00001475class RebootTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001476 def __init__(self, host):
1477 global _autoserv_path
1478
1479 # Current implementation of autoserv requires control file
1480 # to be passed on reboot action request. TODO: remove when no
1481 # longer appropriate.
1482 self.create_temp_resultsdir('.reboot')
1483 self.cmd = [_autoserv_path, '-b', '-m', host.hostname,
1484 '-r', self.temp_results_dir, '/dev/null']
1485 self.host = host
1486 super(RebootTask, self).__init__(self.cmd,
1487 failure_tasks=[RepairTask(host)])
mbligh16c722d2008-03-05 00:58:44 +00001488
mblighd5c95802008-03-05 00:33:46 +00001489
jadmanski0afbb632008-06-06 21:10:57 +00001490 def prolog(self):
1491 print "starting reboot task for host: %s" % self.host.hostname
1492 self.host.set_status("Rebooting")
mblighd5c95802008-03-05 00:33:46 +00001493
mblighd5c95802008-03-05 00:33:46 +00001494
showard21baa452008-10-21 00:08:39 +00001495 def epilog(self):
1496 super(RebootTask, self).epilog()
1497 self.host.set_status('Ready')
1498 if self.success:
1499 self.host.update_field('dirty', 0)
1500
1501
mblighd5c95802008-03-05 00:33:46 +00001502class AbortTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001503 def __init__(self, queue_entry, agents_to_abort):
1504 self.queue_entry = queue_entry
1505 self.agents_to_abort = agents_to_abort
jadmanski0afbb632008-06-06 21:10:57 +00001506 super(AbortTask, self).__init__('')
mbligh36768f02008-02-22 18:28:33 +00001507
1508
jadmanski0afbb632008-06-06 21:10:57 +00001509 def prolog(self):
1510 print "starting abort on host %s, job %s" % (
1511 self.queue_entry.host_id, self.queue_entry.job_id)
mbligh36768f02008-02-22 18:28:33 +00001512
mblighd64e5702008-04-04 21:39:28 +00001513
jadmanski0afbb632008-06-06 21:10:57 +00001514 def epilog(self):
1515 super(AbortTask, self).epilog()
1516 self.queue_entry.set_status('Aborted')
1517 self.success = True
1518
1519
1520 def run(self):
1521 for agent in self.agents_to_abort:
1522 if (agent.active_task):
1523 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +00001524
1525
showard97aed502008-11-04 02:01:24 +00001526class FinalReparseTask(AgentTask):
1527 MAX_PARSE_PROCESSES = (
1528 global_config.global_config.get_config_value(
1529 _global_config_section, 'max_parse_processes', type=int))
1530 _num_running_parses = 0
1531
1532 def __init__(self, queue_entries):
1533 self._queue_entries = queue_entries
1534 self._parse_started = False
1535
1536 assert len(queue_entries) > 0
1537 queue_entry = queue_entries[0]
1538 job = queue_entry.job
1539
1540 flags = []
1541 if job.is_synchronous():
1542 assert len(queue_entries) == job.num_machines()
1543 else:
1544 assert len(queue_entries) == 1
1545 if job.num_machines() > 1:
1546 flags = ['-l', '2']
1547
1548 if _testing_mode:
1549 self.cmd = 'true'
1550 return
1551
1552 self._results_dir = queue_entry.results_dir()
1553 self.log_file = os.path.abspath(os.path.join(self._results_dir,
1554 '.parse.log'))
1555 super(FinalReparseTask, self).__init__(
1556 cmd=self.generate_parse_command(flags=flags))
1557
1558
1559 @classmethod
1560 def _increment_running_parses(cls):
1561 cls._num_running_parses += 1
1562
1563
1564 @classmethod
1565 def _decrement_running_parses(cls):
1566 cls._num_running_parses -= 1
1567
1568
1569 @classmethod
1570 def _can_run_new_parse(cls):
1571 return cls._num_running_parses < cls.MAX_PARSE_PROCESSES
1572
1573
1574 def prolog(self):
1575 super(FinalReparseTask, self).prolog()
1576 for queue_entry in self._queue_entries:
1577 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
1578
1579
1580 def epilog(self):
1581 super(FinalReparseTask, self).epilog()
1582 final_status = self._determine_final_status()
1583 for queue_entry in self._queue_entries:
1584 queue_entry.set_status(final_status)
1585
1586
1587 def _determine_final_status(self):
1588 # use a PidfileRunMonitor to read the autoserv exit status
1589 monitor = PidfileRunMonitor(self._results_dir)
1590 if monitor.exit_code() == 0:
1591 return models.HostQueueEntry.Status.COMPLETED
1592 return models.HostQueueEntry.Status.FAILED
1593
1594
1595 def generate_parse_command(self, flags=[]):
1596 parse = os.path.abspath(os.path.join(AUTOTEST_TKO_DIR, 'parse'))
1597 return [parse] + flags + ['-r', '-o', self._results_dir]
1598
1599
1600 def poll(self):
1601 # override poll to keep trying to start until the parse count goes down
1602 # and we can, at which point we revert to default behavior
1603 if self._parse_started:
1604 super(FinalReparseTask, self).poll()
1605 else:
1606 self._try_starting_parse()
1607
1608
1609 def run(self):
1610 # override run() to not actually run unless we can
1611 self._try_starting_parse()
1612
1613
1614 def _try_starting_parse(self):
1615 if not self._can_run_new_parse():
1616 return
1617 # actually run the parse command
1618 super(FinalReparseTask, self).run()
1619 self._increment_running_parses()
1620 self._parse_started = True
1621
1622
1623 def finished(self, success):
1624 super(FinalReparseTask, self).finished(success)
1625 self._decrement_running_parses()
1626
1627
mbligh36768f02008-02-22 18:28:33 +00001628class DBObject(object):
jadmanski0afbb632008-06-06 21:10:57 +00001629 def __init__(self, id=None, row=None, new_record=False):
1630 assert (bool(id) != bool(row))
mbligh36768f02008-02-22 18:28:33 +00001631
jadmanski0afbb632008-06-06 21:10:57 +00001632 self.__table = self._get_table()
1633 fields = self._fields()
mbligh36768f02008-02-22 18:28:33 +00001634
jadmanski0afbb632008-06-06 21:10:57 +00001635 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00001636
jadmanski0afbb632008-06-06 21:10:57 +00001637 if row is None:
1638 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
1639 rows = _db.execute(sql, (id,))
1640 if len(rows) == 0:
1641 raise "row not found (table=%s, id=%s)" % \
1642 (self.__table, id)
1643 row = rows[0]
mbligh36768f02008-02-22 18:28:33 +00001644
jadmanski0afbb632008-06-06 21:10:57 +00001645 assert len(row) == self.num_cols(), (
1646 "table = %s, row = %s/%d, fields = %s/%d" % (
1647 self.__table, row, len(row), fields, self.num_cols()))
mbligh36768f02008-02-22 18:28:33 +00001648
jadmanski0afbb632008-06-06 21:10:57 +00001649 self.__valid_fields = {}
1650 for i,value in enumerate(row):
1651 self.__dict__[fields[i]] = value
1652 self.__valid_fields[fields[i]] = True
mbligh36768f02008-02-22 18:28:33 +00001653
jadmanski0afbb632008-06-06 21:10:57 +00001654 del self.__valid_fields['id']
mbligh36768f02008-02-22 18:28:33 +00001655
mblighe2586682008-02-29 22:45:46 +00001656
jadmanski0afbb632008-06-06 21:10:57 +00001657 @classmethod
1658 def _get_table(cls):
1659 raise NotImplementedError('Subclasses must override this')
mblighe2586682008-02-29 22:45:46 +00001660
1661
jadmanski0afbb632008-06-06 21:10:57 +00001662 @classmethod
1663 def _fields(cls):
1664 raise NotImplementedError('Subclasses must override this')
showard04c82c52008-05-29 19:38:12 +00001665
1666
jadmanski0afbb632008-06-06 21:10:57 +00001667 @classmethod
1668 def num_cols(cls):
1669 return len(cls._fields())
showard04c82c52008-05-29 19:38:12 +00001670
1671
jadmanski0afbb632008-06-06 21:10:57 +00001672 def count(self, where, table = None):
1673 if not table:
1674 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00001675
jadmanski0afbb632008-06-06 21:10:57 +00001676 rows = _db.execute("""
1677 SELECT count(*) FROM %s
1678 WHERE %s
1679 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00001680
jadmanski0afbb632008-06-06 21:10:57 +00001681 assert len(rows) == 1
1682
1683 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001684
1685
mblighf8c624d2008-07-03 16:58:45 +00001686 def update_field(self, field, value, condition=''):
jadmanski0afbb632008-06-06 21:10:57 +00001687 assert self.__valid_fields[field]
mbligh36768f02008-02-22 18:28:33 +00001688
jadmanski0afbb632008-06-06 21:10:57 +00001689 if self.__dict__[field] == value:
1690 return
mbligh36768f02008-02-22 18:28:33 +00001691
mblighf8c624d2008-07-03 16:58:45 +00001692 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
1693 if condition:
1694 query += ' AND (%s)' % condition
jadmanski0afbb632008-06-06 21:10:57 +00001695 _db.execute(query, (value, self.id))
1696
1697 self.__dict__[field] = value
mbligh36768f02008-02-22 18:28:33 +00001698
1699
jadmanski0afbb632008-06-06 21:10:57 +00001700 def save(self):
1701 if self.__new_record:
1702 keys = self._fields()[1:] # avoid id
1703 columns = ','.join([str(key) for key in keys])
1704 values = ['"%s"' % self.__dict__[key] for key in keys]
1705 values = ','.join(values)
1706 query = """INSERT INTO %s (%s) VALUES (%s)""" % \
1707 (self.__table, columns, values)
1708 _db.execute(query)
mbligh36768f02008-02-22 18:28:33 +00001709
1710
jadmanski0afbb632008-06-06 21:10:57 +00001711 def delete(self):
1712 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
1713 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00001714
1715
showard63a34772008-08-18 19:32:50 +00001716 @staticmethod
1717 def _prefix_with(string, prefix):
1718 if string:
1719 string = prefix + string
1720 return string
1721
1722
jadmanski0afbb632008-06-06 21:10:57 +00001723 @classmethod
showard989f25d2008-10-01 11:38:11 +00001724 def fetch(cls, where='', params=(), joins='', order_by=''):
showard63a34772008-08-18 19:32:50 +00001725 order_by = cls._prefix_with(order_by, 'ORDER BY ')
1726 where = cls._prefix_with(where, 'WHERE ')
1727 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
1728 '%(where)s %(order_by)s' % {'table' : cls._get_table(),
1729 'joins' : joins,
1730 'where' : where,
1731 'order_by' : order_by})
1732 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00001733 for row in rows:
1734 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00001735
mbligh36768f02008-02-22 18:28:33 +00001736
1737class IneligibleHostQueue(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001738 def __init__(self, id=None, row=None, new_record=None):
1739 super(IneligibleHostQueue, self).__init__(id=id, row=row,
1740 new_record=new_record)
mblighe2586682008-02-29 22:45:46 +00001741
1742
jadmanski0afbb632008-06-06 21:10:57 +00001743 @classmethod
1744 def _get_table(cls):
1745 return 'ineligible_host_queues'
mbligh36768f02008-02-22 18:28:33 +00001746
1747
jadmanski0afbb632008-06-06 21:10:57 +00001748 @classmethod
1749 def _fields(cls):
1750 return ['id', 'job_id', 'host_id']
showard04c82c52008-05-29 19:38:12 +00001751
1752
showard989f25d2008-10-01 11:38:11 +00001753class Label(DBObject):
1754 @classmethod
1755 def _get_table(cls):
1756 return 'labels'
1757
1758
1759 @classmethod
1760 def _fields(cls):
1761 return ['id', 'name', 'kernel_config', 'platform', 'invalid',
1762 'only_if_needed']
1763
1764
mbligh36768f02008-02-22 18:28:33 +00001765class Host(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001766 def __init__(self, id=None, row=None):
1767 super(Host, self).__init__(id=id, row=row)
mblighe2586682008-02-29 22:45:46 +00001768
1769
jadmanski0afbb632008-06-06 21:10:57 +00001770 @classmethod
1771 def _get_table(cls):
1772 return 'hosts'
mbligh36768f02008-02-22 18:28:33 +00001773
1774
jadmanski0afbb632008-06-06 21:10:57 +00001775 @classmethod
1776 def _fields(cls):
1777 return ['id', 'hostname', 'locked', 'synch_id','status',
showard21baa452008-10-21 00:08:39 +00001778 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty']
showard04c82c52008-05-29 19:38:12 +00001779
1780
jadmanski0afbb632008-06-06 21:10:57 +00001781 def current_task(self):
1782 rows = _db.execute("""
1783 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
1784 """, (self.id,))
1785
1786 if len(rows) == 0:
1787 return None
1788 else:
1789 assert len(rows) == 1
1790 results = rows[0];
mblighf8c624d2008-07-03 16:58:45 +00001791# print "current = %s" % results
jadmanski0afbb632008-06-06 21:10:57 +00001792 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00001793
1794
jadmanski0afbb632008-06-06 21:10:57 +00001795 def yield_work(self):
1796 print "%s yielding work" % self.hostname
1797 if self.current_task():
1798 self.current_task().requeue()
1799
1800 def set_status(self,status):
1801 print '%s -> %s' % (self.hostname, status)
1802 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00001803
1804
showardd8e548a2008-09-09 03:04:57 +00001805 def labels(self):
1806 """
1807 Fetch a list of names of all non-platform labels associated with this
1808 host.
1809 """
1810 rows = _db.execute("""
1811 SELECT labels.name
1812 FROM labels
1813 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
1814 WHERE NOT labels.platform AND hosts_labels.host_id = %s
1815 ORDER BY labels.name
1816 """, (self.id,))
1817 return [row[0] for row in rows]
1818
1819
mbligh36768f02008-02-22 18:28:33 +00001820class HostQueueEntry(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001821 def __init__(self, id=None, row=None):
1822 assert id or row
1823 super(HostQueueEntry, self).__init__(id=id, row=row)
1824 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00001825
jadmanski0afbb632008-06-06 21:10:57 +00001826 if self.host_id:
1827 self.host = Host(self.host_id)
1828 else:
1829 self.host = None
mbligh36768f02008-02-22 18:28:33 +00001830
jadmanski0afbb632008-06-06 21:10:57 +00001831 self.queue_log_path = os.path.join(self.job.results_dir(),
1832 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00001833
1834
jadmanski0afbb632008-06-06 21:10:57 +00001835 @classmethod
1836 def _get_table(cls):
1837 return 'host_queue_entries'
mblighe2586682008-02-29 22:45:46 +00001838
1839
jadmanski0afbb632008-06-06 21:10:57 +00001840 @classmethod
1841 def _fields(cls):
1842 return ['id', 'job_id', 'host_id', 'priority', 'status',
showardb8471e32008-07-03 19:51:08 +00001843 'meta_host', 'active', 'complete', 'deleted']
showard04c82c52008-05-29 19:38:12 +00001844
1845
jadmanski0afbb632008-06-06 21:10:57 +00001846 def set_host(self, host):
1847 if host:
1848 self.queue_log_record('Assigning host ' + host.hostname)
1849 self.update_field('host_id', host.id)
1850 self.update_field('active', True)
1851 self.block_host(host.id)
1852 else:
1853 self.queue_log_record('Releasing host')
1854 self.unblock_host(self.host.id)
1855 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00001856
jadmanski0afbb632008-06-06 21:10:57 +00001857 self.host = host
mbligh36768f02008-02-22 18:28:33 +00001858
1859
jadmanski0afbb632008-06-06 21:10:57 +00001860 def get_host(self):
1861 return self.host
mbligh36768f02008-02-22 18:28:33 +00001862
1863
jadmanski0afbb632008-06-06 21:10:57 +00001864 def queue_log_record(self, log_line):
1865 now = str(datetime.datetime.now())
1866 queue_log = open(self.queue_log_path, 'a', 0)
1867 queue_log.write(now + ' ' + log_line + '\n')
1868 queue_log.close()
mbligh36768f02008-02-22 18:28:33 +00001869
1870
jadmanski0afbb632008-06-06 21:10:57 +00001871 def block_host(self, host_id):
1872 print "creating block %s/%s" % (self.job.id, host_id)
1873 row = [0, self.job.id, host_id]
1874 block = IneligibleHostQueue(row=row, new_record=True)
1875 block.save()
mblighe2586682008-02-29 22:45:46 +00001876
1877
jadmanski0afbb632008-06-06 21:10:57 +00001878 def unblock_host(self, host_id):
1879 print "removing block %s/%s" % (self.job.id, host_id)
1880 blocks = IneligibleHostQueue.fetch(
1881 'job_id=%d and host_id=%d' % (self.job.id, host_id))
1882 for block in blocks:
1883 block.delete()
mblighe2586682008-02-29 22:45:46 +00001884
1885
jadmanski0afbb632008-06-06 21:10:57 +00001886 def results_dir(self):
1887 if self.job.is_synchronous() or self.job.num_machines() == 1:
1888 return self.job.job_dir
1889 else:
1890 assert self.host
1891 return os.path.join(self.job.job_dir,
1892 self.host.hostname)
mbligh36768f02008-02-22 18:28:33 +00001893
mblighe2586682008-02-29 22:45:46 +00001894
jadmanski0afbb632008-06-06 21:10:57 +00001895 def verify_results_dir(self):
1896 if self.job.is_synchronous() or self.job.num_machines() > 1:
1897 assert self.host
1898 return os.path.join(self.job.job_dir,
1899 self.host.hostname)
1900 else:
1901 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +00001902
1903
jadmanski0afbb632008-06-06 21:10:57 +00001904 def set_status(self, status):
mblighf8c624d2008-07-03 16:58:45 +00001905 abort_statuses = ['Abort', 'Aborting', 'Aborted']
1906 if status not in abort_statuses:
1907 condition = ' AND '.join(['status <> "%s"' % x
1908 for x in abort_statuses])
1909 else:
1910 condition = ''
1911 self.update_field('status', status, condition=condition)
1912
jadmanski0afbb632008-06-06 21:10:57 +00001913 if self.host:
1914 hostname = self.host.hostname
1915 else:
1916 hostname = 'no host'
1917 print "%s/%d status -> %s" % (hostname, self.id, self.status)
mblighf8c624d2008-07-03 16:58:45 +00001918
jadmanski0afbb632008-06-06 21:10:57 +00001919 if status in ['Queued']:
1920 self.update_field('complete', False)
1921 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00001922
jadmanski0afbb632008-06-06 21:10:57 +00001923 if status in ['Pending', 'Running', 'Verifying', 'Starting',
1924 'Abort', 'Aborting']:
1925 self.update_field('complete', False)
1926 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00001927
showard97aed502008-11-04 02:01:24 +00001928 if status in ['Failed', 'Completed', 'Stopped', 'Aborted', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00001929 self.update_field('complete', True)
1930 self.update_field('active', False)
showard542e8402008-09-19 20:16:18 +00001931 self._email_on_job_complete()
1932
1933
1934 def _email_on_job_complete(self):
1935 url = "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
1936
1937 if self.job.is_finished():
1938 subject = "Autotest: Job ID: %s \"%s\" Completed" % (
1939 self.job.id, self.job.name)
1940 body = "Job ID: %s\nJob Name: %s\n%s\n" % (
1941 self.job.id, self.job.name, url)
1942 send_email(_email_from, self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00001943
1944
jadmanski0afbb632008-06-06 21:10:57 +00001945 def run(self,assigned_host=None):
1946 if self.meta_host:
1947 assert assigned_host
1948 # ensure results dir exists for the queue log
1949 self.job.create_results_dir()
1950 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00001951
jadmanski0afbb632008-06-06 21:10:57 +00001952 print "%s/%s scheduled on %s, status=%s" % (self.job.name,
1953 self.meta_host, self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00001954
jadmanski0afbb632008-06-06 21:10:57 +00001955 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00001956
jadmanski0afbb632008-06-06 21:10:57 +00001957 def requeue(self):
1958 self.set_status('Queued')
mblighe2586682008-02-29 22:45:46 +00001959
jadmanski0afbb632008-06-06 21:10:57 +00001960 if self.meta_host:
1961 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00001962
1963
jadmanski0afbb632008-06-06 21:10:57 +00001964 def handle_host_failure(self):
1965 """\
1966 Called when this queue entry's host has failed verification and
1967 repair.
1968 """
1969 assert not self.meta_host
1970 self.set_status('Failed')
1971 if self.job.is_synchronous():
1972 self.job.stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00001973
1974
jadmanski0afbb632008-06-06 21:10:57 +00001975 def clear_results_dir(self, results_dir=None, dont_delete_files=False):
1976 results_dir = results_dir or self.results_dir()
1977 if not os.path.exists(results_dir):
1978 return
1979 if dont_delete_files:
1980 temp_dir = tempfile.mkdtemp(suffix='.clear_results')
1981 print 'Moving results from %s to %s' % (results_dir,
1982 temp_dir)
1983 for filename in os.listdir(results_dir):
1984 path = os.path.join(results_dir, filename)
1985 if dont_delete_files:
1986 shutil.move(path,
1987 os.path.join(temp_dir, filename))
1988 else:
1989 remove_file_or_dir(path)
mbligh36768f02008-02-22 18:28:33 +00001990
1991
jadmanskif7fa2cc2008-10-01 14:13:23 +00001992 @property
1993 def aborted_by(self):
1994 self._load_abort_info()
1995 return self._aborted_by
1996
1997
1998 @property
1999 def aborted_on(self):
2000 self._load_abort_info()
2001 return self._aborted_on
2002
2003
2004 def _load_abort_info(self):
2005 """ Fetch info about who aborted the job. """
2006 if hasattr(self, "_aborted_by"):
2007 return
2008 rows = _db.execute("""
2009 SELECT users.login, aborted_host_queue_entries.aborted_on
2010 FROM aborted_host_queue_entries
2011 INNER JOIN users
2012 ON users.id = aborted_host_queue_entries.aborted_by_id
2013 WHERE aborted_host_queue_entries.queue_entry_id = %s
2014 """, (self.id,))
2015 if rows:
2016 self._aborted_by, self._aborted_on = rows[0]
2017 else:
2018 self._aborted_by = self._aborted_on = None
2019
2020
showardb2e2c322008-10-14 17:33:55 +00002021 def on_pending(self):
2022 """
2023 Called when an entry in a synchronous job has passed verify. If the
2024 job is ready to run, returns an agent to run the job. Returns None
2025 otherwise.
2026 """
2027 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002028 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00002029 if self.job.is_ready():
2030 return self.job.run(self)
2031 return None
2032
2033
showard1be97432008-10-17 15:30:45 +00002034 def abort(self, agents_to_abort=[]):
2035 abort_task = AbortTask(self, agents_to_abort)
2036 tasks = [abort_task]
2037
2038 host = self.get_host()
2039 if host:
2040 reboot_task = RebootTask(host)
2041 verify_task = VerifyTask(host=host)
2042 # just to make sure this host does not get taken away
2043 host.set_status('Rebooting')
2044 tasks += [reboot_task, verify_task]
2045
2046 self.set_status('Aborting')
2047 return Agent(tasks=tasks, queue_entry_ids=[self.id])
2048
2049
mbligh36768f02008-02-22 18:28:33 +00002050class Job(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00002051 def __init__(self, id=None, row=None):
2052 assert id or row
2053 super(Job, self).__init__(id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00002054
jadmanski0afbb632008-06-06 21:10:57 +00002055 self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id,
2056 self.owner))
mblighe2586682008-02-29 22:45:46 +00002057
2058
jadmanski0afbb632008-06-06 21:10:57 +00002059 @classmethod
2060 def _get_table(cls):
2061 return 'jobs'
mbligh36768f02008-02-22 18:28:33 +00002062
2063
jadmanski0afbb632008-06-06 21:10:57 +00002064 @classmethod
2065 def _fields(cls):
2066 return ['id', 'owner', 'name', 'priority', 'control_file',
2067 'control_type', 'created_on', 'synch_type',
showard542e8402008-09-19 20:16:18 +00002068 'synch_count', 'synchronizing', 'timeout',
showard21baa452008-10-21 00:08:39 +00002069 'run_verify', 'email_list', 'reboot_before', 'reboot_after']
showard04c82c52008-05-29 19:38:12 +00002070
2071
jadmanski0afbb632008-06-06 21:10:57 +00002072 def is_server_job(self):
2073 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002074
2075
jadmanski0afbb632008-06-06 21:10:57 +00002076 def get_host_queue_entries(self):
2077 rows = _db.execute("""
2078 SELECT * FROM host_queue_entries
2079 WHERE job_id= %s
2080 """, (self.id,))
2081 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002082
jadmanski0afbb632008-06-06 21:10:57 +00002083 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002084
jadmanski0afbb632008-06-06 21:10:57 +00002085 return entries
mbligh36768f02008-02-22 18:28:33 +00002086
2087
jadmanski0afbb632008-06-06 21:10:57 +00002088 def set_status(self, status, update_queues=False):
2089 self.update_field('status',status)
2090
2091 if update_queues:
2092 for queue_entry in self.get_host_queue_entries():
2093 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002094
2095
jadmanski0afbb632008-06-06 21:10:57 +00002096 def is_synchronous(self):
2097 return self.synch_type == 2
mbligh36768f02008-02-22 18:28:33 +00002098
2099
jadmanski0afbb632008-06-06 21:10:57 +00002100 def is_ready(self):
2101 if not self.is_synchronous():
2102 return True
2103 sql = "job_id=%s AND status='Pending'" % self.id
2104 count = self.count(sql, table='host_queue_entries')
showardb2e2c322008-10-14 17:33:55 +00002105 return (count == self.num_machines())
mbligh36768f02008-02-22 18:28:33 +00002106
2107
jadmanski0afbb632008-06-06 21:10:57 +00002108 def results_dir(self):
2109 return self.job_dir
mbligh36768f02008-02-22 18:28:33 +00002110
jadmanski0afbb632008-06-06 21:10:57 +00002111 def num_machines(self, clause = None):
2112 sql = "job_id=%s" % self.id
2113 if clause:
2114 sql += " AND (%s)" % clause
2115 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002116
2117
jadmanski0afbb632008-06-06 21:10:57 +00002118 def num_queued(self):
2119 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002120
2121
jadmanski0afbb632008-06-06 21:10:57 +00002122 def num_active(self):
2123 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002124
2125
jadmanski0afbb632008-06-06 21:10:57 +00002126 def num_complete(self):
2127 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002128
2129
jadmanski0afbb632008-06-06 21:10:57 +00002130 def is_finished(self):
2131 left = self.num_queued()
2132 print "%s: %s machines left" % (self.name, left)
2133 return left==0
mbligh36768f02008-02-22 18:28:33 +00002134
mbligh36768f02008-02-22 18:28:33 +00002135
jadmanski0afbb632008-06-06 21:10:57 +00002136 def stop_all_entries(self):
2137 for child_entry in self.get_host_queue_entries():
2138 if not child_entry.complete:
2139 child_entry.set_status('Stopped')
mblighe2586682008-02-29 22:45:46 +00002140
2141
jadmanski0afbb632008-06-06 21:10:57 +00002142 def write_to_machines_file(self, queue_entry):
2143 hostname = queue_entry.get_host().hostname
2144 print "writing %s to job %s machines file" % (hostname, self.id)
2145 file_path = os.path.join(self.job_dir, '.machines')
2146 mf = open(file_path, 'a')
2147 mf.write("%s\n" % queue_entry.get_host().hostname)
2148 mf.close()
mbligh36768f02008-02-22 18:28:33 +00002149
2150
jadmanski0afbb632008-06-06 21:10:57 +00002151 def create_results_dir(self, queue_entry=None):
2152 print "create: active: %s complete %s" % (self.num_active(),
2153 self.num_complete())
mbligh36768f02008-02-22 18:28:33 +00002154
jadmanski0afbb632008-06-06 21:10:57 +00002155 if not os.path.exists(self.job_dir):
2156 os.makedirs(self.job_dir)
mbligh36768f02008-02-22 18:28:33 +00002157
jadmanski0afbb632008-06-06 21:10:57 +00002158 if queue_entry:
showarde05654d2008-10-28 20:38:40 +00002159 results_dir = queue_entry.results_dir()
2160 if not os.path.exists(results_dir):
2161 os.makedirs(results_dir)
2162 return results_dir
jadmanski0afbb632008-06-06 21:10:57 +00002163 return self.job_dir
mbligh36768f02008-02-22 18:28:33 +00002164
2165
showardb2e2c322008-10-14 17:33:55 +00002166 def _write_control_file(self):
2167 'Writes control file out to disk, returns a filename'
2168 control_fd, control_filename = tempfile.mkstemp(suffix='.control_file')
2169 control_file = os.fdopen(control_fd, 'w')
jadmanski0afbb632008-06-06 21:10:57 +00002170 if self.control_file:
showardb2e2c322008-10-14 17:33:55 +00002171 control_file.write(self.control_file)
2172 control_file.close()
2173 return control_filename
mbligh36768f02008-02-22 18:28:33 +00002174
showardb2e2c322008-10-14 17:33:55 +00002175
2176 def _get_job_tag(self, queue_entries):
2177 base_job_tag = "%s-%s" % (self.id, self.owner)
2178 if self.is_synchronous() or self.num_machines() == 1:
2179 return base_job_tag
jadmanski0afbb632008-06-06 21:10:57 +00002180 else:
showardb2e2c322008-10-14 17:33:55 +00002181 return base_job_tag + '/' + queue_entries[0].get_host().hostname
2182
2183
2184 def _get_autoserv_params(self, queue_entries):
2185 results_dir = self.create_results_dir(queue_entries[0])
2186 control_filename = self._write_control_file()
jadmanski0afbb632008-06-06 21:10:57 +00002187 hostnames = ','.join([entry.get_host().hostname
2188 for entry in queue_entries])
showardb2e2c322008-10-14 17:33:55 +00002189 job_tag = self._get_job_tag(queue_entries)
mbligh36768f02008-02-22 18:28:33 +00002190
showardb2e2c322008-10-14 17:33:55 +00002191 params = [_autoserv_path, '-P', job_tag, '-p', '-n',
showard21baa452008-10-21 00:08:39 +00002192 '-r', os.path.abspath(results_dir), '-u', self.owner,
2193 '-l', self.name, '-m', hostnames, control_filename]
mbligh36768f02008-02-22 18:28:33 +00002194
jadmanski0afbb632008-06-06 21:10:57 +00002195 if not self.is_server_job():
2196 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002197
showardb2e2c322008-10-14 17:33:55 +00002198 return params
mblighe2586682008-02-29 22:45:46 +00002199
mbligh36768f02008-02-22 18:28:33 +00002200
showard21baa452008-10-21 00:08:39 +00002201 def _get_pre_job_tasks(self, queue_entry, verify_task_class=VerifyTask):
2202 do_reboot = False
showard0fc38302008-10-23 00:44:07 +00002203 if self.reboot_before == models.RebootBefore.ALWAYS:
showard21baa452008-10-21 00:08:39 +00002204 do_reboot = True
showard0fc38302008-10-23 00:44:07 +00002205 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showard21baa452008-10-21 00:08:39 +00002206 do_reboot = queue_entry.get_host().dirty
2207
2208 tasks = []
2209 if do_reboot:
2210 tasks.append(RebootTask(queue_entry.get_host()))
2211 tasks.append(verify_task_class(queue_entry=queue_entry))
2212 return tasks
2213
2214
showardb2e2c322008-10-14 17:33:55 +00002215 def _run_synchronous(self, queue_entry):
2216 if not self.is_ready():
showard9976ce92008-10-15 20:28:13 +00002217 if self.run_verify:
showard21baa452008-10-21 00:08:39 +00002218 return Agent(self._get_pre_job_tasks(queue_entry,
2219 VerifySynchronousTask),
2220 [queue_entry.id])
showard9976ce92008-10-15 20:28:13 +00002221 else:
2222 return queue_entry.on_pending()
mbligh36768f02008-02-22 18:28:33 +00002223
showardb2e2c322008-10-14 17:33:55 +00002224 return self._finish_run(self.get_host_queue_entries())
2225
2226
2227 def _run_asynchronous(self, queue_entry):
showard9976ce92008-10-15 20:28:13 +00002228 initial_tasks = []
2229 if self.run_verify:
showard21baa452008-10-21 00:08:39 +00002230 initial_tasks = self._get_pre_job_tasks(queue_entry)
showardb2e2c322008-10-14 17:33:55 +00002231 return self._finish_run([queue_entry], initial_tasks)
2232
2233
2234 def _finish_run(self, queue_entries, initial_tasks=[]):
showardb2ccdda2008-10-28 20:39:05 +00002235 for queue_entry in queue_entries:
2236 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002237 params = self._get_autoserv_params(queue_entries)
2238 queue_task = QueueTask(job=self, queue_entries=queue_entries,
2239 cmd=params)
2240 tasks = initial_tasks + [queue_task]
2241 entry_ids = [entry.id for entry in queue_entries]
2242
2243 return Agent(tasks, entry_ids, num_processes=len(queue_entries))
2244
2245
2246 def run(self, queue_entry):
2247 if self.is_synchronous():
2248 return self._run_synchronous(queue_entry)
2249 return self._run_asynchronous(queue_entry)
mbligh36768f02008-02-22 18:28:33 +00002250
2251
2252if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002253 main()