blob: 573776d32f855ab4c50cb82aa775a2a5a83daf86 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard542e8402008-09-19 20:16:18 +00008import datetime, errno, MySQLdb, optparse, os, pwd, Queue, re, shutil, signal
9import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
mbligh70feeee2008-06-11 16:20:49 +000010import common
showard542e8402008-09-19 20:16:18 +000011from autotest_lib.client.common_lib import global_config
12from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000013from autotest_lib.database import database_connection
mbligh70feeee2008-06-11 16:20:49 +000014
mblighb090f142008-02-27 21:33:46 +000015
mbligh36768f02008-02-22 18:28:33 +000016RESULTS_DIR = '.'
17AUTOSERV_NICE_LEVEL = 10
showardb1e51872008-10-07 11:08:18 +000018CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000019
20AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
21
22if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000023 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000024AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
25AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
26
27if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000028 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000029
mblighbb421852008-03-11 22:36:16 +000030AUTOSERV_PID_FILE = '.autoserv_execute'
mbligh90a549d2008-03-25 23:52:34 +000031# how long to wait for autoserv to write a pidfile
32PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000033
mbligh6f8bab42008-02-29 22:45:14 +000034_db = None
mbligh36768f02008-02-22 18:28:33 +000035_shutdown = False
36_notify_email = None
mbligh4314a712008-02-29 22:44:30 +000037_autoserv_path = 'autoserv'
38_testing_mode = False
showardec113162008-05-08 00:52:49 +000039_global_config_section = 'SCHEDULER'
showard542e8402008-09-19 20:16:18 +000040_base_url = None
41# see os.getlogin() online docs
42_email_from = pwd.getpwuid(os.getuid())[0]
mbligh36768f02008-02-22 18:28:33 +000043
44
45def main():
jadmanski0afbb632008-06-06 21:10:57 +000046 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000047
jadmanski0afbb632008-06-06 21:10:57 +000048 parser = optparse.OptionParser(usage)
49 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
50 action='store_true')
51 parser.add_option('--logfile', help='Set a log file that all stdout ' +
52 'should be redirected to. Stderr will go to this ' +
53 'file + ".err"')
54 parser.add_option('--test', help='Indicate that scheduler is under ' +
55 'test and should use dummy autoserv and no parsing',
56 action='store_true')
57 (options, args) = parser.parse_args()
58 if len(args) != 1:
59 parser.print_usage()
60 return
mbligh36768f02008-02-22 18:28:33 +000061
jadmanski0afbb632008-06-06 21:10:57 +000062 global RESULTS_DIR
63 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +000064
jadmanski0afbb632008-06-06 21:10:57 +000065 # read in notify_email from global_config
66 c = global_config.global_config
67 global _notify_email
68 val = c.get_config_value(_global_config_section, "notify_email")
69 if val != "":
70 _notify_email = val
mbligh36768f02008-02-22 18:28:33 +000071
showard3bb499f2008-07-03 19:42:20 +000072 tick_pause = c.get_config_value(
73 _global_config_section, 'tick_pause_sec', type=int)
74
jadmanski0afbb632008-06-06 21:10:57 +000075 if options.test:
76 global _autoserv_path
77 _autoserv_path = 'autoserv_dummy'
78 global _testing_mode
79 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +000080
showard542e8402008-09-19 20:16:18 +000081 # read in base url
82 global _base_url
showardb1e51872008-10-07 11:08:18 +000083 val = c.get_config_value(CONFIG_SECTION, "base_url")
showard542e8402008-09-19 20:16:18 +000084 if val:
85 _base_url = val
86 else:
87 _base_url = "http://your_autotest_server/afe/"
88
jadmanski0afbb632008-06-06 21:10:57 +000089 init(options.logfile)
90 dispatcher = Dispatcher()
91 dispatcher.do_initial_recovery(recover_hosts=options.recover_hosts)
92
93 try:
94 while not _shutdown:
95 dispatcher.tick()
showard3bb499f2008-07-03 19:42:20 +000096 time.sleep(tick_pause)
jadmanski0afbb632008-06-06 21:10:57 +000097 except:
98 log_stacktrace("Uncaught exception; terminating monitor_db")
99
100 email_manager.send_queued_emails()
101 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000102
103
104def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000105 global _shutdown
106 _shutdown = True
107 print "Shutdown request received."
mbligh36768f02008-02-22 18:28:33 +0000108
109
110def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000111 if logfile:
112 enable_logging(logfile)
113 print "%s> dispatcher starting" % time.strftime("%X %x")
114 print "My PID is %d" % os.getpid()
mbligh36768f02008-02-22 18:28:33 +0000115
showardb1e51872008-10-07 11:08:18 +0000116 if _testing_mode:
117 global_config.global_config.override_config_value(
118 CONFIG_SECTION, 'database', 'stresstest_autotest_web')
119
jadmanski0afbb632008-06-06 21:10:57 +0000120 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
121 global _db
showardb1e51872008-10-07 11:08:18 +0000122 _db = database_connection.DatabaseConnection(CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000123 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000124
jadmanski0afbb632008-06-06 21:10:57 +0000125 print "Setting signal handler"
126 signal.signal(signal.SIGINT, handle_sigint)
127
128 print "Connected! Running..."
mbligh36768f02008-02-22 18:28:33 +0000129
130
131def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000132 out_file = logfile
133 err_file = "%s.err" % logfile
134 print "Enabling logging to %s (%s)" % (out_file, err_file)
135 out_fd = open(out_file, "a", buffering=0)
136 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000137
jadmanski0afbb632008-06-06 21:10:57 +0000138 os.dup2(out_fd.fileno(), sys.stdout.fileno())
139 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000140
jadmanski0afbb632008-06-06 21:10:57 +0000141 sys.stdout = out_fd
142 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000143
144
mblighd5c95802008-03-05 00:33:46 +0000145def queue_entries_to_abort():
jadmanski0afbb632008-06-06 21:10:57 +0000146 rows = _db.execute("""
147 SELECT * FROM host_queue_entries WHERE status='Abort';
148 """)
149 qe = [HostQueueEntry(row=i) for i in rows]
150 return qe
mbligh36768f02008-02-22 18:28:33 +0000151
mblighe2586682008-02-29 22:45:46 +0000152def remove_file_or_dir(path):
jadmanski0afbb632008-06-06 21:10:57 +0000153 if stat.S_ISDIR(os.stat(path).st_mode):
154 # directory
155 shutil.rmtree(path)
156 else:
157 # file
158 os.remove(path)
mblighe2586682008-02-29 22:45:46 +0000159
160
mblighdbdac6c2008-03-05 15:49:58 +0000161def generate_parse_command(results_dir, flags=""):
jadmanski0afbb632008-06-06 21:10:57 +0000162 parse = os.path.abspath(os.path.join(AUTOTEST_TKO_DIR, 'parse'))
163 output = os.path.abspath(os.path.join(results_dir, '.parse.log'))
164 cmd = "%s %s -r -o %s > %s 2>&1 &"
165 return cmd % (parse, flags, results_dir, output)
mblighdbdac6c2008-03-05 15:49:58 +0000166
167
showard970a6db2008-09-03 20:02:39 +0000168_parse_command_queue = []
mbligh36768f02008-02-22 18:28:33 +0000169def parse_results(results_dir, flags=""):
jadmanski0afbb632008-06-06 21:10:57 +0000170 if _testing_mode:
171 return
showard970a6db2008-09-03 20:02:39 +0000172 _parse_command_queue.append(generate_parse_command(results_dir, flags))
mbligh36768f02008-02-22 18:28:33 +0000173
174
mblighbb421852008-03-11 22:36:16 +0000175
176
mbligh36768f02008-02-22 18:28:33 +0000177def log_stacktrace(reason):
jadmanski0afbb632008-06-06 21:10:57 +0000178 (type, value, tb) = sys.exc_info()
179 str = "EXCEPTION: %s\n" % reason
180 str += ''.join(traceback.format_exception(type, value, tb))
mbligh36768f02008-02-22 18:28:33 +0000181
jadmanski0afbb632008-06-06 21:10:57 +0000182 sys.stderr.write("\n%s\n" % str)
183 email_manager.enqueue_notify_email("monitor_db exception", str)
mbligh36768f02008-02-22 18:28:33 +0000184
mblighbb421852008-03-11 22:36:16 +0000185
186def get_proc_poll_fn(pid):
jadmanski0afbb632008-06-06 21:10:57 +0000187 proc_path = os.path.join('/proc', str(pid))
188 def poll_fn():
189 if os.path.exists(proc_path):
190 return None
191 return 0 # we can't get a real exit code
192 return poll_fn
mblighbb421852008-03-11 22:36:16 +0000193
194
showard542e8402008-09-19 20:16:18 +0000195def send_email(from_addr, to_string, subject, body):
196 """Mails out emails to the addresses listed in to_string.
197
198 to_string is split into a list which can be delimited by any of:
199 ';', ',', ':' or any whitespace
200 """
201
202 # Create list from string removing empty strings from the list.
203 to_list = [x for x in re.split('\s|,|;|:', to_string) if x]
showard7d182aa2008-09-22 16:17:24 +0000204 if not to_list:
205 return
206
showard542e8402008-09-19 20:16:18 +0000207 msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (
208 from_addr, ', '.join(to_list), subject, body)
showard7d182aa2008-09-22 16:17:24 +0000209 try:
210 mailer = smtplib.SMTP('localhost')
211 try:
212 mailer.sendmail(from_addr, to_list, msg)
213 finally:
214 mailer.quit()
215 except Exception, e:
216 print "Sending email failed. Reason: %s" % repr(e)
showard542e8402008-09-19 20:16:18 +0000217
218
mblighbb421852008-03-11 22:36:16 +0000219def kill_autoserv(pid, poll_fn=None):
jadmanski0afbb632008-06-06 21:10:57 +0000220 print 'killing', pid
221 if poll_fn is None:
222 poll_fn = get_proc_poll_fn(pid)
223 if poll_fn() == None:
224 os.kill(pid, signal.SIGCONT)
225 os.kill(pid, signal.SIGTERM)
mbligh36768f02008-02-22 18:28:33 +0000226
227
showard7cf9a9b2008-05-15 21:15:52 +0000228class EmailNotificationManager(object):
jadmanski0afbb632008-06-06 21:10:57 +0000229 def __init__(self):
230 self._emails = []
showard7cf9a9b2008-05-15 21:15:52 +0000231
jadmanski0afbb632008-06-06 21:10:57 +0000232 def enqueue_notify_email(self, subject, message):
233 if not _notify_email:
234 return
showard7cf9a9b2008-05-15 21:15:52 +0000235
jadmanski0afbb632008-06-06 21:10:57 +0000236 body = 'Subject: ' + subject + '\n'
237 body += "%s / %s / %s\n%s" % (socket.gethostname(),
238 os.getpid(),
239 time.strftime("%X %x"), message)
240 self._emails.append(body)
showard7cf9a9b2008-05-15 21:15:52 +0000241
242
jadmanski0afbb632008-06-06 21:10:57 +0000243 def send_queued_emails(self):
244 if not self._emails:
245 return
246 subject = 'Scheduler notifications from ' + socket.gethostname()
247 separator = '\n' + '-' * 40 + '\n'
248 body = separator.join(self._emails)
showard7cf9a9b2008-05-15 21:15:52 +0000249
showard542e8402008-09-19 20:16:18 +0000250 send_email(_email_from, _notify_email, subject, body)
jadmanski0afbb632008-06-06 21:10:57 +0000251 self._emails = []
showard7cf9a9b2008-05-15 21:15:52 +0000252
253email_manager = EmailNotificationManager()
254
255
showard63a34772008-08-18 19:32:50 +0000256class HostScheduler(object):
257 def _get_ready_hosts(self):
258 # avoid any host with a currently active queue entry against it
259 hosts = Host.fetch(
260 joins='LEFT JOIN host_queue_entries AS active_hqe '
261 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000262 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000263 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000264 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000265 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
266 return dict((host.id, host) for host in hosts)
267
268
269 @staticmethod
270 def _get_sql_id_list(id_list):
271 return ','.join(str(item_id) for item_id in id_list)
272
273
274 @classmethod
showard989f25d2008-10-01 11:38:11 +0000275 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000276 if not id_list:
277 return {}
showard63a34772008-08-18 19:32:50 +0000278 query %= cls._get_sql_id_list(id_list)
279 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000280 return cls._process_many2many_dict(rows, flip)
281
282
283 @staticmethod
284 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000285 result = {}
286 for row in rows:
287 left_id, right_id = long(row[0]), long(row[1])
showard989f25d2008-10-01 11:38:11 +0000288 if flip:
289 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000290 result.setdefault(left_id, set()).add(right_id)
291 return result
292
293
294 @classmethod
295 def _get_job_acl_groups(cls, job_ids):
296 query = """
297 SELECT jobs.id, acl_groups_users.acl_group_id
298 FROM jobs
299 INNER JOIN users ON users.login = jobs.owner
300 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
301 WHERE jobs.id IN (%s)
302 """
303 return cls._get_many2many_dict(query, job_ids)
304
305
306 @classmethod
307 def _get_job_ineligible_hosts(cls, job_ids):
308 query = """
309 SELECT job_id, host_id
310 FROM ineligible_host_queues
311 WHERE job_id IN (%s)
312 """
313 return cls._get_many2many_dict(query, job_ids)
314
315
316 @classmethod
showard989f25d2008-10-01 11:38:11 +0000317 def _get_job_dependencies(cls, job_ids):
318 query = """
319 SELECT job_id, label_id
320 FROM jobs_dependency_labels
321 WHERE job_id IN (%s)
322 """
323 return cls._get_many2many_dict(query, job_ids)
324
325
326 @classmethod
showard63a34772008-08-18 19:32:50 +0000327 def _get_host_acls(cls, host_ids):
328 query = """
329 SELECT host_id, acl_group_id
330 FROM acl_groups_hosts
331 WHERE host_id IN (%s)
332 """
333 return cls._get_many2many_dict(query, host_ids)
334
335
336 @classmethod
337 def _get_label_hosts(cls, host_ids):
338 query = """
339 SELECT label_id, host_id
340 FROM hosts_labels
341 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000342 """ % cls._get_sql_id_list(host_ids)
343 rows = _db.execute(query)
344 labels_to_hosts = cls._process_many2many_dict(rows)
345 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
346 return labels_to_hosts, hosts_to_labels
347
348
349 @classmethod
350 def _get_labels(cls):
351 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000352
353
354 def refresh(self, pending_queue_entries):
355 self._hosts_available = self._get_ready_hosts()
356
357 relevant_jobs = [queue_entry.job_id
358 for queue_entry in pending_queue_entries]
359 self._job_acls = self._get_job_acl_groups(relevant_jobs)
360 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000361 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000362
363 host_ids = self._hosts_available.keys()
364 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000365 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
366
367 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000368
369
370 def _is_acl_accessible(self, host_id, queue_entry):
371 job_acls = self._job_acls.get(queue_entry.job_id, set())
372 host_acls = self._host_acls.get(host_id, set())
373 return len(host_acls.intersection(job_acls)) > 0
374
375
showard989f25d2008-10-01 11:38:11 +0000376 def _check_job_dependencies(self, job_dependencies, host_labels):
377 missing = job_dependencies - host_labels
378 return len(job_dependencies - host_labels) == 0
379
380
381 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
382 queue_entry):
383 for label_id in host_labels:
384 label = self._labels[label_id]
385 if not label.only_if_needed:
386 # we don't care about non-only_if_needed labels
387 continue
388 if queue_entry.meta_host == label_id:
389 # if the label was requested in a metahost it's OK
390 continue
391 if label_id not in job_dependencies:
392 return False
393 return True
394
395
396 def _is_host_eligible_for_job(self, host_id, queue_entry):
397 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
398 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000399
400 acl = self._is_acl_accessible(host_id, queue_entry)
401 deps = self._check_job_dependencies(job_dependencies, host_labels)
402 only_if = self._check_only_if_needed_labels(job_dependencies,
403 host_labels, queue_entry)
404 return acl and deps and only_if
showard989f25d2008-10-01 11:38:11 +0000405
406
showard63a34772008-08-18 19:32:50 +0000407 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000408 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000409 return None
410 return self._hosts_available.pop(queue_entry.host_id, None)
411
412
413 def _is_host_usable(self, host_id):
414 if host_id not in self._hosts_available:
415 # host was already used during this scheduling cycle
416 return False
417 if self._hosts_available[host_id].invalid:
418 # Invalid hosts cannot be used for metahosts. They're included in
419 # the original query because they can be used by non-metahosts.
420 return False
421 return True
422
423
424 def _schedule_metahost(self, queue_entry):
425 label_id = queue_entry.meta_host
426 hosts_in_label = self._label_hosts.get(label_id, set())
427 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
428 set())
429
430 # must iterate over a copy so we can mutate the original while iterating
431 for host_id in list(hosts_in_label):
432 if not self._is_host_usable(host_id):
433 hosts_in_label.remove(host_id)
434 continue
435 if host_id in ineligible_host_ids:
436 continue
showard989f25d2008-10-01 11:38:11 +0000437 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000438 continue
439
440 hosts_in_label.remove(host_id)
441 return self._hosts_available.pop(host_id)
442 return None
443
444
445 def find_eligible_host(self, queue_entry):
446 if not queue_entry.meta_host:
447 return self._schedule_non_metahost(queue_entry)
448 return self._schedule_metahost(queue_entry)
449
450
mbligh36768f02008-02-22 18:28:33 +0000451class Dispatcher:
jadmanski0afbb632008-06-06 21:10:57 +0000452 autoserv_procs_cache = None
showard4c5374f2008-09-04 17:02:56 +0000453 max_running_processes = global_config.global_config.get_config_value(
jadmanski0afbb632008-06-06 21:10:57 +0000454 _global_config_section, 'max_running_jobs', type=int)
showard4c5374f2008-09-04 17:02:56 +0000455 max_processes_started_per_cycle = (
jadmanski0afbb632008-06-06 21:10:57 +0000456 global_config.global_config.get_config_value(
457 _global_config_section, 'max_jobs_started_per_cycle', type=int))
showard3bb499f2008-07-03 19:42:20 +0000458 clean_interval = (
459 global_config.global_config.get_config_value(
460 _global_config_section, 'clean_interval_minutes', type=int))
showard970a6db2008-09-03 20:02:39 +0000461 max_parse_processes = (
462 global_config.global_config.get_config_value(
463 _global_config_section, 'max_parse_processes', type=int))
mbligh90a549d2008-03-25 23:52:34 +0000464
jadmanski0afbb632008-06-06 21:10:57 +0000465 def __init__(self):
466 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000467 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000468 self._host_scheduler = HostScheduler()
mbligh36768f02008-02-22 18:28:33 +0000469
mbligh36768f02008-02-22 18:28:33 +0000470
jadmanski0afbb632008-06-06 21:10:57 +0000471 def do_initial_recovery(self, recover_hosts=True):
472 # always recover processes
473 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000474
jadmanski0afbb632008-06-06 21:10:57 +0000475 if recover_hosts:
476 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000477
478
jadmanski0afbb632008-06-06 21:10:57 +0000479 def tick(self):
480 Dispatcher.autoserv_procs_cache = None
showard3bb499f2008-07-03 19:42:20 +0000481 if self._last_clean_time + self.clean_interval * 60 < time.time():
482 self._abort_timed_out_jobs()
483 self._clear_inactive_blocks()
484 self._last_clean_time = time.time()
jadmanski0afbb632008-06-06 21:10:57 +0000485 self._find_aborting()
486 self._schedule_new_jobs()
487 self._handle_agents()
showard970a6db2008-09-03 20:02:39 +0000488 self._run_final_parses()
jadmanski0afbb632008-06-06 21:10:57 +0000489 email_manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000490
491
showard970a6db2008-09-03 20:02:39 +0000492 def _run_final_parses(self):
493 process_count = 0
494 try:
495 for line in utils.system_output('ps -e').splitlines():
496 if 'parse.py' in line:
497 process_count += 1
498 except Exception:
499 # We'll try again in a bit. This is a work-around for one time
500 # when the scheduler crashed due to a "Interrupted system call"
501 return
502
503 if process_count:
504 print "%d parses currently running" % process_count
505
506 while (process_count < self.max_parse_processes and
507 _parse_command_queue):
508 cmd = _parse_command_queue.pop(0)
509 print "Starting another final parse with cmd %s" % cmd
510 os.system(cmd)
511 process_count += 1
512
513 if _parse_command_queue:
514 print ("%d cmds still in final parse queue" %
515 len(_parse_command_queue))
516
517
jadmanski0afbb632008-06-06 21:10:57 +0000518 def add_agent(self, agent):
519 self._agents.append(agent)
520 agent.dispatcher = self
mblighd5c95802008-03-05 00:33:46 +0000521
jadmanski0afbb632008-06-06 21:10:57 +0000522 # Find agent corresponding to the specified queue_entry
523 def get_agents(self, queue_entry):
524 res_agents = []
525 for agent in self._agents:
526 if queue_entry.id in agent.queue_entry_ids:
527 res_agents.append(agent)
528 return res_agents
mbligh36768f02008-02-22 18:28:33 +0000529
530
jadmanski0afbb632008-06-06 21:10:57 +0000531 def remove_agent(self, agent):
532 self._agents.remove(agent)
showardec113162008-05-08 00:52:49 +0000533
534
showard4c5374f2008-09-04 17:02:56 +0000535 def num_running_processes(self):
536 return sum(agent.num_processes for agent in self._agents
537 if agent.is_running())
mblighbb421852008-03-11 22:36:16 +0000538
539
jadmanski0afbb632008-06-06 21:10:57 +0000540 @classmethod
541 def find_autoservs(cls, orphans_only=False):
542 """\
543 Returns a dict mapping pids to command lines for root autoserv
544 processes. If orphans_only=True, return only processes that
545 have been orphaned (i.e. parent pid = 1).
546 """
547 if cls.autoserv_procs_cache is not None:
548 return cls.autoserv_procs_cache
549
550 proc = subprocess.Popen(
551 ['/bin/ps', 'x', '-o', 'pid,pgid,ppid,comm,args'],
552 stdout=subprocess.PIPE)
553 # split each line into the four columns output by ps
554 procs = [line.split(None, 4) for line in
555 proc.communicate()[0].splitlines()]
556 autoserv_procs = {}
557 for proc in procs:
558 # check ppid == 1 for orphans
559 if orphans_only and proc[2] != 1:
560 continue
561 # only root autoserv processes have pgid == pid
562 if (proc[3] == 'autoserv' and # comm
563 proc[1] == proc[0]): # pgid == pid
564 # map pid to args
565 autoserv_procs[int(proc[0])] = proc[4]
566 cls.autoserv_procs_cache = autoserv_procs
567 return autoserv_procs
mblighbb421852008-03-11 22:36:16 +0000568
569
jadmanski0afbb632008-06-06 21:10:57 +0000570 def recover_queue_entry(self, queue_entry, run_monitor):
571 job = queue_entry.job
572 if job.is_synchronous():
573 all_queue_entries = job.get_host_queue_entries()
574 else:
575 all_queue_entries = [queue_entry]
576 all_queue_entry_ids = [queue_entry.id for queue_entry
577 in all_queue_entries]
578 queue_task = RecoveryQueueTask(
579 job=queue_entry.job,
580 queue_entries=all_queue_entries,
581 run_monitor=run_monitor)
582 self.add_agent(Agent(tasks=[queue_task],
583 queue_entry_ids=all_queue_entry_ids))
mblighbb421852008-03-11 22:36:16 +0000584
585
jadmanski0afbb632008-06-06 21:10:57 +0000586 def _recover_processes(self):
587 orphans = self.find_autoservs(orphans_only=True)
mblighbb421852008-03-11 22:36:16 +0000588
jadmanski0afbb632008-06-06 21:10:57 +0000589 # first, recover running queue entries
590 rows = _db.execute("""SELECT * FROM host_queue_entries
591 WHERE status = 'Running'""")
592 queue_entries = [HostQueueEntry(row=i) for i in rows]
593 requeue_entries = []
594 recovered_entry_ids = set()
595 for queue_entry in queue_entries:
596 run_monitor = PidfileRunMonitor(
597 queue_entry.results_dir())
jadmanski0afbb632008-06-06 21:10:57 +0000598 pid, exit_code = run_monitor.get_pidfile_info()
599 if pid is None:
600 # autoserv apparently never got run, so requeue
601 requeue_entries.append(queue_entry)
602 continue
603 if queue_entry.id in recovered_entry_ids:
604 # synchronous job we've already recovered
605 continue
606 print 'Recovering queue entry %d (pid %d)' % (
607 queue_entry.id, pid)
608 job = queue_entry.job
609 if job.is_synchronous():
610 for entry in job.get_host_queue_entries():
611 assert entry.active
612 recovered_entry_ids.add(entry.id)
613 self.recover_queue_entry(queue_entry,
614 run_monitor)
615 orphans.pop(pid, None)
mblighd5c95802008-03-05 00:33:46 +0000616
jadmanski0afbb632008-06-06 21:10:57 +0000617 # and requeue other active queue entries
618 rows = _db.execute("""SELECT * FROM host_queue_entries
619 WHERE active AND NOT complete
620 AND status != 'Running'
621 AND status != 'Pending'
622 AND status != 'Abort'
623 AND status != 'Aborting'""")
624 queue_entries = [HostQueueEntry(row=i) for i in rows]
625 for queue_entry in queue_entries + requeue_entries:
626 print 'Requeuing running QE %d' % queue_entry.id
627 queue_entry.clear_results_dir(dont_delete_files=True)
628 queue_entry.requeue()
mbligh90a549d2008-03-25 23:52:34 +0000629
630
jadmanski0afbb632008-06-06 21:10:57 +0000631 # now kill any remaining autoserv processes
632 for pid in orphans.keys():
633 print 'Killing orphan %d (%s)' % (pid, orphans[pid])
634 kill_autoserv(pid)
635
636 # recover aborting tasks
637 rebooting_host_ids = set()
638 rows = _db.execute("""SELECT * FROM host_queue_entries
639 WHERE status='Abort' or status='Aborting'""")
640 queue_entries = [HostQueueEntry(row=i) for i in rows]
641 for queue_entry in queue_entries:
642 print 'Recovering aborting QE %d' % queue_entry.id
643 queue_host = queue_entry.get_host()
644 reboot_task = RebootTask(queue_host)
645 verify_task = VerifyTask(host = queue_host)
646 self.add_agent(Agent(tasks=[reboot_task,
647 verify_task],
648 queue_entry_ids=[queue_entry.id]))
649 queue_entry.set_status('Aborted')
650 # Secure the host from being picked up
651 queue_host.set_status('Rebooting')
652 rebooting_host_ids.add(queue_host.id)
653
654 # reverify hosts that were in the middle of verify, repair or
655 # reboot
656 self._reverify_hosts_where("""(status = 'Repairing' OR
657 status = 'Verifying' OR
658 status = 'Rebooting')""",
659 exclude_ids=rebooting_host_ids)
660
661 # finally, recover "Running" hosts with no active queue entries,
662 # although this should never happen
663 message = ('Recovering running host %s - this probably '
664 'indicates a scheduler bug')
665 self._reverify_hosts_where("""status = 'Running' AND
666 id NOT IN (SELECT host_id
667 FROM host_queue_entries
668 WHERE active)""",
669 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000670
671
jadmanski0afbb632008-06-06 21:10:57 +0000672 def _reverify_hosts_where(self, where,
673 print_message='Reverifying host %s',
674 exclude_ids=set()):
675 rows = _db.execute('SELECT * FROM hosts WHERE locked = 0 AND '
676 'invalid = 0 AND ' + where)
677 hosts = [Host(row=i) for i in rows]
678 for host in hosts:
679 if host.id in exclude_ids:
680 continue
681 if print_message is not None:
682 print print_message % host.hostname
683 verify_task = VerifyTask(host = host)
684 self.add_agent(Agent(tasks = [verify_task]))
mbligh36768f02008-02-22 18:28:33 +0000685
686
jadmanski0afbb632008-06-06 21:10:57 +0000687 def _recover_hosts(self):
688 # recover "Repair Failed" hosts
689 message = 'Reverifying dead host %s'
690 self._reverify_hosts_where("status = 'Repair Failed'",
691 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000692
693
showard3bb499f2008-07-03 19:42:20 +0000694 def _abort_timed_out_jobs(self):
695 """
696 Aborts all jobs that have timed out and not completed
697 """
698 update = """
699 UPDATE host_queue_entries INNER JOIN jobs
700 ON host_queue_entries.job_id = jobs.id"""
mbligh7e26d622008-07-29 21:04:42 +0000701 timed_out = ' AND jobs.created_on + INTERVAL jobs.timeout HOUR < NOW()'
showard3bb499f2008-07-03 19:42:20 +0000702
703 _db.execute(update + """
704 SET host_queue_entries.status = 'Abort'
showardb1e51872008-10-07 11:08:18 +0000705 WHERE host_queue_entries.active""" + timed_out)
showard3bb499f2008-07-03 19:42:20 +0000706
707 _db.execute(update + """
708 SET host_queue_entries.status = 'Aborted',
showardb1e51872008-10-07 11:08:18 +0000709 host_queue_entries.active = 0,
710 host_queue_entries.complete = 1
711 WHERE NOT host_queue_entries.active
712 AND NOT host_queue_entries.complete""" + timed_out)
showard3bb499f2008-07-03 19:42:20 +0000713
714
jadmanski0afbb632008-06-06 21:10:57 +0000715 def _clear_inactive_blocks(self):
716 """
717 Clear out blocks for all completed jobs.
718 """
719 # this would be simpler using NOT IN (subquery), but MySQL
720 # treats all IN subqueries as dependent, so this optimizes much
721 # better
722 _db.execute("""
723 DELETE ihq FROM ineligible_host_queues ihq
showard4eaaf522008-06-06 22:28:07 +0000724 LEFT JOIN (SELECT DISTINCT job_id FROM host_queue_entries
jadmanski0afbb632008-06-06 21:10:57 +0000725 WHERE NOT complete) hqe
726 USING (job_id) WHERE hqe.job_id IS NULL""")
showard04c82c52008-05-29 19:38:12 +0000727
728
showardb95b1bd2008-08-15 18:11:04 +0000729 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000730 # prioritize by job priority, then non-metahost over metahost, then FIFO
731 return list(HostQueueEntry.fetch(
732 where='NOT complete AND NOT active',
733 order_by='priority DESC, meta_host, id'))
mbligh36768f02008-02-22 18:28:33 +0000734
735
jadmanski0afbb632008-06-06 21:10:57 +0000736 def _schedule_new_jobs(self):
737 print "finding work"
738
showard63a34772008-08-18 19:32:50 +0000739 queue_entries = self._get_pending_queue_entries()
740 if not queue_entries:
showardb95b1bd2008-08-15 18:11:04 +0000741 return
showardb95b1bd2008-08-15 18:11:04 +0000742
showard63a34772008-08-18 19:32:50 +0000743 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000744
showard63a34772008-08-18 19:32:50 +0000745 for queue_entry in queue_entries:
746 assigned_host = self._host_scheduler.find_eligible_host(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000747 if not assigned_host:
jadmanski0afbb632008-06-06 21:10:57 +0000748 continue
showardb95b1bd2008-08-15 18:11:04 +0000749 self._run_queue_entry(queue_entry, assigned_host)
750
751
752 def _run_queue_entry(self, queue_entry, host):
753 agent = queue_entry.run(assigned_host=host)
754 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000755
756
jadmanski0afbb632008-06-06 21:10:57 +0000757 def _find_aborting(self):
758 num_aborted = 0
759 # Find jobs that are aborting
760 for entry in queue_entries_to_abort():
761 agents_to_abort = self.get_agents(entry)
762 entry_host = entry.get_host()
763 reboot_task = RebootTask(entry_host)
764 verify_task = VerifyTask(host = entry_host)
765 tasks = [reboot_task, verify_task]
766 if agents_to_abort:
767 abort_task = AbortTask(entry, agents_to_abort)
showard56193bb2008-08-13 20:07:41 +0000768 for agent in agents_to_abort:
769 self.remove_agent(agent)
jadmanski0afbb632008-06-06 21:10:57 +0000770 tasks.insert(0, abort_task)
771 else:
772 entry.set_status('Aborted')
773 # just to make sure this host does not get
774 # taken away
775 entry_host.set_status('Rebooting')
776 self.add_agent(Agent(tasks=tasks,
777 queue_entry_ids = [entry.id]))
778 num_aborted += 1
779 if num_aborted >= 50:
780 break
781
782
showard4c5374f2008-09-04 17:02:56 +0000783 def _can_start_agent(self, agent, num_running_processes,
784 num_started_this_cycle, have_reached_limit):
785 # always allow zero-process agents to run
786 if agent.num_processes == 0:
787 return True
788 # don't allow any nonzero-process agents to run after we've reached a
789 # limit (this avoids starvation of many-process agents)
790 if have_reached_limit:
791 return False
792 # total process throttling
793 if (num_running_processes + agent.num_processes >
794 self.max_running_processes):
795 return False
796 # if a single agent exceeds the per-cycle throttling, still allow it to
797 # run when it's the first agent in the cycle
798 if num_started_this_cycle == 0:
799 return True
800 # per-cycle throttling
801 if (num_started_this_cycle + agent.num_processes >
802 self.max_processes_started_per_cycle):
803 return False
804 return True
805
806
jadmanski0afbb632008-06-06 21:10:57 +0000807 def _handle_agents(self):
showard4c5374f2008-09-04 17:02:56 +0000808 num_running_processes = self.num_running_processes()
jadmanski0afbb632008-06-06 21:10:57 +0000809 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000810 have_reached_limit = False
811 # iterate over copy, so we can remove agents during iteration
812 for agent in list(self._agents):
813 if agent.is_done():
jadmanski0afbb632008-06-06 21:10:57 +0000814 print "agent finished"
showard4c5374f2008-09-04 17:02:56 +0000815 self._agents.remove(agent)
816 num_running_processes -= agent.num_processes
817 continue
818 if not agent.is_running():
819 if not self._can_start_agent(agent, num_running_processes,
820 num_started_this_cycle,
821 have_reached_limit):
822 have_reached_limit = True
823 continue
824 num_running_processes += agent.num_processes
825 num_started_this_cycle += agent.num_processes
826 agent.tick()
827 print num_running_processes, 'running processes'
mbligh36768f02008-02-22 18:28:33 +0000828
829
830class RunMonitor(object):
jadmanski0afbb632008-06-06 21:10:57 +0000831 def __init__(self, cmd, nice_level = None, log_file = None):
832 self.nice_level = nice_level
833 self.log_file = log_file
834 self.cmd = cmd
mbligh36768f02008-02-22 18:28:33 +0000835
jadmanski0afbb632008-06-06 21:10:57 +0000836 def run(self):
837 if self.nice_level:
838 nice_cmd = ['nice','-n', str(self.nice_level)]
839 nice_cmd.extend(self.cmd)
840 self.cmd = nice_cmd
mbligh36768f02008-02-22 18:28:33 +0000841
jadmanski0afbb632008-06-06 21:10:57 +0000842 out_file = None
843 if self.log_file:
844 try:
845 os.makedirs(os.path.dirname(self.log_file))
846 except OSError, exc:
847 if exc.errno != errno.EEXIST:
848 log_stacktrace(
849 'Unexpected error creating logfile '
850 'directory for %s' % self.log_file)
851 try:
852 out_file = open(self.log_file, 'a')
853 out_file.write("\n%s\n" % ('*'*80))
854 out_file.write("%s> %s\n" %
855 (time.strftime("%X %x"),
856 self.cmd))
857 out_file.write("%s\n" % ('*'*80))
858 except (OSError, IOError):
859 log_stacktrace('Error opening log file %s' %
860 self.log_file)
mblighcadb3532008-04-15 17:46:26 +0000861
jadmanski0afbb632008-06-06 21:10:57 +0000862 if not out_file:
863 out_file = open('/dev/null', 'w')
mblighcadb3532008-04-15 17:46:26 +0000864
jadmanski0afbb632008-06-06 21:10:57 +0000865 in_devnull = open('/dev/null', 'r')
866 print "cmd = %s" % self.cmd
867 print "path = %s" % os.getcwd()
mbligh36768f02008-02-22 18:28:33 +0000868
jadmanski0afbb632008-06-06 21:10:57 +0000869 self.proc = subprocess.Popen(self.cmd, stdout=out_file,
870 stderr=subprocess.STDOUT,
871 stdin=in_devnull)
872 out_file.close()
873 in_devnull.close()
mbligh36768f02008-02-22 18:28:33 +0000874
875
jadmanski0afbb632008-06-06 21:10:57 +0000876 def get_pid(self):
877 return self.proc.pid
mblighbb421852008-03-11 22:36:16 +0000878
879
jadmanski0afbb632008-06-06 21:10:57 +0000880 def kill(self):
881 kill_autoserv(self.get_pid(), self.exit_code)
mblighbb421852008-03-11 22:36:16 +0000882
mbligh36768f02008-02-22 18:28:33 +0000883
jadmanski0afbb632008-06-06 21:10:57 +0000884 def exit_code(self):
885 return self.proc.poll()
mbligh36768f02008-02-22 18:28:33 +0000886
887
mblighbb421852008-03-11 22:36:16 +0000888class PidfileException(Exception):
jadmanski0afbb632008-06-06 21:10:57 +0000889 """\
890 Raised when there's some unexpected behavior with the pid file.
891 """
mblighbb421852008-03-11 22:36:16 +0000892
893
894class PidfileRunMonitor(RunMonitor):
jadmanski0afbb632008-06-06 21:10:57 +0000895 def __init__(self, results_dir, cmd=None, nice_level=None,
896 log_file=None):
897 self.results_dir = os.path.abspath(results_dir)
898 self.pid_file = os.path.join(results_dir, AUTOSERV_PID_FILE)
899 self.lost_process = False
900 self.start_time = time.time()
showardb376bc52008-06-13 20:48:45 +0000901 super(PidfileRunMonitor, self).__init__(cmd, nice_level, log_file)
mblighbb421852008-03-11 22:36:16 +0000902
903
jadmanski0afbb632008-06-06 21:10:57 +0000904 def get_pid(self):
905 pid, exit_status = self.get_pidfile_info()
906 assert pid is not None
907 return pid
mblighbb421852008-03-11 22:36:16 +0000908
909
jadmanski0afbb632008-06-06 21:10:57 +0000910 def _check_command_line(self, command_line, spacer=' ',
911 print_error=False):
912 results_dir_arg = spacer.join(('', '-r', self.results_dir, ''))
913 match = results_dir_arg in command_line
914 if print_error and not match:
915 print '%s not found in %s' % (repr(results_dir_arg),
916 repr(command_line))
917 return match
mbligh90a549d2008-03-25 23:52:34 +0000918
919
jadmanski0afbb632008-06-06 21:10:57 +0000920 def _check_proc_fs(self, pid):
921 cmdline_path = os.path.join('/proc', str(pid), 'cmdline')
922 try:
923 cmdline_file = open(cmdline_path, 'r')
924 cmdline = cmdline_file.read().strip()
925 cmdline_file.close()
926 except IOError:
927 return False
928 # /proc/.../cmdline has \x00 separating args
929 return self._check_command_line(cmdline, spacer='\x00',
930 print_error=True)
mblighbb421852008-03-11 22:36:16 +0000931
932
jadmanski0afbb632008-06-06 21:10:57 +0000933 def read_pidfile(self):
934 if not os.path.exists(self.pid_file):
935 return None, None
936 file_obj = open(self.pid_file, 'r')
937 lines = file_obj.readlines()
938 file_obj.close()
939 assert 1 <= len(lines) <= 2
940 try:
941 pid = int(lines[0])
942 exit_status = None
943 if len(lines) == 2:
944 exit_status = int(lines[1])
945 except ValueError, exc:
946 raise PidfileException('Corrupt pid file: ' +
947 str(exc.args))
mblighbb421852008-03-11 22:36:16 +0000948
jadmanski0afbb632008-06-06 21:10:57 +0000949 return pid, exit_status
mblighbb421852008-03-11 22:36:16 +0000950
951
jadmanski0afbb632008-06-06 21:10:57 +0000952 def _find_autoserv_proc(self):
953 autoserv_procs = Dispatcher.find_autoservs()
954 for pid, args in autoserv_procs.iteritems():
955 if self._check_command_line(args):
956 return pid, args
957 return None, None
mbligh90a549d2008-03-25 23:52:34 +0000958
959
jadmanski0afbb632008-06-06 21:10:57 +0000960 def get_pidfile_info(self):
961 """\
962 Returns:
963 None, None if autoserv has not yet run
964 pid, None if autoserv is running
965 pid, exit_status if autoserv has completed
966 """
967 if self.lost_process:
968 return self.pid, self.exit_status
mblighbb421852008-03-11 22:36:16 +0000969
jadmanski0afbb632008-06-06 21:10:57 +0000970 pid, exit_status = self.read_pidfile()
mblighbb421852008-03-11 22:36:16 +0000971
jadmanski0afbb632008-06-06 21:10:57 +0000972 if pid is None:
973 return self._handle_no_pid()
mbligh90a549d2008-03-25 23:52:34 +0000974
jadmanski0afbb632008-06-06 21:10:57 +0000975 if exit_status is None:
976 # double check whether or not autoserv is running
977 proc_running = self._check_proc_fs(pid)
978 if proc_running:
979 return pid, exit_status
mbligh90a549d2008-03-25 23:52:34 +0000980
jadmanski0afbb632008-06-06 21:10:57 +0000981 # pid but no process - maybe process *just* exited
982 pid, exit_status = self.read_pidfile()
983 if exit_status is None:
984 # autoserv exited without writing an exit code
985 # to the pidfile
986 error = ('autoserv died without writing exit '
987 'code')
988 message = error + '\nPid: %s\nPidfile: %s' % (
989 pid, self.pid_file)
990 print message
991 email_manager.enqueue_notify_email(error,
992 message)
993 self.on_lost_process(pid)
994 return self.pid, self.exit_status
mblighbb421852008-03-11 22:36:16 +0000995
jadmanski0afbb632008-06-06 21:10:57 +0000996 return pid, exit_status
mblighbb421852008-03-11 22:36:16 +0000997
998
jadmanski0afbb632008-06-06 21:10:57 +0000999 def _handle_no_pid(self):
1000 """\
1001 Called when no pidfile is found or no pid is in the pidfile.
1002 """
1003 # is autoserv running?
1004 pid, args = self._find_autoserv_proc()
1005 if pid is None:
1006 # no autoserv process running
1007 message = 'No pid found at ' + self.pid_file
1008 else:
1009 message = ("Process %d (%s) hasn't written pidfile %s" %
1010 (pid, args, self.pid_file))
mbligh90a549d2008-03-25 23:52:34 +00001011
jadmanski0afbb632008-06-06 21:10:57 +00001012 print message
1013 if time.time() - self.start_time > PIDFILE_TIMEOUT:
1014 email_manager.enqueue_notify_email(
1015 'Process has failed to write pidfile', message)
1016 if pid is not None:
1017 kill_autoserv(pid)
1018 else:
1019 pid = 0
1020 self.on_lost_process(pid)
1021 return self.pid, self.exit_status
mbligh90a549d2008-03-25 23:52:34 +00001022
jadmanski0afbb632008-06-06 21:10:57 +00001023 return None, None
mbligh90a549d2008-03-25 23:52:34 +00001024
1025
jadmanski0afbb632008-06-06 21:10:57 +00001026 def on_lost_process(self, pid):
1027 """\
1028 Called when autoserv has exited without writing an exit status,
1029 or we've timed out waiting for autoserv to write a pid to the
1030 pidfile. In either case, we just return failure and the caller
1031 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001032
jadmanski0afbb632008-06-06 21:10:57 +00001033 pid is unimportant here, as it shouldn't be used by anyone.
1034 """
1035 self.lost_process = True
1036 self.pid = pid
1037 self.exit_status = 1
mbligh90a549d2008-03-25 23:52:34 +00001038
1039
jadmanski0afbb632008-06-06 21:10:57 +00001040 def exit_code(self):
1041 pid, exit_code = self.get_pidfile_info()
1042 return exit_code
mblighbb421852008-03-11 22:36:16 +00001043
1044
mbligh36768f02008-02-22 18:28:33 +00001045class Agent(object):
showard4c5374f2008-09-04 17:02:56 +00001046 def __init__(self, tasks, queue_entry_ids=[], num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001047 self.active_task = None
1048 self.queue = Queue.Queue(0)
1049 self.dispatcher = None
1050 self.queue_entry_ids = queue_entry_ids
showard4c5374f2008-09-04 17:02:56 +00001051 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001052
1053 for task in tasks:
1054 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001055
1056
jadmanski0afbb632008-06-06 21:10:57 +00001057 def add_task(self, task):
1058 self.queue.put_nowait(task)
1059 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001060
1061
jadmanski0afbb632008-06-06 21:10:57 +00001062 def tick(self):
1063 print "agent tick"
1064 if self.active_task and not self.active_task.is_done():
1065 self.active_task.poll()
1066 else:
1067 self._next_task();
mbligh36768f02008-02-22 18:28:33 +00001068
1069
jadmanski0afbb632008-06-06 21:10:57 +00001070 def _next_task(self):
1071 print "agent picking task"
1072 if self.active_task:
1073 assert self.active_task.is_done()
mbligh36768f02008-02-22 18:28:33 +00001074
jadmanski0afbb632008-06-06 21:10:57 +00001075 if not self.active_task.success:
1076 self.on_task_failure()
mblighe2586682008-02-29 22:45:46 +00001077
jadmanski0afbb632008-06-06 21:10:57 +00001078 self.active_task = None
1079 if not self.is_done():
1080 self.active_task = self.queue.get_nowait()
1081 if self.active_task:
1082 self.active_task.start()
mbligh36768f02008-02-22 18:28:33 +00001083
1084
jadmanski0afbb632008-06-06 21:10:57 +00001085 def on_task_failure(self):
1086 self.queue = Queue.Queue(0)
1087 for task in self.active_task.failure_tasks:
1088 self.add_task(task)
mbligh16c722d2008-03-05 00:58:44 +00001089
mblighe2586682008-02-29 22:45:46 +00001090
showard4c5374f2008-09-04 17:02:56 +00001091 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001092 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001093
1094
jadmanski0afbb632008-06-06 21:10:57 +00001095 def is_done(self):
1096 return self.active_task == None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001097
1098
jadmanski0afbb632008-06-06 21:10:57 +00001099 def start(self):
1100 assert self.dispatcher
mbligh36768f02008-02-22 18:28:33 +00001101
jadmanski0afbb632008-06-06 21:10:57 +00001102 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001103
jadmanski0afbb632008-06-06 21:10:57 +00001104
mbligh36768f02008-02-22 18:28:33 +00001105class AgentTask(object):
jadmanski0afbb632008-06-06 21:10:57 +00001106 def __init__(self, cmd, failure_tasks = []):
1107 self.done = False
1108 self.failure_tasks = failure_tasks
1109 self.started = False
1110 self.cmd = cmd
1111 self.task = None
1112 self.agent = None
1113 self.monitor = None
1114 self.success = None
mbligh36768f02008-02-22 18:28:33 +00001115
1116
jadmanski0afbb632008-06-06 21:10:57 +00001117 def poll(self):
1118 print "poll"
1119 if self.monitor:
1120 self.tick(self.monitor.exit_code())
1121 else:
1122 self.finished(False)
mbligh36768f02008-02-22 18:28:33 +00001123
1124
jadmanski0afbb632008-06-06 21:10:57 +00001125 def tick(self, exit_code):
1126 if exit_code==None:
1127 return
1128# print "exit_code was %d" % exit_code
1129 if exit_code == 0:
1130 success = True
1131 else:
1132 success = False
mbligh36768f02008-02-22 18:28:33 +00001133
jadmanski0afbb632008-06-06 21:10:57 +00001134 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001135
1136
jadmanski0afbb632008-06-06 21:10:57 +00001137 def is_done(self):
1138 return self.done
mbligh36768f02008-02-22 18:28:33 +00001139
1140
jadmanski0afbb632008-06-06 21:10:57 +00001141 def finished(self, success):
1142 self.done = True
1143 self.success = success
1144 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001145
1146
jadmanski0afbb632008-06-06 21:10:57 +00001147 def prolog(self):
1148 pass
mblighd64e5702008-04-04 21:39:28 +00001149
1150
jadmanski0afbb632008-06-06 21:10:57 +00001151 def create_temp_resultsdir(self, suffix=''):
1152 self.temp_results_dir = tempfile.mkdtemp(suffix=suffix)
mblighd64e5702008-04-04 21:39:28 +00001153
mbligh36768f02008-02-22 18:28:33 +00001154
jadmanski0afbb632008-06-06 21:10:57 +00001155 def cleanup(self):
1156 if (hasattr(self, 'temp_results_dir') and
1157 os.path.exists(self.temp_results_dir)):
1158 shutil.rmtree(self.temp_results_dir)
mbligh36768f02008-02-22 18:28:33 +00001159
1160
jadmanski0afbb632008-06-06 21:10:57 +00001161 def epilog(self):
1162 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001163
1164
jadmanski0afbb632008-06-06 21:10:57 +00001165 def start(self):
1166 assert self.agent
1167
1168 if not self.started:
1169 self.prolog()
1170 self.run()
1171
1172 self.started = True
1173
1174
1175 def abort(self):
1176 if self.monitor:
1177 self.monitor.kill()
1178 self.done = True
1179 self.cleanup()
1180
1181
1182 def run(self):
1183 if self.cmd:
1184 print "agent starting monitor"
1185 log_file = None
1186 if hasattr(self, 'host'):
1187 log_file = os.path.join(RESULTS_DIR, 'hosts',
1188 self.host.hostname)
1189 self.monitor = RunMonitor(
1190 self.cmd, nice_level = AUTOSERV_NICE_LEVEL,
1191 log_file = log_file)
1192 self.monitor.run()
mbligh36768f02008-02-22 18:28:33 +00001193
1194
1195class RepairTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001196 def __init__(self, host, fail_queue_entry=None):
1197 """\
1198 fail_queue_entry: queue entry to mark failed if this repair
1199 fails.
1200 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001201 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001202 # normalize the protection name
1203 protection = host_protections.Protection.get_attr_name(protection)
jadmanski0afbb632008-06-06 21:10:57 +00001204 self.create_temp_resultsdir('.repair')
1205 cmd = [_autoserv_path , '-R', '-m', host.hostname,
jadmanskifb7cfb12008-07-09 14:13:21 +00001206 '-r', self.temp_results_dir, '--host-protection', protection]
jadmanski0afbb632008-06-06 21:10:57 +00001207 self.host = host
1208 self.fail_queue_entry = fail_queue_entry
1209 super(RepairTask, self).__init__(cmd)
mblighe2586682008-02-29 22:45:46 +00001210
mbligh36768f02008-02-22 18:28:33 +00001211
jadmanski0afbb632008-06-06 21:10:57 +00001212 def prolog(self):
1213 print "repair_task starting"
1214 self.host.set_status('Repairing')
mbligh36768f02008-02-22 18:28:33 +00001215
1216
jadmanski0afbb632008-06-06 21:10:57 +00001217 def epilog(self):
1218 super(RepairTask, self).epilog()
1219 if self.success:
1220 self.host.set_status('Ready')
1221 else:
1222 self.host.set_status('Repair Failed')
1223 if self.fail_queue_entry:
1224 self.fail_queue_entry.handle_host_failure()
mbligh36768f02008-02-22 18:28:33 +00001225
1226
1227class VerifyTask(AgentTask):
showard3d9899a2008-07-31 02:11:58 +00001228 def __init__(self, queue_entry=None, host=None, run_verify=True):
jadmanski0afbb632008-06-06 21:10:57 +00001229 assert bool(queue_entry) != bool(host)
mbligh36768f02008-02-22 18:28:33 +00001230
jadmanski0afbb632008-06-06 21:10:57 +00001231 self.host = host or queue_entry.host
1232 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001233
jadmanski0afbb632008-06-06 21:10:57 +00001234 self.create_temp_resultsdir('.verify')
showard3d9899a2008-07-31 02:11:58 +00001235
1236 # TODO:
1237 # While it is rediculous to instantiate a verify task object
1238 # that doesnt actually run the verify task, this is hopefully a
1239 # temporary hack and will have a cleaner way to skip this
1240 # step later. (while ensuring that the original semantics don't change)
1241 if not run_verify:
1242 cmd = ["true"]
1243 else:
1244 cmd = [_autoserv_path,'-v','-m',self.host.hostname,
1245 '-r', self.temp_results_dir]
mbligh36768f02008-02-22 18:28:33 +00001246
jadmanski0afbb632008-06-06 21:10:57 +00001247 fail_queue_entry = None
1248 if queue_entry and not queue_entry.meta_host:
1249 fail_queue_entry = queue_entry
1250 failure_tasks = [RepairTask(self.host, fail_queue_entry)]
mblighe2586682008-02-29 22:45:46 +00001251
jadmanski0afbb632008-06-06 21:10:57 +00001252 super(VerifyTask, self).__init__(cmd,
1253 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001254
1255
jadmanski0afbb632008-06-06 21:10:57 +00001256 def prolog(self):
1257 print "starting verify on %s" % (self.host.hostname)
1258 if self.queue_entry:
1259 self.queue_entry.set_status('Verifying')
1260 self.queue_entry.clear_results_dir(
1261 self.queue_entry.verify_results_dir())
1262 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001263
1264
jadmanski0afbb632008-06-06 21:10:57 +00001265 def cleanup(self):
1266 if not os.path.exists(self.temp_results_dir):
1267 return
1268 if self.queue_entry and (self.success or
1269 not self.queue_entry.meta_host):
1270 self.move_results()
1271 super(VerifyTask, self).cleanup()
mblighd64e5702008-04-04 21:39:28 +00001272
1273
jadmanski0afbb632008-06-06 21:10:57 +00001274 def epilog(self):
1275 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001276
jadmanski0afbb632008-06-06 21:10:57 +00001277 if self.success:
1278 self.host.set_status('Ready')
1279 elif self.queue_entry:
1280 self.queue_entry.requeue()
mbligh36768f02008-02-22 18:28:33 +00001281
1282
jadmanski0afbb632008-06-06 21:10:57 +00001283 def move_results(self):
1284 assert self.queue_entry is not None
1285 target_dir = self.queue_entry.verify_results_dir()
1286 if not os.path.exists(target_dir):
1287 os.makedirs(target_dir)
1288 files = os.listdir(self.temp_results_dir)
1289 for filename in files:
1290 if filename == AUTOSERV_PID_FILE:
1291 continue
1292 self.force_move(os.path.join(self.temp_results_dir,
1293 filename),
1294 os.path.join(target_dir, filename))
mbligh36768f02008-02-22 18:28:33 +00001295
1296
jadmanski0afbb632008-06-06 21:10:57 +00001297 @staticmethod
1298 def force_move(source, dest):
1299 """\
1300 Replacement for shutil.move() that will delete the destination
1301 if it exists, even if it's a directory.
1302 """
1303 if os.path.exists(dest):
1304 print ('Warning: removing existing destination file ' +
1305 dest)
1306 remove_file_or_dir(dest)
1307 shutil.move(source, dest)
mblighe2586682008-02-29 22:45:46 +00001308
1309
mblighdffd6372008-02-29 22:47:33 +00001310class VerifySynchronousTask(VerifyTask):
showard3d9899a2008-07-31 02:11:58 +00001311 def __init__(self, queue_entry, run_verify=True):
1312 super(VerifySynchronousTask, self).__init__(queue_entry=queue_entry,
1313 run_verify=run_verify)
mblighdffd6372008-02-29 22:47:33 +00001314
1315
jadmanski0afbb632008-06-06 21:10:57 +00001316 def epilog(self):
1317 super(VerifySynchronousTask, self).epilog()
1318 if self.success:
1319 if self.queue_entry.job.num_complete() > 0:
1320 # some other entry failed verify, and we've
1321 # already been marked as stopped
1322 return
mblighdffd6372008-02-29 22:47:33 +00001323
showardb2e2c322008-10-14 17:33:55 +00001324 agent = self.queue_entry.on_pending()
1325 if agent:
jadmanski0afbb632008-06-06 21:10:57 +00001326 self.agent.dispatcher.add_agent(agent)
mblighe2586682008-02-29 22:45:46 +00001327
showardb2e2c322008-10-14 17:33:55 +00001328
mbligh36768f02008-02-22 18:28:33 +00001329class QueueTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001330 def __init__(self, job, queue_entries, cmd):
1331 super(QueueTask, self).__init__(cmd)
1332 self.job = job
1333 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001334
1335
jadmanski0afbb632008-06-06 21:10:57 +00001336 @staticmethod
showardd8e548a2008-09-09 03:04:57 +00001337 def _write_keyval(keyval_dir, field, value, keyval_filename='keyval'):
1338 key_path = os.path.join(keyval_dir, keyval_filename)
jadmanski0afbb632008-06-06 21:10:57 +00001339 keyval_file = open(key_path, 'a')
showardd8e548a2008-09-09 03:04:57 +00001340 print >> keyval_file, '%s=%s' % (field, str(value))
jadmanski0afbb632008-06-06 21:10:57 +00001341 keyval_file.close()
mbligh36768f02008-02-22 18:28:33 +00001342
1343
showardd8e548a2008-09-09 03:04:57 +00001344 def _host_keyval_dir(self):
1345 return os.path.join(self.results_dir(), 'host_keyvals')
1346
1347
1348 def _write_host_keyval(self, host):
1349 labels = ','.join(host.labels())
1350 self._write_keyval(self._host_keyval_dir(), 'labels', labels,
1351 keyval_filename=host.hostname)
1352
1353 def _create_host_keyval_dir(self):
1354 directory = self._host_keyval_dir()
1355 if not os.path.exists(directory):
1356 os.makedirs(directory)
1357
1358
jadmanski0afbb632008-06-06 21:10:57 +00001359 def results_dir(self):
1360 return self.queue_entries[0].results_dir()
mblighbb421852008-03-11 22:36:16 +00001361
1362
jadmanski0afbb632008-06-06 21:10:57 +00001363 def run(self):
1364 """\
1365 Override AgentTask.run() so we can use a PidfileRunMonitor.
1366 """
1367 self.monitor = PidfileRunMonitor(self.results_dir(),
1368 cmd=self.cmd,
1369 nice_level=AUTOSERV_NICE_LEVEL)
1370 self.monitor.run()
mblighbb421852008-03-11 22:36:16 +00001371
1372
jadmanski0afbb632008-06-06 21:10:57 +00001373 def prolog(self):
1374 # write some job timestamps into the job keyval file
1375 queued = time.mktime(self.job.created_on.timetuple())
1376 started = time.time()
showardd8e548a2008-09-09 03:04:57 +00001377 self._write_keyval(self.results_dir(), "job_queued", int(queued))
1378 self._write_keyval(self.results_dir(), "job_started", int(started))
1379 self._create_host_keyval_dir()
jadmanski0afbb632008-06-06 21:10:57 +00001380 for queue_entry in self.queue_entries:
showardd8e548a2008-09-09 03:04:57 +00001381 self._write_host_keyval(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001382 print "starting queue_task on %s/%s" % (queue_entry.host.hostname, queue_entry.id)
1383 queue_entry.set_status('Running')
1384 queue_entry.host.set_status('Running')
1385 if (not self.job.is_synchronous() and
1386 self.job.num_machines() > 1):
1387 assert len(self.queue_entries) == 1
1388 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001389
1390
jadmanski0afbb632008-06-06 21:10:57 +00001391 def _finish_task(self):
1392 # write out the finished time into the results keyval
1393 finished = time.time()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001394 self._write_keyval(self.results_dir(), "job_finished", int(finished))
jadmanskic2ac77f2008-05-16 21:44:04 +00001395
jadmanski0afbb632008-06-06 21:10:57 +00001396 # parse the results of the job
1397 if self.job.is_synchronous() or self.job.num_machines() == 1:
1398 parse_results(self.job.results_dir())
1399 else:
1400 for queue_entry in self.queue_entries:
jadmanskif7fa2cc2008-10-01 14:13:23 +00001401 parse_results(queue_entry.results_dir(), flags="-l 2")
1402
1403
1404 def _log_abort(self):
1405 # build up sets of all the aborted_by and aborted_on values
1406 aborted_by, aborted_on = set(), set()
1407 for queue_entry in self.queue_entries:
1408 if queue_entry.aborted_by:
1409 aborted_by.add(queue_entry.aborted_by)
1410 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1411 aborted_on.add(t)
1412
1413 # extract some actual, unique aborted by value and write it out
1414 assert len(aborted_by) <= 1
1415 if len(aborted_by) == 1:
1416 results_dir = self.results_dir()
1417 self._write_keyval(results_dir, "aborted_by", aborted_by.pop())
1418 self._write_keyval(results_dir, "aborted_on", max(aborted_on))
jadmanskic2ac77f2008-05-16 21:44:04 +00001419
1420
jadmanski0afbb632008-06-06 21:10:57 +00001421 def abort(self):
1422 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001423 self._log_abort()
jadmanski0afbb632008-06-06 21:10:57 +00001424 self._finish_task()
jadmanskic2ac77f2008-05-16 21:44:04 +00001425
1426
jadmanski0afbb632008-06-06 21:10:57 +00001427 def epilog(self):
1428 super(QueueTask, self).epilog()
1429 if self.success:
1430 status = 'Completed'
1431 else:
1432 status = 'Failed'
mbligh36768f02008-02-22 18:28:33 +00001433
jadmanski0afbb632008-06-06 21:10:57 +00001434 for queue_entry in self.queue_entries:
1435 queue_entry.set_status(status)
1436 queue_entry.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001437
jadmanski0afbb632008-06-06 21:10:57 +00001438 self._finish_task()
mblighbb421852008-03-11 22:36:16 +00001439
jadmanski0afbb632008-06-06 21:10:57 +00001440 print "queue_task finished with %s/%s" % (status, self.success)
mbligh36768f02008-02-22 18:28:33 +00001441
1442
mblighbb421852008-03-11 22:36:16 +00001443class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001444 def __init__(self, job, queue_entries, run_monitor):
1445 super(RecoveryQueueTask, self).__init__(job,
1446 queue_entries, cmd=None)
1447 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001448
1449
jadmanski0afbb632008-06-06 21:10:57 +00001450 def run(self):
1451 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001452
1453
jadmanski0afbb632008-06-06 21:10:57 +00001454 def prolog(self):
1455 # recovering an existing process - don't do prolog
1456 pass
mblighbb421852008-03-11 22:36:16 +00001457
1458
mbligh36768f02008-02-22 18:28:33 +00001459class RebootTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001460 def __init__(self, host):
1461 global _autoserv_path
1462
1463 # Current implementation of autoserv requires control file
1464 # to be passed on reboot action request. TODO: remove when no
1465 # longer appropriate.
1466 self.create_temp_resultsdir('.reboot')
1467 self.cmd = [_autoserv_path, '-b', '-m', host.hostname,
1468 '-r', self.temp_results_dir, '/dev/null']
1469 self.host = host
1470 super(RebootTask, self).__init__(self.cmd,
1471 failure_tasks=[RepairTask(host)])
mbligh16c722d2008-03-05 00:58:44 +00001472
mblighd5c95802008-03-05 00:33:46 +00001473
jadmanski0afbb632008-06-06 21:10:57 +00001474 def prolog(self):
1475 print "starting reboot task for host: %s" % self.host.hostname
1476 self.host.set_status("Rebooting")
mblighd5c95802008-03-05 00:33:46 +00001477
mblighd5c95802008-03-05 00:33:46 +00001478
1479class AbortTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001480 def __init__(self, queue_entry, agents_to_abort):
1481 self.queue_entry = queue_entry
1482 self.agents_to_abort = agents_to_abort
jadmanski0afbb632008-06-06 21:10:57 +00001483 super(AbortTask, self).__init__('')
mbligh36768f02008-02-22 18:28:33 +00001484
1485
jadmanski0afbb632008-06-06 21:10:57 +00001486 def prolog(self):
1487 print "starting abort on host %s, job %s" % (
1488 self.queue_entry.host_id, self.queue_entry.job_id)
1489 self.queue_entry.set_status('Aborting')
mbligh36768f02008-02-22 18:28:33 +00001490
mblighd64e5702008-04-04 21:39:28 +00001491
jadmanski0afbb632008-06-06 21:10:57 +00001492 def epilog(self):
1493 super(AbortTask, self).epilog()
1494 self.queue_entry.set_status('Aborted')
1495 self.success = True
1496
1497
1498 def run(self):
1499 for agent in self.agents_to_abort:
1500 if (agent.active_task):
1501 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +00001502
1503
1504class DBObject(object):
jadmanski0afbb632008-06-06 21:10:57 +00001505 def __init__(self, id=None, row=None, new_record=False):
1506 assert (bool(id) != bool(row))
mbligh36768f02008-02-22 18:28:33 +00001507
jadmanski0afbb632008-06-06 21:10:57 +00001508 self.__table = self._get_table()
1509 fields = self._fields()
mbligh36768f02008-02-22 18:28:33 +00001510
jadmanski0afbb632008-06-06 21:10:57 +00001511 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00001512
jadmanski0afbb632008-06-06 21:10:57 +00001513 if row is None:
1514 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
1515 rows = _db.execute(sql, (id,))
1516 if len(rows) == 0:
1517 raise "row not found (table=%s, id=%s)" % \
1518 (self.__table, id)
1519 row = rows[0]
mbligh36768f02008-02-22 18:28:33 +00001520
jadmanski0afbb632008-06-06 21:10:57 +00001521 assert len(row) == self.num_cols(), (
1522 "table = %s, row = %s/%d, fields = %s/%d" % (
1523 self.__table, row, len(row), fields, self.num_cols()))
mbligh36768f02008-02-22 18:28:33 +00001524
jadmanski0afbb632008-06-06 21:10:57 +00001525 self.__valid_fields = {}
1526 for i,value in enumerate(row):
1527 self.__dict__[fields[i]] = value
1528 self.__valid_fields[fields[i]] = True
mbligh36768f02008-02-22 18:28:33 +00001529
jadmanski0afbb632008-06-06 21:10:57 +00001530 del self.__valid_fields['id']
mbligh36768f02008-02-22 18:28:33 +00001531
mblighe2586682008-02-29 22:45:46 +00001532
jadmanski0afbb632008-06-06 21:10:57 +00001533 @classmethod
1534 def _get_table(cls):
1535 raise NotImplementedError('Subclasses must override this')
mblighe2586682008-02-29 22:45:46 +00001536
1537
jadmanski0afbb632008-06-06 21:10:57 +00001538 @classmethod
1539 def _fields(cls):
1540 raise NotImplementedError('Subclasses must override this')
showard04c82c52008-05-29 19:38:12 +00001541
1542
jadmanski0afbb632008-06-06 21:10:57 +00001543 @classmethod
1544 def num_cols(cls):
1545 return len(cls._fields())
showard04c82c52008-05-29 19:38:12 +00001546
1547
jadmanski0afbb632008-06-06 21:10:57 +00001548 def count(self, where, table = None):
1549 if not table:
1550 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00001551
jadmanski0afbb632008-06-06 21:10:57 +00001552 rows = _db.execute("""
1553 SELECT count(*) FROM %s
1554 WHERE %s
1555 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00001556
jadmanski0afbb632008-06-06 21:10:57 +00001557 assert len(rows) == 1
1558
1559 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001560
1561
mblighf8c624d2008-07-03 16:58:45 +00001562 def update_field(self, field, value, condition=''):
jadmanski0afbb632008-06-06 21:10:57 +00001563 assert self.__valid_fields[field]
mbligh36768f02008-02-22 18:28:33 +00001564
jadmanski0afbb632008-06-06 21:10:57 +00001565 if self.__dict__[field] == value:
1566 return
mbligh36768f02008-02-22 18:28:33 +00001567
mblighf8c624d2008-07-03 16:58:45 +00001568 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
1569 if condition:
1570 query += ' AND (%s)' % condition
jadmanski0afbb632008-06-06 21:10:57 +00001571 _db.execute(query, (value, self.id))
1572
1573 self.__dict__[field] = value
mbligh36768f02008-02-22 18:28:33 +00001574
1575
jadmanski0afbb632008-06-06 21:10:57 +00001576 def save(self):
1577 if self.__new_record:
1578 keys = self._fields()[1:] # avoid id
1579 columns = ','.join([str(key) for key in keys])
1580 values = ['"%s"' % self.__dict__[key] for key in keys]
1581 values = ','.join(values)
1582 query = """INSERT INTO %s (%s) VALUES (%s)""" % \
1583 (self.__table, columns, values)
1584 _db.execute(query)
mbligh36768f02008-02-22 18:28:33 +00001585
1586
jadmanski0afbb632008-06-06 21:10:57 +00001587 def delete(self):
1588 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
1589 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00001590
1591
showard63a34772008-08-18 19:32:50 +00001592 @staticmethod
1593 def _prefix_with(string, prefix):
1594 if string:
1595 string = prefix + string
1596 return string
1597
1598
jadmanski0afbb632008-06-06 21:10:57 +00001599 @classmethod
showard989f25d2008-10-01 11:38:11 +00001600 def fetch(cls, where='', params=(), joins='', order_by=''):
showard63a34772008-08-18 19:32:50 +00001601 order_by = cls._prefix_with(order_by, 'ORDER BY ')
1602 where = cls._prefix_with(where, 'WHERE ')
1603 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
1604 '%(where)s %(order_by)s' % {'table' : cls._get_table(),
1605 'joins' : joins,
1606 'where' : where,
1607 'order_by' : order_by})
1608 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00001609 for row in rows:
1610 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00001611
mbligh36768f02008-02-22 18:28:33 +00001612
1613class IneligibleHostQueue(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001614 def __init__(self, id=None, row=None, new_record=None):
1615 super(IneligibleHostQueue, self).__init__(id=id, row=row,
1616 new_record=new_record)
mblighe2586682008-02-29 22:45:46 +00001617
1618
jadmanski0afbb632008-06-06 21:10:57 +00001619 @classmethod
1620 def _get_table(cls):
1621 return 'ineligible_host_queues'
mbligh36768f02008-02-22 18:28:33 +00001622
1623
jadmanski0afbb632008-06-06 21:10:57 +00001624 @classmethod
1625 def _fields(cls):
1626 return ['id', 'job_id', 'host_id']
showard04c82c52008-05-29 19:38:12 +00001627
1628
showard989f25d2008-10-01 11:38:11 +00001629class Label(DBObject):
1630 @classmethod
1631 def _get_table(cls):
1632 return 'labels'
1633
1634
1635 @classmethod
1636 def _fields(cls):
1637 return ['id', 'name', 'kernel_config', 'platform', 'invalid',
1638 'only_if_needed']
1639
1640
mbligh36768f02008-02-22 18:28:33 +00001641class Host(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001642 def __init__(self, id=None, row=None):
1643 super(Host, self).__init__(id=id, row=row)
mblighe2586682008-02-29 22:45:46 +00001644
1645
jadmanski0afbb632008-06-06 21:10:57 +00001646 @classmethod
1647 def _get_table(cls):
1648 return 'hosts'
mbligh36768f02008-02-22 18:28:33 +00001649
1650
jadmanski0afbb632008-06-06 21:10:57 +00001651 @classmethod
1652 def _fields(cls):
1653 return ['id', 'hostname', 'locked', 'synch_id','status',
showardfb2a7fa2008-07-17 17:04:12 +00001654 'invalid', 'protection', 'locked_by_id', 'lock_time']
showard04c82c52008-05-29 19:38:12 +00001655
1656
jadmanski0afbb632008-06-06 21:10:57 +00001657 def current_task(self):
1658 rows = _db.execute("""
1659 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
1660 """, (self.id,))
1661
1662 if len(rows) == 0:
1663 return None
1664 else:
1665 assert len(rows) == 1
1666 results = rows[0];
mblighf8c624d2008-07-03 16:58:45 +00001667# print "current = %s" % results
jadmanski0afbb632008-06-06 21:10:57 +00001668 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00001669
1670
jadmanski0afbb632008-06-06 21:10:57 +00001671 def yield_work(self):
1672 print "%s yielding work" % self.hostname
1673 if self.current_task():
1674 self.current_task().requeue()
1675
1676 def set_status(self,status):
1677 print '%s -> %s' % (self.hostname, status)
1678 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00001679
1680
showardd8e548a2008-09-09 03:04:57 +00001681 def labels(self):
1682 """
1683 Fetch a list of names of all non-platform labels associated with this
1684 host.
1685 """
1686 rows = _db.execute("""
1687 SELECT labels.name
1688 FROM labels
1689 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
1690 WHERE NOT labels.platform AND hosts_labels.host_id = %s
1691 ORDER BY labels.name
1692 """, (self.id,))
1693 return [row[0] for row in rows]
1694
1695
mbligh36768f02008-02-22 18:28:33 +00001696class HostQueueEntry(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001697 def __init__(self, id=None, row=None):
1698 assert id or row
1699 super(HostQueueEntry, self).__init__(id=id, row=row)
1700 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00001701
jadmanski0afbb632008-06-06 21:10:57 +00001702 if self.host_id:
1703 self.host = Host(self.host_id)
1704 else:
1705 self.host = None
mbligh36768f02008-02-22 18:28:33 +00001706
jadmanski0afbb632008-06-06 21:10:57 +00001707 self.queue_log_path = os.path.join(self.job.results_dir(),
1708 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00001709
1710
jadmanski0afbb632008-06-06 21:10:57 +00001711 @classmethod
1712 def _get_table(cls):
1713 return 'host_queue_entries'
mblighe2586682008-02-29 22:45:46 +00001714
1715
jadmanski0afbb632008-06-06 21:10:57 +00001716 @classmethod
1717 def _fields(cls):
1718 return ['id', 'job_id', 'host_id', 'priority', 'status',
showardb8471e32008-07-03 19:51:08 +00001719 'meta_host', 'active', 'complete', 'deleted']
showard04c82c52008-05-29 19:38:12 +00001720
1721
jadmanski0afbb632008-06-06 21:10:57 +00001722 def set_host(self, host):
1723 if host:
1724 self.queue_log_record('Assigning host ' + host.hostname)
1725 self.update_field('host_id', host.id)
1726 self.update_field('active', True)
1727 self.block_host(host.id)
1728 else:
1729 self.queue_log_record('Releasing host')
1730 self.unblock_host(self.host.id)
1731 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00001732
jadmanski0afbb632008-06-06 21:10:57 +00001733 self.host = host
mbligh36768f02008-02-22 18:28:33 +00001734
1735
jadmanski0afbb632008-06-06 21:10:57 +00001736 def get_host(self):
1737 return self.host
mbligh36768f02008-02-22 18:28:33 +00001738
1739
jadmanski0afbb632008-06-06 21:10:57 +00001740 def queue_log_record(self, log_line):
1741 now = str(datetime.datetime.now())
1742 queue_log = open(self.queue_log_path, 'a', 0)
1743 queue_log.write(now + ' ' + log_line + '\n')
1744 queue_log.close()
mbligh36768f02008-02-22 18:28:33 +00001745
1746
jadmanski0afbb632008-06-06 21:10:57 +00001747 def block_host(self, host_id):
1748 print "creating block %s/%s" % (self.job.id, host_id)
1749 row = [0, self.job.id, host_id]
1750 block = IneligibleHostQueue(row=row, new_record=True)
1751 block.save()
mblighe2586682008-02-29 22:45:46 +00001752
1753
jadmanski0afbb632008-06-06 21:10:57 +00001754 def unblock_host(self, host_id):
1755 print "removing block %s/%s" % (self.job.id, host_id)
1756 blocks = IneligibleHostQueue.fetch(
1757 'job_id=%d and host_id=%d' % (self.job.id, host_id))
1758 for block in blocks:
1759 block.delete()
mblighe2586682008-02-29 22:45:46 +00001760
1761
jadmanski0afbb632008-06-06 21:10:57 +00001762 def results_dir(self):
1763 if self.job.is_synchronous() or self.job.num_machines() == 1:
1764 return self.job.job_dir
1765 else:
1766 assert self.host
1767 return os.path.join(self.job.job_dir,
1768 self.host.hostname)
mbligh36768f02008-02-22 18:28:33 +00001769
mblighe2586682008-02-29 22:45:46 +00001770
jadmanski0afbb632008-06-06 21:10:57 +00001771 def verify_results_dir(self):
1772 if self.job.is_synchronous() or self.job.num_machines() > 1:
1773 assert self.host
1774 return os.path.join(self.job.job_dir,
1775 self.host.hostname)
1776 else:
1777 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +00001778
1779
jadmanski0afbb632008-06-06 21:10:57 +00001780 def set_status(self, status):
mblighf8c624d2008-07-03 16:58:45 +00001781 abort_statuses = ['Abort', 'Aborting', 'Aborted']
1782 if status not in abort_statuses:
1783 condition = ' AND '.join(['status <> "%s"' % x
1784 for x in abort_statuses])
1785 else:
1786 condition = ''
1787 self.update_field('status', status, condition=condition)
1788
jadmanski0afbb632008-06-06 21:10:57 +00001789 if self.host:
1790 hostname = self.host.hostname
1791 else:
1792 hostname = 'no host'
1793 print "%s/%d status -> %s" % (hostname, self.id, self.status)
mblighf8c624d2008-07-03 16:58:45 +00001794
jadmanski0afbb632008-06-06 21:10:57 +00001795 if status in ['Queued']:
1796 self.update_field('complete', False)
1797 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00001798
jadmanski0afbb632008-06-06 21:10:57 +00001799 if status in ['Pending', 'Running', 'Verifying', 'Starting',
1800 'Abort', 'Aborting']:
1801 self.update_field('complete', False)
1802 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00001803
jadmanski0afbb632008-06-06 21:10:57 +00001804 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
1805 self.update_field('complete', True)
1806 self.update_field('active', False)
showard542e8402008-09-19 20:16:18 +00001807 self._email_on_job_complete()
1808
1809
1810 def _email_on_job_complete(self):
1811 url = "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
1812
1813 if self.job.is_finished():
1814 subject = "Autotest: Job ID: %s \"%s\" Completed" % (
1815 self.job.id, self.job.name)
1816 body = "Job ID: %s\nJob Name: %s\n%s\n" % (
1817 self.job.id, self.job.name, url)
1818 send_email(_email_from, self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00001819
1820
jadmanski0afbb632008-06-06 21:10:57 +00001821 def run(self,assigned_host=None):
1822 if self.meta_host:
1823 assert assigned_host
1824 # ensure results dir exists for the queue log
1825 self.job.create_results_dir()
1826 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00001827
jadmanski0afbb632008-06-06 21:10:57 +00001828 print "%s/%s scheduled on %s, status=%s" % (self.job.name,
1829 self.meta_host, self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00001830
jadmanski0afbb632008-06-06 21:10:57 +00001831 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00001832
jadmanski0afbb632008-06-06 21:10:57 +00001833 def requeue(self):
1834 self.set_status('Queued')
mblighe2586682008-02-29 22:45:46 +00001835
jadmanski0afbb632008-06-06 21:10:57 +00001836 if self.meta_host:
1837 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00001838
1839
jadmanski0afbb632008-06-06 21:10:57 +00001840 def handle_host_failure(self):
1841 """\
1842 Called when this queue entry's host has failed verification and
1843 repair.
1844 """
1845 assert not self.meta_host
1846 self.set_status('Failed')
1847 if self.job.is_synchronous():
1848 self.job.stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00001849
1850
jadmanski0afbb632008-06-06 21:10:57 +00001851 def clear_results_dir(self, results_dir=None, dont_delete_files=False):
1852 results_dir = results_dir or self.results_dir()
1853 if not os.path.exists(results_dir):
1854 return
1855 if dont_delete_files:
1856 temp_dir = tempfile.mkdtemp(suffix='.clear_results')
1857 print 'Moving results from %s to %s' % (results_dir,
1858 temp_dir)
1859 for filename in os.listdir(results_dir):
1860 path = os.path.join(results_dir, filename)
1861 if dont_delete_files:
1862 shutil.move(path,
1863 os.path.join(temp_dir, filename))
1864 else:
1865 remove_file_or_dir(path)
mbligh36768f02008-02-22 18:28:33 +00001866
1867
jadmanskif7fa2cc2008-10-01 14:13:23 +00001868 @property
1869 def aborted_by(self):
1870 self._load_abort_info()
1871 return self._aborted_by
1872
1873
1874 @property
1875 def aborted_on(self):
1876 self._load_abort_info()
1877 return self._aborted_on
1878
1879
1880 def _load_abort_info(self):
1881 """ Fetch info about who aborted the job. """
1882 if hasattr(self, "_aborted_by"):
1883 return
1884 rows = _db.execute("""
1885 SELECT users.login, aborted_host_queue_entries.aborted_on
1886 FROM aborted_host_queue_entries
1887 INNER JOIN users
1888 ON users.id = aborted_host_queue_entries.aborted_by_id
1889 WHERE aborted_host_queue_entries.queue_entry_id = %s
1890 """, (self.id,))
1891 if rows:
1892 self._aborted_by, self._aborted_on = rows[0]
1893 else:
1894 self._aborted_by = self._aborted_on = None
1895
1896
showardb2e2c322008-10-14 17:33:55 +00001897 def on_pending(self):
1898 """
1899 Called when an entry in a synchronous job has passed verify. If the
1900 job is ready to run, returns an agent to run the job. Returns None
1901 otherwise.
1902 """
1903 self.set_status('Pending')
1904 if self.job.is_ready():
1905 return self.job.run(self)
1906 return None
1907
1908
mbligh36768f02008-02-22 18:28:33 +00001909class Job(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001910 def __init__(self, id=None, row=None):
1911 assert id or row
1912 super(Job, self).__init__(id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00001913
jadmanski0afbb632008-06-06 21:10:57 +00001914 self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id,
1915 self.owner))
mblighe2586682008-02-29 22:45:46 +00001916
1917
jadmanski0afbb632008-06-06 21:10:57 +00001918 @classmethod
1919 def _get_table(cls):
1920 return 'jobs'
mbligh36768f02008-02-22 18:28:33 +00001921
1922
jadmanski0afbb632008-06-06 21:10:57 +00001923 @classmethod
1924 def _fields(cls):
1925 return ['id', 'owner', 'name', 'priority', 'control_file',
1926 'control_type', 'created_on', 'synch_type',
showard542e8402008-09-19 20:16:18 +00001927 'synch_count', 'synchronizing', 'timeout',
1928 'run_verify', 'email_list']
showard04c82c52008-05-29 19:38:12 +00001929
1930
jadmanski0afbb632008-06-06 21:10:57 +00001931 def is_server_job(self):
1932 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00001933
1934
jadmanski0afbb632008-06-06 21:10:57 +00001935 def get_host_queue_entries(self):
1936 rows = _db.execute("""
1937 SELECT * FROM host_queue_entries
1938 WHERE job_id= %s
1939 """, (self.id,))
1940 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00001941
jadmanski0afbb632008-06-06 21:10:57 +00001942 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00001943
jadmanski0afbb632008-06-06 21:10:57 +00001944 return entries
mbligh36768f02008-02-22 18:28:33 +00001945
1946
jadmanski0afbb632008-06-06 21:10:57 +00001947 def set_status(self, status, update_queues=False):
1948 self.update_field('status',status)
1949
1950 if update_queues:
1951 for queue_entry in self.get_host_queue_entries():
1952 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00001953
1954
jadmanski0afbb632008-06-06 21:10:57 +00001955 def is_synchronous(self):
1956 return self.synch_type == 2
mbligh36768f02008-02-22 18:28:33 +00001957
1958
jadmanski0afbb632008-06-06 21:10:57 +00001959 def is_ready(self):
1960 if not self.is_synchronous():
1961 return True
1962 sql = "job_id=%s AND status='Pending'" % self.id
1963 count = self.count(sql, table='host_queue_entries')
showardb2e2c322008-10-14 17:33:55 +00001964 return (count == self.num_machines())
mbligh36768f02008-02-22 18:28:33 +00001965
1966
jadmanski0afbb632008-06-06 21:10:57 +00001967 def results_dir(self):
1968 return self.job_dir
mbligh36768f02008-02-22 18:28:33 +00001969
jadmanski0afbb632008-06-06 21:10:57 +00001970 def num_machines(self, clause = None):
1971 sql = "job_id=%s" % self.id
1972 if clause:
1973 sql += " AND (%s)" % clause
1974 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00001975
1976
jadmanski0afbb632008-06-06 21:10:57 +00001977 def num_queued(self):
1978 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00001979
1980
jadmanski0afbb632008-06-06 21:10:57 +00001981 def num_active(self):
1982 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00001983
1984
jadmanski0afbb632008-06-06 21:10:57 +00001985 def num_complete(self):
1986 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00001987
1988
jadmanski0afbb632008-06-06 21:10:57 +00001989 def is_finished(self):
1990 left = self.num_queued()
1991 print "%s: %s machines left" % (self.name, left)
1992 return left==0
mbligh36768f02008-02-22 18:28:33 +00001993
mbligh36768f02008-02-22 18:28:33 +00001994
jadmanski0afbb632008-06-06 21:10:57 +00001995 def stop_all_entries(self):
1996 for child_entry in self.get_host_queue_entries():
1997 if not child_entry.complete:
1998 child_entry.set_status('Stopped')
mblighe2586682008-02-29 22:45:46 +00001999
2000
jadmanski0afbb632008-06-06 21:10:57 +00002001 def write_to_machines_file(self, queue_entry):
2002 hostname = queue_entry.get_host().hostname
2003 print "writing %s to job %s machines file" % (hostname, self.id)
2004 file_path = os.path.join(self.job_dir, '.machines')
2005 mf = open(file_path, 'a')
2006 mf.write("%s\n" % queue_entry.get_host().hostname)
2007 mf.close()
mbligh36768f02008-02-22 18:28:33 +00002008
2009
jadmanski0afbb632008-06-06 21:10:57 +00002010 def create_results_dir(self, queue_entry=None):
2011 print "create: active: %s complete %s" % (self.num_active(),
2012 self.num_complete())
mbligh36768f02008-02-22 18:28:33 +00002013
jadmanski0afbb632008-06-06 21:10:57 +00002014 if not os.path.exists(self.job_dir):
2015 os.makedirs(self.job_dir)
mbligh36768f02008-02-22 18:28:33 +00002016
jadmanski0afbb632008-06-06 21:10:57 +00002017 if queue_entry:
2018 return queue_entry.results_dir()
2019 return self.job_dir
mbligh36768f02008-02-22 18:28:33 +00002020
2021
showardb2e2c322008-10-14 17:33:55 +00002022 def _write_control_file(self):
2023 'Writes control file out to disk, returns a filename'
2024 control_fd, control_filename = tempfile.mkstemp(suffix='.control_file')
2025 control_file = os.fdopen(control_fd, 'w')
jadmanski0afbb632008-06-06 21:10:57 +00002026 if self.control_file:
showardb2e2c322008-10-14 17:33:55 +00002027 control_file.write(self.control_file)
2028 control_file.close()
2029 return control_filename
mbligh36768f02008-02-22 18:28:33 +00002030
showardb2e2c322008-10-14 17:33:55 +00002031
2032 def _get_job_tag(self, queue_entries):
2033 base_job_tag = "%s-%s" % (self.id, self.owner)
2034 if self.is_synchronous() or self.num_machines() == 1:
2035 return base_job_tag
jadmanski0afbb632008-06-06 21:10:57 +00002036 else:
showardb2e2c322008-10-14 17:33:55 +00002037 return base_job_tag + '/' + queue_entries[0].get_host().hostname
2038
2039
2040 def _get_autoserv_params(self, queue_entries):
2041 results_dir = self.create_results_dir(queue_entries[0])
2042 control_filename = self._write_control_file()
jadmanski0afbb632008-06-06 21:10:57 +00002043 hostnames = ','.join([entry.get_host().hostname
2044 for entry in queue_entries])
showardb2e2c322008-10-14 17:33:55 +00002045 job_tag = self._get_job_tag(queue_entries)
mbligh36768f02008-02-22 18:28:33 +00002046
showardb2e2c322008-10-14 17:33:55 +00002047 params = [_autoserv_path, '-P', job_tag, '-p', '-n',
jadmanski0afbb632008-06-06 21:10:57 +00002048 '-r', os.path.abspath(results_dir),
2049 '-b', '-u', self.owner, '-l', self.name,
showardb2e2c322008-10-14 17:33:55 +00002050 '-m', hostnames, control_filename]
mbligh36768f02008-02-22 18:28:33 +00002051
jadmanski0afbb632008-06-06 21:10:57 +00002052 if not self.is_server_job():
2053 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002054
showardb2e2c322008-10-14 17:33:55 +00002055 return params
mblighe2586682008-02-29 22:45:46 +00002056
mbligh36768f02008-02-22 18:28:33 +00002057
showardb2e2c322008-10-14 17:33:55 +00002058 def _run_synchronous(self, queue_entry):
2059 if not self.is_ready():
2060 return Agent([VerifySynchronousTask(queue_entry=queue_entry,
2061 run_verify=self.run_verify)],
2062 [queue_entry.id])
mbligh36768f02008-02-22 18:28:33 +00002063
showardb2e2c322008-10-14 17:33:55 +00002064 queue_entry.set_status('Starting')
jadmanski0afbb632008-06-06 21:10:57 +00002065
showardb2e2c322008-10-14 17:33:55 +00002066 return self._finish_run(self.get_host_queue_entries())
2067
2068
2069 def _run_asynchronous(self, queue_entry):
2070 # TODO(showard): this is of questionable necessity, but in the interest
2071 # of lowering risk, I'm leaving it in for now
2072 assert queue_entry
2073
2074 initial_tasks = [VerifyTask(queue_entry, run_verify=self.run_verify)]
2075 return self._finish_run([queue_entry], initial_tasks)
2076
2077
2078 def _finish_run(self, queue_entries, initial_tasks=[]):
2079 params = self._get_autoserv_params(queue_entries)
2080 queue_task = QueueTask(job=self, queue_entries=queue_entries,
2081 cmd=params)
2082 tasks = initial_tasks + [queue_task]
2083 entry_ids = [entry.id for entry in queue_entries]
2084
2085 return Agent(tasks, entry_ids, num_processes=len(queue_entries))
2086
2087
2088 def run(self, queue_entry):
2089 if self.is_synchronous():
2090 return self._run_synchronous(queue_entry)
2091 return self._run_asynchronous(queue_entry)
mbligh36768f02008-02-22 18:28:33 +00002092
2093
2094if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002095 main()