blob: 28316feb0dd30f7162846273e6e5fefdde356ab0 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
6__author__ = "Paul Turner <pjt@google.com>"
7
8import os, sys, tempfile, shutil, MySQLdb, time, traceback, subprocess, Queue
9import optparse, signal, smtplib, socket
10RESULTS_DIR = '.'
11AUTOSERV_NICE_LEVEL = 10
12
13AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
14
15if os.environ.has_key('AUTOTEST_DIR'):
16 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
17AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
18AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
19
20if AUTOTEST_SERVER_DIR not in sys.path:
21 sys.path.insert(0, AUTOTEST_SERVER_DIR)
22
23_connection = None
24_cursor = None
25_shutdown = False
26_notify_email = None
27
28
29def main():
30 usage = 'usage: %prog [options] results_dir'
31
32 parser = optparse.OptionParser(usage)
33 parser.add_option('--no-recover', help='Skip machine/job recovery ' +
34 'step [for multiple monitors/rolling upgrades]',
35 action='store_true')
36 parser.add_option('--logfile', help='Set a log file that all stdout ' +
37 'should be redirected to. Stderr will go to this ' +
38 'file + ".err"')
39 parser.add_option('--notify', help='Set an email address to be ' +
40 'notified of exceptions')
41 (options, args) = parser.parse_args()
42 if len(args) != 1:
43 parser.print_usage()
44 return
45
46 global RESULTS_DIR
47 RESULTS_DIR = args[0]
48
49 global _notify_email
50 _notify_email = options.notify
51
52 init(options.logfile)
53 dispatcher = Dispatcher(do_recover = not options.no_recover)
54
55 try:
56 while not _shutdown:
57 dispatcher.tick()
58 time.sleep(20)
59 dispatcher.shut_down()
60 except:
61 log_stacktrace("Uncaught exception; terminating monitor_db")
62
63 disconnect()
64
65
66def handle_sigint(signum, frame):
67 global _shutdown
68 _shutdown = True
69 print "Shutdown request received."
70
71
72def init(logfile):
73 if logfile:
74 enable_logging(logfile)
75 print "%s> dispatcher starting" % time.strftime("%X %x")
76 print "My PID is %d" % os.getpid()
77
78 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
79 connect()
80
81 print "Setting signal handler"
82 signal.signal(signal.SIGINT, handle_sigint)
83
84 print "Connected! Running..."
85
86
87def enable_logging(logfile):
88 out_file = logfile
89 err_file = "%s.err" % logfile
90 print "Enabling logging to %s (%s)" % (out_file, err_file)
91 out_fd = open(out_file, "a", buffering=0)
92 err_fd = open(err_file, "a", buffering=0)
93
94 os.dup2(out_fd.fileno(), sys.stdout.fileno())
95 os.dup2(err_fd.fileno(), sys.stderr.fileno())
96
97 sys.stdout = out_fd
98 sys.stderr = err_fd
99
100
101def idle_hosts():
102 _cursor.execute("""
103 SELECT * FROM hosts h WHERE
104 id NOT IN (SELECT host_id FROM host_queue_entries WHERE active) AND (
105 (id IN (SELECT host_id FROM host_queue_entries WHERE not complete AND not active))
106 OR
107 (id IN (SELECT DISTINCT hl.host_id FROM host_queue_entries hqe
108 INNER JOIN hosts_labels hl ON hqe.meta_host=hl.label_id WHERE not hqe.complete AND not hqe.active))
109 )
110 AND locked=false AND (h.status IS null OR h.status='Ready') """)
111 hosts = [Host(row=i) for i in _cursor.fetchall()]
112 return hosts
113
114
115def connect():
116 path = os.path.dirname(os.path.abspath(sys.argv[0]))
117 try:
118 file = os.path.join(path, '.database')
119 db_prefs = open(file, 'r')
120 DB_HOST = db_prefs.readline().rstrip()
121 DB_SCHEMA = db_prefs.readline().rstrip()
122 except:
123 DB_HOST = 'localhost'
124 DB_SCHEMA = 'autotest_web'
125
126 try:
127 file = os.path.join(path, '.priv_login')
128 login = open(file, 'r')
129 DB_USER = login.readline().rstrip()
130 DB_PASS = login.readline().rstrip()
131 except:
132 try:
133 file = os.path.join(path, '.unpriv_login')
134 login = open(file, 'r')
135 DB_USER = login.readline().rstrip()
136 DB_PASS = login.readline().rstrip()
137 except:
138 DB_USER = 'autotest'
139 DB_PASS = 'password'
140
141 global _connection, _cursor
142 _connection = MySQLdb.connect(
143 host=DB_HOST,
144 user=DB_USER,
145 passwd=DB_PASS,
146 db=DB_SCHEMA
147 )
148 _connection.autocommit(True)
149 _cursor = _connection.cursor()
150
151
152def disconnect():
153 global _connection, _cursor
154 _connection.close()
155 _connection = None
156 _cursor = None
157
158
159def parse_results(results_dir, flags=""):
160 parse = os.path.join(AUTOTEST_TKO_DIR, 'parse')
161 output = os.path.join(results_dir, '.parse.log')
162 os.system("%s %s -r -o %s > %s 2>&1 &" % (parse, flags, results_dir, output))
163
164
165def log_stacktrace(reason):
166 (type, value, tb) = sys.exc_info()
167 str = "EXCEPTION: %s\n" % reason
168 str += "%s / %s / %s\n" % (socket.gethostname(), os.getpid(),
169 time.strftime("%X %x"))
170 str += ''.join(traceback.format_exception(type, value, tb))
171
172 sys.stderr.write("\n%s\n" % str)
173
174 if _notify_email:
175 sender = "monitor_db"
176 subject = "monitor_db exception"
177 msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (
178 sender, _notify_email, subject, str)
179 mailer = smtplib.SMTP('localhost')
180 mailer.sendmail(sender, _notify_email, msg)
181 mailer.quit()
182
183
184class Dispatcher:
185 def __init__(self, do_recover=True):
186 self._agents = []
187 self.shutting_down = False
188
189 if do_recover:
190 self._recover_lost()
191
192
193 def shut_down(self):
194 print "Shutting down!"
195 self.shutting_down = True
196 while self._agents:
197 self.tick()
198 time.sleep(40)
199
200
201 def tick(self):
202 if not self.shutting_down:
203 self._find_more_work()
204 self._handle_agents()
205
206
207 def add_agent(self, agent):
208 self._agents.append(agent)
209 agent.dispatcher = self
210
211
212 def _recover_lost(self):
213 _cursor.execute("""SELECT * FROM host_queue_entries WHERE active AND NOT complete""")
214 if _cursor.rowcount:
215 queue_entries = [HostQueueEntry(row=i) for i in _cursor.fetchall()]
216 for queue_entry in queue_entries:
217 job = queue_entry.job
218 if job.is_synchronous():
219 for child_entry in job.get_host_queue_entries():
220 child_entry.requeue()
221 else:
222 queue_entry.requeue()
223 queue_entry.clear_results_dir()
224
225 _cursor.execute("""SELECT * FROM hosts
226 WHERE status != 'Ready' AND NOT locked""")
227 if _cursor.rowcount:
228 hosts = [Host(row=i) for i in _cursor.fetchall()]
229 for host in hosts:
230 verify_task = VerifyTask(host = host)
231 self.add_agent(Agent(tasks = [verify_task]))
232
233
234 def _find_more_work(self):
235 print "finding work"
236
237 num_started = 0
238 for host in idle_hosts():
239 tasks = host.next_queue_entries()
240 if tasks:
241 for next in tasks:
242 try:
243 agent = next.run(assigned_host=host)
244 if agent:
245 self.add_agent(agent)
246
247 num_started += 1
248 if num_started>=100:
249 return
250 break
251 except:
252 next.set_status('Failed')
253
254# if next.host:
255# next.host.set_status('Ready')
256
257 log_stacktrace("task_id = %d" % next.id)
258
259
260 def _handle_agents(self):
261 still_running = []
262 for agent in self._agents:
263 agent.tick()
264 if not agent.is_done():
265 still_running.append(agent)
266 else:
267 print "agent finished"
268 self._agents = still_running
269
270
271class RunMonitor(object):
272 def __init__(self, cmd, nice_level = None, log_file = None):
273 self.nice_level = nice_level
274 self.log_file = log_file
275 self.proc = self.run(cmd)
276
277 def run(self, cmd):
278 if self.nice_level:
279 nice_cmd = ['nice','-n', str(self.nice_level)]
280 nice_cmd.extend(cmd)
281 cmd = nice_cmd
282
283 out_file = None
284 if self.log_file:
285 try:
286 out_file = open(self.log_file, 'a')
287 out_file.write("\n%s\n" % ('*'*80))
288 out_file.write("%s> %s\n" % (time.strftime("%X %x"), cmd))
289 out_file.write("%s\n" % ('*'*80))
290 except:
291 pass
292
293 if not out_file:
294 out_file = open('/dev/null', 'w')
295
296 in_devnull = open('/dev/null', 'r')
297 print "cmd = %s" % cmd
298 print "path = %s" % os.getcwd()
299
300 proc = subprocess.Popen(cmd, stdout=out_file,
301 stderr=subprocess.STDOUT, stdin=in_devnull)
302 out_file.close()
303 in_devnull.close()
304 return proc
305
306
307 def kill(self):
308 self.proc.kill()
309
310
311 def exit_code(self):
312 return self.proc.poll()
313
314
315class Agent(object):
316 def __init__(self, tasks):
317 self.active_task = None
318 self.queue = Queue.Queue(0)
319 self.dispatcher = None
320
321 for task in tasks:
322 self.add_task(task)
323
324
325 def add_task(self, task):
326 self.queue.put_nowait(task)
327 task.agent = self
328
329
330 def tick(self):
331 print "agent tick"
332 if self.active_task and not self.active_task.is_done():
333 self.active_task.poll()
334 else:
335 self._next_task();
336
337
338 def _next_task(self):
339 print "agent picking task"
340 if self.active_task:
341 assert self.active_task.is_done()
342
343 if not self.active_task.success and self.active_task.failure_tasks:
344 self.queue = Queue.Queue(0)
345 for task in self.active_task.failure_tasks:
346 self.add_task(task)
347
348 self.active_task = None
349 if not self.is_done():
350 self.active_task = self.queue.get_nowait()
351 if self.active_task:
352 self.active_task.start()
353
354
355 def is_done(self):
356 return self.active_task == None and self.queue.empty()
357
358
359 def start(self):
360 assert self.dispatcher
361
362 self._next_task()
363
364
365class AgentTask(object):
366 def __init__(self, cmd, failure_tasks = None):
367 self.done = False
368 self.failure_tasks = failure_tasks
369 self.started = False
370 self.cmd = cmd
371 self.agent = None
372
373
374 def poll(self):
375 print "poll"
376 if hasattr(self, 'monitor'):
377 self.tick(self.monitor.exit_code())
378 else:
379 self.finished(False)
380
381
382 def tick(self, exit_code):
383 if exit_code==None:
384 return
385# print "exit_code was %d" % exit_code
386 if exit_code == 0:
387 success = True
388 else:
389 success = False
390
391 self.finished(success)
392
393
394 def is_done(self):
395 return self.done
396
397
398 def finished(self, success):
399 self.done = True
400 self.success = success
401 self.epilog()
402
403
404 def prolog(self):
405 pass
406
407
408 def epilog(self):
409 pass
410
411
412 def start(self):
413 assert self.agent
414
415 if not self.started:
416 self.prolog()
417 self.run()
418
419 self.started = True
420
421
422 def abort(self):
423 self.monitor.kill()
424
425
426 def run(self):
427 if self.cmd:
428 print "agent starting monitor"
429
430 log_file = None
431 if hasattr(self, 'host'):
432 log_file = os.path.join(os.path.join(RESULTS_DIR, 'hosts'), self.host.hostname)
433
434 self.monitor = RunMonitor(self.cmd, nice_level = AUTOSERV_NICE_LEVEL, log_file = log_file)
435
436
437class RepairTask(AgentTask):
438 def __init__(self, host):
439 cmd = ['autoserv', '-n', '-R', '-m', host.hostname]
440 self.host = host
441 AgentTask.__init__(self, cmd)
442
443 def prolog(self):
444 print "repair_task starting"
445 self.host.set_status('Repairing')
446
447
448 def epilog(self):
449 if self.success:
450 status = 'Ready'
451 else:
452 status = 'Repair Failed'
453
454 self.host.set_status(status)
455
456
457class VerifyTask(AgentTask):
458 def __init__(self, queue_entry=None, host=None):
459 assert bool(queue_entry) != bool(host)
460
461 self.host = host or queue_entry.host
462 self.queue_entry = queue_entry
463
464 self.temp_results_dir = tempfile.mkdtemp(suffix='.verify')
465 cmd = ['autoserv', '-n', '-v', '-m', self.host.hostname,
466 '-r', self.temp_results_dir]
467
468 AgentTask.__init__(self, cmd, failure_tasks = [RepairTask(self.host)])
469
470
471 def prolog(self):
472 print "starting verify on %s" % (self.host.hostname)
473 if self.queue_entry:
474 self.queue_entry.set_status('Verifying')
475 self.host.set_status('Verifying')
476
477
478 def epilog(self):
479 if self.queue_entry and (self.success or
480 not self.queue_entry.meta_host):
481 self.move_results()
482 else:
483 shutil.rmtree(self.temp_results_dir)
484
485 if self.success:
486 status = 'Ready'
487 else:
488 status = 'Failed Verify'
489 if self.queue_entry:
490 if self.queue_entry.meta_host:
491 self.host.yield_work()
492 else:
493 self.queue_entry.set_status('Failed')
494
495 self.host.set_status(status)
496
497
498 def move_results(self):
499 assert self.queue_entry is not None
500 target_dir = self.queue_entry.results_dir()
501 if self.queue_entry.job.is_synchronous():
502 target_dir = os.path.join(target_dir,
503 self.queue_entry.host.hostname)
504 if not os.path.exists(target_dir):
505 os.makedirs(target_dir)
506 files = os.listdir(self.temp_results_dir)
507 for filename in files:
508 shutil.move(os.path.join(self.temp_results_dir,
509 filename),
510 os.path.join(target_dir, filename))
511
512
513class VerifySynchronousTask(VerifyTask):
514 def __init__(self, queue_entry):
515 VerifyTask.__init__(self, queue_entry = queue_entry)
516
517
518 def epilog(self):
519 VerifyTask.epilog(self)
520 print "verify_synchronous finished: %s/%s" % (self.queue_entry.host.hostname, self.success)
521 if self.success:
522 if self.queue_entry.job.num_complete()==0:
523 self.queue_entry.set_status('Pending')
524
525 job = self.queue_entry.job
526 if job.is_ready():
527 self.agent.dispatcher.add_agent(job.run(self.queue_entry))
528 else:
529 self.queue_entry.set_status('Stopped') # some other entry failed verify
530 else:
531 if self.queue_entry.meta_host:
532 self.queue_entry.set_status('Queued')
533 else:
534 job = self.queue_entry.job
535 for child_entry in job.get_host_queue_entries():
536 if child_entry.active:
537 child_entry.set_status('Stopped')
538
539
540class QueueTask(AgentTask):
541 def __init__(self, job, queue_entries, cmd):
542 AgentTask.__init__(self, cmd)
543 self.job = job
544 self.queue_entries = queue_entries
545
546
547 @staticmethod
548 def _write_keyval(queue_entry, field, value):
549 key_path = os.path.join(queue_entry.results_dir(), 'keyval')
550 keyval_file = open(key_path, 'a')
551 print >> keyval_file, '%s=%d' % (field, value)
552 keyval_file.close()
553
554
555 def prolog(self):
556 for queue_entry in self.queue_entries:
557 print "starting queue_task on %s/%s" % (queue_entry.host.hostname, queue_entry.id)
558 queue_entry.set_status('Running')
559 queue_entry.host.set_status('Running')
560 # write some job timestamps into the job keyval file
561 queued = time.mktime(self.job.created_on.timetuple())
562 started = time.time()
563 self._write_keyval(queue_entry, "job_queued", queued)
564 self._write_keyval(queue_entry, "job_started", started)
565
566
567 def epilog(self):
568 if self.success:
569 status = 'Completed'
570 else:
571 status = 'Failed'
572
573 for queue_entry in self.queue_entries:
574 queue_entry.set_status(status)
575 queue_entry.host.set_status('Ready')
576 # write another timestamp into the job keyval file
577 finished = time.time()
578 self._write_keyval(queue_entry, "job_finished",
579 finished)
580
581 if self.job.is_synchronous() or self.job.num_machines()==1:
582 if self.job.is_finished():
583 parse_results(self.job.results_dir())
584 else:
585 for queue_entry in self.queue_entries:
586 parse_results(queue_entry.results_dir(), flags='-l 2')
587
588 print "queue_task finished with %s/%s" % (status, self.success)
589
590
591class RebootTask(AgentTask):
592 def __init__(self):
593 AgentTask.__init__(self, host)
594 self.cmd = "autoserv -n -b -m %s /dev/null" % host
595 self.host = host
596
597
598 def tick(self, exit_code):
599 raise "not implemented"
600
601
602 def run(self):
603 raise "not implemented"
604
605
606
607class DBObject(object):
608 def __init__(self, table, fields, id=None, row=None, new_record=False):
609 assert (bool(id) != bool(row)) and table and fields
610
611 self.__table = table
612 self.__fields = fields
613
614 self.__new_record = new_record
615
616 if row is None:
617 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
618 _cursor.execute(sql, (id,))
619 if not _cursor.rowcount:
620 raise "row not found (table=%s, id=%s)" % \
621 (self.__table, id)
622 row = _cursor.fetchone()
623
624 assert len(row)==len(fields), "table = %s, row = %s/%d, fields = %s/%d" % (table, row, len(row), fields, len(fields))
625
626 self.__valid_fields = {}
627 for i,value in enumerate(row):
628 self.__dict__[fields[i]] = value
629 self.__valid_fields[fields[i]] = True
630
631 del self.__valid_fields['id']
632
633 def count(self, where, table = None):
634 if not table:
635 table = self.__table
636
637 _cursor.execute("""
638 SELECT count(*) FROM %s
639 WHERE %s
640 """ % (table, where))
641 count = _cursor.fetchall()
642
643 return int(count[0][0])
644
645
646 def num_cols(self):
647 return len(self.__fields)
648
649
650 def update_field(self, field, value):
651 assert self.__valid_fields[field]
652
653 if self.__dict__[field] == value:
654 return
655
656 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % \
657 (self.__table, field)
658 _cursor.execute(query, (value, self.id))
659
660 self.__dict__[field] = value
661
662
663 def save(self):
664 if self.__new_record:
665 keys = self.__fields[1:] # avoid id
666 columns = ','.join([str(key) for key in keys])
667 values = ['"%s"' % self.__dict__[key] for key in keys]
668 values = ','.join(values)
669 query = """INSERT INTO %s (%s) VALUES (%s)""" % \
670 (self.__table, columns, values)
671 _cursor.execute(query)
672
673
674 def delete(self):
675 _cursor.execute("""DELETE FROM %s WHERE id = %%s""" % \
676 self.__table, (self.id,))
677
678
679class IneligibleHostQueue(DBObject):
680 def __init__(self, id=None, row=None, new_record=None):
681 fields = ['id', 'job_id', 'host_id']
682 DBObject.__init__(self, 'ineligible_host_queues', fields,
683 id=id, row=row, new_record=new_record)
684
685
686class Host(DBObject):
687 def __init__(self, id=None, row=None):
688 fields = ['id', 'hostname', 'locked', 'synch_id','status']
689 DBObject.__init__(self, 'hosts',fields, id=id, row=row)
690
691
692
693 def current_task(self):
694 _cursor.execute("""
695 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
696 """, (self.id,))
697
698 if not _cursor.rowcount:
699 return None
700 else:
701 assert _cursor.rowcount == 1
702 results = _cursor.fetchone();
703# print "current = %s" % results
704 return HostQueueEntry(row=results)
705
706
707 def next_queue_entries(self):
708 if self.locked:
709 print "%s locked, not queuing" % self.hostname
710 return None
711# print "%s/%s looking for work" % (self.hostname, self.platform_id)
712 _cursor.execute("""
713 SELECT * FROM host_queue_entries
714 WHERE ((host_id=%s) OR (meta_host IS NOT null AND
715 (meta_host IN (
716 SELECT label_id FROM hosts_labels WHERE host_id=%s
717 )
718 )
719 AND job_id NOT IN (
720 SELECT job_id FROM ineligible_host_queues
721 WHERE host_id=%s
722 )))
723 AND NOT complete AND NOT active
724 ORDER BY priority DESC, meta_host, id
725 LIMIT 1
726 """, (self.id,self.id, self.id))
727
728 if not _cursor.rowcount:
729 return None
730 else:
731 return [HostQueueEntry(row=i) for i in _cursor.fetchall()]
732
733 def yield_work(self):
734 print "%s yielding work" % self.hostname
735 if self.current_task():
736 self.current_task().requeue()
737
738 def set_status(self,status):
739 self.update_field('status',status)
740
741
742class HostQueueEntry(DBObject):
743 def __init__(self, id=None, row=None):
744 assert id or row
745 fields = ['id', 'job_id', 'host_id', 'priority', 'status',
746 'meta_host', 'active', 'complete']
747 DBObject.__init__(self, 'host_queue_entries', fields, id=id,
748 row=row)
749
750 self.job = Job(self.job_id)
751
752 if self.host_id:
753 self.host = Host(self.host_id)
754 else:
755 self.host = None
756
757 self.queue_log_path = os.path.join(self.job.results_dir(),
758 'queue.log.' + str(self.id))
759
760
761 def set_host(self, host):
762 if host:
763 self.queue_log_record('Assigning host ' + host.hostname)
764 self.update_field('host_id', host.id)
765 self.update_field('active', True)
766 else:
767 self.queue_log_record('Releasing host')
768 self.update_field('host_id', None)
769
770 self.host = host
771
772
773 def get_host(self):
774 if not self.host:
775 if self.host_id:
776 self.host = Host(self.host_id)
777 if self.host:
778 return self.host
779 else:
780 return None
781
782
783 def queue_log_record(self, log_line):
784 queue_log = open(self.queue_log_path, 'a', 0)
785 queue_log.write(log_line + '\n')
786 queue_log.close()
787
788
789 def results_dir(self):
790 if self.job.num_machines()==1 or self.job.is_synchronous():
791 results_dir = self.job.job_dir
792 else:
793 assert self.host
794 results_dir = '%s/%s' % (self.job.job_dir,
795 self.host.hostname)
796
797 return results_dir
798
799
800 def set_status(self, status):
801 self.update_field('status', status)
802 if self.host:
803 hostname = self.host.hostname
804 else:
805 hostname = 'no host'
806 print "%s/%d status -> %s" % (hostname, self.id, self.status)
807 if status in ['Queued']:
808 self.update_field('complete', False)
809 self.update_field('active', False)
810
811 if status in ['Pending', 'Running', 'Verifying', 'Starting']:
812 self.update_field('complete', False)
813 self.update_field('active', True)
814
815 if status in ['Failed', 'Completed', 'Stopped']:
816 self.update_field('complete', True)
817 self.update_field('active', False)
818
819
820 def run(self,assigned_host=None):
821 if self.meta_host:
822 assert assigned_host
823 self.job.create_results_dir()
824 self.set_host(assigned_host)
825 print "creating block %s/%s" % (self.job.id,
826 self.host.id)
827
828 row = [0, self.job.id, self.host.id]
829 block = IneligibleHostQueue(row=row, new_record=True)
830 block.save()
831
832 print "%s/%s scheduled on %s, status=%s" % (self.job.name,
833 self.meta_host, self.host.hostname, self.status)
834
835 return self.job.run(queue_entry=self)
836
837 def requeue(self):
838 self.set_status('Queued')
839
840 if self.meta_host:
841 self.set_host(None)
842
843
844 def clear_results_dir(self):
845 if os.path.exists(self.results_dir()):
846 shutil.rmtree(self.results_dir())
847
848
849class Job(DBObject):
850 def __init__(self, id=None, row=None):
851 assert id or row
852 DBObject.__init__(self,'jobs',['id','owner','name','priority','control_file',
853 'control_type','created_on', 'synch_type', 'synch_count',
854 'synchronizing' ], id=id, row=row)
855
856 self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id, self.owner))
857
858
859 def is_server_job(self):
860 return self.control_type != 2
861
862
863 def get_host_queue_entries(self):
864 _cursor.execute("""
865 SELECT * FROM host_queue_entries
866 WHERE job_id= %s
867 """, (self.id,))
868 entries = [HostQueueEntry(row=i) for i in _cursor.fetchall()]
869
870 assert len(entries)>0
871
872 return entries
873
874
875 def set_status(self, status, update_queues=False):
876 self.update_field('status',status)
877
878 if update_queues:
879 for queue_entry in self.get_host_queue_entries():
880 queue_entry.set_status(status)
881
882
883 def is_synchronous(self):
884 return self.synch_type == 2
885
886
887 def is_ready(self):
888 if not self.is_synchronous():
889 return True
890 sql = "job_id=%s AND status='Pending'" % self.id
891 count = self.count(sql, table='host_queue_entries')
892 return (count == self.synch_count)
893
894
895 def ready_to_synchronize(self):
896 # heuristic
897 queue_entries = self.get_host_queue_entries()
898 count = 0
899 for queue_entry in queue_entries:
900 if queue_entry.status == 'Pending':
901 count += 1
902
903 return (count/self.synch_count >= 0.5)
904
905
906 def start_synchronizing(self):
907 self.update_field('synchronizing', True)
908
909
910 def results_dir(self):
911 return self.job_dir
912
913 def num_machines(self, clause = None):
914 sql = "job_id=%s" % self.id
915 if clause:
916 sql += " AND (%s)" % clause
917 return self.count(sql, table='host_queue_entries')
918
919
920 def num_queued(self):
921 return self.num_machines('not complete')
922
923
924 def num_active(self):
925 return self.num_machines('active')
926
927
928 def num_complete(self):
929 return self.num_machines('complete')
930
931
932 def is_finished(self):
933 left = self.num_queued()
934 print "%s: %s machines left" % (self.name, left)
935 return left==0
936
937 def stop_synchronizing(self):
938 self.update_field('synchronizing', False)
939 self.set_status('Queued', update_queues = False)
940
941
942 def write_machines_file(self):
943 if self.num_machines()>1:
944 print "writing machines file"
945 mf = open("%s/.machines" % self.job_dir, 'w')
946 for queue_entry in self.get_host_queue_entries():
947 if queue_entry.get_host():
948 mf.write("%s\n" % \
949 queue_entry.get_host().hostname)
950 mf.close()
951
952
953 def create_results_dir(self, queue_entry=None):
954 print "create: active: %s complete %s" % (self.num_active(),
955 self.num_complete())
956
957 if not os.path.exists(self.job_dir):
958 os.makedirs(self.job_dir)
959
960 if queue_entry:
961 return queue_entry.results_dir()
962 return self.job_dir
963
964
965 def run(self, queue_entry):
966 results_dir = self.create_results_dir(queue_entry)
967
968 if self.is_synchronous():
969 if not self.is_ready():
970 return Agent([VerifySynchronousTask(queue_entry = queue_entry)])
971
972 queue_entry.set_status('Starting')
973
974 ctrl = open(os.tmpnam(), 'w')
975 if self.control_file:
976 ctrl.write(self.control_file)
977 else:
978 ctrl.write("")
979 ctrl.flush()
980
981 if self.is_synchronous():
982 if self.num_machines() > 1:
983 self.write_machines_file()
984 hosts = self.get_host_queue_entries()
985 hostnames = ','.join([i.host.hostname for i in hosts])
986 queue_entries = self.get_host_queue_entries()
987 else:
988 assert queue_entry
989 hostnames = queue_entry.host.hostname
990 queue_entries = [queue_entry]
991
992 params = ['autoserv', '-n', '-r', results_dir,
993 '-b', '-u', self.owner, '-l', self.name,
994 '-m', hostnames, ctrl.name]
995
996 if not self.is_server_job():
997 params.append('-c')
998
999 tasks = []
1000 if not self.is_synchronous():
1001 tasks.append(VerifyTask(queue_entry))
1002
1003 tasks.append(QueueTask(job = self, queue_entries = queue_entries, cmd = params))
1004
1005 agent = Agent(tasks)
1006
1007 return agent
1008
1009
1010if __name__ == '__main__':
1011 main()