blob: 02356d66742d77c58bf695b6303334ae6c0e31a4 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
mbligh8bcd23a2009-02-03 19:14:06 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showarda3c58572009-03-12 20:36:59 +000010import itertools, logging, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000014from autotest_lib.client.common_lib import global_config
showard2bab8f42008-11-12 18:15:22 +000015from autotest_lib.client.common_lib import host_protections, utils, debug
showardb1e51872008-10-07 11:08:18 +000016from autotest_lib.database import database_connection
showard21baa452008-10-21 00:08:39 +000017from autotest_lib.frontend.afe import models
showard170873e2009-01-07 00:22:26 +000018from autotest_lib.scheduler import drone_manager, drones, email_manager
showardd1ee1dd2009-01-07 21:33:08 +000019from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000020
mblighb090f142008-02-27 21:33:46 +000021
mbligh36768f02008-02-22 18:28:33 +000022RESULTS_DIR = '.'
23AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000024DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000025
26AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
27
28if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000029 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000030AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
31AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
32
33if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000034 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000035
mbligh90a549d2008-03-25 23:52:34 +000036# how long to wait for autoserv to write a pidfile
37PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000038
showard35162b02009-03-03 02:17:30 +000039# error message to leave in results dir when an autoserv process disappears
40# mysteriously
41_LOST_PROCESS_ERROR = """\
42Autoserv failed abnormally during execution for this job, probably due to a
43system error on the Autotest server. Full results may not be available. Sorry.
44"""
45
mbligh6f8bab42008-02-29 22:45:14 +000046_db = None
mbligh36768f02008-02-22 18:28:33 +000047_shutdown = False
showard170873e2009-01-07 00:22:26 +000048_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
49_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000050_testing_mode = False
showard542e8402008-09-19 20:16:18 +000051_base_url = None
showardc85c21b2008-11-24 22:17:37 +000052_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000053_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000054
55
56def main():
jadmanski0afbb632008-06-06 21:10:57 +000057 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000058
jadmanski0afbb632008-06-06 21:10:57 +000059 parser = optparse.OptionParser(usage)
60 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
61 action='store_true')
62 parser.add_option('--logfile', help='Set a log file that all stdout ' +
63 'should be redirected to. Stderr will go to this ' +
64 'file + ".err"')
65 parser.add_option('--test', help='Indicate that scheduler is under ' +
66 'test and should use dummy autoserv and no parsing',
67 action='store_true')
68 (options, args) = parser.parse_args()
69 if len(args) != 1:
70 parser.print_usage()
71 return
mbligh36768f02008-02-22 18:28:33 +000072
jadmanski0afbb632008-06-06 21:10:57 +000073 global RESULTS_DIR
74 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +000075
showardcca334f2009-03-12 20:38:34 +000076 # Change the cwd while running to avoid issues incase we were launched from
77 # somewhere odd (such as a random NFS home directory of the person running
78 # sudo to launch us as the appropriate user).
79 os.chdir(RESULTS_DIR)
80
jadmanski0afbb632008-06-06 21:10:57 +000081 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +000082 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
83 "notify_email_statuses",
84 default='')
showardc85c21b2008-11-24 22:17:37 +000085 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +000086 _notify_email_statuses = [status for status in
87 re.split(r'[\s,;:]', notify_statuses_list.lower())
88 if status]
showardc85c21b2008-11-24 22:17:37 +000089
jadmanski0afbb632008-06-06 21:10:57 +000090 if options.test:
91 global _autoserv_path
92 _autoserv_path = 'autoserv_dummy'
93 global _testing_mode
94 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +000095
mbligh37eceaa2008-12-15 22:56:37 +000096 # AUTOTEST_WEB.base_url is still a supported config option as some people
97 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +000098 global _base_url
showard170873e2009-01-07 00:22:26 +000099 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
100 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000101 if config_base_url:
102 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000103 else:
mbligh37eceaa2008-12-15 22:56:37 +0000104 # For the common case of everything running on a single server you
105 # can just set the hostname in a single place in the config file.
106 server_name = c.get_config_value('SERVER', 'hostname')
107 if not server_name:
108 print 'Error: [SERVER] hostname missing from the config file.'
109 sys.exit(1)
110 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000111
showardc5afc462009-01-13 00:09:39 +0000112 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000113 server.start()
114
jadmanski0afbb632008-06-06 21:10:57 +0000115 try:
showardc5afc462009-01-13 00:09:39 +0000116 init(options.logfile)
117 dispatcher = Dispatcher()
118 dispatcher.do_initial_recovery(recover_hosts=options.recover_hosts)
119
jadmanski0afbb632008-06-06 21:10:57 +0000120 while not _shutdown:
121 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000122 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000123 except:
showard170873e2009-01-07 00:22:26 +0000124 email_manager.manager.log_stacktrace(
125 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000126
showard170873e2009-01-07 00:22:26 +0000127 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000128 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000129 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000130 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000131
132
133def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000134 global _shutdown
135 _shutdown = True
136 print "Shutdown request received."
mbligh36768f02008-02-22 18:28:33 +0000137
138
139def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000140 if logfile:
141 enable_logging(logfile)
142 print "%s> dispatcher starting" % time.strftime("%X %x")
143 print "My PID is %d" % os.getpid()
mbligh36768f02008-02-22 18:28:33 +0000144
showardb1e51872008-10-07 11:08:18 +0000145 if _testing_mode:
146 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000147 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000148
jadmanski0afbb632008-06-06 21:10:57 +0000149 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
150 global _db
showard170873e2009-01-07 00:22:26 +0000151 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000152 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000153
showardfa8629c2008-11-04 16:51:23 +0000154 # ensure Django connection is in autocommit
155 setup_django_environment.enable_autocommit()
156
showard2bab8f42008-11-12 18:15:22 +0000157 debug.configure('scheduler', format_string='%(message)s')
showard67831ae2009-01-16 03:07:38 +0000158 debug.get_logger().setLevel(logging.INFO)
showard2bab8f42008-11-12 18:15:22 +0000159
jadmanski0afbb632008-06-06 21:10:57 +0000160 print "Setting signal handler"
161 signal.signal(signal.SIGINT, handle_sigint)
162
showardd1ee1dd2009-01-07 21:33:08 +0000163 drones = global_config.global_config.get_config_value(
164 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
165 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000166 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000167 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000168 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
169
jadmanski0afbb632008-06-06 21:10:57 +0000170 print "Connected! Running..."
mbligh36768f02008-02-22 18:28:33 +0000171
172
173def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000174 out_file = logfile
175 err_file = "%s.err" % logfile
176 print "Enabling logging to %s (%s)" % (out_file, err_file)
177 out_fd = open(out_file, "a", buffering=0)
178 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000179
jadmanski0afbb632008-06-06 21:10:57 +0000180 os.dup2(out_fd.fileno(), sys.stdout.fileno())
181 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000182
jadmanski0afbb632008-06-06 21:10:57 +0000183 sys.stdout = out_fd
184 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000185
186
mblighd5c95802008-03-05 00:33:46 +0000187def queue_entries_to_abort():
jadmanski0afbb632008-06-06 21:10:57 +0000188 rows = _db.execute("""
189 SELECT * FROM host_queue_entries WHERE status='Abort';
190 """)
showard2bab8f42008-11-12 18:15:22 +0000191
jadmanski0afbb632008-06-06 21:10:57 +0000192 qe = [HostQueueEntry(row=i) for i in rows]
193 return qe
mbligh36768f02008-02-22 18:28:33 +0000194
showard7cf9a9b2008-05-15 21:15:52 +0000195
showard63a34772008-08-18 19:32:50 +0000196class HostScheduler(object):
197 def _get_ready_hosts(self):
198 # avoid any host with a currently active queue entry against it
199 hosts = Host.fetch(
200 joins='LEFT JOIN host_queue_entries AS active_hqe '
201 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000202 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000203 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000204 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000205 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
206 return dict((host.id, host) for host in hosts)
207
208
209 @staticmethod
210 def _get_sql_id_list(id_list):
211 return ','.join(str(item_id) for item_id in id_list)
212
213
214 @classmethod
showard989f25d2008-10-01 11:38:11 +0000215 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000216 if not id_list:
217 return {}
showard63a34772008-08-18 19:32:50 +0000218 query %= cls._get_sql_id_list(id_list)
219 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000220 return cls._process_many2many_dict(rows, flip)
221
222
223 @staticmethod
224 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000225 result = {}
226 for row in rows:
227 left_id, right_id = long(row[0]), long(row[1])
showard989f25d2008-10-01 11:38:11 +0000228 if flip:
229 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000230 result.setdefault(left_id, set()).add(right_id)
231 return result
232
233
234 @classmethod
235 def _get_job_acl_groups(cls, job_ids):
236 query = """
showardd9ac4452009-02-07 02:04:37 +0000237 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000238 FROM jobs
239 INNER JOIN users ON users.login = jobs.owner
240 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
241 WHERE jobs.id IN (%s)
242 """
243 return cls._get_many2many_dict(query, job_ids)
244
245
246 @classmethod
247 def _get_job_ineligible_hosts(cls, job_ids):
248 query = """
249 SELECT job_id, host_id
250 FROM ineligible_host_queues
251 WHERE job_id IN (%s)
252 """
253 return cls._get_many2many_dict(query, job_ids)
254
255
256 @classmethod
showard989f25d2008-10-01 11:38:11 +0000257 def _get_job_dependencies(cls, job_ids):
258 query = """
259 SELECT job_id, label_id
260 FROM jobs_dependency_labels
261 WHERE job_id IN (%s)
262 """
263 return cls._get_many2many_dict(query, job_ids)
264
265
266 @classmethod
showard63a34772008-08-18 19:32:50 +0000267 def _get_host_acls(cls, host_ids):
268 query = """
showardd9ac4452009-02-07 02:04:37 +0000269 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000270 FROM acl_groups_hosts
271 WHERE host_id IN (%s)
272 """
273 return cls._get_many2many_dict(query, host_ids)
274
275
276 @classmethod
277 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000278 if not host_ids:
279 return {}, {}
showard63a34772008-08-18 19:32:50 +0000280 query = """
281 SELECT label_id, host_id
282 FROM hosts_labels
283 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000284 """ % cls._get_sql_id_list(host_ids)
285 rows = _db.execute(query)
286 labels_to_hosts = cls._process_many2many_dict(rows)
287 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
288 return labels_to_hosts, hosts_to_labels
289
290
291 @classmethod
292 def _get_labels(cls):
293 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000294
295
296 def refresh(self, pending_queue_entries):
297 self._hosts_available = self._get_ready_hosts()
298
299 relevant_jobs = [queue_entry.job_id
300 for queue_entry in pending_queue_entries]
301 self._job_acls = self._get_job_acl_groups(relevant_jobs)
302 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000303 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000304
305 host_ids = self._hosts_available.keys()
306 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000307 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
308
309 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000310
311
312 def _is_acl_accessible(self, host_id, queue_entry):
313 job_acls = self._job_acls.get(queue_entry.job_id, set())
314 host_acls = self._host_acls.get(host_id, set())
315 return len(host_acls.intersection(job_acls)) > 0
316
317
showard989f25d2008-10-01 11:38:11 +0000318 def _check_job_dependencies(self, job_dependencies, host_labels):
319 missing = job_dependencies - host_labels
320 return len(job_dependencies - host_labels) == 0
321
322
323 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
324 queue_entry):
showardade14e22009-01-26 22:38:32 +0000325 if not queue_entry.meta_host:
326 # bypass only_if_needed labels when a specific host is selected
327 return True
328
showard989f25d2008-10-01 11:38:11 +0000329 for label_id in host_labels:
330 label = self._labels[label_id]
331 if not label.only_if_needed:
332 # we don't care about non-only_if_needed labels
333 continue
334 if queue_entry.meta_host == label_id:
335 # if the label was requested in a metahost it's OK
336 continue
337 if label_id not in job_dependencies:
338 return False
339 return True
340
341
342 def _is_host_eligible_for_job(self, host_id, queue_entry):
343 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
344 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000345
346 acl = self._is_acl_accessible(host_id, queue_entry)
347 deps = self._check_job_dependencies(job_dependencies, host_labels)
348 only_if = self._check_only_if_needed_labels(job_dependencies,
349 host_labels, queue_entry)
350 return acl and deps and only_if
showard989f25d2008-10-01 11:38:11 +0000351
352
showard63a34772008-08-18 19:32:50 +0000353 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000354 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000355 return None
356 return self._hosts_available.pop(queue_entry.host_id, None)
357
358
359 def _is_host_usable(self, host_id):
360 if host_id not in self._hosts_available:
361 # host was already used during this scheduling cycle
362 return False
363 if self._hosts_available[host_id].invalid:
364 # Invalid hosts cannot be used for metahosts. They're included in
365 # the original query because they can be used by non-metahosts.
366 return False
367 return True
368
369
370 def _schedule_metahost(self, queue_entry):
371 label_id = queue_entry.meta_host
372 hosts_in_label = self._label_hosts.get(label_id, set())
373 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
374 set())
375
376 # must iterate over a copy so we can mutate the original while iterating
377 for host_id in list(hosts_in_label):
378 if not self._is_host_usable(host_id):
379 hosts_in_label.remove(host_id)
380 continue
381 if host_id in ineligible_host_ids:
382 continue
showard989f25d2008-10-01 11:38:11 +0000383 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000384 continue
385
386 hosts_in_label.remove(host_id)
387 return self._hosts_available.pop(host_id)
388 return None
389
390
391 def find_eligible_host(self, queue_entry):
392 if not queue_entry.meta_host:
393 return self._schedule_non_metahost(queue_entry)
394 return self._schedule_metahost(queue_entry)
395
396
showard170873e2009-01-07 00:22:26 +0000397class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000398 def __init__(self):
399 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000400 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000401 self._host_scheduler = HostScheduler()
showard170873e2009-01-07 00:22:26 +0000402 self._host_agents = {}
403 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000404
mbligh36768f02008-02-22 18:28:33 +0000405
jadmanski0afbb632008-06-06 21:10:57 +0000406 def do_initial_recovery(self, recover_hosts=True):
407 # always recover processes
408 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000409
jadmanski0afbb632008-06-06 21:10:57 +0000410 if recover_hosts:
411 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000412
413
jadmanski0afbb632008-06-06 21:10:57 +0000414 def tick(self):
showard170873e2009-01-07 00:22:26 +0000415 _drone_manager.refresh()
showarda3ab0d52008-11-03 19:03:47 +0000416 self._run_cleanup_maybe()
jadmanski0afbb632008-06-06 21:10:57 +0000417 self._find_aborting()
418 self._schedule_new_jobs()
419 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000420 _drone_manager.execute_actions()
421 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000422
showard97aed502008-11-04 02:01:24 +0000423
showarda3ab0d52008-11-03 19:03:47 +0000424 def _run_cleanup_maybe(self):
showardd1ee1dd2009-01-07 21:33:08 +0000425 should_cleanup = (self._last_clean_time +
426 scheduler_config.config.clean_interval * 60 <
427 time.time())
428 if should_cleanup:
showarda3ab0d52008-11-03 19:03:47 +0000429 print 'Running cleanup'
430 self._abort_timed_out_jobs()
431 self._abort_jobs_past_synch_start_timeout()
432 self._clear_inactive_blocks()
showardfa8629c2008-11-04 16:51:23 +0000433 self._check_for_db_inconsistencies()
showarda3ab0d52008-11-03 19:03:47 +0000434 self._last_clean_time = time.time()
435
mbligh36768f02008-02-22 18:28:33 +0000436
showard170873e2009-01-07 00:22:26 +0000437 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
438 for object_id in object_ids:
439 agent_dict.setdefault(object_id, set()).add(agent)
440
441
442 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
443 for object_id in object_ids:
444 assert object_id in agent_dict
445 agent_dict[object_id].remove(agent)
446
447
jadmanski0afbb632008-06-06 21:10:57 +0000448 def add_agent(self, agent):
449 self._agents.append(agent)
450 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000451 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
452 self._register_agent_for_ids(self._queue_entry_agents,
453 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000454
showard170873e2009-01-07 00:22:26 +0000455
456 def get_agents_for_entry(self, queue_entry):
457 """
458 Find agents corresponding to the specified queue_entry.
459 """
460 return self._queue_entry_agents.get(queue_entry.id, set())
461
462
463 def host_has_agent(self, host):
464 """
465 Determine if there is currently an Agent present using this host.
466 """
467 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000468
469
jadmanski0afbb632008-06-06 21:10:57 +0000470 def remove_agent(self, agent):
471 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000472 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
473 agent)
474 self._unregister_agent_for_ids(self._queue_entry_agents,
475 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000476
477
showard4c5374f2008-09-04 17:02:56 +0000478 def num_running_processes(self):
479 return sum(agent.num_processes for agent in self._agents
480 if agent.is_running())
mblighbb421852008-03-11 22:36:16 +0000481
482
showard170873e2009-01-07 00:22:26 +0000483 def _extract_execution_tag(self, command_line):
484 match = re.match(r'.* -P (\S+) ', command_line)
485 if not match:
486 return None
487 return match.group(1)
mblighbb421852008-03-11 22:36:16 +0000488
489
showard2bab8f42008-11-12 18:15:22 +0000490 def _recover_queue_entries(self, queue_entries, run_monitor):
491 assert len(queue_entries) > 0
showard2bab8f42008-11-12 18:15:22 +0000492 queue_task = RecoveryQueueTask(job=queue_entries[0].job,
493 queue_entries=queue_entries,
494 run_monitor=run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +0000495 self.add_agent(Agent(tasks=[queue_task],
showard170873e2009-01-07 00:22:26 +0000496 num_processes=len(queue_entries)))
mblighbb421852008-03-11 22:36:16 +0000497
498
jadmanski0afbb632008-06-06 21:10:57 +0000499 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000500 self._register_pidfiles()
501 _drone_manager.refresh()
502 self._recover_running_entries()
503 self._recover_aborting_entries()
504 self._requeue_other_active_entries()
505 self._recover_parsing_entries()
506 self._reverify_remaining_hosts()
507 # reinitialize drones after killing orphaned processes, since they can
508 # leave around files when they die
509 _drone_manager.execute_actions()
510 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000511
showard170873e2009-01-07 00:22:26 +0000512
513 def _register_pidfiles(self):
514 # during recovery we may need to read pidfiles for both running and
515 # parsing entries
516 queue_entries = HostQueueEntry.fetch(
517 where="status IN ('Running', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000518 for queue_entry in queue_entries:
showard170873e2009-01-07 00:22:26 +0000519 pidfile_id = _drone_manager.get_pidfile_id_from(
520 queue_entry.execution_tag())
521 _drone_manager.register_pidfile(pidfile_id)
522
523
524 def _recover_running_entries(self):
525 orphans = _drone_manager.get_orphaned_autoserv_processes()
526
527 queue_entries = HostQueueEntry.fetch(where="status = 'Running'")
528 requeue_entries = []
529 for queue_entry in queue_entries:
530 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000531 # synchronous job we've already recovered
532 continue
showard170873e2009-01-07 00:22:26 +0000533 execution_tag = queue_entry.execution_tag()
534 run_monitor = PidfileRunMonitor()
535 run_monitor.attach_to_existing_process(execution_tag)
536 if not run_monitor.has_process():
537 # autoserv apparently never got run, so let it get requeued
538 continue
showarde788ea62008-11-17 21:02:47 +0000539 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showard170873e2009-01-07 00:22:26 +0000540 print 'Recovering %s (process %s)' % (
541 ', '.join(str(entry) for entry in queue_entries),
542 run_monitor.get_process())
showard2bab8f42008-11-12 18:15:22 +0000543 self._recover_queue_entries(queue_entries, run_monitor)
showard170873e2009-01-07 00:22:26 +0000544 orphans.pop(execution_tag, None)
mbligh90a549d2008-03-25 23:52:34 +0000545
jadmanski0afbb632008-06-06 21:10:57 +0000546 # now kill any remaining autoserv processes
showard170873e2009-01-07 00:22:26 +0000547 for process in orphans.itervalues():
548 print 'Killing orphan %s' % process
549 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000550
showard170873e2009-01-07 00:22:26 +0000551
552 def _recover_aborting_entries(self):
553 queue_entries = HostQueueEntry.fetch(
554 where='status IN ("Abort", "Aborting")')
jadmanski0afbb632008-06-06 21:10:57 +0000555 for queue_entry in queue_entries:
showard170873e2009-01-07 00:22:26 +0000556 print 'Recovering aborting QE %s' % queue_entry
557 agent = queue_entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000558
showard97aed502008-11-04 02:01:24 +0000559
showard170873e2009-01-07 00:22:26 +0000560 def _requeue_other_active_entries(self):
561 queue_entries = HostQueueEntry.fetch(
562 where='active AND NOT complete AND status != "Pending"')
563 for queue_entry in queue_entries:
564 if self.get_agents_for_entry(queue_entry):
565 # entry has already been recovered
566 continue
567 print 'Requeuing active QE %s (status=%s)' % (queue_entry,
568 queue_entry.status)
569 if queue_entry.host:
570 tasks = queue_entry.host.reverify_tasks()
571 self.add_agent(Agent(tasks))
572 agent = queue_entry.requeue()
573
574
575 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000576 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000577 self._reverify_hosts_where("""(status = 'Repairing' OR
578 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000579 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000580
showard170873e2009-01-07 00:22:26 +0000581 # recover "Running" hosts with no active queue entries, although this
582 # should never happen
583 message = ('Recovering running host %s - this probably indicates a '
584 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000585 self._reverify_hosts_where("""status = 'Running' AND
586 id NOT IN (SELECT host_id
587 FROM host_queue_entries
588 WHERE active)""",
589 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000590
591
jadmanski0afbb632008-06-06 21:10:57 +0000592 def _reverify_hosts_where(self, where,
showard170873e2009-01-07 00:22:26 +0000593 print_message='Reverifying host %s'):
594 full_where='locked = 0 AND invalid = 0 AND ' + where
595 for host in Host.fetch(where=full_where):
596 if self.host_has_agent(host):
597 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000598 continue
showard170873e2009-01-07 00:22:26 +0000599 if print_message:
jadmanski0afbb632008-06-06 21:10:57 +0000600 print print_message % host.hostname
showard170873e2009-01-07 00:22:26 +0000601 tasks = host.reverify_tasks()
602 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000603
604
showard97aed502008-11-04 02:01:24 +0000605 def _recover_parsing_entries(self):
showard2bab8f42008-11-12 18:15:22 +0000606 recovered_entry_ids = set()
showard97aed502008-11-04 02:01:24 +0000607 for entry in HostQueueEntry.fetch(where='status = "Parsing"'):
showard2bab8f42008-11-12 18:15:22 +0000608 if entry.id in recovered_entry_ids:
609 continue
610 queue_entries = entry.job.get_group_entries(entry)
showard170873e2009-01-07 00:22:26 +0000611 recovered_entry_ids = recovered_entry_ids.union(
612 entry.id for entry in queue_entries)
613 print 'Recovering parsing entries %s' % (
614 ', '.join(str(entry) for entry in queue_entries))
showard97aed502008-11-04 02:01:24 +0000615
616 reparse_task = FinalReparseTask(queue_entries)
showard170873e2009-01-07 00:22:26 +0000617 self.add_agent(Agent([reparse_task], num_processes=0))
showard97aed502008-11-04 02:01:24 +0000618
619
jadmanski0afbb632008-06-06 21:10:57 +0000620 def _recover_hosts(self):
621 # recover "Repair Failed" hosts
622 message = 'Reverifying dead host %s'
623 self._reverify_hosts_where("status = 'Repair Failed'",
624 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000625
626
showard3bb499f2008-07-03 19:42:20 +0000627 def _abort_timed_out_jobs(self):
628 """
629 Aborts all jobs that have timed out and not completed
630 """
showarda3ab0d52008-11-03 19:03:47 +0000631 query = models.Job.objects.filter(hostqueueentry__complete=False).extra(
632 where=['created_on + INTERVAL timeout HOUR < NOW()'])
633 for job in query.distinct():
634 print 'Aborting job %d due to job timeout' % job.id
635 job.abort(None)
showard3bb499f2008-07-03 19:42:20 +0000636
637
showard98863972008-10-29 21:14:56 +0000638 def _abort_jobs_past_synch_start_timeout(self):
639 """
640 Abort synchronous jobs that are past the start timeout (from global
641 config) and are holding a machine that's in everyone.
642 """
643 timeout_delta = datetime.timedelta(
showardd1ee1dd2009-01-07 21:33:08 +0000644 minutes=scheduler_config.config.synch_job_start_timeout_minutes)
showard98863972008-10-29 21:14:56 +0000645 timeout_start = datetime.datetime.now() - timeout_delta
646 query = models.Job.objects.filter(
showard98863972008-10-29 21:14:56 +0000647 created_on__lt=timeout_start,
648 hostqueueentry__status='Pending',
showardd9ac4452009-02-07 02:04:37 +0000649 hostqueueentry__host__aclgroup__name='Everyone')
showard98863972008-10-29 21:14:56 +0000650 for job in query.distinct():
651 print 'Aborting job %d due to start timeout' % job.id
showardff059d72008-12-03 18:18:53 +0000652 entries_to_abort = job.hostqueueentry_set.exclude(
653 status=models.HostQueueEntry.Status.RUNNING)
654 for queue_entry in entries_to_abort:
655 queue_entry.abort(None)
showard98863972008-10-29 21:14:56 +0000656
657
jadmanski0afbb632008-06-06 21:10:57 +0000658 def _clear_inactive_blocks(self):
659 """
660 Clear out blocks for all completed jobs.
661 """
662 # this would be simpler using NOT IN (subquery), but MySQL
663 # treats all IN subqueries as dependent, so this optimizes much
664 # better
665 _db.execute("""
666 DELETE ihq FROM ineligible_host_queues ihq
showard4eaaf522008-06-06 22:28:07 +0000667 LEFT JOIN (SELECT DISTINCT job_id FROM host_queue_entries
jadmanski0afbb632008-06-06 21:10:57 +0000668 WHERE NOT complete) hqe
669 USING (job_id) WHERE hqe.job_id IS NULL""")
showard04c82c52008-05-29 19:38:12 +0000670
671
showardb95b1bd2008-08-15 18:11:04 +0000672 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000673 # prioritize by job priority, then non-metahost over metahost, then FIFO
674 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000675 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000676 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000677 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000678
679
jadmanski0afbb632008-06-06 21:10:57 +0000680 def _schedule_new_jobs(self):
showard63a34772008-08-18 19:32:50 +0000681 queue_entries = self._get_pending_queue_entries()
682 if not queue_entries:
showardb95b1bd2008-08-15 18:11:04 +0000683 return
showardb95b1bd2008-08-15 18:11:04 +0000684
showard63a34772008-08-18 19:32:50 +0000685 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000686
showard63a34772008-08-18 19:32:50 +0000687 for queue_entry in queue_entries:
688 assigned_host = self._host_scheduler.find_eligible_host(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000689 if not assigned_host:
jadmanski0afbb632008-06-06 21:10:57 +0000690 continue
showardb95b1bd2008-08-15 18:11:04 +0000691 self._run_queue_entry(queue_entry, assigned_host)
692
693
694 def _run_queue_entry(self, queue_entry, host):
695 agent = queue_entry.run(assigned_host=host)
showard170873e2009-01-07 00:22:26 +0000696 # in some cases (synchronous jobs with run_verify=False), agent may be
697 # None
showard9976ce92008-10-15 20:28:13 +0000698 if agent:
699 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000700
701
jadmanski0afbb632008-06-06 21:10:57 +0000702 def _find_aborting(self):
jadmanski0afbb632008-06-06 21:10:57 +0000703 for entry in queue_entries_to_abort():
showard170873e2009-01-07 00:22:26 +0000704 agents_to_abort = list(self.get_agents_for_entry(entry))
showard1be97432008-10-17 15:30:45 +0000705 for agent in agents_to_abort:
706 self.remove_agent(agent)
707
showard170873e2009-01-07 00:22:26 +0000708 entry.abort(self, agents_to_abort)
jadmanski0afbb632008-06-06 21:10:57 +0000709
710
showard324bf812009-01-20 23:23:38 +0000711 def _can_start_agent(self, agent, num_started_this_cycle,
712 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000713 # always allow zero-process agents to run
714 if agent.num_processes == 0:
715 return True
716 # don't allow any nonzero-process agents to run after we've reached a
717 # limit (this avoids starvation of many-process agents)
718 if have_reached_limit:
719 return False
720 # total process throttling
showard324bf812009-01-20 23:23:38 +0000721 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000722 return False
723 # if a single agent exceeds the per-cycle throttling, still allow it to
724 # run when it's the first agent in the cycle
725 if num_started_this_cycle == 0:
726 return True
727 # per-cycle throttling
728 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000729 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000730 return False
731 return True
732
733
jadmanski0afbb632008-06-06 21:10:57 +0000734 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000735 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000736 have_reached_limit = False
737 # iterate over copy, so we can remove agents during iteration
738 for agent in list(self._agents):
739 if agent.is_done():
jadmanski0afbb632008-06-06 21:10:57 +0000740 print "agent finished"
showard170873e2009-01-07 00:22:26 +0000741 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000742 continue
743 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000744 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000745 have_reached_limit):
746 have_reached_limit = True
747 continue
showard4c5374f2008-09-04 17:02:56 +0000748 num_started_this_cycle += agent.num_processes
749 agent.tick()
showard324bf812009-01-20 23:23:38 +0000750 print _drone_manager.total_running_processes(), 'running processes'
mbligh36768f02008-02-22 18:28:33 +0000751
752
showardfa8629c2008-11-04 16:51:23 +0000753 def _check_for_db_inconsistencies(self):
754 query = models.HostQueueEntry.objects.filter(active=True, complete=True)
755 if query.count() != 0:
756 subject = ('%d queue entries found with active=complete=1'
757 % query.count())
758 message = '\n'.join(str(entry.get_object_dict())
759 for entry in query[:50])
760 if len(query) > 50:
761 message += '\n(truncated)\n'
762
763 print subject
showard170873e2009-01-07 00:22:26 +0000764 email_manager.manager.enqueue_notify_email(subject, message)
showardfa8629c2008-11-04 16:51:23 +0000765
766
showard170873e2009-01-07 00:22:26 +0000767class PidfileRunMonitor(object):
768 """
769 Client must call either run() to start a new process or
770 attach_to_existing_process().
771 """
mbligh36768f02008-02-22 18:28:33 +0000772
showard170873e2009-01-07 00:22:26 +0000773 class _PidfileException(Exception):
774 """
775 Raised when there's some unexpected behavior with the pid file, but only
776 used internally (never allowed to escape this class).
777 """
mbligh36768f02008-02-22 18:28:33 +0000778
779
showard170873e2009-01-07 00:22:26 +0000780 def __init__(self):
showard35162b02009-03-03 02:17:30 +0000781 self.lost_process = False
showard170873e2009-01-07 00:22:26 +0000782 self._start_time = None
783 self.pidfile_id = None
784 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +0000785
786
showard170873e2009-01-07 00:22:26 +0000787 def _add_nice_command(self, command, nice_level):
788 if not nice_level:
789 return command
790 return ['nice', '-n', str(nice_level)] + command
791
792
793 def _set_start_time(self):
794 self._start_time = time.time()
795
796
797 def run(self, command, working_directory, nice_level=None, log_file=None,
798 pidfile_name=None, paired_with_pidfile=None):
799 assert command is not None
800 if nice_level is not None:
801 command = ['nice', '-n', str(nice_level)] + command
802 self._set_start_time()
803 self.pidfile_id = _drone_manager.execute_command(
804 command, working_directory, log_file=log_file,
805 pidfile_name=pidfile_name, paired_with_pidfile=paired_with_pidfile)
806
807
808 def attach_to_existing_process(self, execution_tag):
809 self._set_start_time()
810 self.pidfile_id = _drone_manager.get_pidfile_id_from(execution_tag)
811 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +0000812
813
jadmanski0afbb632008-06-06 21:10:57 +0000814 def kill(self):
showard170873e2009-01-07 00:22:26 +0000815 if self.has_process():
816 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +0000817
mbligh36768f02008-02-22 18:28:33 +0000818
showard170873e2009-01-07 00:22:26 +0000819 def has_process(self):
showard21baa452008-10-21 00:08:39 +0000820 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +0000821 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +0000822
823
showard170873e2009-01-07 00:22:26 +0000824 def get_process(self):
showard21baa452008-10-21 00:08:39 +0000825 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +0000826 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +0000827 return self._state.process
mblighbb421852008-03-11 22:36:16 +0000828
829
showard170873e2009-01-07 00:22:26 +0000830 def _read_pidfile(self, use_second_read=False):
831 assert self.pidfile_id is not None, (
832 'You must call run() or attach_to_existing_process()')
833 contents = _drone_manager.get_pidfile_contents(
834 self.pidfile_id, use_second_read=use_second_read)
835 if contents.is_invalid():
836 self._state = drone_manager.PidfileContents()
837 raise self._PidfileException(contents)
838 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +0000839
840
showard21baa452008-10-21 00:08:39 +0000841 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +0000842 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
843 self._state.process, self.pidfile_id, message)
showard21baa452008-10-21 00:08:39 +0000844 print message
showard170873e2009-01-07 00:22:26 +0000845 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +0000846 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +0000847
848
849 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +0000850 if self.lost_process:
showard21baa452008-10-21 00:08:39 +0000851 return
mblighbb421852008-03-11 22:36:16 +0000852
showard21baa452008-10-21 00:08:39 +0000853 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +0000854
showard170873e2009-01-07 00:22:26 +0000855 if self._state.process is None:
856 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +0000857 return
mbligh90a549d2008-03-25 23:52:34 +0000858
showard21baa452008-10-21 00:08:39 +0000859 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000860 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +0000861 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +0000862 return
mbligh90a549d2008-03-25 23:52:34 +0000863
showard170873e2009-01-07 00:22:26 +0000864 # pid but no running process - maybe process *just* exited
865 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +0000866 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000867 # autoserv exited without writing an exit code
868 # to the pidfile
showard21baa452008-10-21 00:08:39 +0000869 self._handle_pidfile_error(
870 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +0000871
showard21baa452008-10-21 00:08:39 +0000872
873 def _get_pidfile_info(self):
874 """\
875 After completion, self._state will contain:
876 pid=None, exit_status=None if autoserv has not yet run
877 pid!=None, exit_status=None if autoserv is running
878 pid!=None, exit_status!=None if autoserv has completed
879 """
880 try:
881 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +0000882 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +0000883 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +0000884
885
showard170873e2009-01-07 00:22:26 +0000886 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +0000887 """\
888 Called when no pidfile is found or no pid is in the pidfile.
889 """
showard170873e2009-01-07 00:22:26 +0000890 message = 'No pid found at %s' % self.pidfile_id
jadmanski0afbb632008-06-06 21:10:57 +0000891 print message
showard170873e2009-01-07 00:22:26 +0000892 if time.time() - self._start_time > PIDFILE_TIMEOUT:
893 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +0000894 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +0000895 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +0000896
897
showard35162b02009-03-03 02:17:30 +0000898 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +0000899 """\
900 Called when autoserv has exited without writing an exit status,
901 or we've timed out waiting for autoserv to write a pid to the
902 pidfile. In either case, we just return failure and the caller
903 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +0000904
showard170873e2009-01-07 00:22:26 +0000905 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +0000906 """
907 self.lost_process = True
showard170873e2009-01-07 00:22:26 +0000908 self._state.process = process
showard21baa452008-10-21 00:08:39 +0000909 self._state.exit_status = 1
910 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +0000911
912
jadmanski0afbb632008-06-06 21:10:57 +0000913 def exit_code(self):
showard21baa452008-10-21 00:08:39 +0000914 self._get_pidfile_info()
915 return self._state.exit_status
916
917
918 def num_tests_failed(self):
919 self._get_pidfile_info()
920 assert self._state.num_tests_failed is not None
921 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +0000922
923
mbligh36768f02008-02-22 18:28:33 +0000924class Agent(object):
showard170873e2009-01-07 00:22:26 +0000925 def __init__(self, tasks, num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +0000926 self.active_task = None
927 self.queue = Queue.Queue(0)
928 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +0000929 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +0000930
showard170873e2009-01-07 00:22:26 +0000931 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
932 for task in tasks)
933 self.host_ids = self._union_ids(task.host_ids for task in tasks)
934
jadmanski0afbb632008-06-06 21:10:57 +0000935 for task in tasks:
936 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +0000937
938
showard170873e2009-01-07 00:22:26 +0000939 def _union_ids(self, id_lists):
940 return set(itertools.chain(*id_lists))
941
942
jadmanski0afbb632008-06-06 21:10:57 +0000943 def add_task(self, task):
944 self.queue.put_nowait(task)
945 task.agent = self
mbligh36768f02008-02-22 18:28:33 +0000946
947
jadmanski0afbb632008-06-06 21:10:57 +0000948 def tick(self):
showard21baa452008-10-21 00:08:39 +0000949 while not self.is_done():
950 if self.active_task and not self.active_task.is_done():
951 self.active_task.poll()
952 if not self.active_task.is_done():
953 return
954 self._next_task()
mbligh36768f02008-02-22 18:28:33 +0000955
956
jadmanski0afbb632008-06-06 21:10:57 +0000957 def _next_task(self):
958 print "agent picking task"
959 if self.active_task:
960 assert self.active_task.is_done()
mbligh36768f02008-02-22 18:28:33 +0000961
jadmanski0afbb632008-06-06 21:10:57 +0000962 if not self.active_task.success:
963 self.on_task_failure()
mblighe2586682008-02-29 22:45:46 +0000964
jadmanski0afbb632008-06-06 21:10:57 +0000965 self.active_task = None
966 if not self.is_done():
967 self.active_task = self.queue.get_nowait()
968 if self.active_task:
969 self.active_task.start()
mbligh36768f02008-02-22 18:28:33 +0000970
971
jadmanski0afbb632008-06-06 21:10:57 +0000972 def on_task_failure(self):
973 self.queue = Queue.Queue(0)
974 for task in self.active_task.failure_tasks:
975 self.add_task(task)
mbligh16c722d2008-03-05 00:58:44 +0000976
mblighe2586682008-02-29 22:45:46 +0000977
showard4c5374f2008-09-04 17:02:56 +0000978 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +0000979 return self.active_task is not None
showardec113162008-05-08 00:52:49 +0000980
981
jadmanski0afbb632008-06-06 21:10:57 +0000982 def is_done(self):
mblighd876f452008-12-03 15:09:17 +0000983 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +0000984
985
jadmanski0afbb632008-06-06 21:10:57 +0000986 def start(self):
987 assert self.dispatcher
mbligh36768f02008-02-22 18:28:33 +0000988
jadmanski0afbb632008-06-06 21:10:57 +0000989 self._next_task()
mbligh36768f02008-02-22 18:28:33 +0000990
jadmanski0afbb632008-06-06 21:10:57 +0000991
mbligh36768f02008-02-22 18:28:33 +0000992class AgentTask(object):
showard170873e2009-01-07 00:22:26 +0000993 def __init__(self, cmd, working_directory=None, failure_tasks=[]):
jadmanski0afbb632008-06-06 21:10:57 +0000994 self.done = False
995 self.failure_tasks = failure_tasks
996 self.started = False
997 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +0000998 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +0000999 self.task = None
1000 self.agent = None
1001 self.monitor = None
1002 self.success = None
showard170873e2009-01-07 00:22:26 +00001003 self.queue_entry_ids = []
1004 self.host_ids = []
1005 self.log_file = None
1006
1007
1008 def _set_ids(self, host=None, queue_entries=None):
1009 if queue_entries and queue_entries != [None]:
1010 self.host_ids = [entry.host.id for entry in queue_entries]
1011 self.queue_entry_ids = [entry.id for entry in queue_entries]
1012 else:
1013 assert host
1014 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001015
1016
jadmanski0afbb632008-06-06 21:10:57 +00001017 def poll(self):
jadmanski0afbb632008-06-06 21:10:57 +00001018 if self.monitor:
1019 self.tick(self.monitor.exit_code())
1020 else:
1021 self.finished(False)
mbligh36768f02008-02-22 18:28:33 +00001022
1023
jadmanski0afbb632008-06-06 21:10:57 +00001024 def tick(self, exit_code):
showard170873e2009-01-07 00:22:26 +00001025 if exit_code is None:
jadmanski0afbb632008-06-06 21:10:57 +00001026 return
jadmanski0afbb632008-06-06 21:10:57 +00001027 if exit_code == 0:
1028 success = True
1029 else:
1030 success = False
mbligh36768f02008-02-22 18:28:33 +00001031
jadmanski0afbb632008-06-06 21:10:57 +00001032 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001033
1034
jadmanski0afbb632008-06-06 21:10:57 +00001035 def is_done(self):
1036 return self.done
mbligh36768f02008-02-22 18:28:33 +00001037
1038
jadmanski0afbb632008-06-06 21:10:57 +00001039 def finished(self, success):
1040 self.done = True
1041 self.success = success
1042 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001043
1044
jadmanski0afbb632008-06-06 21:10:57 +00001045 def prolog(self):
1046 pass
mblighd64e5702008-04-04 21:39:28 +00001047
1048
jadmanski0afbb632008-06-06 21:10:57 +00001049 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001050 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001051
mbligh36768f02008-02-22 18:28:33 +00001052
jadmanski0afbb632008-06-06 21:10:57 +00001053 def cleanup(self):
showard170873e2009-01-07 00:22:26 +00001054 if self.monitor and self.log_file:
1055 _drone_manager.copy_to_results_repository(
1056 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001057
1058
jadmanski0afbb632008-06-06 21:10:57 +00001059 def epilog(self):
1060 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001061
1062
jadmanski0afbb632008-06-06 21:10:57 +00001063 def start(self):
1064 assert self.agent
1065
1066 if not self.started:
1067 self.prolog()
1068 self.run()
1069
1070 self.started = True
1071
1072
1073 def abort(self):
1074 if self.monitor:
1075 self.monitor.kill()
1076 self.done = True
1077 self.cleanup()
1078
1079
showard170873e2009-01-07 00:22:26 +00001080 def set_host_log_file(self, base_name, host):
1081 filename = '%s.%s' % (time.time(), base_name)
1082 self.log_file = os.path.join('hosts', host.hostname, filename)
1083
1084
showardde634ee2009-01-30 01:44:24 +00001085 def _get_consistent_execution_tag(self, queue_entries):
1086 first_execution_tag = queue_entries[0].execution_tag()
1087 for queue_entry in queue_entries[1:]:
1088 assert queue_entry.execution_tag() == first_execution_tag, (
1089 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1090 queue_entry,
1091 first_execution_tag,
1092 queue_entries[0]))
1093 return first_execution_tag
1094
1095
showard678df4f2009-02-04 21:36:39 +00001096 def _copy_and_parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001097 assert len(queue_entries) > 0
1098 assert self.monitor
1099 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001100 results_path = execution_tag + '/'
1101 _drone_manager.copy_to_results_repository(self.monitor.get_process(),
1102 results_path)
showardde634ee2009-01-30 01:44:24 +00001103
1104 reparse_task = FinalReparseTask(queue_entries)
1105 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1106
1107
jadmanski0afbb632008-06-06 21:10:57 +00001108 def run(self):
1109 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001110 self.monitor = PidfileRunMonitor()
1111 self.monitor.run(self.cmd, self._working_directory,
1112 nice_level=AUTOSERV_NICE_LEVEL,
1113 log_file=self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001114
1115
1116class RepairTask(AgentTask):
showarde788ea62008-11-17 21:02:47 +00001117 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001118 """\
showard170873e2009-01-07 00:22:26 +00001119 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001120 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001121 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001122 # normalize the protection name
1123 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001124
jadmanski0afbb632008-06-06 21:10:57 +00001125 self.host = host
showarde788ea62008-11-17 21:02:47 +00001126 self.queue_entry = queue_entry
showard170873e2009-01-07 00:22:26 +00001127 self._set_ids(host=host, queue_entries=[queue_entry])
1128
1129 self.create_temp_resultsdir('.repair')
1130 cmd = [_autoserv_path , '-p', '-R', '-m', host.hostname,
1131 '-r', _drone_manager.absolute_path(self.temp_results_dir),
1132 '--host-protection', protection]
1133 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1134
1135 self._set_ids(host=host, queue_entries=[queue_entry])
1136 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001137
mbligh36768f02008-02-22 18:28:33 +00001138
jadmanski0afbb632008-06-06 21:10:57 +00001139 def prolog(self):
1140 print "repair_task starting"
1141 self.host.set_status('Repairing')
showarde788ea62008-11-17 21:02:47 +00001142 if self.queue_entry:
1143 self.queue_entry.requeue()
mbligh36768f02008-02-22 18:28:33 +00001144
1145
showardde634ee2009-01-30 01:44:24 +00001146 def _fail_queue_entry(self):
1147 assert self.queue_entry
1148 self.queue_entry.set_execution_subdir()
showard678df4f2009-02-04 21:36:39 +00001149 # copy results logs into the normal place for job results
1150 _drone_manager.copy_results_on_drone(
1151 self.monitor.get_process(),
1152 source_path=self.temp_results_dir + '/',
1153 destination_path=self.queue_entry.execution_tag() + '/')
1154
1155 self._copy_and_parse_results([self.queue_entry])
showardde634ee2009-01-30 01:44:24 +00001156 self.queue_entry.handle_host_failure()
1157
1158
jadmanski0afbb632008-06-06 21:10:57 +00001159 def epilog(self):
1160 super(RepairTask, self).epilog()
1161 if self.success:
1162 self.host.set_status('Ready')
1163 else:
1164 self.host.set_status('Repair Failed')
showarde788ea62008-11-17 21:02:47 +00001165 if self.queue_entry and not self.queue_entry.meta_host:
showardde634ee2009-01-30 01:44:24 +00001166 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001167
1168
showard8fe93b52008-11-18 17:53:22 +00001169class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001170 def epilog(self):
1171 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001172 should_copy_results = (self.queue_entry and not self.success
1173 and not self.queue_entry.meta_host)
1174 if should_copy_results:
1175 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001176 destination = os.path.join(self.queue_entry.execution_tag(),
1177 os.path.basename(self.log_file))
1178 _drone_manager.copy_to_results_repository(
1179 self.monitor.get_process(), self.log_file,
1180 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001181
1182
1183class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001184 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001185 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001186 self.host = host or queue_entry.host
1187 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001188
jadmanski0afbb632008-06-06 21:10:57 +00001189 self.create_temp_resultsdir('.verify')
showard170873e2009-01-07 00:22:26 +00001190 cmd = [_autoserv_path, '-p', '-v', '-m', self.host.hostname, '-r',
1191 _drone_manager.absolute_path(self.temp_results_dir)]
showarde788ea62008-11-17 21:02:47 +00001192 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001193 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1194 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001195
showard170873e2009-01-07 00:22:26 +00001196 self.set_host_log_file('verify', self.host)
1197 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001198
1199
jadmanski0afbb632008-06-06 21:10:57 +00001200 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001201 super(VerifyTask, self).prolog()
jadmanski0afbb632008-06-06 21:10:57 +00001202 print "starting verify on %s" % (self.host.hostname)
1203 if self.queue_entry:
1204 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001205 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001206
1207
jadmanski0afbb632008-06-06 21:10:57 +00001208 def epilog(self):
1209 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001210
jadmanski0afbb632008-06-06 21:10:57 +00001211 if self.success:
1212 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001213
1214
mbligh36768f02008-02-22 18:28:33 +00001215class QueueTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001216 def __init__(self, job, queue_entries, cmd):
jadmanski0afbb632008-06-06 21:10:57 +00001217 self.job = job
1218 self.queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001219 super(QueueTask, self).__init__(cmd, self._execution_tag())
1220 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001221
1222
showard170873e2009-01-07 00:22:26 +00001223 def _format_keyval(self, key, value):
1224 return '%s=%s' % (key, value)
mbligh36768f02008-02-22 18:28:33 +00001225
1226
showard73ec0442009-02-07 02:05:20 +00001227 def _keyval_path(self):
1228 return os.path.join(self._execution_tag(), 'keyval')
1229
1230
1231 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1232 keyval_contents = '\n'.join(self._format_keyval(key, value)
1233 for key, value in keyval_dict.iteritems())
1234 # always end with a newline to allow additional keyvals to be written
1235 keyval_contents += '\n'
1236 _drone_manager.attach_file_to_execution(self._execution_tag(),
1237 keyval_contents,
1238 file_path=keyval_path)
1239
1240
1241 def _write_keyvals_before_job(self, keyval_dict):
1242 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1243
1244
1245 def _write_keyval_after_job(self, field, value):
showard170873e2009-01-07 00:22:26 +00001246 assert self.monitor and self.monitor.has_process()
showard170873e2009-01-07 00:22:26 +00001247 _drone_manager.write_lines_to_file(
showard73ec0442009-02-07 02:05:20 +00001248 self._keyval_path(), [self._format_keyval(field, value)],
showard35162b02009-03-03 02:17:30 +00001249 paired_with_process=self.monitor.get_process())
showardd8e548a2008-09-09 03:04:57 +00001250
1251
showard170873e2009-01-07 00:22:26 +00001252 def _write_host_keyvals(self, host):
1253 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1254 host.hostname)
1255 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001256 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1257 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001258
1259
showard170873e2009-01-07 00:22:26 +00001260 def _execution_tag(self):
1261 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001262
1263
jadmanski0afbb632008-06-06 21:10:57 +00001264 def prolog(self):
showard73ec0442009-02-07 02:05:20 +00001265 queued = int(time.mktime(self.job.created_on.timetuple()))
1266 self._write_keyvals_before_job({'job_queued': queued})
jadmanski0afbb632008-06-06 21:10:57 +00001267 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001268 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001269 queue_entry.set_status('Running')
1270 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001271 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001272 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001273 assert len(self.queue_entries) == 1
1274 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001275
1276
showard35162b02009-03-03 02:17:30 +00001277 def _write_lost_process_error_file(self):
1278 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1279 _drone_manager.write_lines_to_file(error_file_path,
1280 [_LOST_PROCESS_ERROR])
1281
1282
showard97aed502008-11-04 02:01:24 +00001283 def _finish_task(self, success):
showard35162b02009-03-03 02:17:30 +00001284 if self.monitor.has_process():
1285 self._write_keyval_after_job("job_finished", int(time.time()))
1286 self._copy_and_parse_results(self.queue_entries)
1287
1288 if self.monitor.lost_process:
1289 self._write_lost_process_error_file()
1290 for queue_entry in self.queue_entries:
1291 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001292
1293
showardcbd74612008-11-19 21:42:02 +00001294 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001295 _drone_manager.write_lines_to_file(
1296 os.path.join(self._execution_tag(), 'status.log'),
1297 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001298 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001299
1300
jadmanskif7fa2cc2008-10-01 14:13:23 +00001301 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001302 if not self.monitor or not self.monitor.has_process():
1303 return
1304
jadmanskif7fa2cc2008-10-01 14:13:23 +00001305 # build up sets of all the aborted_by and aborted_on values
1306 aborted_by, aborted_on = set(), set()
1307 for queue_entry in self.queue_entries:
1308 if queue_entry.aborted_by:
1309 aborted_by.add(queue_entry.aborted_by)
1310 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1311 aborted_on.add(t)
1312
1313 # extract some actual, unique aborted by value and write it out
1314 assert len(aborted_by) <= 1
1315 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001316 aborted_by_value = aborted_by.pop()
1317 aborted_on_value = max(aborted_on)
1318 else:
1319 aborted_by_value = 'autotest_system'
1320 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001321
showarda0382352009-02-11 23:36:43 +00001322 self._write_keyval_after_job("aborted_by", aborted_by_value)
1323 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001324
showardcbd74612008-11-19 21:42:02 +00001325 aborted_on_string = str(datetime.datetime.fromtimestamp(
1326 aborted_on_value))
1327 self._write_status_comment('Job aborted by %s on %s' %
1328 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001329
1330
jadmanski0afbb632008-06-06 21:10:57 +00001331 def abort(self):
1332 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001333 self._log_abort()
showard97aed502008-11-04 02:01:24 +00001334 self._finish_task(False)
jadmanskic2ac77f2008-05-16 21:44:04 +00001335
1336
showard21baa452008-10-21 00:08:39 +00001337 def _reboot_hosts(self):
1338 reboot_after = self.job.reboot_after
1339 do_reboot = False
showard0fc38302008-10-23 00:44:07 +00001340 if reboot_after == models.RebootAfter.ALWAYS:
showard21baa452008-10-21 00:08:39 +00001341 do_reboot = True
showard0fc38302008-10-23 00:44:07 +00001342 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
showard21baa452008-10-21 00:08:39 +00001343 num_tests_failed = self.monitor.num_tests_failed()
1344 do_reboot = (self.success and num_tests_failed == 0)
1345
showard8ebca792008-11-04 21:54:22 +00001346 for queue_entry in self.queue_entries:
1347 if do_reboot:
showard45ae8192008-11-05 19:32:53 +00001348 # don't pass the queue entry to the CleanupTask. if the cleanup
showardfa8629c2008-11-04 16:51:23 +00001349 # fails, the job doesn't care -- it's over.
showard45ae8192008-11-05 19:32:53 +00001350 cleanup_task = CleanupTask(host=queue_entry.get_host())
1351 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
showard8ebca792008-11-04 21:54:22 +00001352 else:
1353 queue_entry.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001354
1355
jadmanski0afbb632008-06-06 21:10:57 +00001356 def epilog(self):
1357 super(QueueTask, self).epilog()
showard97aed502008-11-04 02:01:24 +00001358 self._finish_task(self.success)
showard21baa452008-10-21 00:08:39 +00001359 self._reboot_hosts()
mblighbb421852008-03-11 22:36:16 +00001360
showard97aed502008-11-04 02:01:24 +00001361 print "queue_task finished with succes=%s" % self.success
mbligh36768f02008-02-22 18:28:33 +00001362
1363
mblighbb421852008-03-11 22:36:16 +00001364class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001365 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001366 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
jadmanski0afbb632008-06-06 21:10:57 +00001367 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001368
1369
jadmanski0afbb632008-06-06 21:10:57 +00001370 def run(self):
1371 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001372
1373
jadmanski0afbb632008-06-06 21:10:57 +00001374 def prolog(self):
1375 # recovering an existing process - don't do prolog
1376 pass
mblighbb421852008-03-11 22:36:16 +00001377
1378
showard8fe93b52008-11-18 17:53:22 +00001379class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001380 def __init__(self, host=None, queue_entry=None):
1381 assert bool(host) ^ bool(queue_entry)
1382 if queue_entry:
1383 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001384 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001385 self.host = host
showard170873e2009-01-07 00:22:26 +00001386
1387 self.create_temp_resultsdir('.cleanup')
1388 self.cmd = [_autoserv_path, '-p', '--cleanup', '-m', host.hostname,
1389 '-r', _drone_manager.absolute_path(self.temp_results_dir)]
showarde788ea62008-11-17 21:02:47 +00001390 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001391 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1392 failure_tasks=[repair_task])
1393
1394 self._set_ids(host=host, queue_entries=[queue_entry])
1395 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001396
mblighd5c95802008-03-05 00:33:46 +00001397
jadmanski0afbb632008-06-06 21:10:57 +00001398 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001399 super(CleanupTask, self).prolog()
showard45ae8192008-11-05 19:32:53 +00001400 print "starting cleanup task for host: %s" % self.host.hostname
1401 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001402
mblighd5c95802008-03-05 00:33:46 +00001403
showard21baa452008-10-21 00:08:39 +00001404 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001405 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001406 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001407 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001408 self.host.update_field('dirty', 0)
1409
1410
mblighd5c95802008-03-05 00:33:46 +00001411class AbortTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001412 def __init__(self, queue_entry, agents_to_abort):
jadmanski0afbb632008-06-06 21:10:57 +00001413 super(AbortTask, self).__init__('')
showard170873e2009-01-07 00:22:26 +00001414 self.queue_entry = queue_entry
1415 # don't use _set_ids, since we don't want to set the host_ids
1416 self.queue_entry_ids = [queue_entry.id]
1417 self.agents_to_abort = agents_to_abort
mbligh36768f02008-02-22 18:28:33 +00001418
1419
jadmanski0afbb632008-06-06 21:10:57 +00001420 def prolog(self):
1421 print "starting abort on host %s, job %s" % (
1422 self.queue_entry.host_id, self.queue_entry.job_id)
mbligh36768f02008-02-22 18:28:33 +00001423
mblighd64e5702008-04-04 21:39:28 +00001424
jadmanski0afbb632008-06-06 21:10:57 +00001425 def epilog(self):
1426 super(AbortTask, self).epilog()
1427 self.queue_entry.set_status('Aborted')
1428 self.success = True
1429
1430
1431 def run(self):
1432 for agent in self.agents_to_abort:
1433 if (agent.active_task):
1434 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +00001435
1436
showard97aed502008-11-04 02:01:24 +00001437class FinalReparseTask(AgentTask):
showard97aed502008-11-04 02:01:24 +00001438 _num_running_parses = 0
1439
1440 def __init__(self, queue_entries):
1441 self._queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001442 # don't use _set_ids, since we don't want to set the host_ids
1443 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00001444 self._parse_started = False
1445
1446 assert len(queue_entries) > 0
1447 queue_entry = queue_entries[0]
showard97aed502008-11-04 02:01:24 +00001448
showard170873e2009-01-07 00:22:26 +00001449 self._execution_tag = queue_entry.execution_tag()
1450 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1451 self._autoserv_monitor = PidfileRunMonitor()
1452 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1453 self._final_status = self._determine_final_status()
1454
showard97aed502008-11-04 02:01:24 +00001455 if _testing_mode:
1456 self.cmd = 'true'
showard170873e2009-01-07 00:22:26 +00001457 else:
1458 super(FinalReparseTask, self).__init__(
1459 cmd=self._generate_parse_command(),
1460 working_directory=self._execution_tag)
showard97aed502008-11-04 02:01:24 +00001461
showard170873e2009-01-07 00:22:26 +00001462 self.log_file = os.path.join(self._execution_tag, '.parse.log')
showard97aed502008-11-04 02:01:24 +00001463
1464
1465 @classmethod
1466 def _increment_running_parses(cls):
1467 cls._num_running_parses += 1
1468
1469
1470 @classmethod
1471 def _decrement_running_parses(cls):
1472 cls._num_running_parses -= 1
1473
1474
1475 @classmethod
1476 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00001477 return (cls._num_running_parses <
1478 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00001479
1480
showard170873e2009-01-07 00:22:26 +00001481 def _determine_final_status(self):
1482 # we'll use a PidfileRunMonitor to read the autoserv exit status
1483 if self._autoserv_monitor.exit_code() == 0:
1484 return models.HostQueueEntry.Status.COMPLETED
1485 return models.HostQueueEntry.Status.FAILED
1486
1487
showard97aed502008-11-04 02:01:24 +00001488 def prolog(self):
1489 super(FinalReparseTask, self).prolog()
1490 for queue_entry in self._queue_entries:
1491 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
1492
1493
1494 def epilog(self):
1495 super(FinalReparseTask, self).epilog()
showard97aed502008-11-04 02:01:24 +00001496 for queue_entry in self._queue_entries:
showard170873e2009-01-07 00:22:26 +00001497 queue_entry.set_status(self._final_status)
showard97aed502008-11-04 02:01:24 +00001498
1499
showard2bab8f42008-11-12 18:15:22 +00001500 def _generate_parse_command(self):
showard170873e2009-01-07 00:22:26 +00001501 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
1502 self._results_dir]
showard97aed502008-11-04 02:01:24 +00001503
1504
1505 def poll(self):
1506 # override poll to keep trying to start until the parse count goes down
1507 # and we can, at which point we revert to default behavior
1508 if self._parse_started:
1509 super(FinalReparseTask, self).poll()
1510 else:
1511 self._try_starting_parse()
1512
1513
1514 def run(self):
1515 # override run() to not actually run unless we can
1516 self._try_starting_parse()
1517
1518
1519 def _try_starting_parse(self):
1520 if not self._can_run_new_parse():
1521 return
showard170873e2009-01-07 00:22:26 +00001522
showard678df4f2009-02-04 21:36:39 +00001523 # make sure we actually have results to parse
showard35162b02009-03-03 02:17:30 +00001524 # this should never happen in normal operation
showard678df4f2009-02-04 21:36:39 +00001525 if not self._autoserv_monitor.has_process():
1526 email_manager.manager.enqueue_notify_email(
1527 'No results to parse',
1528 'No results to parse at %s' % self._autoserv_monitor.pidfile_id)
1529 self.finished(False)
1530 return
1531
showard97aed502008-11-04 02:01:24 +00001532 # actually run the parse command
showard170873e2009-01-07 00:22:26 +00001533 self.monitor = PidfileRunMonitor()
1534 self.monitor.run(self.cmd, self._working_directory,
1535 log_file=self.log_file,
1536 pidfile_name='.parser_execute',
1537 paired_with_pidfile=self._autoserv_monitor.pidfile_id)
1538
showard97aed502008-11-04 02:01:24 +00001539 self._increment_running_parses()
1540 self._parse_started = True
1541
1542
1543 def finished(self, success):
1544 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00001545 if self._parse_started:
1546 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00001547
1548
showardc9ae1782009-01-30 01:42:37 +00001549class SetEntryPendingTask(AgentTask):
1550 def __init__(self, queue_entry):
1551 super(SetEntryPendingTask, self).__init__(cmd='')
1552 self._queue_entry = queue_entry
1553 self._set_ids(queue_entries=[queue_entry])
1554
1555
1556 def run(self):
1557 agent = self._queue_entry.on_pending()
1558 if agent:
1559 self.agent.dispatcher.add_agent(agent)
1560 self.finished(True)
1561
1562
showarda3c58572009-03-12 20:36:59 +00001563class DBError(Exception):
1564 """Raised by the DBObject constructor when its select fails."""
1565
1566
mbligh36768f02008-02-22 18:28:33 +00001567class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00001568 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00001569
1570 # Subclasses MUST override these:
1571 _table_name = ''
1572 _fields = ()
1573
showarda3c58572009-03-12 20:36:59 +00001574 # A mapping from (type, id) to the instance of the object for that
1575 # particular id. This prevents us from creating new Job() and Host()
1576 # instances for every HostQueueEntry object that we instantiate as
1577 # multiple HQEs often share the same Job.
1578 _instances_by_type_and_id = weakref.WeakValueDictionary()
1579 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00001580
showarda3c58572009-03-12 20:36:59 +00001581
1582 def __new__(cls, id=None, **kwargs):
1583 """
1584 Look to see if we already have an instance for this particular type
1585 and id. If so, use it instead of creating a duplicate instance.
1586 """
1587 if id is not None:
1588 instance = cls._instances_by_type_and_id.get((cls, id))
1589 if instance:
1590 return instance
1591 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
1592
1593
1594 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00001595 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00001596 assert self._table_name, '_table_name must be defined in your class'
1597 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00001598 if not new_record:
1599 if self._initialized and not always_query:
1600 return # We've already been initialized.
1601 if id is None:
1602 id = row[0]
1603 # Tell future constructors to use us instead of re-querying while
1604 # this instance is still around.
1605 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00001606
showard6ae5ea92009-02-25 00:11:51 +00001607 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00001608
jadmanski0afbb632008-06-06 21:10:57 +00001609 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00001610
jadmanski0afbb632008-06-06 21:10:57 +00001611 if row is None:
1612 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
1613 rows = _db.execute(sql, (id,))
showarda3c58572009-03-12 20:36:59 +00001614 if not rows:
1615 raise DBError("row not found (table=%s, id=%s)"
1616 % (self.__table, id))
jadmanski0afbb632008-06-06 21:10:57 +00001617 row = rows[0]
mbligh36768f02008-02-22 18:28:33 +00001618
showarda3c58572009-03-12 20:36:59 +00001619 if self._initialized:
1620 differences = self._compare_fields_in_row(row)
1621 if differences:
1622 print ('initialized %s %s instance requery is updating: %s' %
1623 (type(self), self.id, differences))
showard2bab8f42008-11-12 18:15:22 +00001624 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00001625 self._initialized = True
1626
1627
1628 @classmethod
1629 def _clear_instance_cache(cls):
1630 """Used for testing, clear the internal instance cache."""
1631 cls._instances_by_type_and_id.clear()
1632
1633
1634 def _assert_row_length(self, row):
1635 assert len(row) == len(self._fields), (
1636 "table = %s, row = %s/%d, fields = %s/%d" % (
1637 self.__table, row, len(row), self._fields, len(self._fields)))
1638
1639
1640 def _compare_fields_in_row(self, row):
1641 """
1642 Given a row as returned by a SELECT query, compare it to our existing
1643 in memory fields.
1644
1645 @param row - A sequence of values corresponding to fields named in
1646 The class attribute _fields.
1647
1648 @returns A dictionary listing the differences keyed by field name
1649 containing tuples of (current_value, row_value).
1650 """
1651 self._assert_row_length(row)
1652 differences = {}
1653 for field, row_value in itertools.izip(self._fields, row):
1654 current_value = getattr(self, field)
1655 if current_value != row_value:
1656 differences[field] = (current_value, row_value)
1657 return differences
showard2bab8f42008-11-12 18:15:22 +00001658
1659
1660 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00001661 """
1662 Update our field attributes using a single row returned by SELECT.
1663
1664 @param row - A sequence of values corresponding to fields named in
1665 the class fields list.
1666 """
1667 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00001668
showard2bab8f42008-11-12 18:15:22 +00001669 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00001670 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00001671 setattr(self, field, value)
1672 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00001673
showard2bab8f42008-11-12 18:15:22 +00001674 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00001675
mblighe2586682008-02-29 22:45:46 +00001676
jadmanski0afbb632008-06-06 21:10:57 +00001677 def count(self, where, table = None):
1678 if not table:
1679 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00001680
jadmanski0afbb632008-06-06 21:10:57 +00001681 rows = _db.execute("""
1682 SELECT count(*) FROM %s
1683 WHERE %s
1684 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00001685
jadmanski0afbb632008-06-06 21:10:57 +00001686 assert len(rows) == 1
1687
1688 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001689
1690
mblighf8c624d2008-07-03 16:58:45 +00001691 def update_field(self, field, value, condition=''):
showard2bab8f42008-11-12 18:15:22 +00001692 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00001693
showard2bab8f42008-11-12 18:15:22 +00001694 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00001695 return
mbligh36768f02008-02-22 18:28:33 +00001696
mblighf8c624d2008-07-03 16:58:45 +00001697 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
1698 if condition:
1699 query += ' AND (%s)' % condition
jadmanski0afbb632008-06-06 21:10:57 +00001700 _db.execute(query, (value, self.id))
1701
showard2bab8f42008-11-12 18:15:22 +00001702 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00001703
1704
jadmanski0afbb632008-06-06 21:10:57 +00001705 def save(self):
1706 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00001707 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00001708 columns = ','.join([str(key) for key in keys])
1709 values = ['"%s"' % self.__dict__[key] for key in keys]
1710 values = ','.join(values)
1711 query = """INSERT INTO %s (%s) VALUES (%s)""" % \
1712 (self.__table, columns, values)
1713 _db.execute(query)
mbligh36768f02008-02-22 18:28:33 +00001714
1715
jadmanski0afbb632008-06-06 21:10:57 +00001716 def delete(self):
showarda3c58572009-03-12 20:36:59 +00001717 self._instances_by_type_and_id.pop((type(self), id), None)
1718 self._initialized = False
1719 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00001720 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
1721 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00001722
1723
showard63a34772008-08-18 19:32:50 +00001724 @staticmethod
1725 def _prefix_with(string, prefix):
1726 if string:
1727 string = prefix + string
1728 return string
1729
1730
jadmanski0afbb632008-06-06 21:10:57 +00001731 @classmethod
showard989f25d2008-10-01 11:38:11 +00001732 def fetch(cls, where='', params=(), joins='', order_by=''):
showard63a34772008-08-18 19:32:50 +00001733 order_by = cls._prefix_with(order_by, 'ORDER BY ')
1734 where = cls._prefix_with(where, 'WHERE ')
1735 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00001736 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00001737 'joins' : joins,
1738 'where' : where,
1739 'order_by' : order_by})
1740 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00001741 for row in rows:
1742 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00001743
mbligh36768f02008-02-22 18:28:33 +00001744
1745class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001746 _table_name = 'ineligible_host_queues'
1747 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00001748
1749
showard989f25d2008-10-01 11:38:11 +00001750class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001751 _table_name = 'labels'
1752 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
1753 'only_if_needed')
showard989f25d2008-10-01 11:38:11 +00001754
1755
mbligh36768f02008-02-22 18:28:33 +00001756class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001757 _table_name = 'hosts'
1758 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
1759 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
1760
1761
jadmanski0afbb632008-06-06 21:10:57 +00001762 def current_task(self):
1763 rows = _db.execute("""
1764 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
1765 """, (self.id,))
1766
1767 if len(rows) == 0:
1768 return None
1769 else:
1770 assert len(rows) == 1
1771 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00001772 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00001773
1774
jadmanski0afbb632008-06-06 21:10:57 +00001775 def yield_work(self):
1776 print "%s yielding work" % self.hostname
1777 if self.current_task():
1778 self.current_task().requeue()
1779
showard6ae5ea92009-02-25 00:11:51 +00001780
jadmanski0afbb632008-06-06 21:10:57 +00001781 def set_status(self,status):
1782 print '%s -> %s' % (self.hostname, status)
1783 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00001784
1785
showard170873e2009-01-07 00:22:26 +00001786 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00001787 """
showard170873e2009-01-07 00:22:26 +00001788 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00001789 """
1790 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00001791 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00001792 FROM labels
1793 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00001794 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00001795 ORDER BY labels.name
1796 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00001797 platform = None
1798 all_labels = []
1799 for label_name, is_platform in rows:
1800 if is_platform:
1801 platform = label_name
1802 all_labels.append(label_name)
1803 return platform, all_labels
1804
1805
1806 def reverify_tasks(self):
1807 cleanup_task = CleanupTask(host=self)
1808 verify_task = VerifyTask(host=self)
1809 # just to make sure this host does not get taken away
1810 self.set_status('Cleaning')
1811 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00001812
1813
mbligh36768f02008-02-22 18:28:33 +00001814class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001815 _table_name = 'host_queue_entries'
1816 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
1817 'active', 'complete', 'deleted', 'execution_subdir')
1818
1819
showarda3c58572009-03-12 20:36:59 +00001820 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00001821 assert id or row
showarda3c58572009-03-12 20:36:59 +00001822 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00001823 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00001824
jadmanski0afbb632008-06-06 21:10:57 +00001825 if self.host_id:
1826 self.host = Host(self.host_id)
1827 else:
1828 self.host = None
mbligh36768f02008-02-22 18:28:33 +00001829
showard170873e2009-01-07 00:22:26 +00001830 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00001831 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00001832
1833
showardc85c21b2008-11-24 22:17:37 +00001834 def _view_job_url(self):
1835 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
1836
1837
jadmanski0afbb632008-06-06 21:10:57 +00001838 def set_host(self, host):
1839 if host:
1840 self.queue_log_record('Assigning host ' + host.hostname)
1841 self.update_field('host_id', host.id)
1842 self.update_field('active', True)
1843 self.block_host(host.id)
1844 else:
1845 self.queue_log_record('Releasing host')
1846 self.unblock_host(self.host.id)
1847 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00001848
jadmanski0afbb632008-06-06 21:10:57 +00001849 self.host = host
mbligh36768f02008-02-22 18:28:33 +00001850
1851
jadmanski0afbb632008-06-06 21:10:57 +00001852 def get_host(self):
1853 return self.host
mbligh36768f02008-02-22 18:28:33 +00001854
1855
jadmanski0afbb632008-06-06 21:10:57 +00001856 def queue_log_record(self, log_line):
1857 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00001858 _drone_manager.write_lines_to_file(self.queue_log_path,
1859 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00001860
1861
jadmanski0afbb632008-06-06 21:10:57 +00001862 def block_host(self, host_id):
1863 print "creating block %s/%s" % (self.job.id, host_id)
1864 row = [0, self.job.id, host_id]
1865 block = IneligibleHostQueue(row=row, new_record=True)
1866 block.save()
mblighe2586682008-02-29 22:45:46 +00001867
1868
jadmanski0afbb632008-06-06 21:10:57 +00001869 def unblock_host(self, host_id):
1870 print "removing block %s/%s" % (self.job.id, host_id)
1871 blocks = IneligibleHostQueue.fetch(
1872 'job_id=%d and host_id=%d' % (self.job.id, host_id))
1873 for block in blocks:
1874 block.delete()
mblighe2586682008-02-29 22:45:46 +00001875
1876
showard2bab8f42008-11-12 18:15:22 +00001877 def set_execution_subdir(self, subdir=None):
1878 if subdir is None:
1879 assert self.get_host()
1880 subdir = self.get_host().hostname
1881 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00001882
1883
showard6355f6b2008-12-05 18:52:13 +00001884 def _get_hostname(self):
1885 if self.host:
1886 return self.host.hostname
1887 return 'no host'
1888
1889
showard170873e2009-01-07 00:22:26 +00001890 def __str__(self):
1891 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
1892
1893
jadmanski0afbb632008-06-06 21:10:57 +00001894 def set_status(self, status):
mblighf8c624d2008-07-03 16:58:45 +00001895 abort_statuses = ['Abort', 'Aborting', 'Aborted']
1896 if status not in abort_statuses:
1897 condition = ' AND '.join(['status <> "%s"' % x
1898 for x in abort_statuses])
1899 else:
1900 condition = ''
1901 self.update_field('status', status, condition=condition)
1902
showard170873e2009-01-07 00:22:26 +00001903 print "%s -> %s" % (self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00001904
showardc85c21b2008-11-24 22:17:37 +00001905 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00001906 self.update_field('complete', False)
1907 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00001908
jadmanski0afbb632008-06-06 21:10:57 +00001909 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showarde58e3f82008-11-20 19:04:59 +00001910 'Aborting']:
jadmanski0afbb632008-06-06 21:10:57 +00001911 self.update_field('complete', False)
1912 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00001913
showardc85c21b2008-11-24 22:17:37 +00001914 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00001915 self.update_field('complete', True)
1916 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00001917
1918 should_email_status = (status.lower() in _notify_email_statuses or
1919 'all' in _notify_email_statuses)
1920 if should_email_status:
1921 self._email_on_status(status)
1922
1923 self._email_on_job_complete()
1924
1925
1926 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00001927 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00001928
1929 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
1930 self.job.id, self.job.name, hostname, status)
1931 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
1932 self.job.id, self.job.name, hostname, status,
1933 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00001934 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00001935
1936
1937 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00001938 if not self.job.is_finished():
1939 return
showard542e8402008-09-19 20:16:18 +00001940
showardc85c21b2008-11-24 22:17:37 +00001941 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00001942 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00001943 for queue_entry in hosts_queue:
1944 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00001945 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00001946 queue_entry.status))
1947
1948 summary_text = "\n".join(summary_text)
1949 status_counts = models.Job.objects.get_status_counts(
1950 [self.job.id])[self.job.id]
1951 status = ', '.join('%d %s' % (count, status) for status, count
1952 in status_counts.iteritems())
1953
1954 subject = 'Autotest: Job ID: %s "%s" %s' % (
1955 self.job.id, self.job.name, status)
1956 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
1957 self.job.id, self.job.name, status, self._view_job_url(),
1958 summary_text)
showard170873e2009-01-07 00:22:26 +00001959 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00001960
1961
jadmanski0afbb632008-06-06 21:10:57 +00001962 def run(self,assigned_host=None):
1963 if self.meta_host:
1964 assert assigned_host
1965 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00001966 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00001967
jadmanski0afbb632008-06-06 21:10:57 +00001968 print "%s/%s scheduled on %s, status=%s" % (self.job.name,
1969 self.meta_host, self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00001970
jadmanski0afbb632008-06-06 21:10:57 +00001971 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00001972
showard6ae5ea92009-02-25 00:11:51 +00001973
jadmanski0afbb632008-06-06 21:10:57 +00001974 def requeue(self):
1975 self.set_status('Queued')
showardde634ee2009-01-30 01:44:24 +00001976 # verify/cleanup failure sets the execution subdir, so reset it here
1977 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00001978 if self.meta_host:
1979 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00001980
1981
jadmanski0afbb632008-06-06 21:10:57 +00001982 def handle_host_failure(self):
1983 """\
1984 Called when this queue entry's host has failed verification and
1985 repair.
1986 """
1987 assert not self.meta_host
1988 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00001989 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00001990
1991
jadmanskif7fa2cc2008-10-01 14:13:23 +00001992 @property
1993 def aborted_by(self):
1994 self._load_abort_info()
1995 return self._aborted_by
1996
1997
1998 @property
1999 def aborted_on(self):
2000 self._load_abort_info()
2001 return self._aborted_on
2002
2003
2004 def _load_abort_info(self):
2005 """ Fetch info about who aborted the job. """
2006 if hasattr(self, "_aborted_by"):
2007 return
2008 rows = _db.execute("""
2009 SELECT users.login, aborted_host_queue_entries.aborted_on
2010 FROM aborted_host_queue_entries
2011 INNER JOIN users
2012 ON users.id = aborted_host_queue_entries.aborted_by_id
2013 WHERE aborted_host_queue_entries.queue_entry_id = %s
2014 """, (self.id,))
2015 if rows:
2016 self._aborted_by, self._aborted_on = rows[0]
2017 else:
2018 self._aborted_by = self._aborted_on = None
2019
2020
showardb2e2c322008-10-14 17:33:55 +00002021 def on_pending(self):
2022 """
2023 Called when an entry in a synchronous job has passed verify. If the
2024 job is ready to run, returns an agent to run the job. Returns None
2025 otherwise.
2026 """
2027 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002028 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00002029 if self.job.is_ready():
2030 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00002031 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00002032 return None
2033
2034
showard170873e2009-01-07 00:22:26 +00002035 def abort(self, dispatcher, agents_to_abort=[]):
showard1be97432008-10-17 15:30:45 +00002036 host = self.get_host()
showard9d9ffd52008-11-09 23:14:35 +00002037 if self.active and host:
showard170873e2009-01-07 00:22:26 +00002038 dispatcher.add_agent(Agent(tasks=host.reverify_tasks()))
showard1be97432008-10-17 15:30:45 +00002039
showard170873e2009-01-07 00:22:26 +00002040 abort_task = AbortTask(self, agents_to_abort)
showard1be97432008-10-17 15:30:45 +00002041 self.set_status('Aborting')
showard170873e2009-01-07 00:22:26 +00002042 dispatcher.add_agent(Agent(tasks=[abort_task], num_processes=0))
2043
2044 def execution_tag(self):
2045 assert self.execution_subdir
2046 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002047
2048
mbligh36768f02008-02-22 18:28:33 +00002049class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002050 _table_name = 'jobs'
2051 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2052 'control_type', 'created_on', 'synch_count', 'timeout',
2053 'run_verify', 'email_list', 'reboot_before', 'reboot_after')
2054
2055
showarda3c58572009-03-12 20:36:59 +00002056 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002057 assert id or row
showarda3c58572009-03-12 20:36:59 +00002058 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002059
mblighe2586682008-02-29 22:45:46 +00002060
jadmanski0afbb632008-06-06 21:10:57 +00002061 def is_server_job(self):
2062 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002063
2064
showard170873e2009-01-07 00:22:26 +00002065 def tag(self):
2066 return "%s-%s" % (self.id, self.owner)
2067
2068
jadmanski0afbb632008-06-06 21:10:57 +00002069 def get_host_queue_entries(self):
2070 rows = _db.execute("""
2071 SELECT * FROM host_queue_entries
2072 WHERE job_id= %s
2073 """, (self.id,))
2074 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002075
jadmanski0afbb632008-06-06 21:10:57 +00002076 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002077
jadmanski0afbb632008-06-06 21:10:57 +00002078 return entries
mbligh36768f02008-02-22 18:28:33 +00002079
2080
jadmanski0afbb632008-06-06 21:10:57 +00002081 def set_status(self, status, update_queues=False):
2082 self.update_field('status',status)
2083
2084 if update_queues:
2085 for queue_entry in self.get_host_queue_entries():
2086 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002087
2088
jadmanski0afbb632008-06-06 21:10:57 +00002089 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00002090 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
2091 status='Pending')
2092 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00002093
2094
jadmanski0afbb632008-06-06 21:10:57 +00002095 def num_machines(self, clause = None):
2096 sql = "job_id=%s" % self.id
2097 if clause:
2098 sql += " AND (%s)" % clause
2099 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002100
2101
jadmanski0afbb632008-06-06 21:10:57 +00002102 def num_queued(self):
2103 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002104
2105
jadmanski0afbb632008-06-06 21:10:57 +00002106 def num_active(self):
2107 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002108
2109
jadmanski0afbb632008-06-06 21:10:57 +00002110 def num_complete(self):
2111 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002112
2113
jadmanski0afbb632008-06-06 21:10:57 +00002114 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002115 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002116
mbligh36768f02008-02-22 18:28:33 +00002117
showard6bb7c292009-01-30 01:44:51 +00002118 def _not_yet_run_entries(self, include_verifying=True):
2119 statuses = [models.HostQueueEntry.Status.QUEUED,
2120 models.HostQueueEntry.Status.PENDING]
2121 if include_verifying:
2122 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2123 return models.HostQueueEntry.objects.filter(job=self.id,
2124 status__in=statuses)
2125
2126
2127 def _stop_all_entries(self):
2128 entries_to_stop = self._not_yet_run_entries(
2129 include_verifying=False)
2130 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002131 assert not child_entry.complete, (
2132 '%s status=%s, active=%s, complete=%s' %
2133 (child_entry.id, child_entry.status, child_entry.active,
2134 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002135 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2136 child_entry.host.status = models.Host.Status.READY
2137 child_entry.host.save()
2138 child_entry.status = models.HostQueueEntry.Status.STOPPED
2139 child_entry.save()
2140
showard2bab8f42008-11-12 18:15:22 +00002141 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002142 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002143 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002144 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002145
2146
jadmanski0afbb632008-06-06 21:10:57 +00002147 def write_to_machines_file(self, queue_entry):
2148 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002149 file_path = os.path.join(self.tag(), '.machines')
2150 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002151
2152
showard2bab8f42008-11-12 18:15:22 +00002153 def _next_group_name(self):
2154 query = models.HostQueueEntry.objects.filter(
2155 job=self.id).values('execution_subdir').distinct()
2156 subdirs = (entry['execution_subdir'] for entry in query)
2157 groups = (re.match(r'group(\d+)', subdir) for subdir in subdirs)
2158 ids = [int(match.group(1)) for match in groups if match]
2159 if ids:
2160 next_id = max(ids) + 1
2161 else:
2162 next_id = 0
2163 return "group%d" % next_id
2164
2165
showard170873e2009-01-07 00:22:26 +00002166 def _write_control_file(self, execution_tag):
2167 control_path = _drone_manager.attach_file_to_execution(
2168 execution_tag, self.control_file)
2169 return control_path
mbligh36768f02008-02-22 18:28:33 +00002170
showardb2e2c322008-10-14 17:33:55 +00002171
showard2bab8f42008-11-12 18:15:22 +00002172 def get_group_entries(self, queue_entry_from_group):
2173 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002174 return list(HostQueueEntry.fetch(
2175 where='job_id=%s AND execution_subdir=%s',
2176 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002177
2178
showardb2e2c322008-10-14 17:33:55 +00002179 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002180 assert queue_entries
2181 execution_tag = queue_entries[0].execution_tag()
2182 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002183 hostnames = ','.join([entry.get_host().hostname
2184 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002185
showard170873e2009-01-07 00:22:26 +00002186 params = [_autoserv_path, '-P', execution_tag, '-p', '-n',
2187 '-r', _drone_manager.absolute_path(execution_tag),
2188 '-u', self.owner, '-l', self.name, '-m', hostnames,
2189 _drone_manager.absolute_path(control_path)]
mbligh36768f02008-02-22 18:28:33 +00002190
jadmanski0afbb632008-06-06 21:10:57 +00002191 if not self.is_server_job():
2192 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002193
showardb2e2c322008-10-14 17:33:55 +00002194 return params
mblighe2586682008-02-29 22:45:46 +00002195
mbligh36768f02008-02-22 18:28:33 +00002196
showardc9ae1782009-01-30 01:42:37 +00002197 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002198 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002199 return True
showard0fc38302008-10-23 00:44:07 +00002200 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002201 return queue_entry.get_host().dirty
2202 return False
showard21baa452008-10-21 00:08:39 +00002203
showardc9ae1782009-01-30 01:42:37 +00002204
2205 def _should_run_verify(self, queue_entry):
2206 do_not_verify = (queue_entry.host.protection ==
2207 host_protections.Protection.DO_NOT_VERIFY)
2208 if do_not_verify:
2209 return False
2210 return self.run_verify
2211
2212
2213 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002214 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002215 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002216 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002217 if self._should_run_verify(queue_entry):
2218 tasks.append(VerifyTask(queue_entry=queue_entry))
2219 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002220 return tasks
2221
2222
showard2bab8f42008-11-12 18:15:22 +00002223 def _assign_new_group(self, queue_entries):
2224 if len(queue_entries) == 1:
2225 group_name = queue_entries[0].get_host().hostname
2226 else:
2227 group_name = self._next_group_name()
2228 print 'Running synchronous job %d hosts %s as %s' % (
2229 self.id, [entry.host.hostname for entry in queue_entries],
2230 group_name)
2231
2232 for queue_entry in queue_entries:
2233 queue_entry.set_execution_subdir(group_name)
2234
2235
2236 def _choose_group_to_run(self, include_queue_entry):
2237 chosen_entries = [include_queue_entry]
2238
2239 num_entries_needed = self.synch_count - 1
2240 if num_entries_needed > 0:
2241 pending_entries = HostQueueEntry.fetch(
2242 where='job_id = %s AND status = "Pending" AND id != %s',
2243 params=(self.id, include_queue_entry.id))
2244 chosen_entries += list(pending_entries)[:num_entries_needed]
2245
2246 self._assign_new_group(chosen_entries)
2247 return chosen_entries
2248
2249
2250 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002251 if not self.is_ready():
showardc9ae1782009-01-30 01:42:37 +00002252 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2253 return Agent(self._get_pre_job_tasks(queue_entry))
mbligh36768f02008-02-22 18:28:33 +00002254
showard2bab8f42008-11-12 18:15:22 +00002255 queue_entries = self._choose_group_to_run(queue_entry)
2256 return self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00002257
2258
2259 def _finish_run(self, queue_entries, initial_tasks=[]):
showardb2ccdda2008-10-28 20:39:05 +00002260 for queue_entry in queue_entries:
2261 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002262 params = self._get_autoserv_params(queue_entries)
2263 queue_task = QueueTask(job=self, queue_entries=queue_entries,
2264 cmd=params)
2265 tasks = initial_tasks + [queue_task]
2266 entry_ids = [entry.id for entry in queue_entries]
2267
showard170873e2009-01-07 00:22:26 +00002268 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00002269
2270
mbligh36768f02008-02-22 18:28:33 +00002271if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002272 main()