blob: 0a81ddfc4885da3fb7bd414379d190b3d180c76b [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
6__author__ = "Paul Turner <pjt@google.com>"
7
8import os, sys, tempfile, shutil, MySQLdb, time, traceback, subprocess, Queue
9import optparse, signal, smtplib, socket
mblighb090f142008-02-27 21:33:46 +000010from common import global_config
11
mbligh36768f02008-02-22 18:28:33 +000012RESULTS_DIR = '.'
13AUTOSERV_NICE_LEVEL = 10
14
15AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
16
17if os.environ.has_key('AUTOTEST_DIR'):
18 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
19AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
20AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
21
22if AUTOTEST_SERVER_DIR not in sys.path:
23 sys.path.insert(0, AUTOTEST_SERVER_DIR)
24
25_connection = None
26_cursor = None
27_shutdown = False
28_notify_email = None
mbligh4314a712008-02-29 22:44:30 +000029_autoserv_path = 'autoserv'
30_testing_mode = False
mbligh36768f02008-02-22 18:28:33 +000031
32
33def main():
34 usage = 'usage: %prog [options] results_dir'
35
36 parser = optparse.OptionParser(usage)
37 parser.add_option('--no-recover', help='Skip machine/job recovery ' +
38 'step [for multiple monitors/rolling upgrades]',
39 action='store_true')
40 parser.add_option('--logfile', help='Set a log file that all stdout ' +
41 'should be redirected to. Stderr will go to this ' +
42 'file + ".err"')
43 parser.add_option('--notify', help='Set an email address to be ' +
44 'notified of exceptions')
mbligh4314a712008-02-29 22:44:30 +000045 parser.add_option('--test', help='Indicate that scheduler is under ' +
46 'test and should use dummy autoserv and no parsing',
47 action='store_true')
mbligh36768f02008-02-22 18:28:33 +000048 (options, args) = parser.parse_args()
49 if len(args) != 1:
50 parser.print_usage()
51 return
52
53 global RESULTS_DIR
54 RESULTS_DIR = args[0]
55
56 global _notify_email
57 _notify_email = options.notify
mbligh4314a712008-02-29 22:44:30 +000058
59 if options.test:
60 global _autoserv_path
61 _autoserv_path = 'autoserv_dummy'
62 global _testing_mode
63 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +000064
65 init(options.logfile)
66 dispatcher = Dispatcher(do_recover = not options.no_recover)
67
68 try:
69 while not _shutdown:
70 dispatcher.tick()
71 time.sleep(20)
72 dispatcher.shut_down()
73 except:
74 log_stacktrace("Uncaught exception; terminating monitor_db")
75
76 disconnect()
77
78
79def handle_sigint(signum, frame):
80 global _shutdown
81 _shutdown = True
82 print "Shutdown request received."
83
84
85def init(logfile):
86 if logfile:
87 enable_logging(logfile)
88 print "%s> dispatcher starting" % time.strftime("%X %x")
89 print "My PID is %d" % os.getpid()
90
91 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
92 connect()
93
94 print "Setting signal handler"
95 signal.signal(signal.SIGINT, handle_sigint)
96
97 print "Connected! Running..."
98
99
100def enable_logging(logfile):
101 out_file = logfile
102 err_file = "%s.err" % logfile
103 print "Enabling logging to %s (%s)" % (out_file, err_file)
104 out_fd = open(out_file, "a", buffering=0)
105 err_fd = open(err_file, "a", buffering=0)
106
107 os.dup2(out_fd.fileno(), sys.stdout.fileno())
108 os.dup2(err_fd.fileno(), sys.stderr.fileno())
109
110 sys.stdout = out_fd
111 sys.stderr = err_fd
112
113
114def idle_hosts():
115 _cursor.execute("""
116 SELECT * FROM hosts h WHERE
117 id NOT IN (SELECT host_id FROM host_queue_entries WHERE active) AND (
118 (id IN (SELECT host_id FROM host_queue_entries WHERE not complete AND not active))
119 OR
120 (id IN (SELECT DISTINCT hl.host_id FROM host_queue_entries hqe
121 INNER JOIN hosts_labels hl ON hqe.meta_host=hl.label_id WHERE not hqe.complete AND not hqe.active))
122 )
123 AND locked=false AND (h.status IS null OR h.status='Ready') """)
124 hosts = [Host(row=i) for i in _cursor.fetchall()]
125 return hosts
126
127
128def connect():
129 path = os.path.dirname(os.path.abspath(sys.argv[0]))
mblighb090f142008-02-27 21:33:46 +0000130 # get global config and parse for info
131 c = global_config.global_config
132 dbase = "AUTOTEST_WEB"
133 DB_HOST = c.get_config_value(dbase, "host", "localhost")
134 DB_SCHEMA = c.get_config_value(dbase, "database", "autotest_web")
mbligh4314a712008-02-29 22:44:30 +0000135 if _testing_mode:
136 DB_SCHEMA = 'stresstest_autotest_web'
mbligh36768f02008-02-22 18:28:33 +0000137
mblighb090f142008-02-27 21:33:46 +0000138 DB_USER = c.get_config_value(dbase, "user", "autotest")
139 DB_PASS = c.get_config_value(dbase, "password")
mbligh36768f02008-02-22 18:28:33 +0000140
141 global _connection, _cursor
142 _connection = MySQLdb.connect(
143 host=DB_HOST,
144 user=DB_USER,
145 passwd=DB_PASS,
146 db=DB_SCHEMA
147 )
148 _connection.autocommit(True)
149 _cursor = _connection.cursor()
150
151
152def disconnect():
153 global _connection, _cursor
154 _connection.close()
155 _connection = None
156 _cursor = None
157
158
159def parse_results(results_dir, flags=""):
mbligh4314a712008-02-29 22:44:30 +0000160 global _testing_mode
161 if _testing_mode:
162 return
mbligh36768f02008-02-22 18:28:33 +0000163 parse = os.path.join(AUTOTEST_TKO_DIR, 'parse')
164 output = os.path.join(results_dir, '.parse.log')
165 os.system("%s %s -r -o %s > %s 2>&1 &" % (parse, flags, results_dir, output))
166
167
168def log_stacktrace(reason):
169 (type, value, tb) = sys.exc_info()
170 str = "EXCEPTION: %s\n" % reason
171 str += "%s / %s / %s\n" % (socket.gethostname(), os.getpid(),
172 time.strftime("%X %x"))
173 str += ''.join(traceback.format_exception(type, value, tb))
174
175 sys.stderr.write("\n%s\n" % str)
176
177 if _notify_email:
178 sender = "monitor_db"
179 subject = "monitor_db exception"
180 msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (
181 sender, _notify_email, subject, str)
182 mailer = smtplib.SMTP('localhost')
183 mailer.sendmail(sender, _notify_email, msg)
184 mailer.quit()
185
186
187class Dispatcher:
188 def __init__(self, do_recover=True):
189 self._agents = []
190 self.shutting_down = False
191
192 if do_recover:
193 self._recover_lost()
194
195
196 def shut_down(self):
197 print "Shutting down!"
198 self.shutting_down = True
199 while self._agents:
200 self.tick()
201 time.sleep(40)
202
203
204 def tick(self):
205 if not self.shutting_down:
206 self._find_more_work()
207 self._handle_agents()
208
209
210 def add_agent(self, agent):
211 self._agents.append(agent)
212 agent.dispatcher = self
213
214
215 def _recover_lost(self):
216 _cursor.execute("""SELECT * FROM host_queue_entries WHERE active AND NOT complete""")
217 if _cursor.rowcount:
218 queue_entries = [HostQueueEntry(row=i) for i in _cursor.fetchall()]
219 for queue_entry in queue_entries:
220 job = queue_entry.job
221 if job.is_synchronous():
222 for child_entry in job.get_host_queue_entries():
223 child_entry.requeue()
224 else:
225 queue_entry.requeue()
226 queue_entry.clear_results_dir()
227
228 _cursor.execute("""SELECT * FROM hosts
229 WHERE status != 'Ready' AND NOT locked""")
230 if _cursor.rowcount:
231 hosts = [Host(row=i) for i in _cursor.fetchall()]
232 for host in hosts:
233 verify_task = VerifyTask(host = host)
234 self.add_agent(Agent(tasks = [verify_task]))
235
236
237 def _find_more_work(self):
238 print "finding work"
239
240 num_started = 0
241 for host in idle_hosts():
242 tasks = host.next_queue_entries()
243 if tasks:
244 for next in tasks:
245 try:
246 agent = next.run(assigned_host=host)
247 if agent:
248 self.add_agent(agent)
249
250 num_started += 1
251 if num_started>=100:
252 return
253 break
254 except:
255 next.set_status('Failed')
256
257# if next.host:
258# next.host.set_status('Ready')
259
260 log_stacktrace("task_id = %d" % next.id)
261
262
263 def _handle_agents(self):
264 still_running = []
265 for agent in self._agents:
266 agent.tick()
267 if not agent.is_done():
268 still_running.append(agent)
269 else:
270 print "agent finished"
271 self._agents = still_running
272
273
274class RunMonitor(object):
275 def __init__(self, cmd, nice_level = None, log_file = None):
276 self.nice_level = nice_level
277 self.log_file = log_file
278 self.proc = self.run(cmd)
279
280 def run(self, cmd):
281 if self.nice_level:
282 nice_cmd = ['nice','-n', str(self.nice_level)]
283 nice_cmd.extend(cmd)
284 cmd = nice_cmd
285
286 out_file = None
287 if self.log_file:
288 try:
289 out_file = open(self.log_file, 'a')
290 out_file.write("\n%s\n" % ('*'*80))
291 out_file.write("%s> %s\n" % (time.strftime("%X %x"), cmd))
292 out_file.write("%s\n" % ('*'*80))
293 except:
294 pass
295
296 if not out_file:
297 out_file = open('/dev/null', 'w')
298
299 in_devnull = open('/dev/null', 'r')
300 print "cmd = %s" % cmd
301 print "path = %s" % os.getcwd()
302
303 proc = subprocess.Popen(cmd, stdout=out_file,
304 stderr=subprocess.STDOUT, stdin=in_devnull)
305 out_file.close()
306 in_devnull.close()
307 return proc
308
309
310 def kill(self):
311 self.proc.kill()
312
313
314 def exit_code(self):
315 return self.proc.poll()
316
317
318class Agent(object):
319 def __init__(self, tasks):
320 self.active_task = None
321 self.queue = Queue.Queue(0)
322 self.dispatcher = None
323
324 for task in tasks:
325 self.add_task(task)
326
327
328 def add_task(self, task):
329 self.queue.put_nowait(task)
330 task.agent = self
331
332
333 def tick(self):
334 print "agent tick"
335 if self.active_task and not self.active_task.is_done():
336 self.active_task.poll()
337 else:
338 self._next_task();
339
340
341 def _next_task(self):
342 print "agent picking task"
343 if self.active_task:
344 assert self.active_task.is_done()
345
346 if not self.active_task.success and self.active_task.failure_tasks:
347 self.queue = Queue.Queue(0)
348 for task in self.active_task.failure_tasks:
349 self.add_task(task)
350
351 self.active_task = None
352 if not self.is_done():
353 self.active_task = self.queue.get_nowait()
354 if self.active_task:
355 self.active_task.start()
356
357
358 def is_done(self):
359 return self.active_task == None and self.queue.empty()
360
361
362 def start(self):
363 assert self.dispatcher
364
365 self._next_task()
366
367
368class AgentTask(object):
369 def __init__(self, cmd, failure_tasks = None):
370 self.done = False
371 self.failure_tasks = failure_tasks
372 self.started = False
373 self.cmd = cmd
374 self.agent = None
375
376
377 def poll(self):
378 print "poll"
379 if hasattr(self, 'monitor'):
380 self.tick(self.monitor.exit_code())
381 else:
382 self.finished(False)
383
384
385 def tick(self, exit_code):
386 if exit_code==None:
387 return
388# print "exit_code was %d" % exit_code
389 if exit_code == 0:
390 success = True
391 else:
392 success = False
393
394 self.finished(success)
395
396
397 def is_done(self):
398 return self.done
399
400
401 def finished(self, success):
402 self.done = True
403 self.success = success
404 self.epilog()
405
406
407 def prolog(self):
408 pass
409
410
411 def epilog(self):
412 pass
413
414
415 def start(self):
416 assert self.agent
417
418 if not self.started:
419 self.prolog()
420 self.run()
421
422 self.started = True
423
424
425 def abort(self):
426 self.monitor.kill()
427
428
429 def run(self):
430 if self.cmd:
431 print "agent starting monitor"
432
433 log_file = None
434 if hasattr(self, 'host'):
435 log_file = os.path.join(os.path.join(RESULTS_DIR, 'hosts'), self.host.hostname)
436
437 self.monitor = RunMonitor(self.cmd, nice_level = AUTOSERV_NICE_LEVEL, log_file = log_file)
438
439
440class RepairTask(AgentTask):
441 def __init__(self, host):
mbligh4314a712008-02-29 22:44:30 +0000442 global _autoserv_path
443 cmd = [_autoserv_path ,'-n', '-R', '-m', host.hostname]
mbligh36768f02008-02-22 18:28:33 +0000444 self.host = host
445 AgentTask.__init__(self, cmd)
446
447 def prolog(self):
448 print "repair_task starting"
449 self.host.set_status('Repairing')
450
451
452 def epilog(self):
453 if self.success:
454 status = 'Ready'
455 else:
456 status = 'Repair Failed'
457
458 self.host.set_status(status)
459
460
461class VerifyTask(AgentTask):
462 def __init__(self, queue_entry=None, host=None):
463 assert bool(queue_entry) != bool(host)
464
465 self.host = host or queue_entry.host
466 self.queue_entry = queue_entry
467
468 self.temp_results_dir = tempfile.mkdtemp(suffix='.verify')
mbligh4314a712008-02-29 22:44:30 +0000469 global _autoserv_path
470 cmd = [_autoserv_path,'-n', '-v','-m',self.host.hostname,
mbligh36768f02008-02-22 18:28:33 +0000471 '-r', self.temp_results_dir]
472
473 AgentTask.__init__(self, cmd, failure_tasks = [RepairTask(self.host)])
474
475
476 def prolog(self):
477 print "starting verify on %s" % (self.host.hostname)
478 if self.queue_entry:
479 self.queue_entry.set_status('Verifying')
480 self.host.set_status('Verifying')
481
482
483 def epilog(self):
484 if self.queue_entry and (self.success or
485 not self.queue_entry.meta_host):
486 self.move_results()
487 else:
488 shutil.rmtree(self.temp_results_dir)
489
490 if self.success:
491 status = 'Ready'
492 else:
493 status = 'Failed Verify'
494 if self.queue_entry:
495 if self.queue_entry.meta_host:
496 self.host.yield_work()
497 else:
498 self.queue_entry.set_status('Failed')
499
500 self.host.set_status(status)
501
502
503 def move_results(self):
504 assert self.queue_entry is not None
505 target_dir = self.queue_entry.results_dir()
506 if self.queue_entry.job.is_synchronous():
507 target_dir = os.path.join(target_dir,
508 self.queue_entry.host.hostname)
509 if not os.path.exists(target_dir):
510 os.makedirs(target_dir)
511 files = os.listdir(self.temp_results_dir)
512 for filename in files:
513 shutil.move(os.path.join(self.temp_results_dir,
514 filename),
515 os.path.join(target_dir, filename))
516
517
518class VerifySynchronousTask(VerifyTask):
519 def __init__(self, queue_entry):
520 VerifyTask.__init__(self, queue_entry = queue_entry)
521
522
523 def epilog(self):
524 VerifyTask.epilog(self)
525 print "verify_synchronous finished: %s/%s" % (self.queue_entry.host.hostname, self.success)
526 if self.success:
527 if self.queue_entry.job.num_complete()==0:
528 self.queue_entry.set_status('Pending')
529
530 job = self.queue_entry.job
531 if job.is_ready():
532 self.agent.dispatcher.add_agent(job.run(self.queue_entry))
533 else:
534 self.queue_entry.set_status('Stopped') # some other entry failed verify
535 else:
536 if self.queue_entry.meta_host:
537 self.queue_entry.set_status('Queued')
538 else:
539 job = self.queue_entry.job
540 for child_entry in job.get_host_queue_entries():
541 if child_entry.active:
542 child_entry.set_status('Stopped')
543
544
545class QueueTask(AgentTask):
546 def __init__(self, job, queue_entries, cmd):
547 AgentTask.__init__(self, cmd)
548 self.job = job
549 self.queue_entries = queue_entries
550
551
mbligh4314a712008-02-29 22:44:30 +0000552 @staticmethod
553 def _write_keyval(queue_entry, field, value):
mbligh36768f02008-02-22 18:28:33 +0000554 key_path = os.path.join(queue_entry.results_dir(), 'keyval')
555 keyval_file = open(key_path, 'a')
556 print >> keyval_file, '%s=%d' % (field, value)
557 keyval_file.close()
558
559
560 def prolog(self):
561 for queue_entry in self.queue_entries:
562 print "starting queue_task on %s/%s" % (queue_entry.host.hostname, queue_entry.id)
563 queue_entry.set_status('Running')
564 queue_entry.host.set_status('Running')
565 # write some job timestamps into the job keyval file
566 queued = time.mktime(self.job.created_on.timetuple())
567 started = time.time()
568 self._write_keyval(queue_entry, "job_queued", queued)
569 self._write_keyval(queue_entry, "job_started", started)
570
571
572 def epilog(self):
573 if self.success:
574 status = 'Completed'
575 else:
576 status = 'Failed'
577
578 for queue_entry in self.queue_entries:
579 queue_entry.set_status(status)
580 queue_entry.host.set_status('Ready')
mbligh4314a712008-02-29 22:44:30 +0000581 # write another timestamp into the job keyval file
mbligh36768f02008-02-22 18:28:33 +0000582 finished = time.time()
583 self._write_keyval(queue_entry, "job_finished",
584 finished)
585
586 if self.job.is_synchronous() or self.job.num_machines()==1:
587 if self.job.is_finished():
588 parse_results(self.job.results_dir())
589 else:
590 for queue_entry in self.queue_entries:
591 parse_results(queue_entry.results_dir(), flags='-l 2')
592
593 print "queue_task finished with %s/%s" % (status, self.success)
594
595
596class RebootTask(AgentTask):
597 def __init__(self):
598 AgentTask.__init__(self, host)
mbligh4314a712008-02-29 22:44:30 +0000599 global _autoserv_path
600 self.cmd = "%s -n -b -m %s /dev/null" % (_autoserv_path, host)
mbligh36768f02008-02-22 18:28:33 +0000601 self.host = host
602
603
604 def tick(self, exit_code):
605 raise "not implemented"
606
607
608 def run(self):
609 raise "not implemented"
610
611
612
613class DBObject(object):
mbligh4314a712008-02-29 22:44:30 +0000614 def __init__(self, table, fields, id=None, row=None, new_record=False):
mbligh36768f02008-02-22 18:28:33 +0000615 assert (bool(id) != bool(row)) and table and fields
616
617 self.__table = table
618 self.__fields = fields
619
620 self.__new_record = new_record
621
622 if row is None:
623 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
624 _cursor.execute(sql, (id,))
625 if not _cursor.rowcount:
626 raise "row not found (table=%s, id=%s)" % \
627 (self.__table, id)
628 row = _cursor.fetchone()
629
630 assert len(row)==len(fields), "table = %s, row = %s/%d, fields = %s/%d" % (table, row, len(row), fields, len(fields))
631
632 self.__valid_fields = {}
633 for i,value in enumerate(row):
634 self.__dict__[fields[i]] = value
635 self.__valid_fields[fields[i]] = True
636
637 del self.__valid_fields['id']
638
639 def count(self, where, table = None):
640 if not table:
641 table = self.__table
mbligh4314a712008-02-29 22:44:30 +0000642
mbligh36768f02008-02-22 18:28:33 +0000643 _cursor.execute("""
644 SELECT count(*) FROM %s
645 WHERE %s
646 """ % (table, where))
647 count = _cursor.fetchall()
648
649 return int(count[0][0])
650
651
652 def num_cols(self):
653 return len(self.__fields)
654
655
656 def update_field(self, field, value):
657 assert self.__valid_fields[field]
658
659 if self.__dict__[field] == value:
660 return
661
662 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % \
663 (self.__table, field)
664 _cursor.execute(query, (value, self.id))
665
666 self.__dict__[field] = value
667
668
669 def save(self):
670 if self.__new_record:
671 keys = self.__fields[1:] # avoid id
672 columns = ','.join([str(key) for key in keys])
673 values = ['"%s"' % self.__dict__[key] for key in keys]
674 values = ','.join(values)
675 query = """INSERT INTO %s (%s) VALUES (%s)""" % \
676 (self.__table, columns, values)
677 _cursor.execute(query)
678
679
680 def delete(self):
681 _cursor.execute("""DELETE FROM %s WHERE id = %%s""" % \
682 self.__table, (self.id,))
683
684
685class IneligibleHostQueue(DBObject):
686 def __init__(self, id=None, row=None, new_record=None):
687 fields = ['id', 'job_id', 'host_id']
688 DBObject.__init__(self, 'ineligible_host_queues', fields,
689 id=id, row=row, new_record=new_record)
690
691
692class Host(DBObject):
693 def __init__(self, id=None, row=None):
694 fields = ['id', 'hostname', 'locked', 'synch_id','status']
695 DBObject.__init__(self, 'hosts',fields, id=id, row=row)
696
697
698
699 def current_task(self):
700 _cursor.execute("""
701 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
702 """, (self.id,))
703
704 if not _cursor.rowcount:
705 return None
706 else:
707 assert _cursor.rowcount == 1
708 results = _cursor.fetchone();
709# print "current = %s" % results
710 return HostQueueEntry(row=results)
711
712
713 def next_queue_entries(self):
714 if self.locked:
715 print "%s locked, not queuing" % self.hostname
716 return None
717# print "%s/%s looking for work" % (self.hostname, self.platform_id)
718 _cursor.execute("""
719 SELECT * FROM host_queue_entries
720 WHERE ((host_id=%s) OR (meta_host IS NOT null AND
721 (meta_host IN (
722 SELECT label_id FROM hosts_labels WHERE host_id=%s
723 )
724 )
725 AND job_id NOT IN (
726 SELECT job_id FROM ineligible_host_queues
727 WHERE host_id=%s
728 )))
729 AND NOT complete AND NOT active
730 ORDER BY priority DESC, meta_host, id
731 LIMIT 1
732 """, (self.id,self.id, self.id))
733
734 if not _cursor.rowcount:
735 return None
736 else:
737 return [HostQueueEntry(row=i) for i in _cursor.fetchall()]
738
739 def yield_work(self):
740 print "%s yielding work" % self.hostname
741 if self.current_task():
742 self.current_task().requeue()
743
744 def set_status(self,status):
745 self.update_field('status',status)
746
747
748class HostQueueEntry(DBObject):
749 def __init__(self, id=None, row=None):
750 assert id or row
751 fields = ['id', 'job_id', 'host_id', 'priority', 'status',
752 'meta_host', 'active', 'complete']
753 DBObject.__init__(self, 'host_queue_entries', fields, id=id,
754 row=row)
755
756 self.job = Job(self.job_id)
757
758 if self.host_id:
759 self.host = Host(self.host_id)
760 else:
761 self.host = None
762
763 self.queue_log_path = os.path.join(self.job.results_dir(),
764 'queue.log.' + str(self.id))
765
766
767 def set_host(self, host):
768 if host:
769 self.queue_log_record('Assigning host ' + host.hostname)
770 self.update_field('host_id', host.id)
771 self.update_field('active', True)
772 else:
773 self.queue_log_record('Releasing host')
774 self.update_field('host_id', None)
775
776 self.host = host
777
778
779 def get_host(self):
780 if not self.host:
781 if self.host_id:
782 self.host = Host(self.host_id)
783 if self.host:
784 return self.host
785 else:
786 return None
787
788
789 def queue_log_record(self, log_line):
790 queue_log = open(self.queue_log_path, 'a', 0)
791 queue_log.write(log_line + '\n')
792 queue_log.close()
793
794
795 def results_dir(self):
796 if self.job.num_machines()==1 or self.job.is_synchronous():
797 results_dir = self.job.job_dir
798 else:
799 assert self.host
800 results_dir = '%s/%s' % (self.job.job_dir,
801 self.host.hostname)
802
803 return results_dir
804
805
806 def set_status(self, status):
807 self.update_field('status', status)
808 if self.host:
809 hostname = self.host.hostname
810 else:
811 hostname = 'no host'
812 print "%s/%d status -> %s" % (hostname, self.id, self.status)
813 if status in ['Queued']:
814 self.update_field('complete', False)
815 self.update_field('active', False)
816
817 if status in ['Pending', 'Running', 'Verifying', 'Starting']:
818 self.update_field('complete', False)
819 self.update_field('active', True)
820
821 if status in ['Failed', 'Completed', 'Stopped']:
822 self.update_field('complete', True)
823 self.update_field('active', False)
824
825
826 def run(self,assigned_host=None):
827 if self.meta_host:
828 assert assigned_host
829 self.job.create_results_dir()
830 self.set_host(assigned_host)
831 print "creating block %s/%s" % (self.job.id,
832 self.host.id)
833
834 row = [0, self.job.id, self.host.id]
835 block = IneligibleHostQueue(row=row, new_record=True)
836 block.save()
837
838 print "%s/%s scheduled on %s, status=%s" % (self.job.name,
839 self.meta_host, self.host.hostname, self.status)
840
841 return self.job.run(queue_entry=self)
842
843 def requeue(self):
844 self.set_status('Queued')
845
846 if self.meta_host:
847 self.set_host(None)
848
849
850 def clear_results_dir(self):
851 if os.path.exists(self.results_dir()):
852 shutil.rmtree(self.results_dir())
853
854
855class Job(DBObject):
856 def __init__(self, id=None, row=None):
857 assert id or row
858 DBObject.__init__(self,'jobs',['id','owner','name','priority','control_file',
mbligh4314a712008-02-29 22:44:30 +0000859 'control_type','created_on', 'synch_type', 'synch_count',
860 'synchronizing' ], id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +0000861
862 self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id, self.owner))
863
864
865 def is_server_job(self):
866 return self.control_type != 2
867
868
869 def get_host_queue_entries(self):
870 _cursor.execute("""
871 SELECT * FROM host_queue_entries
872 WHERE job_id= %s
873 """, (self.id,))
874 entries = [HostQueueEntry(row=i) for i in _cursor.fetchall()]
875
876 assert len(entries)>0
877
878 return entries
879
880
881 def set_status(self, status, update_queues=False):
882 self.update_field('status',status)
883
884 if update_queues:
885 for queue_entry in self.get_host_queue_entries():
886 queue_entry.set_status(status)
887
888
889 def is_synchronous(self):
890 return self.synch_type == 2
891
892
893 def is_ready(self):
894 if not self.is_synchronous():
895 return True
896 sql = "job_id=%s AND status='Pending'" % self.id
897 count = self.count(sql, table='host_queue_entries')
898 return (count == self.synch_count)
899
900
901 def ready_to_synchronize(self):
902 # heuristic
903 queue_entries = self.get_host_queue_entries()
904 count = 0
905 for queue_entry in queue_entries:
906 if queue_entry.status == 'Pending':
907 count += 1
908
909 return (count/self.synch_count >= 0.5)
910
911
912 def start_synchronizing(self):
913 self.update_field('synchronizing', True)
914
915
916 def results_dir(self):
917 return self.job_dir
918
919 def num_machines(self, clause = None):
920 sql = "job_id=%s" % self.id
921 if clause:
922 sql += " AND (%s)" % clause
923 return self.count(sql, table='host_queue_entries')
924
925
926 def num_queued(self):
927 return self.num_machines('not complete')
928
929
930 def num_active(self):
931 return self.num_machines('active')
932
933
934 def num_complete(self):
935 return self.num_machines('complete')
936
937
938 def is_finished(self):
939 left = self.num_queued()
940 print "%s: %s machines left" % (self.name, left)
941 return left==0
942
943 def stop_synchronizing(self):
944 self.update_field('synchronizing', False)
945 self.set_status('Queued', update_queues = False)
946
947
948 def write_machines_file(self):
949 if self.num_machines()>1:
950 print "writing machines file"
951 mf = open("%s/.machines" % self.job_dir, 'w')
952 for queue_entry in self.get_host_queue_entries():
953 if queue_entry.get_host():
954 mf.write("%s\n" % \
955 queue_entry.get_host().hostname)
956 mf.close()
957
958
959 def create_results_dir(self, queue_entry=None):
960 print "create: active: %s complete %s" % (self.num_active(),
961 self.num_complete())
962
963 if not os.path.exists(self.job_dir):
964 os.makedirs(self.job_dir)
965
966 if queue_entry:
967 return queue_entry.results_dir()
968 return self.job_dir
969
970
971 def run(self, queue_entry):
972 results_dir = self.create_results_dir(queue_entry)
973
974 if self.is_synchronous():
975 if not self.is_ready():
976 return Agent([VerifySynchronousTask(queue_entry = queue_entry)])
977
978 queue_entry.set_status('Starting')
979
980 ctrl = open(os.tmpnam(), 'w')
981 if self.control_file:
982 ctrl.write(self.control_file)
983 else:
984 ctrl.write("")
985 ctrl.flush()
986
987 if self.is_synchronous():
988 if self.num_machines() > 1:
989 self.write_machines_file()
990 hosts = self.get_host_queue_entries()
991 hostnames = ','.join([i.host.hostname for i in hosts])
992 queue_entries = self.get_host_queue_entries()
993 else:
994 assert queue_entry
995 hostnames = queue_entry.host.hostname
996 queue_entries = [queue_entry]
997
mbligh4314a712008-02-29 22:44:30 +0000998 global _autoserv_path
999 params = [_autoserv_path, '-n', '-r', results_dir,
mbligh36768f02008-02-22 18:28:33 +00001000 '-b', '-u', self.owner, '-l', self.name,
1001 '-m', hostnames, ctrl.name]
1002
1003 if not self.is_server_job():
1004 params.append('-c')
1005
1006 tasks = []
1007 if not self.is_synchronous():
1008 tasks.append(VerifyTask(queue_entry))
1009
1010 tasks.append(QueueTask(job = self, queue_entries = queue_entries, cmd = params))
1011
1012 agent = Agent(tasks)
1013
1014 return agent
1015
1016
1017if __name__ == '__main__':
1018 main()