blob: ff6d5d916c629c64e01e97ad3eadb9ba472d820a [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
6__author__ = "Paul Turner <pjt@google.com>"
7
8import os, sys, tempfile, shutil, MySQLdb, time, traceback, subprocess, Queue
9import optparse, signal, smtplib, socket
mblighb090f142008-02-27 21:33:46 +000010from common import global_config
11
mbligh36768f02008-02-22 18:28:33 +000012RESULTS_DIR = '.'
13AUTOSERV_NICE_LEVEL = 10
14
15AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
16
17if os.environ.has_key('AUTOTEST_DIR'):
18 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
19AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
20AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
21
22if AUTOTEST_SERVER_DIR not in sys.path:
23 sys.path.insert(0, AUTOTEST_SERVER_DIR)
24
25_connection = None
26_cursor = None
27_shutdown = False
28_notify_email = None
29
30
31def main():
32 usage = 'usage: %prog [options] results_dir'
33
34 parser = optparse.OptionParser(usage)
35 parser.add_option('--no-recover', help='Skip machine/job recovery ' +
36 'step [for multiple monitors/rolling upgrades]',
37 action='store_true')
38 parser.add_option('--logfile', help='Set a log file that all stdout ' +
39 'should be redirected to. Stderr will go to this ' +
40 'file + ".err"')
41 parser.add_option('--notify', help='Set an email address to be ' +
42 'notified of exceptions')
43 (options, args) = parser.parse_args()
44 if len(args) != 1:
45 parser.print_usage()
46 return
47
48 global RESULTS_DIR
49 RESULTS_DIR = args[0]
50
51 global _notify_email
52 _notify_email = options.notify
53
54 init(options.logfile)
55 dispatcher = Dispatcher(do_recover = not options.no_recover)
56
57 try:
58 while not _shutdown:
59 dispatcher.tick()
60 time.sleep(20)
61 dispatcher.shut_down()
62 except:
63 log_stacktrace("Uncaught exception; terminating monitor_db")
64
65 disconnect()
66
67
68def handle_sigint(signum, frame):
69 global _shutdown
70 _shutdown = True
71 print "Shutdown request received."
72
73
74def init(logfile):
75 if logfile:
76 enable_logging(logfile)
77 print "%s> dispatcher starting" % time.strftime("%X %x")
78 print "My PID is %d" % os.getpid()
79
80 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
81 connect()
82
83 print "Setting signal handler"
84 signal.signal(signal.SIGINT, handle_sigint)
85
86 print "Connected! Running..."
87
88
89def enable_logging(logfile):
90 out_file = logfile
91 err_file = "%s.err" % logfile
92 print "Enabling logging to %s (%s)" % (out_file, err_file)
93 out_fd = open(out_file, "a", buffering=0)
94 err_fd = open(err_file, "a", buffering=0)
95
96 os.dup2(out_fd.fileno(), sys.stdout.fileno())
97 os.dup2(err_fd.fileno(), sys.stderr.fileno())
98
99 sys.stdout = out_fd
100 sys.stderr = err_fd
101
102
103def idle_hosts():
104 _cursor.execute("""
105 SELECT * FROM hosts h WHERE
106 id NOT IN (SELECT host_id FROM host_queue_entries WHERE active) AND (
107 (id IN (SELECT host_id FROM host_queue_entries WHERE not complete AND not active))
108 OR
109 (id IN (SELECT DISTINCT hl.host_id FROM host_queue_entries hqe
110 INNER JOIN hosts_labels hl ON hqe.meta_host=hl.label_id WHERE not hqe.complete AND not hqe.active))
111 )
112 AND locked=false AND (h.status IS null OR h.status='Ready') """)
113 hosts = [Host(row=i) for i in _cursor.fetchall()]
114 return hosts
115
116
117def connect():
118 path = os.path.dirname(os.path.abspath(sys.argv[0]))
mblighb090f142008-02-27 21:33:46 +0000119 # get global config and parse for info
120 c = global_config.global_config
121 dbase = "AUTOTEST_WEB"
122 DB_HOST = c.get_config_value(dbase, "host", "localhost")
123 DB_SCHEMA = c.get_config_value(dbase, "database", "autotest_web")
mbligh36768f02008-02-22 18:28:33 +0000124
mblighb090f142008-02-27 21:33:46 +0000125 DB_USER = c.get_config_value(dbase, "user", "autotest")
126 DB_PASS = c.get_config_value(dbase, "password")
mbligh36768f02008-02-22 18:28:33 +0000127
128 global _connection, _cursor
129 _connection = MySQLdb.connect(
130 host=DB_HOST,
131 user=DB_USER,
132 passwd=DB_PASS,
133 db=DB_SCHEMA
134 )
135 _connection.autocommit(True)
136 _cursor = _connection.cursor()
137
138
139def disconnect():
140 global _connection, _cursor
141 _connection.close()
142 _connection = None
143 _cursor = None
144
145
146def parse_results(results_dir, flags=""):
147 parse = os.path.join(AUTOTEST_TKO_DIR, 'parse')
148 output = os.path.join(results_dir, '.parse.log')
149 os.system("%s %s -r -o %s > %s 2>&1 &" % (parse, flags, results_dir, output))
150
151
152def log_stacktrace(reason):
153 (type, value, tb) = sys.exc_info()
154 str = "EXCEPTION: %s\n" % reason
155 str += "%s / %s / %s\n" % (socket.gethostname(), os.getpid(),
156 time.strftime("%X %x"))
157 str += ''.join(traceback.format_exception(type, value, tb))
158
159 sys.stderr.write("\n%s\n" % str)
160
161 if _notify_email:
162 sender = "monitor_db"
163 subject = "monitor_db exception"
164 msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (
165 sender, _notify_email, subject, str)
166 mailer = smtplib.SMTP('localhost')
167 mailer.sendmail(sender, _notify_email, msg)
168 mailer.quit()
169
170
171class Dispatcher:
172 def __init__(self, do_recover=True):
173 self._agents = []
174 self.shutting_down = False
175
176 if do_recover:
177 self._recover_lost()
178
179
180 def shut_down(self):
181 print "Shutting down!"
182 self.shutting_down = True
183 while self._agents:
184 self.tick()
185 time.sleep(40)
186
187
188 def tick(self):
189 if not self.shutting_down:
190 self._find_more_work()
191 self._handle_agents()
192
193
194 def add_agent(self, agent):
195 self._agents.append(agent)
196 agent.dispatcher = self
197
198
199 def _recover_lost(self):
200 _cursor.execute("""SELECT * FROM host_queue_entries WHERE active AND NOT complete""")
201 if _cursor.rowcount:
202 queue_entries = [HostQueueEntry(row=i) for i in _cursor.fetchall()]
203 for queue_entry in queue_entries:
204 job = queue_entry.job
205 if job.is_synchronous():
206 for child_entry in job.get_host_queue_entries():
207 child_entry.requeue()
208 else:
209 queue_entry.requeue()
210 queue_entry.clear_results_dir()
211
212 _cursor.execute("""SELECT * FROM hosts
213 WHERE status != 'Ready' AND NOT locked""")
214 if _cursor.rowcount:
215 hosts = [Host(row=i) for i in _cursor.fetchall()]
216 for host in hosts:
217 verify_task = VerifyTask(host = host)
218 self.add_agent(Agent(tasks = [verify_task]))
219
220
221 def _find_more_work(self):
222 print "finding work"
223
224 num_started = 0
225 for host in idle_hosts():
226 tasks = host.next_queue_entries()
227 if tasks:
228 for next in tasks:
229 try:
230 agent = next.run(assigned_host=host)
231 if agent:
232 self.add_agent(agent)
233
234 num_started += 1
235 if num_started>=100:
236 return
237 break
238 except:
239 next.set_status('Failed')
240
241# if next.host:
242# next.host.set_status('Ready')
243
244 log_stacktrace("task_id = %d" % next.id)
245
246
247 def _handle_agents(self):
248 still_running = []
249 for agent in self._agents:
250 agent.tick()
251 if not agent.is_done():
252 still_running.append(agent)
253 else:
254 print "agent finished"
255 self._agents = still_running
256
257
258class RunMonitor(object):
259 def __init__(self, cmd, nice_level = None, log_file = None):
260 self.nice_level = nice_level
261 self.log_file = log_file
262 self.proc = self.run(cmd)
263
264 def run(self, cmd):
265 if self.nice_level:
266 nice_cmd = ['nice','-n', str(self.nice_level)]
267 nice_cmd.extend(cmd)
268 cmd = nice_cmd
269
270 out_file = None
271 if self.log_file:
272 try:
273 out_file = open(self.log_file, 'a')
274 out_file.write("\n%s\n" % ('*'*80))
275 out_file.write("%s> %s\n" % (time.strftime("%X %x"), cmd))
276 out_file.write("%s\n" % ('*'*80))
277 except:
278 pass
279
280 if not out_file:
281 out_file = open('/dev/null', 'w')
282
283 in_devnull = open('/dev/null', 'r')
284 print "cmd = %s" % cmd
285 print "path = %s" % os.getcwd()
286
287 proc = subprocess.Popen(cmd, stdout=out_file,
288 stderr=subprocess.STDOUT, stdin=in_devnull)
289 out_file.close()
290 in_devnull.close()
291 return proc
292
293
294 def kill(self):
295 self.proc.kill()
296
297
298 def exit_code(self):
299 return self.proc.poll()
300
301
302class Agent(object):
303 def __init__(self, tasks):
304 self.active_task = None
305 self.queue = Queue.Queue(0)
306 self.dispatcher = None
307
308 for task in tasks:
309 self.add_task(task)
310
311
312 def add_task(self, task):
313 self.queue.put_nowait(task)
314 task.agent = self
315
316
317 def tick(self):
318 print "agent tick"
319 if self.active_task and not self.active_task.is_done():
320 self.active_task.poll()
321 else:
322 self._next_task();
323
324
325 def _next_task(self):
326 print "agent picking task"
327 if self.active_task:
328 assert self.active_task.is_done()
329
330 if not self.active_task.success and self.active_task.failure_tasks:
331 self.queue = Queue.Queue(0)
332 for task in self.active_task.failure_tasks:
333 self.add_task(task)
334
335 self.active_task = None
336 if not self.is_done():
337 self.active_task = self.queue.get_nowait()
338 if self.active_task:
339 self.active_task.start()
340
341
342 def is_done(self):
343 return self.active_task == None and self.queue.empty()
344
345
346 def start(self):
347 assert self.dispatcher
348
349 self._next_task()
350
351
352class AgentTask(object):
353 def __init__(self, cmd, failure_tasks = None):
354 self.done = False
355 self.failure_tasks = failure_tasks
356 self.started = False
357 self.cmd = cmd
358 self.agent = None
359
360
361 def poll(self):
362 print "poll"
363 if hasattr(self, 'monitor'):
364 self.tick(self.monitor.exit_code())
365 else:
366 self.finished(False)
367
368
369 def tick(self, exit_code):
370 if exit_code==None:
371 return
372# print "exit_code was %d" % exit_code
373 if exit_code == 0:
374 success = True
375 else:
376 success = False
377
378 self.finished(success)
379
380
381 def is_done(self):
382 return self.done
383
384
385 def finished(self, success):
386 self.done = True
387 self.success = success
388 self.epilog()
389
390
391 def prolog(self):
392 pass
393
394
395 def epilog(self):
396 pass
397
398
399 def start(self):
400 assert self.agent
401
402 if not self.started:
403 self.prolog()
404 self.run()
405
406 self.started = True
407
408
409 def abort(self):
410 self.monitor.kill()
411
412
413 def run(self):
414 if self.cmd:
415 print "agent starting monitor"
416
417 log_file = None
418 if hasattr(self, 'host'):
419 log_file = os.path.join(os.path.join(RESULTS_DIR, 'hosts'), self.host.hostname)
420
421 self.monitor = RunMonitor(self.cmd, nice_level = AUTOSERV_NICE_LEVEL, log_file = log_file)
422
423
424class RepairTask(AgentTask):
425 def __init__(self, host):
426 cmd = ['autoserv', '-n', '-R', '-m', host.hostname]
427 self.host = host
428 AgentTask.__init__(self, cmd)
429
430 def prolog(self):
431 print "repair_task starting"
432 self.host.set_status('Repairing')
433
434
435 def epilog(self):
436 if self.success:
437 status = 'Ready'
438 else:
439 status = 'Repair Failed'
440
441 self.host.set_status(status)
442
443
444class VerifyTask(AgentTask):
445 def __init__(self, queue_entry=None, host=None):
446 assert bool(queue_entry) != bool(host)
447
448 self.host = host or queue_entry.host
449 self.queue_entry = queue_entry
450
451 self.temp_results_dir = tempfile.mkdtemp(suffix='.verify')
452 cmd = ['autoserv', '-n', '-v', '-m', self.host.hostname,
453 '-r', self.temp_results_dir]
454
455 AgentTask.__init__(self, cmd, failure_tasks = [RepairTask(self.host)])
456
457
458 def prolog(self):
459 print "starting verify on %s" % (self.host.hostname)
460 if self.queue_entry:
461 self.queue_entry.set_status('Verifying')
462 self.host.set_status('Verifying')
463
464
465 def epilog(self):
466 if self.queue_entry and (self.success or
467 not self.queue_entry.meta_host):
468 self.move_results()
469 else:
470 shutil.rmtree(self.temp_results_dir)
471
472 if self.success:
473 status = 'Ready'
474 else:
475 status = 'Failed Verify'
476 if self.queue_entry:
477 if self.queue_entry.meta_host:
478 self.host.yield_work()
479 else:
480 self.queue_entry.set_status('Failed')
481
482 self.host.set_status(status)
483
484
485 def move_results(self):
486 assert self.queue_entry is not None
487 target_dir = self.queue_entry.results_dir()
488 if self.queue_entry.job.is_synchronous():
489 target_dir = os.path.join(target_dir,
490 self.queue_entry.host.hostname)
491 if not os.path.exists(target_dir):
492 os.makedirs(target_dir)
493 files = os.listdir(self.temp_results_dir)
494 for filename in files:
495 shutil.move(os.path.join(self.temp_results_dir,
496 filename),
497 os.path.join(target_dir, filename))
498
499
500class VerifySynchronousTask(VerifyTask):
501 def __init__(self, queue_entry):
502 VerifyTask.__init__(self, queue_entry = queue_entry)
503
504
505 def epilog(self):
506 VerifyTask.epilog(self)
507 print "verify_synchronous finished: %s/%s" % (self.queue_entry.host.hostname, self.success)
508 if self.success:
509 if self.queue_entry.job.num_complete()==0:
510 self.queue_entry.set_status('Pending')
511
512 job = self.queue_entry.job
513 if job.is_ready():
514 self.agent.dispatcher.add_agent(job.run(self.queue_entry))
515 else:
516 self.queue_entry.set_status('Stopped') # some other entry failed verify
517 else:
518 if self.queue_entry.meta_host:
519 self.queue_entry.set_status('Queued')
520 else:
521 job = self.queue_entry.job
522 for child_entry in job.get_host_queue_entries():
523 if child_entry.active:
524 child_entry.set_status('Stopped')
525
526
527class QueueTask(AgentTask):
528 def __init__(self, job, queue_entries, cmd):
529 AgentTask.__init__(self, cmd)
530 self.job = job
531 self.queue_entries = queue_entries
532
533
534 @staticmethod
535 def _write_keyval(queue_entry, field, value):
536 key_path = os.path.join(queue_entry.results_dir(), 'keyval')
537 keyval_file = open(key_path, 'a')
538 print >> keyval_file, '%s=%d' % (field, value)
539 keyval_file.close()
540
541
542 def prolog(self):
543 for queue_entry in self.queue_entries:
544 print "starting queue_task on %s/%s" % (queue_entry.host.hostname, queue_entry.id)
545 queue_entry.set_status('Running')
546 queue_entry.host.set_status('Running')
547 # write some job timestamps into the job keyval file
548 queued = time.mktime(self.job.created_on.timetuple())
549 started = time.time()
550 self._write_keyval(queue_entry, "job_queued", queued)
551 self._write_keyval(queue_entry, "job_started", started)
552
553
554 def epilog(self):
555 if self.success:
556 status = 'Completed'
557 else:
558 status = 'Failed'
559
560 for queue_entry in self.queue_entries:
561 queue_entry.set_status(status)
562 queue_entry.host.set_status('Ready')
563 # write another timestamp into the job keyval file
564 finished = time.time()
565 self._write_keyval(queue_entry, "job_finished",
566 finished)
567
568 if self.job.is_synchronous() or self.job.num_machines()==1:
569 if self.job.is_finished():
570 parse_results(self.job.results_dir())
571 else:
572 for queue_entry in self.queue_entries:
573 parse_results(queue_entry.results_dir(), flags='-l 2')
574
575 print "queue_task finished with %s/%s" % (status, self.success)
576
577
578class RebootTask(AgentTask):
579 def __init__(self):
580 AgentTask.__init__(self, host)
581 self.cmd = "autoserv -n -b -m %s /dev/null" % host
582 self.host = host
583
584
585 def tick(self, exit_code):
586 raise "not implemented"
587
588
589 def run(self):
590 raise "not implemented"
591
592
593
594class DBObject(object):
595 def __init__(self, table, fields, id=None, row=None, new_record=False):
596 assert (bool(id) != bool(row)) and table and fields
597
598 self.__table = table
599 self.__fields = fields
600
601 self.__new_record = new_record
602
603 if row is None:
604 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
605 _cursor.execute(sql, (id,))
606 if not _cursor.rowcount:
607 raise "row not found (table=%s, id=%s)" % \
608 (self.__table, id)
609 row = _cursor.fetchone()
610
611 assert len(row)==len(fields), "table = %s, row = %s/%d, fields = %s/%d" % (table, row, len(row), fields, len(fields))
612
613 self.__valid_fields = {}
614 for i,value in enumerate(row):
615 self.__dict__[fields[i]] = value
616 self.__valid_fields[fields[i]] = True
617
618 del self.__valid_fields['id']
619
620 def count(self, where, table = None):
621 if not table:
622 table = self.__table
623
624 _cursor.execute("""
625 SELECT count(*) FROM %s
626 WHERE %s
627 """ % (table, where))
628 count = _cursor.fetchall()
629
630 return int(count[0][0])
631
632
633 def num_cols(self):
634 return len(self.__fields)
635
636
637 def update_field(self, field, value):
638 assert self.__valid_fields[field]
639
640 if self.__dict__[field] == value:
641 return
642
643 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % \
644 (self.__table, field)
645 _cursor.execute(query, (value, self.id))
646
647 self.__dict__[field] = value
648
649
650 def save(self):
651 if self.__new_record:
652 keys = self.__fields[1:] # avoid id
653 columns = ','.join([str(key) for key in keys])
654 values = ['"%s"' % self.__dict__[key] for key in keys]
655 values = ','.join(values)
656 query = """INSERT INTO %s (%s) VALUES (%s)""" % \
657 (self.__table, columns, values)
658 _cursor.execute(query)
659
660
661 def delete(self):
662 _cursor.execute("""DELETE FROM %s WHERE id = %%s""" % \
663 self.__table, (self.id,))
664
665
666class IneligibleHostQueue(DBObject):
667 def __init__(self, id=None, row=None, new_record=None):
668 fields = ['id', 'job_id', 'host_id']
669 DBObject.__init__(self, 'ineligible_host_queues', fields,
670 id=id, row=row, new_record=new_record)
671
672
673class Host(DBObject):
674 def __init__(self, id=None, row=None):
675 fields = ['id', 'hostname', 'locked', 'synch_id','status']
676 DBObject.__init__(self, 'hosts',fields, id=id, row=row)
677
678
679
680 def current_task(self):
681 _cursor.execute("""
682 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
683 """, (self.id,))
684
685 if not _cursor.rowcount:
686 return None
687 else:
688 assert _cursor.rowcount == 1
689 results = _cursor.fetchone();
690# print "current = %s" % results
691 return HostQueueEntry(row=results)
692
693
694 def next_queue_entries(self):
695 if self.locked:
696 print "%s locked, not queuing" % self.hostname
697 return None
698# print "%s/%s looking for work" % (self.hostname, self.platform_id)
699 _cursor.execute("""
700 SELECT * FROM host_queue_entries
701 WHERE ((host_id=%s) OR (meta_host IS NOT null AND
702 (meta_host IN (
703 SELECT label_id FROM hosts_labels WHERE host_id=%s
704 )
705 )
706 AND job_id NOT IN (
707 SELECT job_id FROM ineligible_host_queues
708 WHERE host_id=%s
709 )))
710 AND NOT complete AND NOT active
711 ORDER BY priority DESC, meta_host, id
712 LIMIT 1
713 """, (self.id,self.id, self.id))
714
715 if not _cursor.rowcount:
716 return None
717 else:
718 return [HostQueueEntry(row=i) for i in _cursor.fetchall()]
719
720 def yield_work(self):
721 print "%s yielding work" % self.hostname
722 if self.current_task():
723 self.current_task().requeue()
724
725 def set_status(self,status):
726 self.update_field('status',status)
727
728
729class HostQueueEntry(DBObject):
730 def __init__(self, id=None, row=None):
731 assert id or row
732 fields = ['id', 'job_id', 'host_id', 'priority', 'status',
733 'meta_host', 'active', 'complete']
734 DBObject.__init__(self, 'host_queue_entries', fields, id=id,
735 row=row)
736
737 self.job = Job(self.job_id)
738
739 if self.host_id:
740 self.host = Host(self.host_id)
741 else:
742 self.host = None
743
744 self.queue_log_path = os.path.join(self.job.results_dir(),
745 'queue.log.' + str(self.id))
746
747
748 def set_host(self, host):
749 if host:
750 self.queue_log_record('Assigning host ' + host.hostname)
751 self.update_field('host_id', host.id)
752 self.update_field('active', True)
753 else:
754 self.queue_log_record('Releasing host')
755 self.update_field('host_id', None)
756
757 self.host = host
758
759
760 def get_host(self):
761 if not self.host:
762 if self.host_id:
763 self.host = Host(self.host_id)
764 if self.host:
765 return self.host
766 else:
767 return None
768
769
770 def queue_log_record(self, log_line):
771 queue_log = open(self.queue_log_path, 'a', 0)
772 queue_log.write(log_line + '\n')
773 queue_log.close()
774
775
776 def results_dir(self):
777 if self.job.num_machines()==1 or self.job.is_synchronous():
778 results_dir = self.job.job_dir
779 else:
780 assert self.host
781 results_dir = '%s/%s' % (self.job.job_dir,
782 self.host.hostname)
783
784 return results_dir
785
786
787 def set_status(self, status):
788 self.update_field('status', status)
789 if self.host:
790 hostname = self.host.hostname
791 else:
792 hostname = 'no host'
793 print "%s/%d status -> %s" % (hostname, self.id, self.status)
794 if status in ['Queued']:
795 self.update_field('complete', False)
796 self.update_field('active', False)
797
798 if status in ['Pending', 'Running', 'Verifying', 'Starting']:
799 self.update_field('complete', False)
800 self.update_field('active', True)
801
802 if status in ['Failed', 'Completed', 'Stopped']:
803 self.update_field('complete', True)
804 self.update_field('active', False)
805
806
807 def run(self,assigned_host=None):
808 if self.meta_host:
809 assert assigned_host
810 self.job.create_results_dir()
811 self.set_host(assigned_host)
812 print "creating block %s/%s" % (self.job.id,
813 self.host.id)
814
815 row = [0, self.job.id, self.host.id]
816 block = IneligibleHostQueue(row=row, new_record=True)
817 block.save()
818
819 print "%s/%s scheduled on %s, status=%s" % (self.job.name,
820 self.meta_host, self.host.hostname, self.status)
821
822 return self.job.run(queue_entry=self)
823
824 def requeue(self):
825 self.set_status('Queued')
826
827 if self.meta_host:
828 self.set_host(None)
829
830
831 def clear_results_dir(self):
832 if os.path.exists(self.results_dir()):
833 shutil.rmtree(self.results_dir())
834
835
836class Job(DBObject):
837 def __init__(self, id=None, row=None):
838 assert id or row
839 DBObject.__init__(self,'jobs',['id','owner','name','priority','control_file',
840 'control_type','created_on', 'synch_type', 'synch_count',
841 'synchronizing' ], id=id, row=row)
842
843 self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id, self.owner))
844
845
846 def is_server_job(self):
847 return self.control_type != 2
848
849
850 def get_host_queue_entries(self):
851 _cursor.execute("""
852 SELECT * FROM host_queue_entries
853 WHERE job_id= %s
854 """, (self.id,))
855 entries = [HostQueueEntry(row=i) for i in _cursor.fetchall()]
856
857 assert len(entries)>0
858
859 return entries
860
861
862 def set_status(self, status, update_queues=False):
863 self.update_field('status',status)
864
865 if update_queues:
866 for queue_entry in self.get_host_queue_entries():
867 queue_entry.set_status(status)
868
869
870 def is_synchronous(self):
871 return self.synch_type == 2
872
873
874 def is_ready(self):
875 if not self.is_synchronous():
876 return True
877 sql = "job_id=%s AND status='Pending'" % self.id
878 count = self.count(sql, table='host_queue_entries')
879 return (count == self.synch_count)
880
881
882 def ready_to_synchronize(self):
883 # heuristic
884 queue_entries = self.get_host_queue_entries()
885 count = 0
886 for queue_entry in queue_entries:
887 if queue_entry.status == 'Pending':
888 count += 1
889
890 return (count/self.synch_count >= 0.5)
891
892
893 def start_synchronizing(self):
894 self.update_field('synchronizing', True)
895
896
897 def results_dir(self):
898 return self.job_dir
899
900 def num_machines(self, clause = None):
901 sql = "job_id=%s" % self.id
902 if clause:
903 sql += " AND (%s)" % clause
904 return self.count(sql, table='host_queue_entries')
905
906
907 def num_queued(self):
908 return self.num_machines('not complete')
909
910
911 def num_active(self):
912 return self.num_machines('active')
913
914
915 def num_complete(self):
916 return self.num_machines('complete')
917
918
919 def is_finished(self):
920 left = self.num_queued()
921 print "%s: %s machines left" % (self.name, left)
922 return left==0
923
924 def stop_synchronizing(self):
925 self.update_field('synchronizing', False)
926 self.set_status('Queued', update_queues = False)
927
928
929 def write_machines_file(self):
930 if self.num_machines()>1:
931 print "writing machines file"
932 mf = open("%s/.machines" % self.job_dir, 'w')
933 for queue_entry in self.get_host_queue_entries():
934 if queue_entry.get_host():
935 mf.write("%s\n" % \
936 queue_entry.get_host().hostname)
937 mf.close()
938
939
940 def create_results_dir(self, queue_entry=None):
941 print "create: active: %s complete %s" % (self.num_active(),
942 self.num_complete())
943
944 if not os.path.exists(self.job_dir):
945 os.makedirs(self.job_dir)
946
947 if queue_entry:
948 return queue_entry.results_dir()
949 return self.job_dir
950
951
952 def run(self, queue_entry):
953 results_dir = self.create_results_dir(queue_entry)
954
955 if self.is_synchronous():
956 if not self.is_ready():
957 return Agent([VerifySynchronousTask(queue_entry = queue_entry)])
958
959 queue_entry.set_status('Starting')
960
961 ctrl = open(os.tmpnam(), 'w')
962 if self.control_file:
963 ctrl.write(self.control_file)
964 else:
965 ctrl.write("")
966 ctrl.flush()
967
968 if self.is_synchronous():
969 if self.num_machines() > 1:
970 self.write_machines_file()
971 hosts = self.get_host_queue_entries()
972 hostnames = ','.join([i.host.hostname for i in hosts])
973 queue_entries = self.get_host_queue_entries()
974 else:
975 assert queue_entry
976 hostnames = queue_entry.host.hostname
977 queue_entries = [queue_entry]
978
979 params = ['autoserv', '-n', '-r', results_dir,
980 '-b', '-u', self.owner, '-l', self.name,
981 '-m', hostnames, ctrl.name]
982
983 if not self.is_server_job():
984 params.append('-c')
985
986 tasks = []
987 if not self.is_synchronous():
988 tasks.append(VerifyTask(queue_entry))
989
990 tasks.append(QueueTask(job = self, queue_entries = queue_entries, cmd = params))
991
992 agent = Agent(tasks)
993
994 return agent
995
996
997if __name__ == '__main__':
998 main()