blob: 475f8498ad23135f3b9f71f585a6e795454db607 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
6__author__ = "Paul Turner <pjt@google.com>"
7
8import os, sys, tempfile, shutil, MySQLdb, time, traceback, subprocess, Queue
9import optparse, signal, smtplib, socket
mblighb090f142008-02-27 21:33:46 +000010from common import global_config
11
mbligh36768f02008-02-22 18:28:33 +000012RESULTS_DIR = '.'
13AUTOSERV_NICE_LEVEL = 10
14
15AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
16
17if os.environ.has_key('AUTOTEST_DIR'):
18 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
19AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
20AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
21
22if AUTOTEST_SERVER_DIR not in sys.path:
23 sys.path.insert(0, AUTOTEST_SERVER_DIR)
24
25_connection = None
26_cursor = None
27_shutdown = False
28_notify_email = None
mbligh4314a712008-02-29 22:44:30 +000029_autoserv_path = 'autoserv'
30_testing_mode = False
mbligh36768f02008-02-22 18:28:33 +000031
32
33def main():
34 usage = 'usage: %prog [options] results_dir'
35
36 parser = optparse.OptionParser(usage)
37 parser.add_option('--no-recover', help='Skip machine/job recovery ' +
38 'step [for multiple monitors/rolling upgrades]',
39 action='store_true')
40 parser.add_option('--logfile', help='Set a log file that all stdout ' +
41 'should be redirected to. Stderr will go to this ' +
42 'file + ".err"')
43 parser.add_option('--notify', help='Set an email address to be ' +
44 'notified of exceptions')
mbligh4314a712008-02-29 22:44:30 +000045 parser.add_option('--test', help='Indicate that scheduler is under ' +
46 'test and should use dummy autoserv and no parsing',
47 action='store_true')
mbligh36768f02008-02-22 18:28:33 +000048 (options, args) = parser.parse_args()
49 if len(args) != 1:
50 parser.print_usage()
51 return
52
53 global RESULTS_DIR
54 RESULTS_DIR = args[0]
55
56 global _notify_email
57 _notify_email = options.notify
mbligh4314a712008-02-29 22:44:30 +000058
59 if options.test:
60 global _autoserv_path
61 _autoserv_path = 'autoserv_dummy'
62 global _testing_mode
63 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +000064
65 init(options.logfile)
66 dispatcher = Dispatcher(do_recover = not options.no_recover)
67
68 try:
69 while not _shutdown:
70 dispatcher.tick()
71 time.sleep(20)
72 dispatcher.shut_down()
73 except:
74 log_stacktrace("Uncaught exception; terminating monitor_db")
75
76 disconnect()
77
78
79def handle_sigint(signum, frame):
80 global _shutdown
81 _shutdown = True
82 print "Shutdown request received."
83
84
85def init(logfile):
86 if logfile:
87 enable_logging(logfile)
88 print "%s> dispatcher starting" % time.strftime("%X %x")
89 print "My PID is %d" % os.getpid()
90
91 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
92 connect()
93
94 print "Setting signal handler"
95 signal.signal(signal.SIGINT, handle_sigint)
96
97 print "Connected! Running..."
98
99
100def enable_logging(logfile):
101 out_file = logfile
102 err_file = "%s.err" % logfile
103 print "Enabling logging to %s (%s)" % (out_file, err_file)
104 out_fd = open(out_file, "a", buffering=0)
105 err_fd = open(err_file, "a", buffering=0)
106
107 os.dup2(out_fd.fileno(), sys.stdout.fileno())
108 os.dup2(err_fd.fileno(), sys.stderr.fileno())
109
110 sys.stdout = out_fd
111 sys.stderr = err_fd
112
113
114def idle_hosts():
115 _cursor.execute("""
116 SELECT * FROM hosts h WHERE
117 id NOT IN (SELECT host_id FROM host_queue_entries WHERE active) AND (
118 (id IN (SELECT host_id FROM host_queue_entries WHERE not complete AND not active))
119 OR
120 (id IN (SELECT DISTINCT hl.host_id FROM host_queue_entries hqe
121 INNER JOIN hosts_labels hl ON hqe.meta_host=hl.label_id WHERE not hqe.complete AND not hqe.active))
122 )
123 AND locked=false AND (h.status IS null OR h.status='Ready') """)
124 hosts = [Host(row=i) for i in _cursor.fetchall()]
125 return hosts
126
127
128def connect():
129 path = os.path.dirname(os.path.abspath(sys.argv[0]))
mblighb090f142008-02-27 21:33:46 +0000130 # get global config and parse for info
131 c = global_config.global_config
132 dbase = "AUTOTEST_WEB"
133 DB_HOST = c.get_config_value(dbase, "host", "localhost")
134 DB_SCHEMA = c.get_config_value(dbase, "database", "autotest_web")
mbligh4314a712008-02-29 22:44:30 +0000135 if _testing_mode:
136 DB_SCHEMA = 'stresstest_autotest_web'
mbligh36768f02008-02-22 18:28:33 +0000137
mblighb090f142008-02-27 21:33:46 +0000138 DB_USER = c.get_config_value(dbase, "user", "autotest")
139 DB_PASS = c.get_config_value(dbase, "password")
mbligh36768f02008-02-22 18:28:33 +0000140
141 global _connection, _cursor
142 _connection = MySQLdb.connect(
143 host=DB_HOST,
144 user=DB_USER,
145 passwd=DB_PASS,
146 db=DB_SCHEMA
147 )
148 _connection.autocommit(True)
149 _cursor = _connection.cursor()
150
151
152def disconnect():
153 global _connection, _cursor
154 _connection.close()
155 _connection = None
156 _cursor = None
157
158
159def parse_results(results_dir, flags=""):
mbligh4314a712008-02-29 22:44:30 +0000160 global _testing_mode
161 if _testing_mode:
162 return
mbligh36768f02008-02-22 18:28:33 +0000163 parse = os.path.join(AUTOTEST_TKO_DIR, 'parse')
164 output = os.path.join(results_dir, '.parse.log')
165 os.system("%s %s -r -o %s > %s 2>&1 &" % (parse, flags, results_dir, output))
166
167
168def log_stacktrace(reason):
169 (type, value, tb) = sys.exc_info()
170 str = "EXCEPTION: %s\n" % reason
171 str += "%s / %s / %s\n" % (socket.gethostname(), os.getpid(),
172 time.strftime("%X %x"))
173 str += ''.join(traceback.format_exception(type, value, tb))
174
175 sys.stderr.write("\n%s\n" % str)
176
177 if _notify_email:
178 sender = "monitor_db"
179 subject = "monitor_db exception"
180 msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (
181 sender, _notify_email, subject, str)
182 mailer = smtplib.SMTP('localhost')
183 mailer.sendmail(sender, _notify_email, msg)
184 mailer.quit()
185
186
187class Dispatcher:
188 def __init__(self, do_recover=True):
189 self._agents = []
190 self.shutting_down = False
191
192 if do_recover:
193 self._recover_lost()
194
195
196 def shut_down(self):
197 print "Shutting down!"
198 self.shutting_down = True
199 while self._agents:
200 self.tick()
201 time.sleep(40)
202
203
204 def tick(self):
205 if not self.shutting_down:
206 self._find_more_work()
207 self._handle_agents()
208
209
210 def add_agent(self, agent):
211 self._agents.append(agent)
212 agent.dispatcher = self
213
214
215 def _recover_lost(self):
216 _cursor.execute("""SELECT * FROM host_queue_entries WHERE active AND NOT complete""")
217 if _cursor.rowcount:
218 queue_entries = [HostQueueEntry(row=i) for i in _cursor.fetchall()]
219 for queue_entry in queue_entries:
220 job = queue_entry.job
221 if job.is_synchronous():
222 for child_entry in job.get_host_queue_entries():
223 child_entry.requeue()
224 else:
225 queue_entry.requeue()
226 queue_entry.clear_results_dir()
227
228 _cursor.execute("""SELECT * FROM hosts
229 WHERE status != 'Ready' AND NOT locked""")
230 if _cursor.rowcount:
231 hosts = [Host(row=i) for i in _cursor.fetchall()]
232 for host in hosts:
233 verify_task = VerifyTask(host = host)
234 self.add_agent(Agent(tasks = [verify_task]))
235
236
237 def _find_more_work(self):
238 print "finding work"
239
240 num_started = 0
241 for host in idle_hosts():
242 tasks = host.next_queue_entries()
243 if tasks:
244 for next in tasks:
245 try:
246 agent = next.run(assigned_host=host)
247 if agent:
248 self.add_agent(agent)
249
250 num_started += 1
251 if num_started>=100:
252 return
253 break
254 except:
255 next.set_status('Failed')
256
257# if next.host:
258# next.host.set_status('Ready')
259
260 log_stacktrace("task_id = %d" % next.id)
261
262
263 def _handle_agents(self):
264 still_running = []
265 for agent in self._agents:
266 agent.tick()
267 if not agent.is_done():
268 still_running.append(agent)
269 else:
270 print "agent finished"
271 self._agents = still_running
272
273
274class RunMonitor(object):
275 def __init__(self, cmd, nice_level = None, log_file = None):
276 self.nice_level = nice_level
277 self.log_file = log_file
278 self.proc = self.run(cmd)
279
280 def run(self, cmd):
281 if self.nice_level:
282 nice_cmd = ['nice','-n', str(self.nice_level)]
283 nice_cmd.extend(cmd)
284 cmd = nice_cmd
285
286 out_file = None
287 if self.log_file:
288 try:
289 out_file = open(self.log_file, 'a')
290 out_file.write("\n%s\n" % ('*'*80))
291 out_file.write("%s> %s\n" % (time.strftime("%X %x"), cmd))
292 out_file.write("%s\n" % ('*'*80))
293 except:
294 pass
295
296 if not out_file:
297 out_file = open('/dev/null', 'w')
298
299 in_devnull = open('/dev/null', 'r')
300 print "cmd = %s" % cmd
301 print "path = %s" % os.getcwd()
302
303 proc = subprocess.Popen(cmd, stdout=out_file,
304 stderr=subprocess.STDOUT, stdin=in_devnull)
305 out_file.close()
306 in_devnull.close()
307 return proc
308
309
310 def kill(self):
311 self.proc.kill()
312
313
314 def exit_code(self):
315 return self.proc.poll()
316
317
318class Agent(object):
319 def __init__(self, tasks):
320 self.active_task = None
321 self.queue = Queue.Queue(0)
322 self.dispatcher = None
323
324 for task in tasks:
325 self.add_task(task)
326
327
328 def add_task(self, task):
329 self.queue.put_nowait(task)
330 task.agent = self
331
332
333 def tick(self):
334 print "agent tick"
335 if self.active_task and not self.active_task.is_done():
336 self.active_task.poll()
337 else:
338 self._next_task();
339
340
341 def _next_task(self):
342 print "agent picking task"
343 if self.active_task:
344 assert self.active_task.is_done()
345
346 if not self.active_task.success and self.active_task.failure_tasks:
347 self.queue = Queue.Queue(0)
348 for task in self.active_task.failure_tasks:
349 self.add_task(task)
350
351 self.active_task = None
352 if not self.is_done():
353 self.active_task = self.queue.get_nowait()
354 if self.active_task:
355 self.active_task.start()
356
357
358 def is_done(self):
359 return self.active_task == None and self.queue.empty()
360
361
362 def start(self):
363 assert self.dispatcher
364
365 self._next_task()
366
367
368class AgentTask(object):
369 def __init__(self, cmd, failure_tasks = None):
370 self.done = False
371 self.failure_tasks = failure_tasks
372 self.started = False
373 self.cmd = cmd
374 self.agent = None
375
376
377 def poll(self):
378 print "poll"
379 if hasattr(self, 'monitor'):
380 self.tick(self.monitor.exit_code())
381 else:
382 self.finished(False)
383
384
385 def tick(self, exit_code):
386 if exit_code==None:
387 return
388# print "exit_code was %d" % exit_code
389 if exit_code == 0:
390 success = True
391 else:
392 success = False
393
394 self.finished(success)
395
396
397 def is_done(self):
398 return self.done
399
400
401 def finished(self, success):
402 self.done = True
403 self.success = success
404 self.epilog()
405
406
407 def prolog(self):
408 pass
409
410
411 def epilog(self):
412 pass
413
414
415 def start(self):
416 assert self.agent
417
418 if not self.started:
419 self.prolog()
420 self.run()
421
422 self.started = True
423
424
425 def abort(self):
426 self.monitor.kill()
427
428
429 def run(self):
430 if self.cmd:
431 print "agent starting monitor"
432
433 log_file = None
434 if hasattr(self, 'host'):
435 log_file = os.path.join(os.path.join(RESULTS_DIR, 'hosts'), self.host.hostname)
436
437 self.monitor = RunMonitor(self.cmd, nice_level = AUTOSERV_NICE_LEVEL, log_file = log_file)
438
439
440class RepairTask(AgentTask):
441 def __init__(self, host):
mbligh4314a712008-02-29 22:44:30 +0000442 global _autoserv_path
443 cmd = [_autoserv_path ,'-n', '-R', '-m', host.hostname]
mbligh36768f02008-02-22 18:28:33 +0000444 self.host = host
445 AgentTask.__init__(self, cmd)
446
447 def prolog(self):
448 print "repair_task starting"
449 self.host.set_status('Repairing')
450
451
452 def epilog(self):
453 if self.success:
454 status = 'Ready'
455 else:
456 status = 'Repair Failed'
457
458 self.host.set_status(status)
459
460
461class VerifyTask(AgentTask):
462 def __init__(self, queue_entry=None, host=None):
463 assert bool(queue_entry) != bool(host)
464
465 self.host = host or queue_entry.host
466 self.queue_entry = queue_entry
467
468 self.temp_results_dir = tempfile.mkdtemp(suffix='.verify')
mbligh4314a712008-02-29 22:44:30 +0000469 global _autoserv_path
470 cmd = [_autoserv_path,'-n', '-v','-m',self.host.hostname,
mbligh36768f02008-02-22 18:28:33 +0000471 '-r', self.temp_results_dir]
472
473 AgentTask.__init__(self, cmd, failure_tasks = [RepairTask(self.host)])
474
475
476 def prolog(self):
477 print "starting verify on %s" % (self.host.hostname)
478 if self.queue_entry:
479 self.queue_entry.set_status('Verifying')
480 self.host.set_status('Verifying')
481
482
483 def epilog(self):
484 if self.queue_entry and (self.success or
485 not self.queue_entry.meta_host):
486 self.move_results()
487 else:
488 shutil.rmtree(self.temp_results_dir)
489
490 if self.success:
491 status = 'Ready'
492 else:
493 status = 'Failed Verify'
494 if self.queue_entry:
495 if self.queue_entry.meta_host:
496 self.host.yield_work()
497 else:
498 self.queue_entry.set_status('Failed')
499
500 self.host.set_status(status)
501
502
503 def move_results(self):
504 assert self.queue_entry is not None
505 target_dir = self.queue_entry.results_dir()
506 if self.queue_entry.job.is_synchronous():
507 target_dir = os.path.join(target_dir,
508 self.queue_entry.host.hostname)
509 if not os.path.exists(target_dir):
510 os.makedirs(target_dir)
511 files = os.listdir(self.temp_results_dir)
512 for filename in files:
513 shutil.move(os.path.join(self.temp_results_dir,
514 filename),
515 os.path.join(target_dir, filename))
516
517
518class VerifySynchronousTask(VerifyTask):
519 def __init__(self, queue_entry):
520 VerifyTask.__init__(self, queue_entry = queue_entry)
521
522
523 def epilog(self):
524 VerifyTask.epilog(self)
525 print "verify_synchronous finished: %s/%s" % (self.queue_entry.host.hostname, self.success)
526 if self.success:
527 if self.queue_entry.job.num_complete()==0:
528 self.queue_entry.set_status('Pending')
529
530 job = self.queue_entry.job
531 if job.is_ready():
532 self.agent.dispatcher.add_agent(job.run(self.queue_entry))
533 else:
534 self.queue_entry.set_status('Stopped') # some other entry failed verify
mbligh8ce2c4a2008-02-29 22:44:53 +0000535 else:
mbligh36768f02008-02-22 18:28:33 +0000536 if self.queue_entry.meta_host:
537 self.queue_entry.set_status('Queued')
538 else:
mbligh8ce2c4a2008-02-29 22:44:53 +0000539 # VerifyTask.epilog() set this queue entry to
540 # failed, so it won't be set to stopped
541 self.stop_all_entries()
542
543
544 def stop_all_entries(self):
545 job = self.queue_entry.job
546 for child_entry in job.get_host_queue_entries():
547 if not child_entry.complete:
548 child_entry.set_status('Stopped')
mbligh36768f02008-02-22 18:28:33 +0000549
550
551class QueueTask(AgentTask):
552 def __init__(self, job, queue_entries, cmd):
553 AgentTask.__init__(self, cmd)
554 self.job = job
555 self.queue_entries = queue_entries
556
557
mbligh4314a712008-02-29 22:44:30 +0000558 @staticmethod
559 def _write_keyval(queue_entry, field, value):
mbligh36768f02008-02-22 18:28:33 +0000560 key_path = os.path.join(queue_entry.results_dir(), 'keyval')
561 keyval_file = open(key_path, 'a')
562 print >> keyval_file, '%s=%d' % (field, value)
563 keyval_file.close()
564
565
566 def prolog(self):
567 for queue_entry in self.queue_entries:
568 print "starting queue_task on %s/%s" % (queue_entry.host.hostname, queue_entry.id)
569 queue_entry.set_status('Running')
570 queue_entry.host.set_status('Running')
571 # write some job timestamps into the job keyval file
572 queued = time.mktime(self.job.created_on.timetuple())
573 started = time.time()
574 self._write_keyval(queue_entry, "job_queued", queued)
575 self._write_keyval(queue_entry, "job_started", started)
576
577
578 def epilog(self):
579 if self.success:
580 status = 'Completed'
581 else:
582 status = 'Failed'
583
584 for queue_entry in self.queue_entries:
585 queue_entry.set_status(status)
586 queue_entry.host.set_status('Ready')
mbligh4314a712008-02-29 22:44:30 +0000587 # write another timestamp into the job keyval file
mbligh36768f02008-02-22 18:28:33 +0000588 finished = time.time()
589 self._write_keyval(queue_entry, "job_finished",
590 finished)
591
592 if self.job.is_synchronous() or self.job.num_machines()==1:
593 if self.job.is_finished():
594 parse_results(self.job.results_dir())
595 else:
596 for queue_entry in self.queue_entries:
597 parse_results(queue_entry.results_dir(), flags='-l 2')
598
599 print "queue_task finished with %s/%s" % (status, self.success)
600
601
602class RebootTask(AgentTask):
603 def __init__(self):
604 AgentTask.__init__(self, host)
mbligh4314a712008-02-29 22:44:30 +0000605 global _autoserv_path
606 self.cmd = "%s -n -b -m %s /dev/null" % (_autoserv_path, host)
mbligh36768f02008-02-22 18:28:33 +0000607 self.host = host
608
609
610 def tick(self, exit_code):
611 raise "not implemented"
612
613
614 def run(self):
615 raise "not implemented"
616
617
618
619class DBObject(object):
mbligh4314a712008-02-29 22:44:30 +0000620 def __init__(self, table, fields, id=None, row=None, new_record=False):
mbligh36768f02008-02-22 18:28:33 +0000621 assert (bool(id) != bool(row)) and table and fields
622
623 self.__table = table
624 self.__fields = fields
625
626 self.__new_record = new_record
627
628 if row is None:
629 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
630 _cursor.execute(sql, (id,))
631 if not _cursor.rowcount:
632 raise "row not found (table=%s, id=%s)" % \
633 (self.__table, id)
634 row = _cursor.fetchone()
635
636 assert len(row)==len(fields), "table = %s, row = %s/%d, fields = %s/%d" % (table, row, len(row), fields, len(fields))
637
638 self.__valid_fields = {}
639 for i,value in enumerate(row):
640 self.__dict__[fields[i]] = value
641 self.__valid_fields[fields[i]] = True
642
643 del self.__valid_fields['id']
644
645 def count(self, where, table = None):
646 if not table:
647 table = self.__table
mbligh4314a712008-02-29 22:44:30 +0000648
mbligh36768f02008-02-22 18:28:33 +0000649 _cursor.execute("""
650 SELECT count(*) FROM %s
651 WHERE %s
652 """ % (table, where))
653 count = _cursor.fetchall()
654
655 return int(count[0][0])
656
657
658 def num_cols(self):
659 return len(self.__fields)
660
661
662 def update_field(self, field, value):
663 assert self.__valid_fields[field]
664
665 if self.__dict__[field] == value:
666 return
667
668 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % \
669 (self.__table, field)
670 _cursor.execute(query, (value, self.id))
671
672 self.__dict__[field] = value
673
674
675 def save(self):
676 if self.__new_record:
677 keys = self.__fields[1:] # avoid id
678 columns = ','.join([str(key) for key in keys])
679 values = ['"%s"' % self.__dict__[key] for key in keys]
680 values = ','.join(values)
681 query = """INSERT INTO %s (%s) VALUES (%s)""" % \
682 (self.__table, columns, values)
683 _cursor.execute(query)
684
685
686 def delete(self):
687 _cursor.execute("""DELETE FROM %s WHERE id = %%s""" % \
688 self.__table, (self.id,))
689
690
691class IneligibleHostQueue(DBObject):
692 def __init__(self, id=None, row=None, new_record=None):
693 fields = ['id', 'job_id', 'host_id']
694 DBObject.__init__(self, 'ineligible_host_queues', fields,
695 id=id, row=row, new_record=new_record)
696
697
698class Host(DBObject):
699 def __init__(self, id=None, row=None):
700 fields = ['id', 'hostname', 'locked', 'synch_id','status']
701 DBObject.__init__(self, 'hosts',fields, id=id, row=row)
702
703
704
705 def current_task(self):
706 _cursor.execute("""
707 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
708 """, (self.id,))
709
710 if not _cursor.rowcount:
711 return None
712 else:
713 assert _cursor.rowcount == 1
714 results = _cursor.fetchone();
715# print "current = %s" % results
716 return HostQueueEntry(row=results)
717
718
719 def next_queue_entries(self):
720 if self.locked:
721 print "%s locked, not queuing" % self.hostname
722 return None
723# print "%s/%s looking for work" % (self.hostname, self.platform_id)
724 _cursor.execute("""
725 SELECT * FROM host_queue_entries
726 WHERE ((host_id=%s) OR (meta_host IS NOT null AND
727 (meta_host IN (
728 SELECT label_id FROM hosts_labels WHERE host_id=%s
729 )
730 )
731 AND job_id NOT IN (
732 SELECT job_id FROM ineligible_host_queues
733 WHERE host_id=%s
734 )))
735 AND NOT complete AND NOT active
736 ORDER BY priority DESC, meta_host, id
737 LIMIT 1
738 """, (self.id,self.id, self.id))
739
740 if not _cursor.rowcount:
741 return None
742 else:
743 return [HostQueueEntry(row=i) for i in _cursor.fetchall()]
744
745 def yield_work(self):
746 print "%s yielding work" % self.hostname
747 if self.current_task():
748 self.current_task().requeue()
749
750 def set_status(self,status):
751 self.update_field('status',status)
752
753
754class HostQueueEntry(DBObject):
755 def __init__(self, id=None, row=None):
756 assert id or row
757 fields = ['id', 'job_id', 'host_id', 'priority', 'status',
758 'meta_host', 'active', 'complete']
759 DBObject.__init__(self, 'host_queue_entries', fields, id=id,
760 row=row)
761
762 self.job = Job(self.job_id)
763
764 if self.host_id:
765 self.host = Host(self.host_id)
766 else:
767 self.host = None
768
769 self.queue_log_path = os.path.join(self.job.results_dir(),
770 'queue.log.' + str(self.id))
771
772
773 def set_host(self, host):
774 if host:
775 self.queue_log_record('Assigning host ' + host.hostname)
776 self.update_field('host_id', host.id)
777 self.update_field('active', True)
778 else:
779 self.queue_log_record('Releasing host')
780 self.update_field('host_id', None)
781
782 self.host = host
783
784
785 def get_host(self):
786 if not self.host:
787 if self.host_id:
788 self.host = Host(self.host_id)
789 if self.host:
790 return self.host
791 else:
792 return None
793
794
795 def queue_log_record(self, log_line):
796 queue_log = open(self.queue_log_path, 'a', 0)
797 queue_log.write(log_line + '\n')
798 queue_log.close()
799
800
801 def results_dir(self):
802 if self.job.num_machines()==1 or self.job.is_synchronous():
803 results_dir = self.job.job_dir
804 else:
805 assert self.host
806 results_dir = '%s/%s' % (self.job.job_dir,
807 self.host.hostname)
808
809 return results_dir
810
811
812 def set_status(self, status):
813 self.update_field('status', status)
814 if self.host:
815 hostname = self.host.hostname
816 else:
817 hostname = 'no host'
818 print "%s/%d status -> %s" % (hostname, self.id, self.status)
819 if status in ['Queued']:
820 self.update_field('complete', False)
821 self.update_field('active', False)
822
823 if status in ['Pending', 'Running', 'Verifying', 'Starting']:
824 self.update_field('complete', False)
825 self.update_field('active', True)
826
827 if status in ['Failed', 'Completed', 'Stopped']:
828 self.update_field('complete', True)
829 self.update_field('active', False)
830
831
832 def run(self,assigned_host=None):
833 if self.meta_host:
834 assert assigned_host
835 self.job.create_results_dir()
836 self.set_host(assigned_host)
837 print "creating block %s/%s" % (self.job.id,
838 self.host.id)
839
840 row = [0, self.job.id, self.host.id]
841 block = IneligibleHostQueue(row=row, new_record=True)
842 block.save()
843
844 print "%s/%s scheduled on %s, status=%s" % (self.job.name,
845 self.meta_host, self.host.hostname, self.status)
846
847 return self.job.run(queue_entry=self)
848
849 def requeue(self):
850 self.set_status('Queued')
851
852 if self.meta_host:
853 self.set_host(None)
854
855
856 def clear_results_dir(self):
857 if os.path.exists(self.results_dir()):
858 shutil.rmtree(self.results_dir())
859
860
861class Job(DBObject):
862 def __init__(self, id=None, row=None):
863 assert id or row
864 DBObject.__init__(self,'jobs',['id','owner','name','priority','control_file',
mbligh4314a712008-02-29 22:44:30 +0000865 'control_type','created_on', 'synch_type', 'synch_count',
866 'synchronizing' ], id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +0000867
868 self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id, self.owner))
869
870
871 def is_server_job(self):
872 return self.control_type != 2
873
874
875 def get_host_queue_entries(self):
876 _cursor.execute("""
877 SELECT * FROM host_queue_entries
878 WHERE job_id= %s
879 """, (self.id,))
880 entries = [HostQueueEntry(row=i) for i in _cursor.fetchall()]
881
882 assert len(entries)>0
883
884 return entries
885
886
887 def set_status(self, status, update_queues=False):
888 self.update_field('status',status)
889
890 if update_queues:
891 for queue_entry in self.get_host_queue_entries():
892 queue_entry.set_status(status)
893
894
895 def is_synchronous(self):
896 return self.synch_type == 2
897
898
899 def is_ready(self):
900 if not self.is_synchronous():
901 return True
902 sql = "job_id=%s AND status='Pending'" % self.id
903 count = self.count(sql, table='host_queue_entries')
904 return (count == self.synch_count)
905
906
907 def ready_to_synchronize(self):
908 # heuristic
909 queue_entries = self.get_host_queue_entries()
910 count = 0
911 for queue_entry in queue_entries:
912 if queue_entry.status == 'Pending':
913 count += 1
914
915 return (count/self.synch_count >= 0.5)
916
917
918 def start_synchronizing(self):
919 self.update_field('synchronizing', True)
920
921
922 def results_dir(self):
923 return self.job_dir
924
925 def num_machines(self, clause = None):
926 sql = "job_id=%s" % self.id
927 if clause:
928 sql += " AND (%s)" % clause
929 return self.count(sql, table='host_queue_entries')
930
931
932 def num_queued(self):
933 return self.num_machines('not complete')
934
935
936 def num_active(self):
937 return self.num_machines('active')
938
939
940 def num_complete(self):
941 return self.num_machines('complete')
942
943
944 def is_finished(self):
945 left = self.num_queued()
946 print "%s: %s machines left" % (self.name, left)
947 return left==0
948
949 def stop_synchronizing(self):
950 self.update_field('synchronizing', False)
951 self.set_status('Queued', update_queues = False)
952
953
954 def write_machines_file(self):
955 if self.num_machines()>1:
956 print "writing machines file"
957 mf = open("%s/.machines" % self.job_dir, 'w')
958 for queue_entry in self.get_host_queue_entries():
959 if queue_entry.get_host():
960 mf.write("%s\n" % \
961 queue_entry.get_host().hostname)
962 mf.close()
963
964
965 def create_results_dir(self, queue_entry=None):
966 print "create: active: %s complete %s" % (self.num_active(),
967 self.num_complete())
968
969 if not os.path.exists(self.job_dir):
970 os.makedirs(self.job_dir)
971
972 if queue_entry:
973 return queue_entry.results_dir()
974 return self.job_dir
975
976
977 def run(self, queue_entry):
978 results_dir = self.create_results_dir(queue_entry)
979
980 if self.is_synchronous():
981 if not self.is_ready():
982 return Agent([VerifySynchronousTask(queue_entry = queue_entry)])
983
984 queue_entry.set_status('Starting')
985
986 ctrl = open(os.tmpnam(), 'w')
987 if self.control_file:
988 ctrl.write(self.control_file)
989 else:
990 ctrl.write("")
991 ctrl.flush()
992
993 if self.is_synchronous():
994 if self.num_machines() > 1:
995 self.write_machines_file()
996 hosts = self.get_host_queue_entries()
997 hostnames = ','.join([i.host.hostname for i in hosts])
998 queue_entries = self.get_host_queue_entries()
999 else:
1000 assert queue_entry
1001 hostnames = queue_entry.host.hostname
1002 queue_entries = [queue_entry]
1003
mbligh4314a712008-02-29 22:44:30 +00001004 global _autoserv_path
1005 params = [_autoserv_path, '-n', '-r', results_dir,
mbligh36768f02008-02-22 18:28:33 +00001006 '-b', '-u', self.owner, '-l', self.name,
1007 '-m', hostnames, ctrl.name]
1008
1009 if not self.is_server_job():
1010 params.append('-c')
1011
1012 tasks = []
1013 if not self.is_synchronous():
1014 tasks.append(VerifyTask(queue_entry))
1015
1016 tasks.append(QueueTask(job = self, queue_entries = queue_entries, cmd = params))
1017
1018 agent = Agent(tasks)
1019
1020 return agent
1021
1022
1023if __name__ == '__main__':
1024 main()