blob: e7eaedf166b75c465f29f3e4a034ee7fe44c940d [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard542e8402008-09-19 20:16:18 +00008import datetime, errno, MySQLdb, optparse, os, pwd, Queue, re, shutil, signal
9import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showard170873e2009-01-07 00:22:26 +000010import itertools, logging
mbligh70feeee2008-06-11 16:20:49 +000011import common
showard21baa452008-10-21 00:08:39 +000012from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000013from autotest_lib.client.common_lib import global_config
showard2bab8f42008-11-12 18:15:22 +000014from autotest_lib.client.common_lib import host_protections, utils, debug
showardb1e51872008-10-07 11:08:18 +000015from autotest_lib.database import database_connection
showard21baa452008-10-21 00:08:39 +000016from autotest_lib.frontend.afe import models
showard170873e2009-01-07 00:22:26 +000017from autotest_lib.scheduler import drone_manager, drones, email_manager
showardd1ee1dd2009-01-07 21:33:08 +000018from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000019
mblighb090f142008-02-27 21:33:46 +000020
mbligh36768f02008-02-22 18:28:33 +000021RESULTS_DIR = '.'
22AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000023DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000024
25AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
26
27if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000028 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000029AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
30AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
31
32if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000033 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000034
mbligh90a549d2008-03-25 23:52:34 +000035# how long to wait for autoserv to write a pidfile
36PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000037
mbligh6f8bab42008-02-29 22:45:14 +000038_db = None
mbligh36768f02008-02-22 18:28:33 +000039_shutdown = False
showard170873e2009-01-07 00:22:26 +000040_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
41_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000042_testing_mode = False
showard542e8402008-09-19 20:16:18 +000043_base_url = None
showardc85c21b2008-11-24 22:17:37 +000044_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000045_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000046
47
48def main():
jadmanski0afbb632008-06-06 21:10:57 +000049 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000050
jadmanski0afbb632008-06-06 21:10:57 +000051 parser = optparse.OptionParser(usage)
52 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
53 action='store_true')
54 parser.add_option('--logfile', help='Set a log file that all stdout ' +
55 'should be redirected to. Stderr will go to this ' +
56 'file + ".err"')
57 parser.add_option('--test', help='Indicate that scheduler is under ' +
58 'test and should use dummy autoserv and no parsing',
59 action='store_true')
60 (options, args) = parser.parse_args()
61 if len(args) != 1:
62 parser.print_usage()
63 return
mbligh36768f02008-02-22 18:28:33 +000064
jadmanski0afbb632008-06-06 21:10:57 +000065 global RESULTS_DIR
66 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +000067
jadmanski0afbb632008-06-06 21:10:57 +000068 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +000069 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
70 "notify_email_statuses",
71 default='')
showardc85c21b2008-11-24 22:17:37 +000072 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +000073 _notify_email_statuses = [status for status in
74 re.split(r'[\s,;:]', notify_statuses_list.lower())
75 if status]
showardc85c21b2008-11-24 22:17:37 +000076
jadmanski0afbb632008-06-06 21:10:57 +000077 if options.test:
78 global _autoserv_path
79 _autoserv_path = 'autoserv_dummy'
80 global _testing_mode
81 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +000082
mbligh37eceaa2008-12-15 22:56:37 +000083 # AUTOTEST_WEB.base_url is still a supported config option as some people
84 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +000085 global _base_url
showard170873e2009-01-07 00:22:26 +000086 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
87 default='')
mbligh37eceaa2008-12-15 22:56:37 +000088 if config_base_url:
89 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +000090 else:
mbligh37eceaa2008-12-15 22:56:37 +000091 # For the common case of everything running on a single server you
92 # can just set the hostname in a single place in the config file.
93 server_name = c.get_config_value('SERVER', 'hostname')
94 if not server_name:
95 print 'Error: [SERVER] hostname missing from the config file.'
96 sys.exit(1)
97 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +000098
showardc5afc462009-01-13 00:09:39 +000099 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000100 server.start()
101
jadmanski0afbb632008-06-06 21:10:57 +0000102 try:
showardc5afc462009-01-13 00:09:39 +0000103 init(options.logfile)
104 dispatcher = Dispatcher()
105 dispatcher.do_initial_recovery(recover_hosts=options.recover_hosts)
106
jadmanski0afbb632008-06-06 21:10:57 +0000107 while not _shutdown:
108 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000109 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000110 except:
showard170873e2009-01-07 00:22:26 +0000111 email_manager.manager.log_stacktrace(
112 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000113
showard170873e2009-01-07 00:22:26 +0000114 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000115 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000116 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000117 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000118
119
120def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000121 global _shutdown
122 _shutdown = True
123 print "Shutdown request received."
mbligh36768f02008-02-22 18:28:33 +0000124
125
126def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000127 if logfile:
128 enable_logging(logfile)
129 print "%s> dispatcher starting" % time.strftime("%X %x")
130 print "My PID is %d" % os.getpid()
mbligh36768f02008-02-22 18:28:33 +0000131
showardb1e51872008-10-07 11:08:18 +0000132 if _testing_mode:
133 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000134 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000135
jadmanski0afbb632008-06-06 21:10:57 +0000136 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
137 global _db
showard170873e2009-01-07 00:22:26 +0000138 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000139 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000140
showardfa8629c2008-11-04 16:51:23 +0000141 # ensure Django connection is in autocommit
142 setup_django_environment.enable_autocommit()
143
showard2bab8f42008-11-12 18:15:22 +0000144 debug.configure('scheduler', format_string='%(message)s')
showard67831ae2009-01-16 03:07:38 +0000145 debug.get_logger().setLevel(logging.INFO)
showard2bab8f42008-11-12 18:15:22 +0000146
jadmanski0afbb632008-06-06 21:10:57 +0000147 print "Setting signal handler"
148 signal.signal(signal.SIGINT, handle_sigint)
149
showardd1ee1dd2009-01-07 21:33:08 +0000150 drones = global_config.global_config.get_config_value(
151 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
152 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000153 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000154 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000155 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
156
jadmanski0afbb632008-06-06 21:10:57 +0000157 print "Connected! Running..."
mbligh36768f02008-02-22 18:28:33 +0000158
159
160def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000161 out_file = logfile
162 err_file = "%s.err" % logfile
163 print "Enabling logging to %s (%s)" % (out_file, err_file)
164 out_fd = open(out_file, "a", buffering=0)
165 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000166
jadmanski0afbb632008-06-06 21:10:57 +0000167 os.dup2(out_fd.fileno(), sys.stdout.fileno())
168 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000169
jadmanski0afbb632008-06-06 21:10:57 +0000170 sys.stdout = out_fd
171 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000172
173
mblighd5c95802008-03-05 00:33:46 +0000174def queue_entries_to_abort():
jadmanski0afbb632008-06-06 21:10:57 +0000175 rows = _db.execute("""
176 SELECT * FROM host_queue_entries WHERE status='Abort';
177 """)
showard2bab8f42008-11-12 18:15:22 +0000178
jadmanski0afbb632008-06-06 21:10:57 +0000179 qe = [HostQueueEntry(row=i) for i in rows]
180 return qe
mbligh36768f02008-02-22 18:28:33 +0000181
showard7cf9a9b2008-05-15 21:15:52 +0000182
showard63a34772008-08-18 19:32:50 +0000183class HostScheduler(object):
184 def _get_ready_hosts(self):
185 # avoid any host with a currently active queue entry against it
186 hosts = Host.fetch(
187 joins='LEFT JOIN host_queue_entries AS active_hqe '
188 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000189 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000190 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000191 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000192 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
193 return dict((host.id, host) for host in hosts)
194
195
196 @staticmethod
197 def _get_sql_id_list(id_list):
198 return ','.join(str(item_id) for item_id in id_list)
199
200
201 @classmethod
showard989f25d2008-10-01 11:38:11 +0000202 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000203 if not id_list:
204 return {}
showard63a34772008-08-18 19:32:50 +0000205 query %= cls._get_sql_id_list(id_list)
206 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000207 return cls._process_many2many_dict(rows, flip)
208
209
210 @staticmethod
211 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000212 result = {}
213 for row in rows:
214 left_id, right_id = long(row[0]), long(row[1])
showard989f25d2008-10-01 11:38:11 +0000215 if flip:
216 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000217 result.setdefault(left_id, set()).add(right_id)
218 return result
219
220
221 @classmethod
222 def _get_job_acl_groups(cls, job_ids):
223 query = """
224 SELECT jobs.id, acl_groups_users.acl_group_id
225 FROM jobs
226 INNER JOIN users ON users.login = jobs.owner
227 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
228 WHERE jobs.id IN (%s)
229 """
230 return cls._get_many2many_dict(query, job_ids)
231
232
233 @classmethod
234 def _get_job_ineligible_hosts(cls, job_ids):
235 query = """
236 SELECT job_id, host_id
237 FROM ineligible_host_queues
238 WHERE job_id IN (%s)
239 """
240 return cls._get_many2many_dict(query, job_ids)
241
242
243 @classmethod
showard989f25d2008-10-01 11:38:11 +0000244 def _get_job_dependencies(cls, job_ids):
245 query = """
246 SELECT job_id, label_id
247 FROM jobs_dependency_labels
248 WHERE job_id IN (%s)
249 """
250 return cls._get_many2many_dict(query, job_ids)
251
252
253 @classmethod
showard63a34772008-08-18 19:32:50 +0000254 def _get_host_acls(cls, host_ids):
255 query = """
256 SELECT host_id, acl_group_id
257 FROM acl_groups_hosts
258 WHERE host_id IN (%s)
259 """
260 return cls._get_many2many_dict(query, host_ids)
261
262
263 @classmethod
264 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000265 if not host_ids:
266 return {}, {}
showard63a34772008-08-18 19:32:50 +0000267 query = """
268 SELECT label_id, host_id
269 FROM hosts_labels
270 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000271 """ % cls._get_sql_id_list(host_ids)
272 rows = _db.execute(query)
273 labels_to_hosts = cls._process_many2many_dict(rows)
274 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
275 return labels_to_hosts, hosts_to_labels
276
277
278 @classmethod
279 def _get_labels(cls):
280 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000281
282
283 def refresh(self, pending_queue_entries):
284 self._hosts_available = self._get_ready_hosts()
285
286 relevant_jobs = [queue_entry.job_id
287 for queue_entry in pending_queue_entries]
288 self._job_acls = self._get_job_acl_groups(relevant_jobs)
289 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000290 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000291
292 host_ids = self._hosts_available.keys()
293 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000294 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
295
296 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000297
298
299 def _is_acl_accessible(self, host_id, queue_entry):
300 job_acls = self._job_acls.get(queue_entry.job_id, set())
301 host_acls = self._host_acls.get(host_id, set())
302 return len(host_acls.intersection(job_acls)) > 0
303
304
showard989f25d2008-10-01 11:38:11 +0000305 def _check_job_dependencies(self, job_dependencies, host_labels):
306 missing = job_dependencies - host_labels
307 return len(job_dependencies - host_labels) == 0
308
309
310 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
311 queue_entry):
showardade14e22009-01-26 22:38:32 +0000312 if not queue_entry.meta_host:
313 # bypass only_if_needed labels when a specific host is selected
314 return True
315
showard989f25d2008-10-01 11:38:11 +0000316 for label_id in host_labels:
317 label = self._labels[label_id]
318 if not label.only_if_needed:
319 # we don't care about non-only_if_needed labels
320 continue
321 if queue_entry.meta_host == label_id:
322 # if the label was requested in a metahost it's OK
323 continue
324 if label_id not in job_dependencies:
325 return False
326 return True
327
328
329 def _is_host_eligible_for_job(self, host_id, queue_entry):
330 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
331 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000332
333 acl = self._is_acl_accessible(host_id, queue_entry)
334 deps = self._check_job_dependencies(job_dependencies, host_labels)
335 only_if = self._check_only_if_needed_labels(job_dependencies,
336 host_labels, queue_entry)
337 return acl and deps and only_if
showard989f25d2008-10-01 11:38:11 +0000338
339
showard63a34772008-08-18 19:32:50 +0000340 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000341 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000342 return None
343 return self._hosts_available.pop(queue_entry.host_id, None)
344
345
346 def _is_host_usable(self, host_id):
347 if host_id not in self._hosts_available:
348 # host was already used during this scheduling cycle
349 return False
350 if self._hosts_available[host_id].invalid:
351 # Invalid hosts cannot be used for metahosts. They're included in
352 # the original query because they can be used by non-metahosts.
353 return False
354 return True
355
356
357 def _schedule_metahost(self, queue_entry):
358 label_id = queue_entry.meta_host
359 hosts_in_label = self._label_hosts.get(label_id, set())
360 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
361 set())
362
363 # must iterate over a copy so we can mutate the original while iterating
364 for host_id in list(hosts_in_label):
365 if not self._is_host_usable(host_id):
366 hosts_in_label.remove(host_id)
367 continue
368 if host_id in ineligible_host_ids:
369 continue
showard989f25d2008-10-01 11:38:11 +0000370 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000371 continue
372
373 hosts_in_label.remove(host_id)
374 return self._hosts_available.pop(host_id)
375 return None
376
377
378 def find_eligible_host(self, queue_entry):
379 if not queue_entry.meta_host:
380 return self._schedule_non_metahost(queue_entry)
381 return self._schedule_metahost(queue_entry)
382
383
showard170873e2009-01-07 00:22:26 +0000384class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000385 def __init__(self):
386 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000387 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000388 self._host_scheduler = HostScheduler()
showard170873e2009-01-07 00:22:26 +0000389 self._host_agents = {}
390 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000391
mbligh36768f02008-02-22 18:28:33 +0000392
jadmanski0afbb632008-06-06 21:10:57 +0000393 def do_initial_recovery(self, recover_hosts=True):
394 # always recover processes
395 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000396
jadmanski0afbb632008-06-06 21:10:57 +0000397 if recover_hosts:
398 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000399
400
jadmanski0afbb632008-06-06 21:10:57 +0000401 def tick(self):
showard170873e2009-01-07 00:22:26 +0000402 _drone_manager.refresh()
showarda3ab0d52008-11-03 19:03:47 +0000403 self._run_cleanup_maybe()
jadmanski0afbb632008-06-06 21:10:57 +0000404 self._find_aborting()
405 self._schedule_new_jobs()
406 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000407 _drone_manager.execute_actions()
408 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000409
showard97aed502008-11-04 02:01:24 +0000410
showarda3ab0d52008-11-03 19:03:47 +0000411 def _run_cleanup_maybe(self):
showardd1ee1dd2009-01-07 21:33:08 +0000412 should_cleanup = (self._last_clean_time +
413 scheduler_config.config.clean_interval * 60 <
414 time.time())
415 if should_cleanup:
showarda3ab0d52008-11-03 19:03:47 +0000416 print 'Running cleanup'
417 self._abort_timed_out_jobs()
418 self._abort_jobs_past_synch_start_timeout()
419 self._clear_inactive_blocks()
showardfa8629c2008-11-04 16:51:23 +0000420 self._check_for_db_inconsistencies()
showarda3ab0d52008-11-03 19:03:47 +0000421 self._last_clean_time = time.time()
422
mbligh36768f02008-02-22 18:28:33 +0000423
showard170873e2009-01-07 00:22:26 +0000424 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
425 for object_id in object_ids:
426 agent_dict.setdefault(object_id, set()).add(agent)
427
428
429 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
430 for object_id in object_ids:
431 assert object_id in agent_dict
432 agent_dict[object_id].remove(agent)
433
434
jadmanski0afbb632008-06-06 21:10:57 +0000435 def add_agent(self, agent):
436 self._agents.append(agent)
437 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000438 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
439 self._register_agent_for_ids(self._queue_entry_agents,
440 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000441
showard170873e2009-01-07 00:22:26 +0000442
443 def get_agents_for_entry(self, queue_entry):
444 """
445 Find agents corresponding to the specified queue_entry.
446 """
447 return self._queue_entry_agents.get(queue_entry.id, set())
448
449
450 def host_has_agent(self, host):
451 """
452 Determine if there is currently an Agent present using this host.
453 """
454 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000455
456
jadmanski0afbb632008-06-06 21:10:57 +0000457 def remove_agent(self, agent):
458 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000459 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
460 agent)
461 self._unregister_agent_for_ids(self._queue_entry_agents,
462 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000463
464
showard4c5374f2008-09-04 17:02:56 +0000465 def num_running_processes(self):
466 return sum(agent.num_processes for agent in self._agents
467 if agent.is_running())
mblighbb421852008-03-11 22:36:16 +0000468
469
showard170873e2009-01-07 00:22:26 +0000470 def _extract_execution_tag(self, command_line):
471 match = re.match(r'.* -P (\S+) ', command_line)
472 if not match:
473 return None
474 return match.group(1)
mblighbb421852008-03-11 22:36:16 +0000475
476
showard2bab8f42008-11-12 18:15:22 +0000477 def _recover_queue_entries(self, queue_entries, run_monitor):
478 assert len(queue_entries) > 0
showard2bab8f42008-11-12 18:15:22 +0000479 queue_task = RecoveryQueueTask(job=queue_entries[0].job,
480 queue_entries=queue_entries,
481 run_monitor=run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +0000482 self.add_agent(Agent(tasks=[queue_task],
showard170873e2009-01-07 00:22:26 +0000483 num_processes=len(queue_entries)))
mblighbb421852008-03-11 22:36:16 +0000484
485
jadmanski0afbb632008-06-06 21:10:57 +0000486 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000487 self._register_pidfiles()
488 _drone_manager.refresh()
489 self._recover_running_entries()
490 self._recover_aborting_entries()
491 self._requeue_other_active_entries()
492 self._recover_parsing_entries()
493 self._reverify_remaining_hosts()
494 # reinitialize drones after killing orphaned processes, since they can
495 # leave around files when they die
496 _drone_manager.execute_actions()
497 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000498
showard170873e2009-01-07 00:22:26 +0000499
500 def _register_pidfiles(self):
501 # during recovery we may need to read pidfiles for both running and
502 # parsing entries
503 queue_entries = HostQueueEntry.fetch(
504 where="status IN ('Running', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000505 for queue_entry in queue_entries:
showard170873e2009-01-07 00:22:26 +0000506 pidfile_id = _drone_manager.get_pidfile_id_from(
507 queue_entry.execution_tag())
508 _drone_manager.register_pidfile(pidfile_id)
509
510
511 def _recover_running_entries(self):
512 orphans = _drone_manager.get_orphaned_autoserv_processes()
513
514 queue_entries = HostQueueEntry.fetch(where="status = 'Running'")
515 requeue_entries = []
516 for queue_entry in queue_entries:
517 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000518 # synchronous job we've already recovered
519 continue
showard170873e2009-01-07 00:22:26 +0000520 execution_tag = queue_entry.execution_tag()
521 run_monitor = PidfileRunMonitor()
522 run_monitor.attach_to_existing_process(execution_tag)
523 if not run_monitor.has_process():
524 # autoserv apparently never got run, so let it get requeued
525 continue
showarde788ea62008-11-17 21:02:47 +0000526 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showard170873e2009-01-07 00:22:26 +0000527 print 'Recovering %s (process %s)' % (
528 ', '.join(str(entry) for entry in queue_entries),
529 run_monitor.get_process())
showard2bab8f42008-11-12 18:15:22 +0000530 self._recover_queue_entries(queue_entries, run_monitor)
showard170873e2009-01-07 00:22:26 +0000531 orphans.pop(execution_tag, None)
mbligh90a549d2008-03-25 23:52:34 +0000532
jadmanski0afbb632008-06-06 21:10:57 +0000533 # now kill any remaining autoserv processes
showard170873e2009-01-07 00:22:26 +0000534 for process in orphans.itervalues():
535 print 'Killing orphan %s' % process
536 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000537
showard170873e2009-01-07 00:22:26 +0000538
539 def _recover_aborting_entries(self):
540 queue_entries = HostQueueEntry.fetch(
541 where='status IN ("Abort", "Aborting")')
jadmanski0afbb632008-06-06 21:10:57 +0000542 for queue_entry in queue_entries:
showard170873e2009-01-07 00:22:26 +0000543 print 'Recovering aborting QE %s' % queue_entry
544 agent = queue_entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000545
showard97aed502008-11-04 02:01:24 +0000546
showard170873e2009-01-07 00:22:26 +0000547 def _requeue_other_active_entries(self):
548 queue_entries = HostQueueEntry.fetch(
549 where='active AND NOT complete AND status != "Pending"')
550 for queue_entry in queue_entries:
551 if self.get_agents_for_entry(queue_entry):
552 # entry has already been recovered
553 continue
554 print 'Requeuing active QE %s (status=%s)' % (queue_entry,
555 queue_entry.status)
556 if queue_entry.host:
557 tasks = queue_entry.host.reverify_tasks()
558 self.add_agent(Agent(tasks))
559 agent = queue_entry.requeue()
560
561
562 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000563 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000564 self._reverify_hosts_where("""(status = 'Repairing' OR
565 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000566 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000567
showard170873e2009-01-07 00:22:26 +0000568 # recover "Running" hosts with no active queue entries, although this
569 # should never happen
570 message = ('Recovering running host %s - this probably indicates a '
571 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000572 self._reverify_hosts_where("""status = 'Running' AND
573 id NOT IN (SELECT host_id
574 FROM host_queue_entries
575 WHERE active)""",
576 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000577
578
jadmanski0afbb632008-06-06 21:10:57 +0000579 def _reverify_hosts_where(self, where,
showard170873e2009-01-07 00:22:26 +0000580 print_message='Reverifying host %s'):
581 full_where='locked = 0 AND invalid = 0 AND ' + where
582 for host in Host.fetch(where=full_where):
583 if self.host_has_agent(host):
584 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000585 continue
showard170873e2009-01-07 00:22:26 +0000586 if print_message:
jadmanski0afbb632008-06-06 21:10:57 +0000587 print print_message % host.hostname
showard170873e2009-01-07 00:22:26 +0000588 tasks = host.reverify_tasks()
589 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000590
591
showard97aed502008-11-04 02:01:24 +0000592 def _recover_parsing_entries(self):
showard2bab8f42008-11-12 18:15:22 +0000593 recovered_entry_ids = set()
showard97aed502008-11-04 02:01:24 +0000594 for entry in HostQueueEntry.fetch(where='status = "Parsing"'):
showard2bab8f42008-11-12 18:15:22 +0000595 if entry.id in recovered_entry_ids:
596 continue
597 queue_entries = entry.job.get_group_entries(entry)
showard170873e2009-01-07 00:22:26 +0000598 recovered_entry_ids = recovered_entry_ids.union(
599 entry.id for entry in queue_entries)
600 print 'Recovering parsing entries %s' % (
601 ', '.join(str(entry) for entry in queue_entries))
showard97aed502008-11-04 02:01:24 +0000602
603 reparse_task = FinalReparseTask(queue_entries)
showard170873e2009-01-07 00:22:26 +0000604 self.add_agent(Agent([reparse_task], num_processes=0))
showard97aed502008-11-04 02:01:24 +0000605
606
jadmanski0afbb632008-06-06 21:10:57 +0000607 def _recover_hosts(self):
608 # recover "Repair Failed" hosts
609 message = 'Reverifying dead host %s'
610 self._reverify_hosts_where("status = 'Repair Failed'",
611 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000612
613
showard3bb499f2008-07-03 19:42:20 +0000614 def _abort_timed_out_jobs(self):
615 """
616 Aborts all jobs that have timed out and not completed
617 """
showarda3ab0d52008-11-03 19:03:47 +0000618 query = models.Job.objects.filter(hostqueueentry__complete=False).extra(
619 where=['created_on + INTERVAL timeout HOUR < NOW()'])
620 for job in query.distinct():
621 print 'Aborting job %d due to job timeout' % job.id
622 job.abort(None)
showard3bb499f2008-07-03 19:42:20 +0000623
624
showard98863972008-10-29 21:14:56 +0000625 def _abort_jobs_past_synch_start_timeout(self):
626 """
627 Abort synchronous jobs that are past the start timeout (from global
628 config) and are holding a machine that's in everyone.
629 """
630 timeout_delta = datetime.timedelta(
showardd1ee1dd2009-01-07 21:33:08 +0000631 minutes=scheduler_config.config.synch_job_start_timeout_minutes)
showard98863972008-10-29 21:14:56 +0000632 timeout_start = datetime.datetime.now() - timeout_delta
633 query = models.Job.objects.filter(
showard98863972008-10-29 21:14:56 +0000634 created_on__lt=timeout_start,
635 hostqueueentry__status='Pending',
636 hostqueueentry__host__acl_group__name='Everyone')
637 for job in query.distinct():
638 print 'Aborting job %d due to start timeout' % job.id
showardff059d72008-12-03 18:18:53 +0000639 entries_to_abort = job.hostqueueentry_set.exclude(
640 status=models.HostQueueEntry.Status.RUNNING)
641 for queue_entry in entries_to_abort:
642 queue_entry.abort(None)
showard98863972008-10-29 21:14:56 +0000643
644
jadmanski0afbb632008-06-06 21:10:57 +0000645 def _clear_inactive_blocks(self):
646 """
647 Clear out blocks for all completed jobs.
648 """
649 # this would be simpler using NOT IN (subquery), but MySQL
650 # treats all IN subqueries as dependent, so this optimizes much
651 # better
652 _db.execute("""
653 DELETE ihq FROM ineligible_host_queues ihq
showard4eaaf522008-06-06 22:28:07 +0000654 LEFT JOIN (SELECT DISTINCT job_id FROM host_queue_entries
jadmanski0afbb632008-06-06 21:10:57 +0000655 WHERE NOT complete) hqe
656 USING (job_id) WHERE hqe.job_id IS NULL""")
showard04c82c52008-05-29 19:38:12 +0000657
658
showardb95b1bd2008-08-15 18:11:04 +0000659 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000660 # prioritize by job priority, then non-metahost over metahost, then FIFO
661 return list(HostQueueEntry.fetch(
showardac9ce222008-12-03 18:19:44 +0000662 where='NOT complete AND NOT active AND status="Queued"',
showard3dd6b882008-10-27 19:21:39 +0000663 order_by='priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000664
665
jadmanski0afbb632008-06-06 21:10:57 +0000666 def _schedule_new_jobs(self):
showard63a34772008-08-18 19:32:50 +0000667 queue_entries = self._get_pending_queue_entries()
668 if not queue_entries:
showardb95b1bd2008-08-15 18:11:04 +0000669 return
showardb95b1bd2008-08-15 18:11:04 +0000670
showard63a34772008-08-18 19:32:50 +0000671 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000672
showard63a34772008-08-18 19:32:50 +0000673 for queue_entry in queue_entries:
674 assigned_host = self._host_scheduler.find_eligible_host(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000675 if not assigned_host:
jadmanski0afbb632008-06-06 21:10:57 +0000676 continue
showardb95b1bd2008-08-15 18:11:04 +0000677 self._run_queue_entry(queue_entry, assigned_host)
678
679
680 def _run_queue_entry(self, queue_entry, host):
681 agent = queue_entry.run(assigned_host=host)
showard170873e2009-01-07 00:22:26 +0000682 # in some cases (synchronous jobs with run_verify=False), agent may be
683 # None
showard9976ce92008-10-15 20:28:13 +0000684 if agent:
685 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000686
687
jadmanski0afbb632008-06-06 21:10:57 +0000688 def _find_aborting(self):
jadmanski0afbb632008-06-06 21:10:57 +0000689 for entry in queue_entries_to_abort():
showard170873e2009-01-07 00:22:26 +0000690 agents_to_abort = list(self.get_agents_for_entry(entry))
showard1be97432008-10-17 15:30:45 +0000691 for agent in agents_to_abort:
692 self.remove_agent(agent)
693
showard170873e2009-01-07 00:22:26 +0000694 entry.abort(self, agents_to_abort)
jadmanski0afbb632008-06-06 21:10:57 +0000695
696
showard324bf812009-01-20 23:23:38 +0000697 def _can_start_agent(self, agent, num_started_this_cycle,
698 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000699 # always allow zero-process agents to run
700 if agent.num_processes == 0:
701 return True
702 # don't allow any nonzero-process agents to run after we've reached a
703 # limit (this avoids starvation of many-process agents)
704 if have_reached_limit:
705 return False
706 # total process throttling
showard324bf812009-01-20 23:23:38 +0000707 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000708 return False
709 # if a single agent exceeds the per-cycle throttling, still allow it to
710 # run when it's the first agent in the cycle
711 if num_started_this_cycle == 0:
712 return True
713 # per-cycle throttling
714 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000715 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000716 return False
717 return True
718
719
jadmanski0afbb632008-06-06 21:10:57 +0000720 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000721 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000722 have_reached_limit = False
723 # iterate over copy, so we can remove agents during iteration
724 for agent in list(self._agents):
725 if agent.is_done():
jadmanski0afbb632008-06-06 21:10:57 +0000726 print "agent finished"
showard170873e2009-01-07 00:22:26 +0000727 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000728 continue
729 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000730 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000731 have_reached_limit):
732 have_reached_limit = True
733 continue
showard4c5374f2008-09-04 17:02:56 +0000734 num_started_this_cycle += agent.num_processes
735 agent.tick()
showard324bf812009-01-20 23:23:38 +0000736 print _drone_manager.total_running_processes(), 'running processes'
mbligh36768f02008-02-22 18:28:33 +0000737
738
showardfa8629c2008-11-04 16:51:23 +0000739 def _check_for_db_inconsistencies(self):
740 query = models.HostQueueEntry.objects.filter(active=True, complete=True)
741 if query.count() != 0:
742 subject = ('%d queue entries found with active=complete=1'
743 % query.count())
744 message = '\n'.join(str(entry.get_object_dict())
745 for entry in query[:50])
746 if len(query) > 50:
747 message += '\n(truncated)\n'
748
749 print subject
showard170873e2009-01-07 00:22:26 +0000750 email_manager.manager.enqueue_notify_email(subject, message)
showardfa8629c2008-11-04 16:51:23 +0000751
752
showard170873e2009-01-07 00:22:26 +0000753class PidfileRunMonitor(object):
754 """
755 Client must call either run() to start a new process or
756 attach_to_existing_process().
757 """
mbligh36768f02008-02-22 18:28:33 +0000758
showard170873e2009-01-07 00:22:26 +0000759 class _PidfileException(Exception):
760 """
761 Raised when there's some unexpected behavior with the pid file, but only
762 used internally (never allowed to escape this class).
763 """
mbligh36768f02008-02-22 18:28:33 +0000764
765
showard170873e2009-01-07 00:22:26 +0000766 def __init__(self):
767 self._lost_process = False
768 self._start_time = None
769 self.pidfile_id = None
770 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +0000771
772
showard170873e2009-01-07 00:22:26 +0000773 def _add_nice_command(self, command, nice_level):
774 if not nice_level:
775 return command
776 return ['nice', '-n', str(nice_level)] + command
777
778
779 def _set_start_time(self):
780 self._start_time = time.time()
781
782
783 def run(self, command, working_directory, nice_level=None, log_file=None,
784 pidfile_name=None, paired_with_pidfile=None):
785 assert command is not None
786 if nice_level is not None:
787 command = ['nice', '-n', str(nice_level)] + command
788 self._set_start_time()
789 self.pidfile_id = _drone_manager.execute_command(
790 command, working_directory, log_file=log_file,
791 pidfile_name=pidfile_name, paired_with_pidfile=paired_with_pidfile)
792
793
794 def attach_to_existing_process(self, execution_tag):
795 self._set_start_time()
796 self.pidfile_id = _drone_manager.get_pidfile_id_from(execution_tag)
797 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +0000798
799
jadmanski0afbb632008-06-06 21:10:57 +0000800 def kill(self):
showard170873e2009-01-07 00:22:26 +0000801 if self.has_process():
802 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +0000803
mbligh36768f02008-02-22 18:28:33 +0000804
showard170873e2009-01-07 00:22:26 +0000805 def has_process(self):
showard21baa452008-10-21 00:08:39 +0000806 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +0000807 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +0000808
809
showard170873e2009-01-07 00:22:26 +0000810 def get_process(self):
showard21baa452008-10-21 00:08:39 +0000811 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +0000812 assert self.has_process()
813 return self._state.process
mblighbb421852008-03-11 22:36:16 +0000814
815
showard170873e2009-01-07 00:22:26 +0000816 def _read_pidfile(self, use_second_read=False):
817 assert self.pidfile_id is not None, (
818 'You must call run() or attach_to_existing_process()')
819 contents = _drone_manager.get_pidfile_contents(
820 self.pidfile_id, use_second_read=use_second_read)
821 if contents.is_invalid():
822 self._state = drone_manager.PidfileContents()
823 raise self._PidfileException(contents)
824 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +0000825
826
showard21baa452008-10-21 00:08:39 +0000827 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +0000828 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
829 self._state.process, self.pidfile_id, message)
showard21baa452008-10-21 00:08:39 +0000830 print message
showard170873e2009-01-07 00:22:26 +0000831 email_manager.manager.enqueue_notify_email(error, message)
832 if self._state.process is not None:
833 process = self._state.process
showard21baa452008-10-21 00:08:39 +0000834 else:
showard170873e2009-01-07 00:22:26 +0000835 process = _drone_manager.get_dummy_process()
836 self.on_lost_process(process)
showard21baa452008-10-21 00:08:39 +0000837
838
839 def _get_pidfile_info_helper(self):
showard170873e2009-01-07 00:22:26 +0000840 if self._lost_process:
showard21baa452008-10-21 00:08:39 +0000841 return
mblighbb421852008-03-11 22:36:16 +0000842
showard21baa452008-10-21 00:08:39 +0000843 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +0000844
showard170873e2009-01-07 00:22:26 +0000845 if self._state.process is None:
846 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +0000847 return
mbligh90a549d2008-03-25 23:52:34 +0000848
showard21baa452008-10-21 00:08:39 +0000849 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000850 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +0000851 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +0000852 return
mbligh90a549d2008-03-25 23:52:34 +0000853
showard170873e2009-01-07 00:22:26 +0000854 # pid but no running process - maybe process *just* exited
855 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +0000856 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000857 # autoserv exited without writing an exit code
858 # to the pidfile
showard21baa452008-10-21 00:08:39 +0000859 self._handle_pidfile_error(
860 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +0000861
showard21baa452008-10-21 00:08:39 +0000862
863 def _get_pidfile_info(self):
864 """\
865 After completion, self._state will contain:
866 pid=None, exit_status=None if autoserv has not yet run
867 pid!=None, exit_status=None if autoserv is running
868 pid!=None, exit_status!=None if autoserv has completed
869 """
870 try:
871 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +0000872 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +0000873 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +0000874
875
showard170873e2009-01-07 00:22:26 +0000876 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +0000877 """\
878 Called when no pidfile is found or no pid is in the pidfile.
879 """
showard170873e2009-01-07 00:22:26 +0000880 message = 'No pid found at %s' % self.pidfile_id
jadmanski0afbb632008-06-06 21:10:57 +0000881 print message
showard170873e2009-01-07 00:22:26 +0000882 if time.time() - self._start_time > PIDFILE_TIMEOUT:
883 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +0000884 'Process has failed to write pidfile', message)
showard170873e2009-01-07 00:22:26 +0000885 self.on_lost_process(_drone_manager.get_dummy_process())
mbligh90a549d2008-03-25 23:52:34 +0000886
887
showard170873e2009-01-07 00:22:26 +0000888 def on_lost_process(self, process):
jadmanski0afbb632008-06-06 21:10:57 +0000889 """\
890 Called when autoserv has exited without writing an exit status,
891 or we've timed out waiting for autoserv to write a pid to the
892 pidfile. In either case, we just return failure and the caller
893 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +0000894
showard170873e2009-01-07 00:22:26 +0000895 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +0000896 """
897 self.lost_process = True
showard170873e2009-01-07 00:22:26 +0000898 self._state.process = process
showard21baa452008-10-21 00:08:39 +0000899 self._state.exit_status = 1
900 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +0000901
902
jadmanski0afbb632008-06-06 21:10:57 +0000903 def exit_code(self):
showard21baa452008-10-21 00:08:39 +0000904 self._get_pidfile_info()
905 return self._state.exit_status
906
907
908 def num_tests_failed(self):
909 self._get_pidfile_info()
910 assert self._state.num_tests_failed is not None
911 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +0000912
913
mbligh36768f02008-02-22 18:28:33 +0000914class Agent(object):
showard170873e2009-01-07 00:22:26 +0000915 def __init__(self, tasks, num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +0000916 self.active_task = None
917 self.queue = Queue.Queue(0)
918 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +0000919 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +0000920
showard170873e2009-01-07 00:22:26 +0000921 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
922 for task in tasks)
923 self.host_ids = self._union_ids(task.host_ids for task in tasks)
924
jadmanski0afbb632008-06-06 21:10:57 +0000925 for task in tasks:
926 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +0000927
928
showard170873e2009-01-07 00:22:26 +0000929 def _union_ids(self, id_lists):
930 return set(itertools.chain(*id_lists))
931
932
jadmanski0afbb632008-06-06 21:10:57 +0000933 def add_task(self, task):
934 self.queue.put_nowait(task)
935 task.agent = self
mbligh36768f02008-02-22 18:28:33 +0000936
937
jadmanski0afbb632008-06-06 21:10:57 +0000938 def tick(self):
showard21baa452008-10-21 00:08:39 +0000939 while not self.is_done():
940 if self.active_task and not self.active_task.is_done():
941 self.active_task.poll()
942 if not self.active_task.is_done():
943 return
944 self._next_task()
mbligh36768f02008-02-22 18:28:33 +0000945
946
jadmanski0afbb632008-06-06 21:10:57 +0000947 def _next_task(self):
948 print "agent picking task"
949 if self.active_task:
950 assert self.active_task.is_done()
mbligh36768f02008-02-22 18:28:33 +0000951
jadmanski0afbb632008-06-06 21:10:57 +0000952 if not self.active_task.success:
953 self.on_task_failure()
mblighe2586682008-02-29 22:45:46 +0000954
jadmanski0afbb632008-06-06 21:10:57 +0000955 self.active_task = None
956 if not self.is_done():
957 self.active_task = self.queue.get_nowait()
958 if self.active_task:
959 self.active_task.start()
mbligh36768f02008-02-22 18:28:33 +0000960
961
jadmanski0afbb632008-06-06 21:10:57 +0000962 def on_task_failure(self):
963 self.queue = Queue.Queue(0)
964 for task in self.active_task.failure_tasks:
965 self.add_task(task)
mbligh16c722d2008-03-05 00:58:44 +0000966
mblighe2586682008-02-29 22:45:46 +0000967
showard4c5374f2008-09-04 17:02:56 +0000968 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +0000969 return self.active_task is not None
showardec113162008-05-08 00:52:49 +0000970
971
jadmanski0afbb632008-06-06 21:10:57 +0000972 def is_done(self):
mblighd876f452008-12-03 15:09:17 +0000973 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +0000974
975
jadmanski0afbb632008-06-06 21:10:57 +0000976 def start(self):
977 assert self.dispatcher
mbligh36768f02008-02-22 18:28:33 +0000978
jadmanski0afbb632008-06-06 21:10:57 +0000979 self._next_task()
mbligh36768f02008-02-22 18:28:33 +0000980
jadmanski0afbb632008-06-06 21:10:57 +0000981
mbligh36768f02008-02-22 18:28:33 +0000982class AgentTask(object):
showard170873e2009-01-07 00:22:26 +0000983 def __init__(self, cmd, working_directory=None, failure_tasks=[]):
jadmanski0afbb632008-06-06 21:10:57 +0000984 self.done = False
985 self.failure_tasks = failure_tasks
986 self.started = False
987 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +0000988 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +0000989 self.task = None
990 self.agent = None
991 self.monitor = None
992 self.success = None
showard170873e2009-01-07 00:22:26 +0000993 self.queue_entry_ids = []
994 self.host_ids = []
995 self.log_file = None
996
997
998 def _set_ids(self, host=None, queue_entries=None):
999 if queue_entries and queue_entries != [None]:
1000 self.host_ids = [entry.host.id for entry in queue_entries]
1001 self.queue_entry_ids = [entry.id for entry in queue_entries]
1002 else:
1003 assert host
1004 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001005
1006
jadmanski0afbb632008-06-06 21:10:57 +00001007 def poll(self):
jadmanski0afbb632008-06-06 21:10:57 +00001008 if self.monitor:
1009 self.tick(self.monitor.exit_code())
1010 else:
1011 self.finished(False)
mbligh36768f02008-02-22 18:28:33 +00001012
1013
jadmanski0afbb632008-06-06 21:10:57 +00001014 def tick(self, exit_code):
showard170873e2009-01-07 00:22:26 +00001015 if exit_code is None:
jadmanski0afbb632008-06-06 21:10:57 +00001016 return
jadmanski0afbb632008-06-06 21:10:57 +00001017 if exit_code == 0:
1018 success = True
1019 else:
1020 success = False
mbligh36768f02008-02-22 18:28:33 +00001021
jadmanski0afbb632008-06-06 21:10:57 +00001022 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001023
1024
jadmanski0afbb632008-06-06 21:10:57 +00001025 def is_done(self):
1026 return self.done
mbligh36768f02008-02-22 18:28:33 +00001027
1028
jadmanski0afbb632008-06-06 21:10:57 +00001029 def finished(self, success):
1030 self.done = True
1031 self.success = success
1032 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001033
1034
jadmanski0afbb632008-06-06 21:10:57 +00001035 def prolog(self):
1036 pass
mblighd64e5702008-04-04 21:39:28 +00001037
1038
jadmanski0afbb632008-06-06 21:10:57 +00001039 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001040 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001041
mbligh36768f02008-02-22 18:28:33 +00001042
jadmanski0afbb632008-06-06 21:10:57 +00001043 def cleanup(self):
showard170873e2009-01-07 00:22:26 +00001044 if self.monitor and self.log_file:
1045 _drone_manager.copy_to_results_repository(
1046 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001047
1048
jadmanski0afbb632008-06-06 21:10:57 +00001049 def epilog(self):
1050 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001051
1052
jadmanski0afbb632008-06-06 21:10:57 +00001053 def start(self):
1054 assert self.agent
1055
1056 if not self.started:
1057 self.prolog()
1058 self.run()
1059
1060 self.started = True
1061
1062
1063 def abort(self):
1064 if self.monitor:
1065 self.monitor.kill()
1066 self.done = True
1067 self.cleanup()
1068
1069
showard170873e2009-01-07 00:22:26 +00001070 def set_host_log_file(self, base_name, host):
1071 filename = '%s.%s' % (time.time(), base_name)
1072 self.log_file = os.path.join('hosts', host.hostname, filename)
1073
1074
jadmanski0afbb632008-06-06 21:10:57 +00001075 def run(self):
1076 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001077 self.monitor = PidfileRunMonitor()
1078 self.monitor.run(self.cmd, self._working_directory,
1079 nice_level=AUTOSERV_NICE_LEVEL,
1080 log_file=self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001081
1082
1083class RepairTask(AgentTask):
showarde788ea62008-11-17 21:02:47 +00001084 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001085 """\
showard170873e2009-01-07 00:22:26 +00001086 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001087 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001088 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001089 # normalize the protection name
1090 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001091
jadmanski0afbb632008-06-06 21:10:57 +00001092 self.host = host
showarde788ea62008-11-17 21:02:47 +00001093 self.queue_entry = queue_entry
showard170873e2009-01-07 00:22:26 +00001094 self._set_ids(host=host, queue_entries=[queue_entry])
1095
1096 self.create_temp_resultsdir('.repair')
1097 cmd = [_autoserv_path , '-p', '-R', '-m', host.hostname,
1098 '-r', _drone_manager.absolute_path(self.temp_results_dir),
1099 '--host-protection', protection]
1100 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1101
1102 self._set_ids(host=host, queue_entries=[queue_entry])
1103 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001104
mbligh36768f02008-02-22 18:28:33 +00001105
jadmanski0afbb632008-06-06 21:10:57 +00001106 def prolog(self):
1107 print "repair_task starting"
1108 self.host.set_status('Repairing')
showarde788ea62008-11-17 21:02:47 +00001109 if self.queue_entry:
1110 self.queue_entry.requeue()
mbligh36768f02008-02-22 18:28:33 +00001111
1112
jadmanski0afbb632008-06-06 21:10:57 +00001113 def epilog(self):
1114 super(RepairTask, self).epilog()
1115 if self.success:
1116 self.host.set_status('Ready')
1117 else:
1118 self.host.set_status('Repair Failed')
showarde788ea62008-11-17 21:02:47 +00001119 if self.queue_entry and not self.queue_entry.meta_host:
1120 self.queue_entry.handle_host_failure()
mbligh36768f02008-02-22 18:28:33 +00001121
1122
showard8fe93b52008-11-18 17:53:22 +00001123class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001124 def epilog(self):
1125 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001126 should_copy_results = (self.queue_entry and not self.success
1127 and not self.queue_entry.meta_host)
1128 if should_copy_results:
1129 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001130 destination = os.path.join(self.queue_entry.execution_tag(),
1131 os.path.basename(self.log_file))
1132 _drone_manager.copy_to_results_repository(
1133 self.monitor.get_process(), self.log_file,
1134 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001135
1136
1137class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001138 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001139 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001140 self.host = host or queue_entry.host
1141 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001142
jadmanski0afbb632008-06-06 21:10:57 +00001143 self.create_temp_resultsdir('.verify')
showard170873e2009-01-07 00:22:26 +00001144 cmd = [_autoserv_path, '-p', '-v', '-m', self.host.hostname, '-r',
1145 _drone_manager.absolute_path(self.temp_results_dir)]
showarde788ea62008-11-17 21:02:47 +00001146 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001147 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1148 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001149
showard170873e2009-01-07 00:22:26 +00001150 self.set_host_log_file('verify', self.host)
1151 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001152
1153
jadmanski0afbb632008-06-06 21:10:57 +00001154 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001155 super(VerifyTask, self).prolog()
jadmanski0afbb632008-06-06 21:10:57 +00001156 print "starting verify on %s" % (self.host.hostname)
1157 if self.queue_entry:
1158 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001159 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001160
1161
jadmanski0afbb632008-06-06 21:10:57 +00001162 def epilog(self):
1163 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001164
jadmanski0afbb632008-06-06 21:10:57 +00001165 if self.success:
1166 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001167
1168
mbligh36768f02008-02-22 18:28:33 +00001169class QueueTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001170 def __init__(self, job, queue_entries, cmd):
jadmanski0afbb632008-06-06 21:10:57 +00001171 self.job = job
1172 self.queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001173 super(QueueTask, self).__init__(cmd, self._execution_tag())
1174 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001175
1176
showard170873e2009-01-07 00:22:26 +00001177 def _format_keyval(self, key, value):
1178 return '%s=%s' % (key, value)
mbligh36768f02008-02-22 18:28:33 +00001179
1180
showard170873e2009-01-07 00:22:26 +00001181 def _write_keyval(self, field, value):
1182 keyval_path = os.path.join(self._execution_tag(), 'keyval')
1183 assert self.monitor and self.monitor.has_process()
1184 paired_with_pidfile = self.monitor.pidfile_id
1185 _drone_manager.write_lines_to_file(
1186 keyval_path, [self._format_keyval(field, value)],
1187 paired_with_pidfile=paired_with_pidfile)
showardd8e548a2008-09-09 03:04:57 +00001188
1189
showard170873e2009-01-07 00:22:26 +00001190 def _write_host_keyvals(self, host):
1191 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1192 host.hostname)
1193 platform, all_labels = host.platform_and_labels()
1194 keyvals = dict(platform=platform, labels=','.join(all_labels))
1195 keyval_content = '\n'.join(self._format_keyval(key, value)
1196 for key, value in keyvals.iteritems())
1197 _drone_manager.attach_file_to_execution(self._execution_tag(),
1198 keyval_content,
1199 file_path=keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001200
1201
showard170873e2009-01-07 00:22:26 +00001202 def _execution_tag(self):
1203 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001204
1205
jadmanski0afbb632008-06-06 21:10:57 +00001206 def prolog(self):
jadmanski0afbb632008-06-06 21:10:57 +00001207 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001208 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001209 queue_entry.set_status('Running')
1210 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001211 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001212 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001213 assert len(self.queue_entries) == 1
1214 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001215
1216
showard97aed502008-11-04 02:01:24 +00001217 def _finish_task(self, success):
showard170873e2009-01-07 00:22:26 +00001218 queued = time.mktime(self.job.created_on.timetuple())
jadmanski0afbb632008-06-06 21:10:57 +00001219 finished = time.time()
showard170873e2009-01-07 00:22:26 +00001220 self._write_keyval("job_queued", int(queued))
1221 self._write_keyval("job_finished", int(finished))
1222
1223 _drone_manager.copy_to_results_repository(self.monitor.get_process(),
1224 self._execution_tag() + '/')
jadmanskic2ac77f2008-05-16 21:44:04 +00001225
jadmanski0afbb632008-06-06 21:10:57 +00001226 # parse the results of the job
showard97aed502008-11-04 02:01:24 +00001227 reparse_task = FinalReparseTask(self.queue_entries)
showard170873e2009-01-07 00:22:26 +00001228 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
jadmanskif7fa2cc2008-10-01 14:13:23 +00001229
1230
showardcbd74612008-11-19 21:42:02 +00001231 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001232 _drone_manager.write_lines_to_file(
1233 os.path.join(self._execution_tag(), 'status.log'),
1234 ['INFO\t----\t----\t' + comment],
1235 paired_with_pidfile=self.monitor.pidfile_id)
showardcbd74612008-11-19 21:42:02 +00001236
1237
jadmanskif7fa2cc2008-10-01 14:13:23 +00001238 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001239 if not self.monitor or not self.monitor.has_process():
1240 return
1241
jadmanskif7fa2cc2008-10-01 14:13:23 +00001242 # build up sets of all the aborted_by and aborted_on values
1243 aborted_by, aborted_on = set(), set()
1244 for queue_entry in self.queue_entries:
1245 if queue_entry.aborted_by:
1246 aborted_by.add(queue_entry.aborted_by)
1247 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1248 aborted_on.add(t)
1249
1250 # extract some actual, unique aborted by value and write it out
1251 assert len(aborted_by) <= 1
1252 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001253 aborted_by_value = aborted_by.pop()
1254 aborted_on_value = max(aborted_on)
1255 else:
1256 aborted_by_value = 'autotest_system'
1257 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001258
1259 self._write_keyval("aborted_by", aborted_by_value)
1260 self._write_keyval("aborted_on", aborted_on_value)
1261
showardcbd74612008-11-19 21:42:02 +00001262 aborted_on_string = str(datetime.datetime.fromtimestamp(
1263 aborted_on_value))
1264 self._write_status_comment('Job aborted by %s on %s' %
1265 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001266
1267
jadmanski0afbb632008-06-06 21:10:57 +00001268 def abort(self):
1269 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001270 self._log_abort()
showard97aed502008-11-04 02:01:24 +00001271 self._finish_task(False)
jadmanskic2ac77f2008-05-16 21:44:04 +00001272
1273
showard21baa452008-10-21 00:08:39 +00001274 def _reboot_hosts(self):
1275 reboot_after = self.job.reboot_after
1276 do_reboot = False
showard0fc38302008-10-23 00:44:07 +00001277 if reboot_after == models.RebootAfter.ALWAYS:
showard21baa452008-10-21 00:08:39 +00001278 do_reboot = True
showard0fc38302008-10-23 00:44:07 +00001279 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
showard21baa452008-10-21 00:08:39 +00001280 num_tests_failed = self.monitor.num_tests_failed()
1281 do_reboot = (self.success and num_tests_failed == 0)
1282
showard8ebca792008-11-04 21:54:22 +00001283 for queue_entry in self.queue_entries:
1284 if do_reboot:
showard45ae8192008-11-05 19:32:53 +00001285 # don't pass the queue entry to the CleanupTask. if the cleanup
showardfa8629c2008-11-04 16:51:23 +00001286 # fails, the job doesn't care -- it's over.
showard45ae8192008-11-05 19:32:53 +00001287 cleanup_task = CleanupTask(host=queue_entry.get_host())
1288 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
showard8ebca792008-11-04 21:54:22 +00001289 else:
1290 queue_entry.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001291
1292
jadmanski0afbb632008-06-06 21:10:57 +00001293 def epilog(self):
1294 super(QueueTask, self).epilog()
showard97aed502008-11-04 02:01:24 +00001295 self._finish_task(self.success)
showard21baa452008-10-21 00:08:39 +00001296 self._reboot_hosts()
mblighbb421852008-03-11 22:36:16 +00001297
showard97aed502008-11-04 02:01:24 +00001298 print "queue_task finished with succes=%s" % self.success
mbligh36768f02008-02-22 18:28:33 +00001299
1300
mblighbb421852008-03-11 22:36:16 +00001301class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001302 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001303 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
jadmanski0afbb632008-06-06 21:10:57 +00001304 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001305
1306
jadmanski0afbb632008-06-06 21:10:57 +00001307 def run(self):
1308 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001309
1310
jadmanski0afbb632008-06-06 21:10:57 +00001311 def prolog(self):
1312 # recovering an existing process - don't do prolog
1313 pass
mblighbb421852008-03-11 22:36:16 +00001314
1315
showard8fe93b52008-11-18 17:53:22 +00001316class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001317 def __init__(self, host=None, queue_entry=None):
1318 assert bool(host) ^ bool(queue_entry)
1319 if queue_entry:
1320 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001321 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001322 self.host = host
showard170873e2009-01-07 00:22:26 +00001323
1324 self.create_temp_resultsdir('.cleanup')
1325 self.cmd = [_autoserv_path, '-p', '--cleanup', '-m', host.hostname,
1326 '-r', _drone_manager.absolute_path(self.temp_results_dir)]
showarde788ea62008-11-17 21:02:47 +00001327 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001328 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1329 failure_tasks=[repair_task])
1330
1331 self._set_ids(host=host, queue_entries=[queue_entry])
1332 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001333
mblighd5c95802008-03-05 00:33:46 +00001334
jadmanski0afbb632008-06-06 21:10:57 +00001335 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001336 super(CleanupTask, self).prolog()
showard45ae8192008-11-05 19:32:53 +00001337 print "starting cleanup task for host: %s" % self.host.hostname
1338 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001339
mblighd5c95802008-03-05 00:33:46 +00001340
showard21baa452008-10-21 00:08:39 +00001341 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001342 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001343 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001344 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001345 self.host.update_field('dirty', 0)
1346
1347
mblighd5c95802008-03-05 00:33:46 +00001348class AbortTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001349 def __init__(self, queue_entry, agents_to_abort):
jadmanski0afbb632008-06-06 21:10:57 +00001350 super(AbortTask, self).__init__('')
showard170873e2009-01-07 00:22:26 +00001351 self.queue_entry = queue_entry
1352 # don't use _set_ids, since we don't want to set the host_ids
1353 self.queue_entry_ids = [queue_entry.id]
1354 self.agents_to_abort = agents_to_abort
mbligh36768f02008-02-22 18:28:33 +00001355
1356
jadmanski0afbb632008-06-06 21:10:57 +00001357 def prolog(self):
1358 print "starting abort on host %s, job %s" % (
1359 self.queue_entry.host_id, self.queue_entry.job_id)
mbligh36768f02008-02-22 18:28:33 +00001360
mblighd64e5702008-04-04 21:39:28 +00001361
jadmanski0afbb632008-06-06 21:10:57 +00001362 def epilog(self):
1363 super(AbortTask, self).epilog()
1364 self.queue_entry.set_status('Aborted')
1365 self.success = True
1366
1367
1368 def run(self):
1369 for agent in self.agents_to_abort:
1370 if (agent.active_task):
1371 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +00001372
1373
showard97aed502008-11-04 02:01:24 +00001374class FinalReparseTask(AgentTask):
showard97aed502008-11-04 02:01:24 +00001375 _num_running_parses = 0
1376
1377 def __init__(self, queue_entries):
1378 self._queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001379 # don't use _set_ids, since we don't want to set the host_ids
1380 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00001381 self._parse_started = False
1382
1383 assert len(queue_entries) > 0
1384 queue_entry = queue_entries[0]
showard97aed502008-11-04 02:01:24 +00001385
showard170873e2009-01-07 00:22:26 +00001386 self._execution_tag = queue_entry.execution_tag()
1387 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1388 self._autoserv_monitor = PidfileRunMonitor()
1389 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1390 self._final_status = self._determine_final_status()
1391
showard97aed502008-11-04 02:01:24 +00001392 if _testing_mode:
1393 self.cmd = 'true'
showard170873e2009-01-07 00:22:26 +00001394 else:
1395 super(FinalReparseTask, self).__init__(
1396 cmd=self._generate_parse_command(),
1397 working_directory=self._execution_tag)
showard97aed502008-11-04 02:01:24 +00001398
showard170873e2009-01-07 00:22:26 +00001399 self.log_file = os.path.join(self._execution_tag, '.parse.log')
showard97aed502008-11-04 02:01:24 +00001400
1401
1402 @classmethod
1403 def _increment_running_parses(cls):
1404 cls._num_running_parses += 1
1405
1406
1407 @classmethod
1408 def _decrement_running_parses(cls):
1409 cls._num_running_parses -= 1
1410
1411
1412 @classmethod
1413 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00001414 return (cls._num_running_parses <
1415 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00001416
1417
showard170873e2009-01-07 00:22:26 +00001418 def _determine_final_status(self):
1419 # we'll use a PidfileRunMonitor to read the autoserv exit status
1420 if self._autoserv_monitor.exit_code() == 0:
1421 return models.HostQueueEntry.Status.COMPLETED
1422 return models.HostQueueEntry.Status.FAILED
1423
1424
showard97aed502008-11-04 02:01:24 +00001425 def prolog(self):
1426 super(FinalReparseTask, self).prolog()
1427 for queue_entry in self._queue_entries:
1428 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
1429
1430
1431 def epilog(self):
1432 super(FinalReparseTask, self).epilog()
showard97aed502008-11-04 02:01:24 +00001433 for queue_entry in self._queue_entries:
showard170873e2009-01-07 00:22:26 +00001434 queue_entry.set_status(self._final_status)
showard97aed502008-11-04 02:01:24 +00001435
1436
showard2bab8f42008-11-12 18:15:22 +00001437 def _generate_parse_command(self):
showard170873e2009-01-07 00:22:26 +00001438 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
1439 self._results_dir]
showard97aed502008-11-04 02:01:24 +00001440
1441
1442 def poll(self):
1443 # override poll to keep trying to start until the parse count goes down
1444 # and we can, at which point we revert to default behavior
1445 if self._parse_started:
1446 super(FinalReparseTask, self).poll()
1447 else:
1448 self._try_starting_parse()
1449
1450
1451 def run(self):
1452 # override run() to not actually run unless we can
1453 self._try_starting_parse()
1454
1455
1456 def _try_starting_parse(self):
1457 if not self._can_run_new_parse():
1458 return
showard170873e2009-01-07 00:22:26 +00001459
showard97aed502008-11-04 02:01:24 +00001460 # actually run the parse command
showard170873e2009-01-07 00:22:26 +00001461 self.monitor = PidfileRunMonitor()
1462 self.monitor.run(self.cmd, self._working_directory,
1463 log_file=self.log_file,
1464 pidfile_name='.parser_execute',
1465 paired_with_pidfile=self._autoserv_monitor.pidfile_id)
1466
showard97aed502008-11-04 02:01:24 +00001467 self._increment_running_parses()
1468 self._parse_started = True
1469
1470
1471 def finished(self, success):
1472 super(FinalReparseTask, self).finished(success)
1473 self._decrement_running_parses()
1474
1475
showardc9ae1782009-01-30 01:42:37 +00001476class SetEntryPendingTask(AgentTask):
1477 def __init__(self, queue_entry):
1478 super(SetEntryPendingTask, self).__init__(cmd='')
1479 self._queue_entry = queue_entry
1480 self._set_ids(queue_entries=[queue_entry])
1481
1482
1483 def run(self):
1484 agent = self._queue_entry.on_pending()
1485 if agent:
1486 self.agent.dispatcher.add_agent(agent)
1487 self.finished(True)
1488
1489
mbligh36768f02008-02-22 18:28:33 +00001490class DBObject(object):
jadmanski0afbb632008-06-06 21:10:57 +00001491 def __init__(self, id=None, row=None, new_record=False):
1492 assert (bool(id) != bool(row))
mbligh36768f02008-02-22 18:28:33 +00001493
jadmanski0afbb632008-06-06 21:10:57 +00001494 self.__table = self._get_table()
mbligh36768f02008-02-22 18:28:33 +00001495
jadmanski0afbb632008-06-06 21:10:57 +00001496 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00001497
jadmanski0afbb632008-06-06 21:10:57 +00001498 if row is None:
1499 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
1500 rows = _db.execute(sql, (id,))
1501 if len(rows) == 0:
1502 raise "row not found (table=%s, id=%s)" % \
1503 (self.__table, id)
1504 row = rows[0]
mbligh36768f02008-02-22 18:28:33 +00001505
showard2bab8f42008-11-12 18:15:22 +00001506 self._update_fields_from_row(row)
1507
1508
1509 def _update_fields_from_row(self, row):
jadmanski0afbb632008-06-06 21:10:57 +00001510 assert len(row) == self.num_cols(), (
1511 "table = %s, row = %s/%d, fields = %s/%d" % (
showard2bab8f42008-11-12 18:15:22 +00001512 self.__table, row, len(row), self._fields(), self.num_cols()))
mbligh36768f02008-02-22 18:28:33 +00001513
showard2bab8f42008-11-12 18:15:22 +00001514 self._valid_fields = set()
1515 for field, value in zip(self._fields(), row):
1516 setattr(self, field, value)
1517 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00001518
showard2bab8f42008-11-12 18:15:22 +00001519 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00001520
mblighe2586682008-02-29 22:45:46 +00001521
jadmanski0afbb632008-06-06 21:10:57 +00001522 @classmethod
1523 def _get_table(cls):
1524 raise NotImplementedError('Subclasses must override this')
mblighe2586682008-02-29 22:45:46 +00001525
1526
jadmanski0afbb632008-06-06 21:10:57 +00001527 @classmethod
1528 def _fields(cls):
1529 raise NotImplementedError('Subclasses must override this')
showard04c82c52008-05-29 19:38:12 +00001530
1531
jadmanski0afbb632008-06-06 21:10:57 +00001532 @classmethod
1533 def num_cols(cls):
1534 return len(cls._fields())
showard04c82c52008-05-29 19:38:12 +00001535
1536
jadmanski0afbb632008-06-06 21:10:57 +00001537 def count(self, where, table = None):
1538 if not table:
1539 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00001540
jadmanski0afbb632008-06-06 21:10:57 +00001541 rows = _db.execute("""
1542 SELECT count(*) FROM %s
1543 WHERE %s
1544 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00001545
jadmanski0afbb632008-06-06 21:10:57 +00001546 assert len(rows) == 1
1547
1548 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001549
1550
mblighf8c624d2008-07-03 16:58:45 +00001551 def update_field(self, field, value, condition=''):
showard2bab8f42008-11-12 18:15:22 +00001552 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00001553
showard2bab8f42008-11-12 18:15:22 +00001554 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00001555 return
mbligh36768f02008-02-22 18:28:33 +00001556
mblighf8c624d2008-07-03 16:58:45 +00001557 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
1558 if condition:
1559 query += ' AND (%s)' % condition
jadmanski0afbb632008-06-06 21:10:57 +00001560 _db.execute(query, (value, self.id))
1561
showard2bab8f42008-11-12 18:15:22 +00001562 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00001563
1564
jadmanski0afbb632008-06-06 21:10:57 +00001565 def save(self):
1566 if self.__new_record:
1567 keys = self._fields()[1:] # avoid id
1568 columns = ','.join([str(key) for key in keys])
1569 values = ['"%s"' % self.__dict__[key] for key in keys]
1570 values = ','.join(values)
1571 query = """INSERT INTO %s (%s) VALUES (%s)""" % \
1572 (self.__table, columns, values)
1573 _db.execute(query)
mbligh36768f02008-02-22 18:28:33 +00001574
1575
jadmanski0afbb632008-06-06 21:10:57 +00001576 def delete(self):
1577 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
1578 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00001579
1580
showard63a34772008-08-18 19:32:50 +00001581 @staticmethod
1582 def _prefix_with(string, prefix):
1583 if string:
1584 string = prefix + string
1585 return string
1586
1587
jadmanski0afbb632008-06-06 21:10:57 +00001588 @classmethod
showard989f25d2008-10-01 11:38:11 +00001589 def fetch(cls, where='', params=(), joins='', order_by=''):
showard63a34772008-08-18 19:32:50 +00001590 order_by = cls._prefix_with(order_by, 'ORDER BY ')
1591 where = cls._prefix_with(where, 'WHERE ')
1592 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
1593 '%(where)s %(order_by)s' % {'table' : cls._get_table(),
1594 'joins' : joins,
1595 'where' : where,
1596 'order_by' : order_by})
1597 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00001598 for row in rows:
1599 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00001600
mbligh36768f02008-02-22 18:28:33 +00001601
1602class IneligibleHostQueue(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001603 def __init__(self, id=None, row=None, new_record=None):
1604 super(IneligibleHostQueue, self).__init__(id=id, row=row,
1605 new_record=new_record)
mblighe2586682008-02-29 22:45:46 +00001606
1607
jadmanski0afbb632008-06-06 21:10:57 +00001608 @classmethod
1609 def _get_table(cls):
1610 return 'ineligible_host_queues'
mbligh36768f02008-02-22 18:28:33 +00001611
1612
jadmanski0afbb632008-06-06 21:10:57 +00001613 @classmethod
1614 def _fields(cls):
1615 return ['id', 'job_id', 'host_id']
showard04c82c52008-05-29 19:38:12 +00001616
1617
showard989f25d2008-10-01 11:38:11 +00001618class Label(DBObject):
1619 @classmethod
1620 def _get_table(cls):
1621 return 'labels'
1622
1623
1624 @classmethod
1625 def _fields(cls):
1626 return ['id', 'name', 'kernel_config', 'platform', 'invalid',
1627 'only_if_needed']
1628
1629
mbligh36768f02008-02-22 18:28:33 +00001630class Host(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001631 def __init__(self, id=None, row=None):
1632 super(Host, self).__init__(id=id, row=row)
mblighe2586682008-02-29 22:45:46 +00001633
1634
jadmanski0afbb632008-06-06 21:10:57 +00001635 @classmethod
1636 def _get_table(cls):
1637 return 'hosts'
mbligh36768f02008-02-22 18:28:33 +00001638
1639
jadmanski0afbb632008-06-06 21:10:57 +00001640 @classmethod
1641 def _fields(cls):
1642 return ['id', 'hostname', 'locked', 'synch_id','status',
showard21baa452008-10-21 00:08:39 +00001643 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty']
showard04c82c52008-05-29 19:38:12 +00001644
1645
jadmanski0afbb632008-06-06 21:10:57 +00001646 def current_task(self):
1647 rows = _db.execute("""
1648 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
1649 """, (self.id,))
1650
1651 if len(rows) == 0:
1652 return None
1653 else:
1654 assert len(rows) == 1
1655 results = rows[0];
mblighf8c624d2008-07-03 16:58:45 +00001656# print "current = %s" % results
jadmanski0afbb632008-06-06 21:10:57 +00001657 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00001658
1659
jadmanski0afbb632008-06-06 21:10:57 +00001660 def yield_work(self):
1661 print "%s yielding work" % self.hostname
1662 if self.current_task():
1663 self.current_task().requeue()
1664
1665 def set_status(self,status):
1666 print '%s -> %s' % (self.hostname, status)
1667 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00001668
1669
showard170873e2009-01-07 00:22:26 +00001670 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00001671 """
showard170873e2009-01-07 00:22:26 +00001672 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00001673 """
1674 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00001675 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00001676 FROM labels
1677 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00001678 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00001679 ORDER BY labels.name
1680 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00001681 platform = None
1682 all_labels = []
1683 for label_name, is_platform in rows:
1684 if is_platform:
1685 platform = label_name
1686 all_labels.append(label_name)
1687 return platform, all_labels
1688
1689
1690 def reverify_tasks(self):
1691 cleanup_task = CleanupTask(host=self)
1692 verify_task = VerifyTask(host=self)
1693 # just to make sure this host does not get taken away
1694 self.set_status('Cleaning')
1695 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00001696
1697
mbligh36768f02008-02-22 18:28:33 +00001698class HostQueueEntry(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001699 def __init__(self, id=None, row=None):
1700 assert id or row
1701 super(HostQueueEntry, self).__init__(id=id, row=row)
1702 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00001703
jadmanski0afbb632008-06-06 21:10:57 +00001704 if self.host_id:
1705 self.host = Host(self.host_id)
1706 else:
1707 self.host = None
mbligh36768f02008-02-22 18:28:33 +00001708
showard170873e2009-01-07 00:22:26 +00001709 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00001710 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00001711
1712
jadmanski0afbb632008-06-06 21:10:57 +00001713 @classmethod
1714 def _get_table(cls):
1715 return 'host_queue_entries'
mblighe2586682008-02-29 22:45:46 +00001716
1717
jadmanski0afbb632008-06-06 21:10:57 +00001718 @classmethod
1719 def _fields(cls):
showard2bab8f42008-11-12 18:15:22 +00001720 return ['id', 'job_id', 'host_id', 'priority', 'status', 'meta_host',
1721 'active', 'complete', 'deleted', 'execution_subdir']
showard04c82c52008-05-29 19:38:12 +00001722
1723
showardc85c21b2008-11-24 22:17:37 +00001724 def _view_job_url(self):
1725 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
1726
1727
jadmanski0afbb632008-06-06 21:10:57 +00001728 def set_host(self, host):
1729 if host:
1730 self.queue_log_record('Assigning host ' + host.hostname)
1731 self.update_field('host_id', host.id)
1732 self.update_field('active', True)
1733 self.block_host(host.id)
1734 else:
1735 self.queue_log_record('Releasing host')
1736 self.unblock_host(self.host.id)
1737 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00001738
jadmanski0afbb632008-06-06 21:10:57 +00001739 self.host = host
mbligh36768f02008-02-22 18:28:33 +00001740
1741
jadmanski0afbb632008-06-06 21:10:57 +00001742 def get_host(self):
1743 return self.host
mbligh36768f02008-02-22 18:28:33 +00001744
1745
jadmanski0afbb632008-06-06 21:10:57 +00001746 def queue_log_record(self, log_line):
1747 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00001748 _drone_manager.write_lines_to_file(self.queue_log_path,
1749 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00001750
1751
jadmanski0afbb632008-06-06 21:10:57 +00001752 def block_host(self, host_id):
1753 print "creating block %s/%s" % (self.job.id, host_id)
1754 row = [0, self.job.id, host_id]
1755 block = IneligibleHostQueue(row=row, new_record=True)
1756 block.save()
mblighe2586682008-02-29 22:45:46 +00001757
1758
jadmanski0afbb632008-06-06 21:10:57 +00001759 def unblock_host(self, host_id):
1760 print "removing block %s/%s" % (self.job.id, host_id)
1761 blocks = IneligibleHostQueue.fetch(
1762 'job_id=%d and host_id=%d' % (self.job.id, host_id))
1763 for block in blocks:
1764 block.delete()
mblighe2586682008-02-29 22:45:46 +00001765
1766
showard2bab8f42008-11-12 18:15:22 +00001767 def set_execution_subdir(self, subdir=None):
1768 if subdir is None:
1769 assert self.get_host()
1770 subdir = self.get_host().hostname
1771 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00001772
1773
showard6355f6b2008-12-05 18:52:13 +00001774 def _get_hostname(self):
1775 if self.host:
1776 return self.host.hostname
1777 return 'no host'
1778
1779
showard170873e2009-01-07 00:22:26 +00001780 def __str__(self):
1781 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
1782
1783
jadmanski0afbb632008-06-06 21:10:57 +00001784 def set_status(self, status):
mblighf8c624d2008-07-03 16:58:45 +00001785 abort_statuses = ['Abort', 'Aborting', 'Aborted']
1786 if status not in abort_statuses:
1787 condition = ' AND '.join(['status <> "%s"' % x
1788 for x in abort_statuses])
1789 else:
1790 condition = ''
1791 self.update_field('status', status, condition=condition)
1792
showard170873e2009-01-07 00:22:26 +00001793 print "%s -> %s" % (self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00001794
showardc85c21b2008-11-24 22:17:37 +00001795 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00001796 self.update_field('complete', False)
1797 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00001798
jadmanski0afbb632008-06-06 21:10:57 +00001799 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showarde58e3f82008-11-20 19:04:59 +00001800 'Aborting']:
jadmanski0afbb632008-06-06 21:10:57 +00001801 self.update_field('complete', False)
1802 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00001803
showardc85c21b2008-11-24 22:17:37 +00001804 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00001805 self.update_field('complete', True)
1806 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00001807
1808 should_email_status = (status.lower() in _notify_email_statuses or
1809 'all' in _notify_email_statuses)
1810 if should_email_status:
1811 self._email_on_status(status)
1812
1813 self._email_on_job_complete()
1814
1815
1816 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00001817 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00001818
1819 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
1820 self.job.id, self.job.name, hostname, status)
1821 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
1822 self.job.id, self.job.name, hostname, status,
1823 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00001824 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00001825
1826
1827 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00001828 if not self.job.is_finished():
1829 return
showard542e8402008-09-19 20:16:18 +00001830
showardc85c21b2008-11-24 22:17:37 +00001831 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00001832 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00001833 for queue_entry in hosts_queue:
1834 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00001835 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00001836 queue_entry.status))
1837
1838 summary_text = "\n".join(summary_text)
1839 status_counts = models.Job.objects.get_status_counts(
1840 [self.job.id])[self.job.id]
1841 status = ', '.join('%d %s' % (count, status) for status, count
1842 in status_counts.iteritems())
1843
1844 subject = 'Autotest: Job ID: %s "%s" %s' % (
1845 self.job.id, self.job.name, status)
1846 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
1847 self.job.id, self.job.name, status, self._view_job_url(),
1848 summary_text)
showard170873e2009-01-07 00:22:26 +00001849 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00001850
1851
jadmanski0afbb632008-06-06 21:10:57 +00001852 def run(self,assigned_host=None):
1853 if self.meta_host:
1854 assert assigned_host
1855 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00001856 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00001857
jadmanski0afbb632008-06-06 21:10:57 +00001858 print "%s/%s scheduled on %s, status=%s" % (self.job.name,
1859 self.meta_host, self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00001860
jadmanski0afbb632008-06-06 21:10:57 +00001861 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00001862
jadmanski0afbb632008-06-06 21:10:57 +00001863 def requeue(self):
1864 self.set_status('Queued')
jadmanski0afbb632008-06-06 21:10:57 +00001865 if self.meta_host:
1866 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00001867
1868
jadmanski0afbb632008-06-06 21:10:57 +00001869 def handle_host_failure(self):
1870 """\
1871 Called when this queue entry's host has failed verification and
1872 repair.
1873 """
1874 assert not self.meta_host
1875 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00001876 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00001877
1878
jadmanskif7fa2cc2008-10-01 14:13:23 +00001879 @property
1880 def aborted_by(self):
1881 self._load_abort_info()
1882 return self._aborted_by
1883
1884
1885 @property
1886 def aborted_on(self):
1887 self._load_abort_info()
1888 return self._aborted_on
1889
1890
1891 def _load_abort_info(self):
1892 """ Fetch info about who aborted the job. """
1893 if hasattr(self, "_aborted_by"):
1894 return
1895 rows = _db.execute("""
1896 SELECT users.login, aborted_host_queue_entries.aborted_on
1897 FROM aborted_host_queue_entries
1898 INNER JOIN users
1899 ON users.id = aborted_host_queue_entries.aborted_by_id
1900 WHERE aborted_host_queue_entries.queue_entry_id = %s
1901 """, (self.id,))
1902 if rows:
1903 self._aborted_by, self._aborted_on = rows[0]
1904 else:
1905 self._aborted_by = self._aborted_on = None
1906
1907
showardb2e2c322008-10-14 17:33:55 +00001908 def on_pending(self):
1909 """
1910 Called when an entry in a synchronous job has passed verify. If the
1911 job is ready to run, returns an agent to run the job. Returns None
1912 otherwise.
1913 """
1914 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00001915 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00001916 if self.job.is_ready():
1917 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00001918 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00001919 return None
1920
1921
showard170873e2009-01-07 00:22:26 +00001922 def abort(self, dispatcher, agents_to_abort=[]):
showard1be97432008-10-17 15:30:45 +00001923 host = self.get_host()
showard9d9ffd52008-11-09 23:14:35 +00001924 if self.active and host:
showard170873e2009-01-07 00:22:26 +00001925 dispatcher.add_agent(Agent(tasks=host.reverify_tasks()))
showard1be97432008-10-17 15:30:45 +00001926
showard170873e2009-01-07 00:22:26 +00001927 abort_task = AbortTask(self, agents_to_abort)
showard1be97432008-10-17 15:30:45 +00001928 self.set_status('Aborting')
showard170873e2009-01-07 00:22:26 +00001929 dispatcher.add_agent(Agent(tasks=[abort_task], num_processes=0))
1930
1931 def execution_tag(self):
1932 assert self.execution_subdir
1933 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00001934
1935
mbligh36768f02008-02-22 18:28:33 +00001936class Job(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001937 def __init__(self, id=None, row=None):
1938 assert id or row
1939 super(Job, self).__init__(id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00001940
mblighe2586682008-02-29 22:45:46 +00001941
jadmanski0afbb632008-06-06 21:10:57 +00001942 @classmethod
1943 def _get_table(cls):
1944 return 'jobs'
mbligh36768f02008-02-22 18:28:33 +00001945
1946
jadmanski0afbb632008-06-06 21:10:57 +00001947 @classmethod
1948 def _fields(cls):
1949 return ['id', 'owner', 'name', 'priority', 'control_file',
showard2bab8f42008-11-12 18:15:22 +00001950 'control_type', 'created_on', 'synch_count', 'timeout',
showard21baa452008-10-21 00:08:39 +00001951 'run_verify', 'email_list', 'reboot_before', 'reboot_after']
showard04c82c52008-05-29 19:38:12 +00001952
1953
jadmanski0afbb632008-06-06 21:10:57 +00001954 def is_server_job(self):
1955 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00001956
1957
showard170873e2009-01-07 00:22:26 +00001958 def tag(self):
1959 return "%s-%s" % (self.id, self.owner)
1960
1961
jadmanski0afbb632008-06-06 21:10:57 +00001962 def get_host_queue_entries(self):
1963 rows = _db.execute("""
1964 SELECT * FROM host_queue_entries
1965 WHERE job_id= %s
1966 """, (self.id,))
1967 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00001968
jadmanski0afbb632008-06-06 21:10:57 +00001969 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00001970
jadmanski0afbb632008-06-06 21:10:57 +00001971 return entries
mbligh36768f02008-02-22 18:28:33 +00001972
1973
jadmanski0afbb632008-06-06 21:10:57 +00001974 def set_status(self, status, update_queues=False):
1975 self.update_field('status',status)
1976
1977 if update_queues:
1978 for queue_entry in self.get_host_queue_entries():
1979 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00001980
1981
jadmanski0afbb632008-06-06 21:10:57 +00001982 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00001983 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
1984 status='Pending')
1985 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00001986
1987
jadmanski0afbb632008-06-06 21:10:57 +00001988 def num_machines(self, clause = None):
1989 sql = "job_id=%s" % self.id
1990 if clause:
1991 sql += " AND (%s)" % clause
1992 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00001993
1994
jadmanski0afbb632008-06-06 21:10:57 +00001995 def num_queued(self):
1996 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00001997
1998
jadmanski0afbb632008-06-06 21:10:57 +00001999 def num_active(self):
2000 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002001
2002
jadmanski0afbb632008-06-06 21:10:57 +00002003 def num_complete(self):
2004 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002005
2006
jadmanski0afbb632008-06-06 21:10:57 +00002007 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002008 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002009
mbligh36768f02008-02-22 18:28:33 +00002010
showard2bab8f42008-11-12 18:15:22 +00002011 def _stop_all_entries(self, entries_to_abort):
2012 """
2013 queue_entries: sequence of models.HostQueueEntry objects
2014 """
2015 for child_entry in entries_to_abort:
showard4f9e5372009-01-07 21:33:38 +00002016 assert not child_entry.complete, (
2017 '%s status=%s, active=%s, complete=%s' %
2018 (child_entry.id, child_entry.status, child_entry.active,
2019 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002020 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2021 child_entry.host.status = models.Host.Status.READY
2022 child_entry.host.save()
2023 child_entry.status = models.HostQueueEntry.Status.STOPPED
2024 child_entry.save()
2025
2026
2027 def stop_if_necessary(self):
2028 not_yet_run = models.HostQueueEntry.objects.filter(
2029 job=self.id, status__in=(models.HostQueueEntry.Status.QUEUED,
2030 models.HostQueueEntry.Status.VERIFYING,
2031 models.HostQueueEntry.Status.PENDING))
2032 if not_yet_run.count() < self.synch_count:
2033 self._stop_all_entries(not_yet_run)
mblighe2586682008-02-29 22:45:46 +00002034
2035
jadmanski0afbb632008-06-06 21:10:57 +00002036 def write_to_machines_file(self, queue_entry):
2037 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002038 file_path = os.path.join(self.tag(), '.machines')
2039 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002040
2041
showard2bab8f42008-11-12 18:15:22 +00002042 def _next_group_name(self):
2043 query = models.HostQueueEntry.objects.filter(
2044 job=self.id).values('execution_subdir').distinct()
2045 subdirs = (entry['execution_subdir'] for entry in query)
2046 groups = (re.match(r'group(\d+)', subdir) for subdir in subdirs)
2047 ids = [int(match.group(1)) for match in groups if match]
2048 if ids:
2049 next_id = max(ids) + 1
2050 else:
2051 next_id = 0
2052 return "group%d" % next_id
2053
2054
showard170873e2009-01-07 00:22:26 +00002055 def _write_control_file(self, execution_tag):
2056 control_path = _drone_manager.attach_file_to_execution(
2057 execution_tag, self.control_file)
2058 return control_path
mbligh36768f02008-02-22 18:28:33 +00002059
showardb2e2c322008-10-14 17:33:55 +00002060
showard2bab8f42008-11-12 18:15:22 +00002061 def get_group_entries(self, queue_entry_from_group):
2062 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002063 return list(HostQueueEntry.fetch(
2064 where='job_id=%s AND execution_subdir=%s',
2065 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002066
2067
showardb2e2c322008-10-14 17:33:55 +00002068 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002069 assert queue_entries
2070 execution_tag = queue_entries[0].execution_tag()
2071 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002072 hostnames = ','.join([entry.get_host().hostname
2073 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002074
showard170873e2009-01-07 00:22:26 +00002075 params = [_autoserv_path, '-P', execution_tag, '-p', '-n',
2076 '-r', _drone_manager.absolute_path(execution_tag),
2077 '-u', self.owner, '-l', self.name, '-m', hostnames,
2078 _drone_manager.absolute_path(control_path)]
mbligh36768f02008-02-22 18:28:33 +00002079
jadmanski0afbb632008-06-06 21:10:57 +00002080 if not self.is_server_job():
2081 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002082
showardb2e2c322008-10-14 17:33:55 +00002083 return params
mblighe2586682008-02-29 22:45:46 +00002084
mbligh36768f02008-02-22 18:28:33 +00002085
showardc9ae1782009-01-30 01:42:37 +00002086 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002087 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002088 return True
showard0fc38302008-10-23 00:44:07 +00002089 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002090 return queue_entry.get_host().dirty
2091 return False
showard21baa452008-10-21 00:08:39 +00002092
showardc9ae1782009-01-30 01:42:37 +00002093
2094 def _should_run_verify(self, queue_entry):
2095 do_not_verify = (queue_entry.host.protection ==
2096 host_protections.Protection.DO_NOT_VERIFY)
2097 if do_not_verify:
2098 return False
2099 return self.run_verify
2100
2101
2102 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002103 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002104 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002105 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002106 if self._should_run_verify(queue_entry):
2107 tasks.append(VerifyTask(queue_entry=queue_entry))
2108 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002109 return tasks
2110
2111
showard2bab8f42008-11-12 18:15:22 +00002112 def _assign_new_group(self, queue_entries):
2113 if len(queue_entries) == 1:
2114 group_name = queue_entries[0].get_host().hostname
2115 else:
2116 group_name = self._next_group_name()
2117 print 'Running synchronous job %d hosts %s as %s' % (
2118 self.id, [entry.host.hostname for entry in queue_entries],
2119 group_name)
2120
2121 for queue_entry in queue_entries:
2122 queue_entry.set_execution_subdir(group_name)
2123
2124
2125 def _choose_group_to_run(self, include_queue_entry):
2126 chosen_entries = [include_queue_entry]
2127
2128 num_entries_needed = self.synch_count - 1
2129 if num_entries_needed > 0:
2130 pending_entries = HostQueueEntry.fetch(
2131 where='job_id = %s AND status = "Pending" AND id != %s',
2132 params=(self.id, include_queue_entry.id))
2133 chosen_entries += list(pending_entries)[:num_entries_needed]
2134
2135 self._assign_new_group(chosen_entries)
2136 return chosen_entries
2137
2138
2139 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002140 if not self.is_ready():
showardc9ae1782009-01-30 01:42:37 +00002141 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2142 return Agent(self._get_pre_job_tasks(queue_entry))
mbligh36768f02008-02-22 18:28:33 +00002143
showard2bab8f42008-11-12 18:15:22 +00002144 queue_entries = self._choose_group_to_run(queue_entry)
2145 return self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00002146
2147
2148 def _finish_run(self, queue_entries, initial_tasks=[]):
showardb2ccdda2008-10-28 20:39:05 +00002149 for queue_entry in queue_entries:
2150 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002151 params = self._get_autoserv_params(queue_entries)
2152 queue_task = QueueTask(job=self, queue_entries=queue_entries,
2153 cmd=params)
2154 tasks = initial_tasks + [queue_task]
2155 entry_ids = [entry.id for entry in queue_entries]
2156
showard170873e2009-01-07 00:22:26 +00002157 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00002158
2159
mbligh36768f02008-02-22 18:28:33 +00002160if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002161 main()