blob: f89c8462d8450941218cc725d8b3161029d8ef92 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
6__author__ = "Paul Turner <pjt@google.com>"
7
8import os, sys, tempfile, shutil, MySQLdb, time, traceback, subprocess, Queue
9import optparse, signal, smtplib, socket
mblighb090f142008-02-27 21:33:46 +000010from common import global_config
11
mbligh36768f02008-02-22 18:28:33 +000012RESULTS_DIR = '.'
13AUTOSERV_NICE_LEVEL = 10
14
15AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
16
17if os.environ.has_key('AUTOTEST_DIR'):
18 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
19AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
20AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
21
22if AUTOTEST_SERVER_DIR not in sys.path:
23 sys.path.insert(0, AUTOTEST_SERVER_DIR)
24
mbligh6f8bab42008-02-29 22:45:14 +000025_db = None
mbligh36768f02008-02-22 18:28:33 +000026_shutdown = False
27_notify_email = None
mbligh4314a712008-02-29 22:44:30 +000028_autoserv_path = 'autoserv'
29_testing_mode = False
mbligh36768f02008-02-22 18:28:33 +000030
31
32def main():
33 usage = 'usage: %prog [options] results_dir'
34
35 parser = optparse.OptionParser(usage)
36 parser.add_option('--no-recover', help='Skip machine/job recovery ' +
37 'step [for multiple monitors/rolling upgrades]',
38 action='store_true')
39 parser.add_option('--logfile', help='Set a log file that all stdout ' +
40 'should be redirected to. Stderr will go to this ' +
41 'file + ".err"')
42 parser.add_option('--notify', help='Set an email address to be ' +
43 'notified of exceptions')
mbligh4314a712008-02-29 22:44:30 +000044 parser.add_option('--test', help='Indicate that scheduler is under ' +
45 'test and should use dummy autoserv and no parsing',
46 action='store_true')
mbligh36768f02008-02-22 18:28:33 +000047 (options, args) = parser.parse_args()
48 if len(args) != 1:
49 parser.print_usage()
50 return
51
52 global RESULTS_DIR
53 RESULTS_DIR = args[0]
54
55 global _notify_email
56 _notify_email = options.notify
mbligh4314a712008-02-29 22:44:30 +000057
58 if options.test:
59 global _autoserv_path
60 _autoserv_path = 'autoserv_dummy'
61 global _testing_mode
62 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +000063
64 init(options.logfile)
65 dispatcher = Dispatcher(do_recover = not options.no_recover)
66
67 try:
68 while not _shutdown:
69 dispatcher.tick()
70 time.sleep(20)
71 dispatcher.shut_down()
72 except:
73 log_stacktrace("Uncaught exception; terminating monitor_db")
74
mbligh6f8bab42008-02-29 22:45:14 +000075 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +000076
77
78def handle_sigint(signum, frame):
79 global _shutdown
80 _shutdown = True
81 print "Shutdown request received."
82
83
84def init(logfile):
85 if logfile:
86 enable_logging(logfile)
87 print "%s> dispatcher starting" % time.strftime("%X %x")
88 print "My PID is %d" % os.getpid()
89
90 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
mbligh6f8bab42008-02-29 22:45:14 +000091 global _db
92 _db = DatabaseConn()
mbligh36768f02008-02-22 18:28:33 +000093
94 print "Setting signal handler"
95 signal.signal(signal.SIGINT, handle_sigint)
96
97 print "Connected! Running..."
98
99
100def enable_logging(logfile):
101 out_file = logfile
102 err_file = "%s.err" % logfile
103 print "Enabling logging to %s (%s)" % (out_file, err_file)
104 out_fd = open(out_file, "a", buffering=0)
105 err_fd = open(err_file, "a", buffering=0)
106
107 os.dup2(out_fd.fileno(), sys.stdout.fileno())
108 os.dup2(err_fd.fileno(), sys.stderr.fileno())
109
110 sys.stdout = out_fd
111 sys.stderr = err_fd
112
113
114def idle_hosts():
mbligh6f8bab42008-02-29 22:45:14 +0000115 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000116 SELECT * FROM hosts h WHERE
117 id NOT IN (SELECT host_id FROM host_queue_entries WHERE active) AND (
118 (id IN (SELECT host_id FROM host_queue_entries WHERE not complete AND not active))
119 OR
120 (id IN (SELECT DISTINCT hl.host_id FROM host_queue_entries hqe
121 INNER JOIN hosts_labels hl ON hqe.meta_host=hl.label_id WHERE not hqe.complete AND not hqe.active))
122 )
123 AND locked=false AND (h.status IS null OR h.status='Ready') """)
mbligh6f8bab42008-02-29 22:45:14 +0000124 hosts = [Host(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000125 return hosts
126
127
mbligh6f8bab42008-02-29 22:45:14 +0000128class DatabaseConn:
129 def __init__(self):
130 self.reconnect_wait = 20
131 self.conn = None
132 self.cur = None
mbligh36768f02008-02-22 18:28:33 +0000133
mbligh6f8bab42008-02-29 22:45:14 +0000134 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000135
136
mbligh6f8bab42008-02-29 22:45:14 +0000137 def connect(self):
138 self.disconnect()
139
140 # get global config and parse for info
141 c = global_config.global_config
142 dbase = "AUTOTEST_WEB"
143 DB_HOST = c.get_config_value(dbase, "host", "localhost")
144 DB_SCHEMA = c.get_config_value(dbase, "database",
145 "autotest_web")
146
147 global _testing_mode
148 if _testing_mode:
149 DB_SCHEMA = 'stresstest_autotest_web'
150
151 DB_USER = c.get_config_value(dbase, "user", "autotest")
152 DB_PASS = c.get_config_value(dbase, "password", "google")
153
154 while not self.conn:
155 try:
156 self.conn = MySQLdb.connect(host=DB_HOST,
157 user=DB_USER,
158 passwd=DB_PASS,
159 db=DB_SCHEMA)
160
161 self.conn.autocommit(True)
162 self.cur = self.conn.cursor()
163 except MySQLdb.OperationalError:
164 #traceback.print_exc()
165 print "Can't connect to MYSQL; reconnecting"
166 time.sleep(self.reconnect_wait)
167 self.disconnect()
168
169
170 def disconnect(self):
171 if self.conn:
172 self.conn.close()
173 self.conn = None
174 self.cur = None
175
176
177 def execute(self, *args, **dargs):
178 while (True):
179 try:
180 self.cur.execute(*args, **dargs)
181 return self.cur.fetchall()
182 except MySQLdb.OperationalError:
183 print "MYSQL connection died; reconnecting"
184 time.sleep(self.reconnect_wait)
185 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000186
187
188def parse_results(results_dir, flags=""):
mbligh4314a712008-02-29 22:44:30 +0000189 global _testing_mode
190 if _testing_mode:
191 return
mbligh36768f02008-02-22 18:28:33 +0000192 parse = os.path.join(AUTOTEST_TKO_DIR, 'parse')
193 output = os.path.join(results_dir, '.parse.log')
194 os.system("%s %s -r -o %s > %s 2>&1 &" % (parse, flags, results_dir, output))
195
196
197def log_stacktrace(reason):
198 (type, value, tb) = sys.exc_info()
199 str = "EXCEPTION: %s\n" % reason
200 str += "%s / %s / %s\n" % (socket.gethostname(), os.getpid(),
201 time.strftime("%X %x"))
202 str += ''.join(traceback.format_exception(type, value, tb))
203
204 sys.stderr.write("\n%s\n" % str)
205
206 if _notify_email:
207 sender = "monitor_db"
208 subject = "monitor_db exception"
209 msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (
210 sender, _notify_email, subject, str)
211 mailer = smtplib.SMTP('localhost')
212 mailer.sendmail(sender, _notify_email, msg)
213 mailer.quit()
214
215
216class Dispatcher:
217 def __init__(self, do_recover=True):
218 self._agents = []
219 self.shutting_down = False
220
221 if do_recover:
222 self._recover_lost()
223
224
225 def shut_down(self):
226 print "Shutting down!"
227 self.shutting_down = True
228 while self._agents:
229 self.tick()
230 time.sleep(40)
231
232
233 def tick(self):
234 if not self.shutting_down:
235 self._find_more_work()
236 self._handle_agents()
237
238
239 def add_agent(self, agent):
240 self._agents.append(agent)
241 agent.dispatcher = self
242
243
244 def _recover_lost(self):
mbligh6f8bab42008-02-29 22:45:14 +0000245 rows = _db.execute("""SELECT * FROM host_queue_entries WHERE active AND NOT complete""")
246 if len(rows) > 0:
247 queue_entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000248 for queue_entry in queue_entries:
249 job = queue_entry.job
250 if job.is_synchronous():
251 for child_entry in job.get_host_queue_entries():
252 child_entry.requeue()
253 else:
254 queue_entry.requeue()
255 queue_entry.clear_results_dir()
256
mbligh6f8bab42008-02-29 22:45:14 +0000257 rows = _db.execute("""SELECT * FROM hosts
mbligh36768f02008-02-22 18:28:33 +0000258 WHERE status != 'Ready' AND NOT locked""")
mbligh6f8bab42008-02-29 22:45:14 +0000259 if len(rows) > 0:
260 hosts = [Host(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000261 for host in hosts:
262 verify_task = VerifyTask(host = host)
263 self.add_agent(Agent(tasks = [verify_task]))
264
265
266 def _find_more_work(self):
267 print "finding work"
268
269 num_started = 0
270 for host in idle_hosts():
271 tasks = host.next_queue_entries()
272 if tasks:
273 for next in tasks:
274 try:
275 agent = next.run(assigned_host=host)
276 if agent:
277 self.add_agent(agent)
278
279 num_started += 1
280 if num_started>=100:
281 return
282 break
283 except:
284 next.set_status('Failed')
285
286# if next.host:
287# next.host.set_status('Ready')
288
289 log_stacktrace("task_id = %d" % next.id)
290
291
292 def _handle_agents(self):
293 still_running = []
294 for agent in self._agents:
295 agent.tick()
296 if not agent.is_done():
297 still_running.append(agent)
298 else:
299 print "agent finished"
300 self._agents = still_running
301
302
303class RunMonitor(object):
304 def __init__(self, cmd, nice_level = None, log_file = None):
305 self.nice_level = nice_level
306 self.log_file = log_file
307 self.proc = self.run(cmd)
308
309 def run(self, cmd):
310 if self.nice_level:
311 nice_cmd = ['nice','-n', str(self.nice_level)]
312 nice_cmd.extend(cmd)
313 cmd = nice_cmd
314
315 out_file = None
316 if self.log_file:
317 try:
318 out_file = open(self.log_file, 'a')
319 out_file.write("\n%s\n" % ('*'*80))
320 out_file.write("%s> %s\n" % (time.strftime("%X %x"), cmd))
321 out_file.write("%s\n" % ('*'*80))
322 except:
323 pass
324
325 if not out_file:
326 out_file = open('/dev/null', 'w')
327
328 in_devnull = open('/dev/null', 'r')
329 print "cmd = %s" % cmd
330 print "path = %s" % os.getcwd()
331
332 proc = subprocess.Popen(cmd, stdout=out_file,
333 stderr=subprocess.STDOUT, stdin=in_devnull)
334 out_file.close()
335 in_devnull.close()
336 return proc
337
338
339 def kill(self):
340 self.proc.kill()
341
342
343 def exit_code(self):
344 return self.proc.poll()
345
346
347class Agent(object):
348 def __init__(self, tasks):
349 self.active_task = None
350 self.queue = Queue.Queue(0)
351 self.dispatcher = None
352
353 for task in tasks:
354 self.add_task(task)
355
356
357 def add_task(self, task):
358 self.queue.put_nowait(task)
359 task.agent = self
360
361
362 def tick(self):
363 print "agent tick"
364 if self.active_task and not self.active_task.is_done():
365 self.active_task.poll()
366 else:
367 self._next_task();
368
369
370 def _next_task(self):
371 print "agent picking task"
372 if self.active_task:
373 assert self.active_task.is_done()
374
375 if not self.active_task.success and self.active_task.failure_tasks:
376 self.queue = Queue.Queue(0)
377 for task in self.active_task.failure_tasks:
378 self.add_task(task)
379
380 self.active_task = None
381 if not self.is_done():
382 self.active_task = self.queue.get_nowait()
383 if self.active_task:
384 self.active_task.start()
385
386
387 def is_done(self):
388 return self.active_task == None and self.queue.empty()
389
390
391 def start(self):
392 assert self.dispatcher
393
394 self._next_task()
395
396
397class AgentTask(object):
398 def __init__(self, cmd, failure_tasks = None):
399 self.done = False
400 self.failure_tasks = failure_tasks
401 self.started = False
402 self.cmd = cmd
403 self.agent = None
404
405
406 def poll(self):
407 print "poll"
408 if hasattr(self, 'monitor'):
409 self.tick(self.monitor.exit_code())
410 else:
411 self.finished(False)
412
413
414 def tick(self, exit_code):
415 if exit_code==None:
416 return
417# print "exit_code was %d" % exit_code
418 if exit_code == 0:
419 success = True
420 else:
421 success = False
422
423 self.finished(success)
424
425
426 def is_done(self):
427 return self.done
428
429
430 def finished(self, success):
431 self.done = True
432 self.success = success
433 self.epilog()
434
435
436 def prolog(self):
437 pass
438
439
440 def epilog(self):
441 pass
442
443
444 def start(self):
445 assert self.agent
446
447 if not self.started:
448 self.prolog()
449 self.run()
450
451 self.started = True
452
453
454 def abort(self):
455 self.monitor.kill()
456
457
458 def run(self):
459 if self.cmd:
460 print "agent starting monitor"
461
462 log_file = None
463 if hasattr(self, 'host'):
464 log_file = os.path.join(os.path.join(RESULTS_DIR, 'hosts'), self.host.hostname)
465
466 self.monitor = RunMonitor(self.cmd, nice_level = AUTOSERV_NICE_LEVEL, log_file = log_file)
467
468
469class RepairTask(AgentTask):
470 def __init__(self, host):
mbligh4314a712008-02-29 22:44:30 +0000471 global _autoserv_path
472 cmd = [_autoserv_path ,'-n', '-R', '-m', host.hostname]
mbligh36768f02008-02-22 18:28:33 +0000473 self.host = host
474 AgentTask.__init__(self, cmd)
475
476 def prolog(self):
477 print "repair_task starting"
478 self.host.set_status('Repairing')
479
480
481 def epilog(self):
482 if self.success:
483 status = 'Ready'
484 else:
485 status = 'Repair Failed'
486
487 self.host.set_status(status)
488
489
490class VerifyTask(AgentTask):
491 def __init__(self, queue_entry=None, host=None):
492 assert bool(queue_entry) != bool(host)
493
494 self.host = host or queue_entry.host
495 self.queue_entry = queue_entry
496
497 self.temp_results_dir = tempfile.mkdtemp(suffix='.verify')
mbligh4314a712008-02-29 22:44:30 +0000498 global _autoserv_path
499 cmd = [_autoserv_path,'-n', '-v','-m',self.host.hostname,
mbligh36768f02008-02-22 18:28:33 +0000500 '-r', self.temp_results_dir]
501
502 AgentTask.__init__(self, cmd, failure_tasks = [RepairTask(self.host)])
503
504
505 def prolog(self):
506 print "starting verify on %s" % (self.host.hostname)
507 if self.queue_entry:
508 self.queue_entry.set_status('Verifying')
509 self.host.set_status('Verifying')
510
511
512 def epilog(self):
513 if self.queue_entry and (self.success or
514 not self.queue_entry.meta_host):
515 self.move_results()
516 else:
517 shutil.rmtree(self.temp_results_dir)
518
519 if self.success:
520 status = 'Ready'
521 else:
522 status = 'Failed Verify'
523 if self.queue_entry:
524 if self.queue_entry.meta_host:
525 self.host.yield_work()
526 else:
527 self.queue_entry.set_status('Failed')
528
529 self.host.set_status(status)
530
531
532 def move_results(self):
533 assert self.queue_entry is not None
534 target_dir = self.queue_entry.results_dir()
535 if self.queue_entry.job.is_synchronous():
536 target_dir = os.path.join(target_dir,
537 self.queue_entry.host.hostname)
538 if not os.path.exists(target_dir):
539 os.makedirs(target_dir)
540 files = os.listdir(self.temp_results_dir)
541 for filename in files:
542 shutil.move(os.path.join(self.temp_results_dir,
543 filename),
544 os.path.join(target_dir, filename))
545
546
547class VerifySynchronousTask(VerifyTask):
548 def __init__(self, queue_entry):
549 VerifyTask.__init__(self, queue_entry = queue_entry)
550
551
552 def epilog(self):
553 VerifyTask.epilog(self)
554 print "verify_synchronous finished: %s/%s" % (self.queue_entry.host.hostname, self.success)
555 if self.success:
556 if self.queue_entry.job.num_complete()==0:
557 self.queue_entry.set_status('Pending')
558
559 job = self.queue_entry.job
560 if job.is_ready():
561 self.agent.dispatcher.add_agent(job.run(self.queue_entry))
562 else:
563 self.queue_entry.set_status('Stopped') # some other entry failed verify
mbligh8ce2c4a2008-02-29 22:44:53 +0000564 else:
mbligh36768f02008-02-22 18:28:33 +0000565 if self.queue_entry.meta_host:
566 self.queue_entry.set_status('Queued')
567 else:
mbligh8ce2c4a2008-02-29 22:44:53 +0000568 # VerifyTask.epilog() set this queue entry to
569 # failed, so it won't be set to stopped
570 self.stop_all_entries()
571
572
573 def stop_all_entries(self):
574 job = self.queue_entry.job
575 for child_entry in job.get_host_queue_entries():
576 if not child_entry.complete:
577 child_entry.set_status('Stopped')
mbligh36768f02008-02-22 18:28:33 +0000578
579
580class QueueTask(AgentTask):
581 def __init__(self, job, queue_entries, cmd):
582 AgentTask.__init__(self, cmd)
583 self.job = job
584 self.queue_entries = queue_entries
585
586
mbligh4314a712008-02-29 22:44:30 +0000587 @staticmethod
588 def _write_keyval(queue_entry, field, value):
mbligh36768f02008-02-22 18:28:33 +0000589 key_path = os.path.join(queue_entry.results_dir(), 'keyval')
590 keyval_file = open(key_path, 'a')
591 print >> keyval_file, '%s=%d' % (field, value)
592 keyval_file.close()
593
594
595 def prolog(self):
596 for queue_entry in self.queue_entries:
597 print "starting queue_task on %s/%s" % (queue_entry.host.hostname, queue_entry.id)
598 queue_entry.set_status('Running')
599 queue_entry.host.set_status('Running')
600 # write some job timestamps into the job keyval file
601 queued = time.mktime(self.job.created_on.timetuple())
602 started = time.time()
603 self._write_keyval(queue_entry, "job_queued", queued)
604 self._write_keyval(queue_entry, "job_started", started)
605
606
607 def epilog(self):
608 if self.success:
609 status = 'Completed'
610 else:
611 status = 'Failed'
612
613 for queue_entry in self.queue_entries:
614 queue_entry.set_status(status)
615 queue_entry.host.set_status('Ready')
mbligh4314a712008-02-29 22:44:30 +0000616 # write another timestamp into the job keyval file
mbligh36768f02008-02-22 18:28:33 +0000617 finished = time.time()
618 self._write_keyval(queue_entry, "job_finished",
619 finished)
620
621 if self.job.is_synchronous() or self.job.num_machines()==1:
622 if self.job.is_finished():
623 parse_results(self.job.results_dir())
624 else:
625 for queue_entry in self.queue_entries:
626 parse_results(queue_entry.results_dir(), flags='-l 2')
627
628 print "queue_task finished with %s/%s" % (status, self.success)
629
630
631class RebootTask(AgentTask):
632 def __init__(self):
633 AgentTask.__init__(self, host)
mbligh4314a712008-02-29 22:44:30 +0000634 global _autoserv_path
635 self.cmd = "%s -n -b -m %s /dev/null" % (_autoserv_path, host)
mbligh36768f02008-02-22 18:28:33 +0000636 self.host = host
637
638
639 def tick(self, exit_code):
640 raise "not implemented"
641
642
643 def run(self):
644 raise "not implemented"
645
646
647
648class DBObject(object):
mbligh4314a712008-02-29 22:44:30 +0000649 def __init__(self, table, fields, id=None, row=None, new_record=False):
mbligh36768f02008-02-22 18:28:33 +0000650 assert (bool(id) != bool(row)) and table and fields
651
652 self.__table = table
653 self.__fields = fields
654
655 self.__new_record = new_record
656
657 if row is None:
658 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
mbligh6f8bab42008-02-29 22:45:14 +0000659 rows = _db.execute(sql, (id,))
660 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +0000661 raise "row not found (table=%s, id=%s)" % \
662 (self.__table, id)
mbligh6f8bab42008-02-29 22:45:14 +0000663 row = rows[0]
mbligh36768f02008-02-22 18:28:33 +0000664
665 assert len(row)==len(fields), "table = %s, row = %s/%d, fields = %s/%d" % (table, row, len(row), fields, len(fields))
666
667 self.__valid_fields = {}
668 for i,value in enumerate(row):
669 self.__dict__[fields[i]] = value
670 self.__valid_fields[fields[i]] = True
671
672 del self.__valid_fields['id']
673
674 def count(self, where, table = None):
675 if not table:
676 table = self.__table
mbligh4314a712008-02-29 22:44:30 +0000677
mbligh6f8bab42008-02-29 22:45:14 +0000678 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000679 SELECT count(*) FROM %s
680 WHERE %s
681 """ % (table, where))
mbligh36768f02008-02-22 18:28:33 +0000682
mbligh6f8bab42008-02-29 22:45:14 +0000683 assert len(rows) == 1
684
685 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +0000686
687
688 def num_cols(self):
689 return len(self.__fields)
690
691
692 def update_field(self, field, value):
693 assert self.__valid_fields[field]
694
695 if self.__dict__[field] == value:
696 return
697
698 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % \
699 (self.__table, field)
mbligh6f8bab42008-02-29 22:45:14 +0000700 _db.execute(query, (value, self.id))
mbligh36768f02008-02-22 18:28:33 +0000701
702 self.__dict__[field] = value
703
704
705 def save(self):
706 if self.__new_record:
707 keys = self.__fields[1:] # avoid id
708 columns = ','.join([str(key) for key in keys])
709 values = ['"%s"' % self.__dict__[key] for key in keys]
710 values = ','.join(values)
711 query = """INSERT INTO %s (%s) VALUES (%s)""" % \
712 (self.__table, columns, values)
mbligh6f8bab42008-02-29 22:45:14 +0000713 _db.execute(query)
mbligh36768f02008-02-22 18:28:33 +0000714
715
mbligh36768f02008-02-22 18:28:33 +0000716
717class IneligibleHostQueue(DBObject):
718 def __init__(self, id=None, row=None, new_record=None):
719 fields = ['id', 'job_id', 'host_id']
720 DBObject.__init__(self, 'ineligible_host_queues', fields,
721 id=id, row=row, new_record=new_record)
722
723
724class Host(DBObject):
725 def __init__(self, id=None, row=None):
726 fields = ['id', 'hostname', 'locked', 'synch_id','status']
727 DBObject.__init__(self, 'hosts',fields, id=id, row=row)
728
729
730
731 def current_task(self):
mbligh6f8bab42008-02-29 22:45:14 +0000732 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000733 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
734 """, (self.id,))
735
mbligh6f8bab42008-02-29 22:45:14 +0000736 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +0000737 return None
738 else:
mbligh6f8bab42008-02-29 22:45:14 +0000739 assert len(rows) == 1
740 results = rows[0];
mbligh36768f02008-02-22 18:28:33 +0000741# print "current = %s" % results
742 return HostQueueEntry(row=results)
743
744
745 def next_queue_entries(self):
746 if self.locked:
747 print "%s locked, not queuing" % self.hostname
748 return None
749# print "%s/%s looking for work" % (self.hostname, self.platform_id)
mbligh6f8bab42008-02-29 22:45:14 +0000750 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000751 SELECT * FROM host_queue_entries
752 WHERE ((host_id=%s) OR (meta_host IS NOT null AND
753 (meta_host IN (
754 SELECT label_id FROM hosts_labels WHERE host_id=%s
755 )
756 )
757 AND job_id NOT IN (
758 SELECT job_id FROM ineligible_host_queues
759 WHERE host_id=%s
760 )))
761 AND NOT complete AND NOT active
762 ORDER BY priority DESC, meta_host, id
763 LIMIT 1
764 """, (self.id,self.id, self.id))
765
mbligh6f8bab42008-02-29 22:45:14 +0000766 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +0000767 return None
768 else:
mbligh6f8bab42008-02-29 22:45:14 +0000769 return [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000770
771 def yield_work(self):
772 print "%s yielding work" % self.hostname
773 if self.current_task():
774 self.current_task().requeue()
775
776 def set_status(self,status):
777 self.update_field('status',status)
778
779
780class HostQueueEntry(DBObject):
781 def __init__(self, id=None, row=None):
782 assert id or row
783 fields = ['id', 'job_id', 'host_id', 'priority', 'status',
784 'meta_host', 'active', 'complete']
785 DBObject.__init__(self, 'host_queue_entries', fields, id=id,
786 row=row)
787
788 self.job = Job(self.job_id)
789
790 if self.host_id:
791 self.host = Host(self.host_id)
792 else:
793 self.host = None
794
795 self.queue_log_path = os.path.join(self.job.results_dir(),
796 'queue.log.' + str(self.id))
797
798
799 def set_host(self, host):
800 if host:
801 self.queue_log_record('Assigning host ' + host.hostname)
802 self.update_field('host_id', host.id)
803 self.update_field('active', True)
804 else:
805 self.queue_log_record('Releasing host')
806 self.update_field('host_id', None)
807
808 self.host = host
809
810
811 def get_host(self):
812 if not self.host:
813 if self.host_id:
814 self.host = Host(self.host_id)
815 if self.host:
816 return self.host
817 else:
818 return None
819
820
821 def queue_log_record(self, log_line):
822 queue_log = open(self.queue_log_path, 'a', 0)
823 queue_log.write(log_line + '\n')
824 queue_log.close()
825
826
827 def results_dir(self):
828 if self.job.num_machines()==1 or self.job.is_synchronous():
829 results_dir = self.job.job_dir
830 else:
831 assert self.host
832 results_dir = '%s/%s' % (self.job.job_dir,
833 self.host.hostname)
834
835 return results_dir
836
837
838 def set_status(self, status):
839 self.update_field('status', status)
840 if self.host:
841 hostname = self.host.hostname
842 else:
843 hostname = 'no host'
844 print "%s/%d status -> %s" % (hostname, self.id, self.status)
845 if status in ['Queued']:
846 self.update_field('complete', False)
847 self.update_field('active', False)
848
849 if status in ['Pending', 'Running', 'Verifying', 'Starting']:
850 self.update_field('complete', False)
851 self.update_field('active', True)
852
853 if status in ['Failed', 'Completed', 'Stopped']:
854 self.update_field('complete', True)
855 self.update_field('active', False)
856
857
858 def run(self,assigned_host=None):
859 if self.meta_host:
860 assert assigned_host
861 self.job.create_results_dir()
862 self.set_host(assigned_host)
863 print "creating block %s/%s" % (self.job.id,
864 self.host.id)
865
866 row = [0, self.job.id, self.host.id]
867 block = IneligibleHostQueue(row=row, new_record=True)
868 block.save()
869
870 print "%s/%s scheduled on %s, status=%s" % (self.job.name,
871 self.meta_host, self.host.hostname, self.status)
872
873 return self.job.run(queue_entry=self)
874
875 def requeue(self):
876 self.set_status('Queued')
877
878 if self.meta_host:
879 self.set_host(None)
880
881
882 def clear_results_dir(self):
883 if os.path.exists(self.results_dir()):
884 shutil.rmtree(self.results_dir())
885
886
887class Job(DBObject):
888 def __init__(self, id=None, row=None):
889 assert id or row
890 DBObject.__init__(self,'jobs',['id','owner','name','priority','control_file',
mbligh4314a712008-02-29 22:44:30 +0000891 'control_type','created_on', 'synch_type', 'synch_count',
892 'synchronizing' ], id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +0000893
894 self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id, self.owner))
895
896
897 def is_server_job(self):
898 return self.control_type != 2
899
900
901 def get_host_queue_entries(self):
mbligh6f8bab42008-02-29 22:45:14 +0000902 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000903 SELECT * FROM host_queue_entries
904 WHERE job_id= %s
905 """, (self.id,))
mbligh6f8bab42008-02-29 22:45:14 +0000906 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000907
908 assert len(entries)>0
909
910 return entries
911
912
913 def set_status(self, status, update_queues=False):
914 self.update_field('status',status)
915
916 if update_queues:
917 for queue_entry in self.get_host_queue_entries():
918 queue_entry.set_status(status)
919
920
921 def is_synchronous(self):
922 return self.synch_type == 2
923
924
925 def is_ready(self):
926 if not self.is_synchronous():
927 return True
928 sql = "job_id=%s AND status='Pending'" % self.id
929 count = self.count(sql, table='host_queue_entries')
930 return (count == self.synch_count)
931
932
933 def ready_to_synchronize(self):
934 # heuristic
935 queue_entries = self.get_host_queue_entries()
936 count = 0
937 for queue_entry in queue_entries:
938 if queue_entry.status == 'Pending':
939 count += 1
940
941 return (count/self.synch_count >= 0.5)
942
943
944 def start_synchronizing(self):
945 self.update_field('synchronizing', True)
946
947
948 def results_dir(self):
949 return self.job_dir
950
951 def num_machines(self, clause = None):
952 sql = "job_id=%s" % self.id
953 if clause:
954 sql += " AND (%s)" % clause
955 return self.count(sql, table='host_queue_entries')
956
957
958 def num_queued(self):
959 return self.num_machines('not complete')
960
961
962 def num_active(self):
963 return self.num_machines('active')
964
965
966 def num_complete(self):
967 return self.num_machines('complete')
968
969
970 def is_finished(self):
971 left = self.num_queued()
972 print "%s: %s machines left" % (self.name, left)
973 return left==0
974
975 def stop_synchronizing(self):
976 self.update_field('synchronizing', False)
977 self.set_status('Queued', update_queues = False)
978
979
980 def write_machines_file(self):
981 if self.num_machines()>1:
982 print "writing machines file"
983 mf = open("%s/.machines" % self.job_dir, 'w')
984 for queue_entry in self.get_host_queue_entries():
985 if queue_entry.get_host():
986 mf.write("%s\n" % \
987 queue_entry.get_host().hostname)
988 mf.close()
989
990
991 def create_results_dir(self, queue_entry=None):
992 print "create: active: %s complete %s" % (self.num_active(),
993 self.num_complete())
994
995 if not os.path.exists(self.job_dir):
996 os.makedirs(self.job_dir)
997
998 if queue_entry:
999 return queue_entry.results_dir()
1000 return self.job_dir
1001
1002
1003 def run(self, queue_entry):
1004 results_dir = self.create_results_dir(queue_entry)
1005
1006 if self.is_synchronous():
1007 if not self.is_ready():
1008 return Agent([VerifySynchronousTask(queue_entry = queue_entry)])
1009
1010 queue_entry.set_status('Starting')
1011
1012 ctrl = open(os.tmpnam(), 'w')
1013 if self.control_file:
1014 ctrl.write(self.control_file)
1015 else:
1016 ctrl.write("")
1017 ctrl.flush()
1018
1019 if self.is_synchronous():
1020 if self.num_machines() > 1:
1021 self.write_machines_file()
1022 hosts = self.get_host_queue_entries()
1023 hostnames = ','.join([i.host.hostname for i in hosts])
1024 queue_entries = self.get_host_queue_entries()
1025 else:
1026 assert queue_entry
1027 hostnames = queue_entry.host.hostname
1028 queue_entries = [queue_entry]
1029
mbligh4314a712008-02-29 22:44:30 +00001030 global _autoserv_path
1031 params = [_autoserv_path, '-n', '-r', results_dir,
mbligh36768f02008-02-22 18:28:33 +00001032 '-b', '-u', self.owner, '-l', self.name,
1033 '-m', hostnames, ctrl.name]
1034
1035 if not self.is_server_job():
1036 params.append('-c')
1037
1038 tasks = []
1039 if not self.is_synchronous():
1040 tasks.append(VerifyTask(queue_entry))
1041
1042 tasks.append(QueueTask(job = self, queue_entries = queue_entries, cmd = params))
1043
1044 agent = Agent(tasks)
1045
1046 return agent
1047
1048
1049if __name__ == '__main__':
1050 main()