blob: 3da2593061fa813bc36e366b8bda9a4faea3dee1 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard89f84db2009-03-12 20:39:13 +00008import datetime, errno, optparse, os, pwd, Queue, random, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardb18134f2009-03-20 20:52:18 +000010import itertools, logging, logging.config, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000014from autotest_lib.client.common_lib import global_config
showardb18134f2009-03-20 20:52:18 +000015from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000016from autotest_lib.database import database_connection
showard21baa452008-10-21 00:08:39 +000017from autotest_lib.frontend.afe import models
showard170873e2009-01-07 00:22:26 +000018from autotest_lib.scheduler import drone_manager, drones, email_manager
mblighf3294cc2009-04-08 21:17:38 +000019from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000020from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000021
mblighb090f142008-02-27 21:33:46 +000022
mbligh36768f02008-02-22 18:28:33 +000023RESULTS_DIR = '.'
24AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000025DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000026
27AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
28
29if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000030 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000031AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
32AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
33
34if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000035 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000036
mbligh90a549d2008-03-25 23:52:34 +000037# how long to wait for autoserv to write a pidfile
38PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000039
showard35162b02009-03-03 02:17:30 +000040# error message to leave in results dir when an autoserv process disappears
41# mysteriously
42_LOST_PROCESS_ERROR = """\
43Autoserv failed abnormally during execution for this job, probably due to a
44system error on the Autotest server. Full results may not be available. Sorry.
45"""
46
mbligh6f8bab42008-02-29 22:45:14 +000047_db = None
mbligh36768f02008-02-22 18:28:33 +000048_shutdown = False
showard170873e2009-01-07 00:22:26 +000049_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
50_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000051_testing_mode = False
showard542e8402008-09-19 20:16:18 +000052_base_url = None
showardc85c21b2008-11-24 22:17:37 +000053_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000054_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000055
showardb18134f2009-03-20 20:52:18 +000056# load the logging settings
57scheduler_dir = os.path.join(AUTOTEST_PATH, 'scheduler')
showard50e463b2009-04-07 18:13:45 +000058if not os.environ.has_key('AUTOTEST_SCHEDULER_LOG_DIR'):
59 os.environ['AUTOTEST_SCHEDULER_LOG_DIR'] = os.path.join(AUTOTEST_PATH, 'logs')
showardb18134f2009-03-20 20:52:18 +000060# Here we export the log name, using the same convention as autoserv's results
61# directory.
mblighc9895aa2009-04-01 18:36:58 +000062if os.environ.has_key('AUTOTEST_SCHEDULER_LOG_NAME'):
63 scheduler_log_name = os.environ['AUTOTEST_SCHEDULER_LOG_NAME']
64else:
65 scheduler_log_name = 'scheduler.log.%s' % time.strftime('%Y-%m-%d-%H.%M.%S')
66 os.environ['AUTOTEST_SCHEDULER_LOG_NAME'] = scheduler_log_name
67
showardb18134f2009-03-20 20:52:18 +000068logging.config.fileConfig(os.path.join(scheduler_dir, 'debug_scheduler.ini'))
69
mbligh36768f02008-02-22 18:28:33 +000070
71def main():
showard27f33872009-04-07 18:20:53 +000072 try:
73 main_without_exception_handling()
74 except:
75 logging.exception('Exception escaping in monitor_db')
76 raise
77
78
79def main_without_exception_handling():
jadmanski0afbb632008-06-06 21:10:57 +000080 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000081
jadmanski0afbb632008-06-06 21:10:57 +000082 parser = optparse.OptionParser(usage)
83 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
84 action='store_true')
85 parser.add_option('--logfile', help='Set a log file that all stdout ' +
86 'should be redirected to. Stderr will go to this ' +
87 'file + ".err"')
88 parser.add_option('--test', help='Indicate that scheduler is under ' +
89 'test and should use dummy autoserv and no parsing',
90 action='store_true')
91 (options, args) = parser.parse_args()
92 if len(args) != 1:
93 parser.print_usage()
94 return
mbligh36768f02008-02-22 18:28:33 +000095
jadmanski0afbb632008-06-06 21:10:57 +000096 global RESULTS_DIR
97 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +000098
showardcca334f2009-03-12 20:38:34 +000099 # Change the cwd while running to avoid issues incase we were launched from
100 # somewhere odd (such as a random NFS home directory of the person running
101 # sudo to launch us as the appropriate user).
102 os.chdir(RESULTS_DIR)
103
jadmanski0afbb632008-06-06 21:10:57 +0000104 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000105 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
106 "notify_email_statuses",
107 default='')
showardc85c21b2008-11-24 22:17:37 +0000108 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000109 _notify_email_statuses = [status for status in
110 re.split(r'[\s,;:]', notify_statuses_list.lower())
111 if status]
showardc85c21b2008-11-24 22:17:37 +0000112
jadmanski0afbb632008-06-06 21:10:57 +0000113 if options.test:
114 global _autoserv_path
115 _autoserv_path = 'autoserv_dummy'
116 global _testing_mode
117 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000118
mbligh37eceaa2008-12-15 22:56:37 +0000119 # AUTOTEST_WEB.base_url is still a supported config option as some people
120 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000121 global _base_url
showard170873e2009-01-07 00:22:26 +0000122 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
123 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000124 if config_base_url:
125 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000126 else:
mbligh37eceaa2008-12-15 22:56:37 +0000127 # For the common case of everything running on a single server you
128 # can just set the hostname in a single place in the config file.
129 server_name = c.get_config_value('SERVER', 'hostname')
130 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000131 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000132 sys.exit(1)
133 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000134
showardc5afc462009-01-13 00:09:39 +0000135 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000136 server.start()
137
jadmanski0afbb632008-06-06 21:10:57 +0000138 try:
showardc5afc462009-01-13 00:09:39 +0000139 init(options.logfile)
140 dispatcher = Dispatcher()
141 dispatcher.do_initial_recovery(recover_hosts=options.recover_hosts)
142
jadmanski0afbb632008-06-06 21:10:57 +0000143 while not _shutdown:
144 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000145 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000146 except:
showard170873e2009-01-07 00:22:26 +0000147 email_manager.manager.log_stacktrace(
148 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000149
showard170873e2009-01-07 00:22:26 +0000150 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000151 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000152 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000153 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000154
155
156def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000157 global _shutdown
158 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000159 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000160
161
162def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000163 if logfile:
164 enable_logging(logfile)
showardb18134f2009-03-20 20:52:18 +0000165 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
166 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000167
mblighfb676032009-04-01 18:25:38 +0000168 utils.write_pid("monitor_db")
169
showardb1e51872008-10-07 11:08:18 +0000170 if _testing_mode:
171 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000172 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000173
jadmanski0afbb632008-06-06 21:10:57 +0000174 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
175 global _db
showard170873e2009-01-07 00:22:26 +0000176 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000177 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000178
showardfa8629c2008-11-04 16:51:23 +0000179 # ensure Django connection is in autocommit
180 setup_django_environment.enable_autocommit()
181
showardb18134f2009-03-20 20:52:18 +0000182 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000183 signal.signal(signal.SIGINT, handle_sigint)
184
showardd1ee1dd2009-01-07 21:33:08 +0000185 drones = global_config.global_config.get_config_value(
186 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
187 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000188 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000189 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000190 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
191
showardb18134f2009-03-20 20:52:18 +0000192 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000193
194
195def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000196 out_file = logfile
197 err_file = "%s.err" % logfile
showardb18134f2009-03-20 20:52:18 +0000198 logging.info("Enabling logging to %s (%s)", out_file, err_file)
jadmanski0afbb632008-06-06 21:10:57 +0000199 out_fd = open(out_file, "a", buffering=0)
200 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000201
jadmanski0afbb632008-06-06 21:10:57 +0000202 os.dup2(out_fd.fileno(), sys.stdout.fileno())
203 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000204
jadmanski0afbb632008-06-06 21:10:57 +0000205 sys.stdout = out_fd
206 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000207
208
mblighd5c95802008-03-05 00:33:46 +0000209def queue_entries_to_abort():
jadmanski0afbb632008-06-06 21:10:57 +0000210 rows = _db.execute("""
211 SELECT * FROM host_queue_entries WHERE status='Abort';
212 """)
showard2bab8f42008-11-12 18:15:22 +0000213
jadmanski0afbb632008-06-06 21:10:57 +0000214 qe = [HostQueueEntry(row=i) for i in rows]
215 return qe
mbligh36768f02008-02-22 18:28:33 +0000216
showard7cf9a9b2008-05-15 21:15:52 +0000217
showard89f84db2009-03-12 20:39:13 +0000218class SchedulerError(Exception):
219 """Raised by HostScheduler when an inconsistent state occurs."""
220
221
showard63a34772008-08-18 19:32:50 +0000222class HostScheduler(object):
223 def _get_ready_hosts(self):
224 # avoid any host with a currently active queue entry against it
225 hosts = Host.fetch(
226 joins='LEFT JOIN host_queue_entries AS active_hqe '
227 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000228 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000229 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000230 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000231 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
232 return dict((host.id, host) for host in hosts)
233
234
235 @staticmethod
236 def _get_sql_id_list(id_list):
237 return ','.join(str(item_id) for item_id in id_list)
238
239
240 @classmethod
showard989f25d2008-10-01 11:38:11 +0000241 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000242 if not id_list:
243 return {}
showard63a34772008-08-18 19:32:50 +0000244 query %= cls._get_sql_id_list(id_list)
245 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000246 return cls._process_many2many_dict(rows, flip)
247
248
249 @staticmethod
250 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000251 result = {}
252 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000253 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000254 if flip:
255 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000256 result.setdefault(left_id, set()).add(right_id)
257 return result
258
259
260 @classmethod
261 def _get_job_acl_groups(cls, job_ids):
262 query = """
showardd9ac4452009-02-07 02:04:37 +0000263 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000264 FROM jobs
265 INNER JOIN users ON users.login = jobs.owner
266 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
267 WHERE jobs.id IN (%s)
268 """
269 return cls._get_many2many_dict(query, job_ids)
270
271
272 @classmethod
273 def _get_job_ineligible_hosts(cls, job_ids):
274 query = """
275 SELECT job_id, host_id
276 FROM ineligible_host_queues
277 WHERE job_id IN (%s)
278 """
279 return cls._get_many2many_dict(query, job_ids)
280
281
282 @classmethod
showard989f25d2008-10-01 11:38:11 +0000283 def _get_job_dependencies(cls, job_ids):
284 query = """
285 SELECT job_id, label_id
286 FROM jobs_dependency_labels
287 WHERE job_id IN (%s)
288 """
289 return cls._get_many2many_dict(query, job_ids)
290
291
292 @classmethod
showard63a34772008-08-18 19:32:50 +0000293 def _get_host_acls(cls, host_ids):
294 query = """
showardd9ac4452009-02-07 02:04:37 +0000295 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000296 FROM acl_groups_hosts
297 WHERE host_id IN (%s)
298 """
299 return cls._get_many2many_dict(query, host_ids)
300
301
302 @classmethod
303 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000304 if not host_ids:
305 return {}, {}
showard63a34772008-08-18 19:32:50 +0000306 query = """
307 SELECT label_id, host_id
308 FROM hosts_labels
309 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000310 """ % cls._get_sql_id_list(host_ids)
311 rows = _db.execute(query)
312 labels_to_hosts = cls._process_many2many_dict(rows)
313 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
314 return labels_to_hosts, hosts_to_labels
315
316
317 @classmethod
318 def _get_labels(cls):
319 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000320
321
322 def refresh(self, pending_queue_entries):
323 self._hosts_available = self._get_ready_hosts()
324
325 relevant_jobs = [queue_entry.job_id
326 for queue_entry in pending_queue_entries]
327 self._job_acls = self._get_job_acl_groups(relevant_jobs)
328 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000329 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000330
331 host_ids = self._hosts_available.keys()
332 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000333 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
334
335 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000336
337
338 def _is_acl_accessible(self, host_id, queue_entry):
339 job_acls = self._job_acls.get(queue_entry.job_id, set())
340 host_acls = self._host_acls.get(host_id, set())
341 return len(host_acls.intersection(job_acls)) > 0
342
343
showard989f25d2008-10-01 11:38:11 +0000344 def _check_job_dependencies(self, job_dependencies, host_labels):
345 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000346 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000347
348
349 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
350 queue_entry):
showardade14e22009-01-26 22:38:32 +0000351 if not queue_entry.meta_host:
352 # bypass only_if_needed labels when a specific host is selected
353 return True
354
showard989f25d2008-10-01 11:38:11 +0000355 for label_id in host_labels:
356 label = self._labels[label_id]
357 if not label.only_if_needed:
358 # we don't care about non-only_if_needed labels
359 continue
360 if queue_entry.meta_host == label_id:
361 # if the label was requested in a metahost it's OK
362 continue
363 if label_id not in job_dependencies:
364 return False
365 return True
366
367
showard89f84db2009-03-12 20:39:13 +0000368 def _check_atomic_group_labels(self, host_labels, queue_entry):
369 """
370 Determine if the given HostQueueEntry's atomic group settings are okay
371 to schedule on a host with the given labels.
372
373 @param host_labels - A list of label ids that the host has.
374 @param queue_entry - The HostQueueEntry being considered for the host.
375
376 @returns True if atomic group settings are okay, False otherwise.
377 """
378 return (self._get_host_atomic_group_id(host_labels) ==
379 queue_entry.atomic_group_id)
380
381
382 def _get_host_atomic_group_id(self, host_labels):
383 """
384 Return the atomic group label id for a host with the given set of
385 labels if any, or None otherwise. Raises an exception if more than
386 one atomic group are found in the set of labels.
387
388 @param host_labels - A list of label ids that the host has.
389
390 @returns The id of the atomic group found on a label in host_labels
391 or None if no atomic group label is found.
392 @raises SchedulerError - If more than one atomic group label is found.
393 """
394 atomic_ids = [self._labels[label_id].atomic_group_id
395 for label_id in host_labels
396 if self._labels[label_id].atomic_group_id is not None]
397 if not atomic_ids:
398 return None
399 if len(atomic_ids) > 1:
400 raise SchedulerError('More than one atomic label on host.')
401 return atomic_ids[0]
402
403
404 def _get_atomic_group_labels(self, atomic_group_id):
405 """
406 Lookup the label ids that an atomic_group is associated with.
407
408 @param atomic_group_id - The id of the AtomicGroup to look up.
409
410 @returns A generator yeilding Label ids for this atomic group.
411 """
412 return (id for id, label in self._labels.iteritems()
413 if label.atomic_group_id == atomic_group_id
414 and not label.invalid)
415
416
417 def _get_eligible_hosts_in_group(self, group_hosts, queue_entry):
418 """
419 @param group_hosts - A sequence of Host ids to test for usability
420 and eligibility against the Job associated with queue_entry.
421 @param queue_entry - The HostQueueEntry that these hosts are being
422 tested for eligibility against.
423
424 @returns A subset of group_hosts Host ids that are eligible for the
425 supplied queue_entry.
426 """
427 return set(host_id for host_id in group_hosts
428 if self._is_host_usable(host_id)
429 and self._is_host_eligible_for_job(host_id, queue_entry))
430
431
showard989f25d2008-10-01 11:38:11 +0000432 def _is_host_eligible_for_job(self, host_id, queue_entry):
433 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
434 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000435
showard89f84db2009-03-12 20:39:13 +0000436 return (self._is_acl_accessible(host_id, queue_entry) and
437 self._check_job_dependencies(job_dependencies, host_labels) and
438 self._check_only_if_needed_labels(
439 job_dependencies, host_labels, queue_entry) and
440 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000441
442
showard63a34772008-08-18 19:32:50 +0000443 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000444 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000445 return None
446 return self._hosts_available.pop(queue_entry.host_id, None)
447
448
449 def _is_host_usable(self, host_id):
450 if host_id not in self._hosts_available:
451 # host was already used during this scheduling cycle
452 return False
453 if self._hosts_available[host_id].invalid:
454 # Invalid hosts cannot be used for metahosts. They're included in
455 # the original query because they can be used by non-metahosts.
456 return False
457 return True
458
459
460 def _schedule_metahost(self, queue_entry):
461 label_id = queue_entry.meta_host
462 hosts_in_label = self._label_hosts.get(label_id, set())
463 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
464 set())
465
466 # must iterate over a copy so we can mutate the original while iterating
467 for host_id in list(hosts_in_label):
468 if not self._is_host_usable(host_id):
469 hosts_in_label.remove(host_id)
470 continue
471 if host_id in ineligible_host_ids:
472 continue
showard989f25d2008-10-01 11:38:11 +0000473 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000474 continue
475
showard89f84db2009-03-12 20:39:13 +0000476 # Remove the host from our cached internal state before returning
477 # the host object.
showard63a34772008-08-18 19:32:50 +0000478 hosts_in_label.remove(host_id)
479 return self._hosts_available.pop(host_id)
480 return None
481
482
483 def find_eligible_host(self, queue_entry):
484 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000485 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000486 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000487 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000488 return self._schedule_metahost(queue_entry)
489
490
showard89f84db2009-03-12 20:39:13 +0000491 def find_eligible_atomic_group(self, queue_entry):
492 """
493 Given an atomic group host queue entry, locate an appropriate group
494 of hosts for the associated job to run on.
495
496 The caller is responsible for creating new HQEs for the additional
497 hosts returned in order to run the actual job on them.
498
499 @returns A list of Host instances in a ready state to satisfy this
500 atomic group scheduling. Hosts will all belong to the same
501 atomic group label as specified by the queue_entry.
502 An empty list will be returned if no suitable atomic
503 group could be found.
504
505 TODO(gps): what is responsible for kicking off any attempted repairs on
506 a group of hosts? not this function, but something needs to. We do
507 not communicate that reason for returning [] outside of here...
508 For now, we'll just be unschedulable if enough hosts within one group
509 enter Repair Failed state.
510 """
511 assert queue_entry.atomic_group_id is not None
512 job = queue_entry.job
513 assert job.synch_count and job.synch_count > 0
514 atomic_group = AtomicGroup(id=queue_entry.atomic_group_id)
515 if job.synch_count > atomic_group.max_number_of_machines:
516 # Such a Job and HostQueueEntry should never be possible to
517 # create using the frontend. Regardless, we can't process it.
518 # Abort it immediately and log an error on the scheduler.
519 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000520 logging.error(
521 'Error: job %d synch_count=%d > requested atomic_group %d '
522 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
523 job.id, job.synch_count, atomic_group.id,
524 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000525 return []
526 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
527 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
528 set())
529
530 # Look in each label associated with atomic_group until we find one with
531 # enough hosts to satisfy the job.
532 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
533 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
534 if queue_entry.meta_host is not None:
535 # If we have a metahost label, only allow its hosts.
536 group_hosts.intersection_update(hosts_in_label)
537 group_hosts -= ineligible_host_ids
538 eligible_hosts_in_group = self._get_eligible_hosts_in_group(
539 group_hosts, queue_entry)
540
541 # Job.synch_count is treated as "minimum synch count" when
542 # scheduling for an atomic group of hosts. The atomic group
543 # number of machines is the maximum to pick out of a single
544 # atomic group label for scheduling at one time.
545 min_hosts = job.synch_count
546 max_hosts = atomic_group.max_number_of_machines
547
548 if len(eligible_hosts_in_group) < min_hosts:
549 # Not enough eligible hosts in this atomic group label.
550 continue
551
552 # Limit ourselves to scheduling the atomic group size.
553 if len(eligible_hosts_in_group) > max_hosts:
554 eligible_hosts_in_group = random.sample(
555 eligible_hosts_in_group, max_hosts)
556
557 # Remove the selected hosts from our cached internal state
558 # of available hosts in order to return the Host objects.
559 host_list = []
560 for host_id in eligible_hosts_in_group:
561 hosts_in_label.discard(host_id)
562 host_list.append(self._hosts_available.pop(host_id))
563 return host_list
564
565 return []
566
567
showard170873e2009-01-07 00:22:26 +0000568class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000569 def __init__(self):
570 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000571 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000572 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000573 user_cleanup_time = scheduler_config.config.clean_interval
574 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
575 _db, user_cleanup_time)
576 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000577 self._host_agents = {}
578 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000579
mbligh36768f02008-02-22 18:28:33 +0000580
jadmanski0afbb632008-06-06 21:10:57 +0000581 def do_initial_recovery(self, recover_hosts=True):
582 # always recover processes
583 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000584
jadmanski0afbb632008-06-06 21:10:57 +0000585 if recover_hosts:
586 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000587
588
jadmanski0afbb632008-06-06 21:10:57 +0000589 def tick(self):
showard170873e2009-01-07 00:22:26 +0000590 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000591 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000592 self._find_aborting()
593 self._schedule_new_jobs()
594 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000595 _drone_manager.execute_actions()
596 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000597
showard97aed502008-11-04 02:01:24 +0000598
mblighf3294cc2009-04-08 21:17:38 +0000599 def _run_cleanup(self):
600 self._periodic_cleanup.run_cleanup_maybe()
601 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000602
mbligh36768f02008-02-22 18:28:33 +0000603
showard170873e2009-01-07 00:22:26 +0000604 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
605 for object_id in object_ids:
606 agent_dict.setdefault(object_id, set()).add(agent)
607
608
609 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
610 for object_id in object_ids:
611 assert object_id in agent_dict
612 agent_dict[object_id].remove(agent)
613
614
jadmanski0afbb632008-06-06 21:10:57 +0000615 def add_agent(self, agent):
616 self._agents.append(agent)
617 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000618 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
619 self._register_agent_for_ids(self._queue_entry_agents,
620 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000621
showard170873e2009-01-07 00:22:26 +0000622
623 def get_agents_for_entry(self, queue_entry):
624 """
625 Find agents corresponding to the specified queue_entry.
626 """
627 return self._queue_entry_agents.get(queue_entry.id, set())
628
629
630 def host_has_agent(self, host):
631 """
632 Determine if there is currently an Agent present using this host.
633 """
634 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000635
636
jadmanski0afbb632008-06-06 21:10:57 +0000637 def remove_agent(self, agent):
638 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000639 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
640 agent)
641 self._unregister_agent_for_ids(self._queue_entry_agents,
642 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000643
644
showard4c5374f2008-09-04 17:02:56 +0000645 def num_running_processes(self):
646 return sum(agent.num_processes for agent in self._agents
647 if agent.is_running())
mblighbb421852008-03-11 22:36:16 +0000648
649
showard170873e2009-01-07 00:22:26 +0000650 def _extract_execution_tag(self, command_line):
651 match = re.match(r'.* -P (\S+) ', command_line)
652 if not match:
653 return None
654 return match.group(1)
mblighbb421852008-03-11 22:36:16 +0000655
656
showard2bab8f42008-11-12 18:15:22 +0000657 def _recover_queue_entries(self, queue_entries, run_monitor):
658 assert len(queue_entries) > 0
showard2bab8f42008-11-12 18:15:22 +0000659 queue_task = RecoveryQueueTask(job=queue_entries[0].job,
660 queue_entries=queue_entries,
661 run_monitor=run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +0000662 self.add_agent(Agent(tasks=[queue_task],
showard170873e2009-01-07 00:22:26 +0000663 num_processes=len(queue_entries)))
mblighbb421852008-03-11 22:36:16 +0000664
665
jadmanski0afbb632008-06-06 21:10:57 +0000666 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000667 self._register_pidfiles()
668 _drone_manager.refresh()
669 self._recover_running_entries()
670 self._recover_aborting_entries()
671 self._requeue_other_active_entries()
672 self._recover_parsing_entries()
673 self._reverify_remaining_hosts()
674 # reinitialize drones after killing orphaned processes, since they can
675 # leave around files when they die
676 _drone_manager.execute_actions()
677 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000678
showard170873e2009-01-07 00:22:26 +0000679
680 def _register_pidfiles(self):
681 # during recovery we may need to read pidfiles for both running and
682 # parsing entries
683 queue_entries = HostQueueEntry.fetch(
684 where="status IN ('Running', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000685 for queue_entry in queue_entries:
showard170873e2009-01-07 00:22:26 +0000686 pidfile_id = _drone_manager.get_pidfile_id_from(
687 queue_entry.execution_tag())
688 _drone_manager.register_pidfile(pidfile_id)
689
690
691 def _recover_running_entries(self):
692 orphans = _drone_manager.get_orphaned_autoserv_processes()
693
694 queue_entries = HostQueueEntry.fetch(where="status = 'Running'")
695 requeue_entries = []
696 for queue_entry in queue_entries:
697 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000698 # synchronous job we've already recovered
699 continue
showard170873e2009-01-07 00:22:26 +0000700 execution_tag = queue_entry.execution_tag()
701 run_monitor = PidfileRunMonitor()
702 run_monitor.attach_to_existing_process(execution_tag)
703 if not run_monitor.has_process():
704 # autoserv apparently never got run, so let it get requeued
705 continue
showarde788ea62008-11-17 21:02:47 +0000706 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showardb18134f2009-03-20 20:52:18 +0000707 logging.info('Recovering %s (process %s)',
708 (', '.join(str(entry) for entry in queue_entries),
709 run_monitor.get_process()))
showard2bab8f42008-11-12 18:15:22 +0000710 self._recover_queue_entries(queue_entries, run_monitor)
showard170873e2009-01-07 00:22:26 +0000711 orphans.pop(execution_tag, None)
mbligh90a549d2008-03-25 23:52:34 +0000712
jadmanski0afbb632008-06-06 21:10:57 +0000713 # now kill any remaining autoserv processes
showard170873e2009-01-07 00:22:26 +0000714 for process in orphans.itervalues():
showardb18134f2009-03-20 20:52:18 +0000715 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000716 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000717
showard170873e2009-01-07 00:22:26 +0000718
719 def _recover_aborting_entries(self):
720 queue_entries = HostQueueEntry.fetch(
721 where='status IN ("Abort", "Aborting")')
jadmanski0afbb632008-06-06 21:10:57 +0000722 for queue_entry in queue_entries:
showardb18134f2009-03-20 20:52:18 +0000723 logging.info('Recovering aborting QE %s', queue_entry)
showard170873e2009-01-07 00:22:26 +0000724 agent = queue_entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000725
showard97aed502008-11-04 02:01:24 +0000726
showard170873e2009-01-07 00:22:26 +0000727 def _requeue_other_active_entries(self):
728 queue_entries = HostQueueEntry.fetch(
729 where='active AND NOT complete AND status != "Pending"')
730 for queue_entry in queue_entries:
731 if self.get_agents_for_entry(queue_entry):
732 # entry has already been recovered
733 continue
showardb18134f2009-03-20 20:52:18 +0000734 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
735 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000736 if queue_entry.host:
737 tasks = queue_entry.host.reverify_tasks()
738 self.add_agent(Agent(tasks))
739 agent = queue_entry.requeue()
740
741
742 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000743 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000744 self._reverify_hosts_where("""(status = 'Repairing' OR
745 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000746 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000747
showard170873e2009-01-07 00:22:26 +0000748 # recover "Running" hosts with no active queue entries, although this
749 # should never happen
750 message = ('Recovering running host %s - this probably indicates a '
751 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000752 self._reverify_hosts_where("""status = 'Running' AND
753 id NOT IN (SELECT host_id
754 FROM host_queue_entries
755 WHERE active)""",
756 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000757
758
jadmanski0afbb632008-06-06 21:10:57 +0000759 def _reverify_hosts_where(self, where,
showard170873e2009-01-07 00:22:26 +0000760 print_message='Reverifying host %s'):
761 full_where='locked = 0 AND invalid = 0 AND ' + where
762 for host in Host.fetch(where=full_where):
763 if self.host_has_agent(host):
764 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000765 continue
showard170873e2009-01-07 00:22:26 +0000766 if print_message:
showardb18134f2009-03-20 20:52:18 +0000767 logging.info(print_message, host.hostname)
showard170873e2009-01-07 00:22:26 +0000768 tasks = host.reverify_tasks()
769 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000770
771
showard97aed502008-11-04 02:01:24 +0000772 def _recover_parsing_entries(self):
showard2bab8f42008-11-12 18:15:22 +0000773 recovered_entry_ids = set()
showard97aed502008-11-04 02:01:24 +0000774 for entry in HostQueueEntry.fetch(where='status = "Parsing"'):
showard2bab8f42008-11-12 18:15:22 +0000775 if entry.id in recovered_entry_ids:
776 continue
777 queue_entries = entry.job.get_group_entries(entry)
showard170873e2009-01-07 00:22:26 +0000778 recovered_entry_ids = recovered_entry_ids.union(
779 entry.id for entry in queue_entries)
showardb18134f2009-03-20 20:52:18 +0000780 logging.info('Recovering parsing entries %s',
781 (', '.join(str(entry) for entry in queue_entries)))
showard97aed502008-11-04 02:01:24 +0000782
783 reparse_task = FinalReparseTask(queue_entries)
showard170873e2009-01-07 00:22:26 +0000784 self.add_agent(Agent([reparse_task], num_processes=0))
showard97aed502008-11-04 02:01:24 +0000785
786
jadmanski0afbb632008-06-06 21:10:57 +0000787 def _recover_hosts(self):
788 # recover "Repair Failed" hosts
789 message = 'Reverifying dead host %s'
790 self._reverify_hosts_where("status = 'Repair Failed'",
791 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000792
793
showard04c82c52008-05-29 19:38:12 +0000794
showardb95b1bd2008-08-15 18:11:04 +0000795 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000796 # prioritize by job priority, then non-metahost over metahost, then FIFO
797 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000798 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000799 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000800 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000801
802
showard89f84db2009-03-12 20:39:13 +0000803 def _refresh_pending_queue_entries(self):
804 """
805 Lookup the pending HostQueueEntries and call our HostScheduler
806 refresh() method given that list. Return the list.
807
808 @returns A list of pending HostQueueEntries sorted in priority order.
809 """
showard63a34772008-08-18 19:32:50 +0000810 queue_entries = self._get_pending_queue_entries()
811 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000812 return []
showardb95b1bd2008-08-15 18:11:04 +0000813
showard63a34772008-08-18 19:32:50 +0000814 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000815
showard89f84db2009-03-12 20:39:13 +0000816 return queue_entries
817
818
819 def _schedule_atomic_group(self, queue_entry):
820 """
821 Schedule the given queue_entry on an atomic group of hosts.
822
823 Returns immediately if there are insufficient available hosts.
824
825 Creates new HostQueueEntries based off of queue_entry for the
826 scheduled hosts and starts them all running.
827 """
828 # This is a virtual host queue entry representing an entire
829 # atomic group, find a group and schedule their hosts.
830 group_hosts = self._host_scheduler.find_eligible_atomic_group(
831 queue_entry)
832 if not group_hosts:
833 return
834 # The first assigned host uses the original HostQueueEntry
835 group_queue_entries = [queue_entry]
836 for assigned_host in group_hosts[1:]:
837 # Create a new HQE for every additional assigned_host.
838 new_hqe = HostQueueEntry.clone(queue_entry)
839 new_hqe.save()
840 group_queue_entries.append(new_hqe)
841 assert len(group_queue_entries) == len(group_hosts)
842 for queue_entry, host in itertools.izip(group_queue_entries,
843 group_hosts):
844 self._run_queue_entry(queue_entry, host)
845
846
847 def _schedule_new_jobs(self):
848 queue_entries = self._refresh_pending_queue_entries()
849 if not queue_entries:
850 return
851
showard63a34772008-08-18 19:32:50 +0000852 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000853 if (queue_entry.atomic_group_id is None or
854 queue_entry.host_id is not None):
855 assigned_host = self._host_scheduler.find_eligible_host(
856 queue_entry)
857 if assigned_host:
858 self._run_queue_entry(queue_entry, assigned_host)
859 else:
860 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000861
862
863 def _run_queue_entry(self, queue_entry, host):
864 agent = queue_entry.run(assigned_host=host)
showard170873e2009-01-07 00:22:26 +0000865 # in some cases (synchronous jobs with run_verify=False), agent may be
866 # None
showard9976ce92008-10-15 20:28:13 +0000867 if agent:
868 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000869
870
jadmanski0afbb632008-06-06 21:10:57 +0000871 def _find_aborting(self):
jadmanski0afbb632008-06-06 21:10:57 +0000872 for entry in queue_entries_to_abort():
showard170873e2009-01-07 00:22:26 +0000873 agents_to_abort = list(self.get_agents_for_entry(entry))
showard1be97432008-10-17 15:30:45 +0000874 for agent in agents_to_abort:
875 self.remove_agent(agent)
876
showard170873e2009-01-07 00:22:26 +0000877 entry.abort(self, agents_to_abort)
jadmanski0afbb632008-06-06 21:10:57 +0000878
879
showard324bf812009-01-20 23:23:38 +0000880 def _can_start_agent(self, agent, num_started_this_cycle,
881 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000882 # always allow zero-process agents to run
883 if agent.num_processes == 0:
884 return True
885 # don't allow any nonzero-process agents to run after we've reached a
886 # limit (this avoids starvation of many-process agents)
887 if have_reached_limit:
888 return False
889 # total process throttling
showard324bf812009-01-20 23:23:38 +0000890 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000891 return False
892 # if a single agent exceeds the per-cycle throttling, still allow it to
893 # run when it's the first agent in the cycle
894 if num_started_this_cycle == 0:
895 return True
896 # per-cycle throttling
897 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000898 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000899 return False
900 return True
901
902
jadmanski0afbb632008-06-06 21:10:57 +0000903 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000904 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000905 have_reached_limit = False
906 # iterate over copy, so we can remove agents during iteration
907 for agent in list(self._agents):
908 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000909 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000910 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000911 continue
912 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000913 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000914 have_reached_limit):
915 have_reached_limit = True
916 continue
showard4c5374f2008-09-04 17:02:56 +0000917 num_started_this_cycle += agent.num_processes
918 agent.tick()
showardb18134f2009-03-20 20:52:18 +0000919 logging.info('%d running processes',
920 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000921
922
showard170873e2009-01-07 00:22:26 +0000923class PidfileRunMonitor(object):
924 """
925 Client must call either run() to start a new process or
926 attach_to_existing_process().
927 """
mbligh36768f02008-02-22 18:28:33 +0000928
showard170873e2009-01-07 00:22:26 +0000929 class _PidfileException(Exception):
930 """
931 Raised when there's some unexpected behavior with the pid file, but only
932 used internally (never allowed to escape this class).
933 """
mbligh36768f02008-02-22 18:28:33 +0000934
935
showard170873e2009-01-07 00:22:26 +0000936 def __init__(self):
showard35162b02009-03-03 02:17:30 +0000937 self.lost_process = False
showard170873e2009-01-07 00:22:26 +0000938 self._start_time = None
939 self.pidfile_id = None
940 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +0000941
942
showard170873e2009-01-07 00:22:26 +0000943 def _add_nice_command(self, command, nice_level):
944 if not nice_level:
945 return command
946 return ['nice', '-n', str(nice_level)] + command
947
948
949 def _set_start_time(self):
950 self._start_time = time.time()
951
952
953 def run(self, command, working_directory, nice_level=None, log_file=None,
954 pidfile_name=None, paired_with_pidfile=None):
955 assert command is not None
956 if nice_level is not None:
957 command = ['nice', '-n', str(nice_level)] + command
958 self._set_start_time()
959 self.pidfile_id = _drone_manager.execute_command(
960 command, working_directory, log_file=log_file,
961 pidfile_name=pidfile_name, paired_with_pidfile=paired_with_pidfile)
962
963
964 def attach_to_existing_process(self, execution_tag):
965 self._set_start_time()
966 self.pidfile_id = _drone_manager.get_pidfile_id_from(execution_tag)
967 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +0000968
969
jadmanski0afbb632008-06-06 21:10:57 +0000970 def kill(self):
showard170873e2009-01-07 00:22:26 +0000971 if self.has_process():
972 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +0000973
mbligh36768f02008-02-22 18:28:33 +0000974
showard170873e2009-01-07 00:22:26 +0000975 def has_process(self):
showard21baa452008-10-21 00:08:39 +0000976 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +0000977 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +0000978
979
showard170873e2009-01-07 00:22:26 +0000980 def get_process(self):
showard21baa452008-10-21 00:08:39 +0000981 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +0000982 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +0000983 return self._state.process
mblighbb421852008-03-11 22:36:16 +0000984
985
showard170873e2009-01-07 00:22:26 +0000986 def _read_pidfile(self, use_second_read=False):
987 assert self.pidfile_id is not None, (
988 'You must call run() or attach_to_existing_process()')
989 contents = _drone_manager.get_pidfile_contents(
990 self.pidfile_id, use_second_read=use_second_read)
991 if contents.is_invalid():
992 self._state = drone_manager.PidfileContents()
993 raise self._PidfileException(contents)
994 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +0000995
996
showard21baa452008-10-21 00:08:39 +0000997 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +0000998 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
999 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001000 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001001 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001002 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001003
1004
1005 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001006 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001007 return
mblighbb421852008-03-11 22:36:16 +00001008
showard21baa452008-10-21 00:08:39 +00001009 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001010
showard170873e2009-01-07 00:22:26 +00001011 if self._state.process is None:
1012 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001013 return
mbligh90a549d2008-03-25 23:52:34 +00001014
showard21baa452008-10-21 00:08:39 +00001015 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001016 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001017 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001018 return
mbligh90a549d2008-03-25 23:52:34 +00001019
showard170873e2009-01-07 00:22:26 +00001020 # pid but no running process - maybe process *just* exited
1021 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001022 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001023 # autoserv exited without writing an exit code
1024 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001025 self._handle_pidfile_error(
1026 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001027
showard21baa452008-10-21 00:08:39 +00001028
1029 def _get_pidfile_info(self):
1030 """\
1031 After completion, self._state will contain:
1032 pid=None, exit_status=None if autoserv has not yet run
1033 pid!=None, exit_status=None if autoserv is running
1034 pid!=None, exit_status!=None if autoserv has completed
1035 """
1036 try:
1037 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001038 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001039 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001040
1041
showard170873e2009-01-07 00:22:26 +00001042 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001043 """\
1044 Called when no pidfile is found or no pid is in the pidfile.
1045 """
showard170873e2009-01-07 00:22:26 +00001046 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001047 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001048 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1049 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001050 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001051 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001052
1053
showard35162b02009-03-03 02:17:30 +00001054 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001055 """\
1056 Called when autoserv has exited without writing an exit status,
1057 or we've timed out waiting for autoserv to write a pid to the
1058 pidfile. In either case, we just return failure and the caller
1059 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001060
showard170873e2009-01-07 00:22:26 +00001061 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001062 """
1063 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001064 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001065 self._state.exit_status = 1
1066 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001067
1068
jadmanski0afbb632008-06-06 21:10:57 +00001069 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001070 self._get_pidfile_info()
1071 return self._state.exit_status
1072
1073
1074 def num_tests_failed(self):
1075 self._get_pidfile_info()
1076 assert self._state.num_tests_failed is not None
1077 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001078
1079
mbligh36768f02008-02-22 18:28:33 +00001080class Agent(object):
showard170873e2009-01-07 00:22:26 +00001081 def __init__(self, tasks, num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001082 self.active_task = None
1083 self.queue = Queue.Queue(0)
1084 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001085 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001086
showard170873e2009-01-07 00:22:26 +00001087 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1088 for task in tasks)
1089 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1090
jadmanski0afbb632008-06-06 21:10:57 +00001091 for task in tasks:
1092 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001093
1094
showard170873e2009-01-07 00:22:26 +00001095 def _union_ids(self, id_lists):
1096 return set(itertools.chain(*id_lists))
1097
1098
jadmanski0afbb632008-06-06 21:10:57 +00001099 def add_task(self, task):
1100 self.queue.put_nowait(task)
1101 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001102
1103
jadmanski0afbb632008-06-06 21:10:57 +00001104 def tick(self):
showard21baa452008-10-21 00:08:39 +00001105 while not self.is_done():
1106 if self.active_task and not self.active_task.is_done():
1107 self.active_task.poll()
1108 if not self.active_task.is_done():
1109 return
1110 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001111
1112
jadmanski0afbb632008-06-06 21:10:57 +00001113 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001114 logging.info("agent picking task")
jadmanski0afbb632008-06-06 21:10:57 +00001115 if self.active_task:
1116 assert self.active_task.is_done()
mbligh36768f02008-02-22 18:28:33 +00001117
jadmanski0afbb632008-06-06 21:10:57 +00001118 if not self.active_task.success:
1119 self.on_task_failure()
mblighe2586682008-02-29 22:45:46 +00001120
jadmanski0afbb632008-06-06 21:10:57 +00001121 self.active_task = None
1122 if not self.is_done():
1123 self.active_task = self.queue.get_nowait()
1124 if self.active_task:
1125 self.active_task.start()
mbligh36768f02008-02-22 18:28:33 +00001126
1127
jadmanski0afbb632008-06-06 21:10:57 +00001128 def on_task_failure(self):
1129 self.queue = Queue.Queue(0)
showardccbd6c52009-03-21 00:10:21 +00001130 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1131 # get reset.
1132 new_agent = Agent(self.active_task.failure_tasks)
1133 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001134
mblighe2586682008-02-29 22:45:46 +00001135
showard4c5374f2008-09-04 17:02:56 +00001136 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001137 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001138
1139
jadmanski0afbb632008-06-06 21:10:57 +00001140 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001141 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001142
1143
jadmanski0afbb632008-06-06 21:10:57 +00001144 def start(self):
1145 assert self.dispatcher
mbligh36768f02008-02-22 18:28:33 +00001146
jadmanski0afbb632008-06-06 21:10:57 +00001147 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001148
jadmanski0afbb632008-06-06 21:10:57 +00001149
mbligh36768f02008-02-22 18:28:33 +00001150class AgentTask(object):
showard170873e2009-01-07 00:22:26 +00001151 def __init__(self, cmd, working_directory=None, failure_tasks=[]):
jadmanski0afbb632008-06-06 21:10:57 +00001152 self.done = False
1153 self.failure_tasks = failure_tasks
1154 self.started = False
1155 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001156 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001157 self.task = None
1158 self.agent = None
1159 self.monitor = None
1160 self.success = None
showard170873e2009-01-07 00:22:26 +00001161 self.queue_entry_ids = []
1162 self.host_ids = []
1163 self.log_file = None
1164
1165
1166 def _set_ids(self, host=None, queue_entries=None):
1167 if queue_entries and queue_entries != [None]:
1168 self.host_ids = [entry.host.id for entry in queue_entries]
1169 self.queue_entry_ids = [entry.id for entry in queue_entries]
1170 else:
1171 assert host
1172 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001173
1174
jadmanski0afbb632008-06-06 21:10:57 +00001175 def poll(self):
jadmanski0afbb632008-06-06 21:10:57 +00001176 if self.monitor:
1177 self.tick(self.monitor.exit_code())
1178 else:
1179 self.finished(False)
mbligh36768f02008-02-22 18:28:33 +00001180
1181
jadmanski0afbb632008-06-06 21:10:57 +00001182 def tick(self, exit_code):
showard170873e2009-01-07 00:22:26 +00001183 if exit_code is None:
jadmanski0afbb632008-06-06 21:10:57 +00001184 return
jadmanski0afbb632008-06-06 21:10:57 +00001185 if exit_code == 0:
1186 success = True
1187 else:
1188 success = False
mbligh36768f02008-02-22 18:28:33 +00001189
jadmanski0afbb632008-06-06 21:10:57 +00001190 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001191
1192
jadmanski0afbb632008-06-06 21:10:57 +00001193 def is_done(self):
1194 return self.done
mbligh36768f02008-02-22 18:28:33 +00001195
1196
jadmanski0afbb632008-06-06 21:10:57 +00001197 def finished(self, success):
1198 self.done = True
1199 self.success = success
1200 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001201
1202
jadmanski0afbb632008-06-06 21:10:57 +00001203 def prolog(self):
1204 pass
mblighd64e5702008-04-04 21:39:28 +00001205
1206
jadmanski0afbb632008-06-06 21:10:57 +00001207 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001208 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001209
mbligh36768f02008-02-22 18:28:33 +00001210
jadmanski0afbb632008-06-06 21:10:57 +00001211 def cleanup(self):
showard170873e2009-01-07 00:22:26 +00001212 if self.monitor and self.log_file:
1213 _drone_manager.copy_to_results_repository(
1214 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001215
1216
jadmanski0afbb632008-06-06 21:10:57 +00001217 def epilog(self):
1218 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001219
1220
jadmanski0afbb632008-06-06 21:10:57 +00001221 def start(self):
1222 assert self.agent
1223
1224 if not self.started:
1225 self.prolog()
1226 self.run()
1227
1228 self.started = True
1229
1230
1231 def abort(self):
1232 if self.monitor:
1233 self.monitor.kill()
1234 self.done = True
1235 self.cleanup()
1236
1237
showard170873e2009-01-07 00:22:26 +00001238 def set_host_log_file(self, base_name, host):
1239 filename = '%s.%s' % (time.time(), base_name)
1240 self.log_file = os.path.join('hosts', host.hostname, filename)
1241
1242
showardde634ee2009-01-30 01:44:24 +00001243 def _get_consistent_execution_tag(self, queue_entries):
1244 first_execution_tag = queue_entries[0].execution_tag()
1245 for queue_entry in queue_entries[1:]:
1246 assert queue_entry.execution_tag() == first_execution_tag, (
1247 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1248 queue_entry,
1249 first_execution_tag,
1250 queue_entries[0]))
1251 return first_execution_tag
1252
1253
showard678df4f2009-02-04 21:36:39 +00001254 def _copy_and_parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001255 assert len(queue_entries) > 0
1256 assert self.monitor
1257 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001258 results_path = execution_tag + '/'
1259 _drone_manager.copy_to_results_repository(self.monitor.get_process(),
1260 results_path)
showardde634ee2009-01-30 01:44:24 +00001261
1262 reparse_task = FinalReparseTask(queue_entries)
1263 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1264
1265
jadmanski0afbb632008-06-06 21:10:57 +00001266 def run(self):
1267 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001268 self.monitor = PidfileRunMonitor()
1269 self.monitor.run(self.cmd, self._working_directory,
1270 nice_level=AUTOSERV_NICE_LEVEL,
1271 log_file=self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001272
1273
1274class RepairTask(AgentTask):
showarde788ea62008-11-17 21:02:47 +00001275 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001276 """\
showard170873e2009-01-07 00:22:26 +00001277 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001278 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001279 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001280 # normalize the protection name
1281 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001282
jadmanski0afbb632008-06-06 21:10:57 +00001283 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001284 self.queue_entry_to_fail = queue_entry
1285 # *don't* include the queue entry in IDs -- if the queue entry is
1286 # aborted, we want to leave the repair task running
1287 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001288
1289 self.create_temp_resultsdir('.repair')
1290 cmd = [_autoserv_path , '-p', '-R', '-m', host.hostname,
1291 '-r', _drone_manager.absolute_path(self.temp_results_dir),
1292 '--host-protection', protection]
1293 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1294
showard170873e2009-01-07 00:22:26 +00001295 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001296
mbligh36768f02008-02-22 18:28:33 +00001297
jadmanski0afbb632008-06-06 21:10:57 +00001298 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001299 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001300 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001301 if self.queue_entry_to_fail:
1302 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001303
1304
showardde634ee2009-01-30 01:44:24 +00001305 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001306 assert self.queue_entry_to_fail
1307
1308 if self.queue_entry_to_fail.meta_host:
1309 return # don't fail metahost entries, they'll be reassigned
1310
1311 self.queue_entry_to_fail.update_from_database()
1312 if self.queue_entry_to_fail.status != 'Queued':
1313 return # entry has been aborted
1314
1315 self.queue_entry_to_fail.set_execution_subdir()
showard678df4f2009-02-04 21:36:39 +00001316 # copy results logs into the normal place for job results
1317 _drone_manager.copy_results_on_drone(
1318 self.monitor.get_process(),
1319 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001320 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001321
showardccbd6c52009-03-21 00:10:21 +00001322 self._copy_and_parse_results([self.queue_entry_to_fail])
1323 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001324
1325
jadmanski0afbb632008-06-06 21:10:57 +00001326 def epilog(self):
1327 super(RepairTask, self).epilog()
1328 if self.success:
1329 self.host.set_status('Ready')
1330 else:
1331 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001332 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001333 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001334
1335
showard8fe93b52008-11-18 17:53:22 +00001336class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001337 def epilog(self):
1338 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001339 should_copy_results = (self.queue_entry and not self.success
1340 and not self.queue_entry.meta_host)
1341 if should_copy_results:
1342 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001343 destination = os.path.join(self.queue_entry.execution_tag(),
1344 os.path.basename(self.log_file))
1345 _drone_manager.copy_to_results_repository(
1346 self.monitor.get_process(), self.log_file,
1347 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001348
1349
1350class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001351 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001352 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001353 self.host = host or queue_entry.host
1354 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001355
jadmanski0afbb632008-06-06 21:10:57 +00001356 self.create_temp_resultsdir('.verify')
showard170873e2009-01-07 00:22:26 +00001357 cmd = [_autoserv_path, '-p', '-v', '-m', self.host.hostname, '-r',
1358 _drone_manager.absolute_path(self.temp_results_dir)]
showarde788ea62008-11-17 21:02:47 +00001359 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001360 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1361 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001362
showard170873e2009-01-07 00:22:26 +00001363 self.set_host_log_file('verify', self.host)
1364 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001365
1366
jadmanski0afbb632008-06-06 21:10:57 +00001367 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001368 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001369 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001370 if self.queue_entry:
1371 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001372 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001373
1374
jadmanski0afbb632008-06-06 21:10:57 +00001375 def epilog(self):
1376 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001377
jadmanski0afbb632008-06-06 21:10:57 +00001378 if self.success:
1379 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001380
1381
mbligh36768f02008-02-22 18:28:33 +00001382class QueueTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001383 def __init__(self, job, queue_entries, cmd):
jadmanski0afbb632008-06-06 21:10:57 +00001384 self.job = job
1385 self.queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001386 super(QueueTask, self).__init__(cmd, self._execution_tag())
1387 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001388
1389
showard170873e2009-01-07 00:22:26 +00001390 def _format_keyval(self, key, value):
1391 return '%s=%s' % (key, value)
mbligh36768f02008-02-22 18:28:33 +00001392
1393
showard73ec0442009-02-07 02:05:20 +00001394 def _keyval_path(self):
1395 return os.path.join(self._execution_tag(), 'keyval')
1396
1397
1398 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1399 keyval_contents = '\n'.join(self._format_keyval(key, value)
1400 for key, value in keyval_dict.iteritems())
1401 # always end with a newline to allow additional keyvals to be written
1402 keyval_contents += '\n'
1403 _drone_manager.attach_file_to_execution(self._execution_tag(),
1404 keyval_contents,
1405 file_path=keyval_path)
1406
1407
1408 def _write_keyvals_before_job(self, keyval_dict):
1409 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1410
1411
1412 def _write_keyval_after_job(self, field, value):
showard170873e2009-01-07 00:22:26 +00001413 assert self.monitor and self.monitor.has_process()
showard170873e2009-01-07 00:22:26 +00001414 _drone_manager.write_lines_to_file(
showard73ec0442009-02-07 02:05:20 +00001415 self._keyval_path(), [self._format_keyval(field, value)],
showard35162b02009-03-03 02:17:30 +00001416 paired_with_process=self.monitor.get_process())
showardd8e548a2008-09-09 03:04:57 +00001417
1418
showard170873e2009-01-07 00:22:26 +00001419 def _write_host_keyvals(self, host):
1420 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1421 host.hostname)
1422 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001423 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1424 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001425
1426
showard170873e2009-01-07 00:22:26 +00001427 def _execution_tag(self):
1428 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001429
1430
jadmanski0afbb632008-06-06 21:10:57 +00001431 def prolog(self):
showard73ec0442009-02-07 02:05:20 +00001432 queued = int(time.mktime(self.job.created_on.timetuple()))
1433 self._write_keyvals_before_job({'job_queued': queued})
jadmanski0afbb632008-06-06 21:10:57 +00001434 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001435 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001436 queue_entry.set_status('Running')
1437 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001438 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001439 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001440 assert len(self.queue_entries) == 1
1441 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001442
1443
showard35162b02009-03-03 02:17:30 +00001444 def _write_lost_process_error_file(self):
1445 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1446 _drone_manager.write_lines_to_file(error_file_path,
1447 [_LOST_PROCESS_ERROR])
1448
1449
showard97aed502008-11-04 02:01:24 +00001450 def _finish_task(self, success):
showard35162b02009-03-03 02:17:30 +00001451 if self.monitor.has_process():
1452 self._write_keyval_after_job("job_finished", int(time.time()))
1453 self._copy_and_parse_results(self.queue_entries)
1454
1455 if self.monitor.lost_process:
1456 self._write_lost_process_error_file()
1457 for queue_entry in self.queue_entries:
1458 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001459
1460
showardcbd74612008-11-19 21:42:02 +00001461 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001462 _drone_manager.write_lines_to_file(
1463 os.path.join(self._execution_tag(), 'status.log'),
1464 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001465 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001466
1467
jadmanskif7fa2cc2008-10-01 14:13:23 +00001468 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001469 if not self.monitor or not self.monitor.has_process():
1470 return
1471
jadmanskif7fa2cc2008-10-01 14:13:23 +00001472 # build up sets of all the aborted_by and aborted_on values
1473 aborted_by, aborted_on = set(), set()
1474 for queue_entry in self.queue_entries:
1475 if queue_entry.aborted_by:
1476 aborted_by.add(queue_entry.aborted_by)
1477 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1478 aborted_on.add(t)
1479
1480 # extract some actual, unique aborted by value and write it out
1481 assert len(aborted_by) <= 1
1482 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001483 aborted_by_value = aborted_by.pop()
1484 aborted_on_value = max(aborted_on)
1485 else:
1486 aborted_by_value = 'autotest_system'
1487 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001488
showarda0382352009-02-11 23:36:43 +00001489 self._write_keyval_after_job("aborted_by", aborted_by_value)
1490 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001491
showardcbd74612008-11-19 21:42:02 +00001492 aborted_on_string = str(datetime.datetime.fromtimestamp(
1493 aborted_on_value))
1494 self._write_status_comment('Job aborted by %s on %s' %
1495 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001496
1497
jadmanski0afbb632008-06-06 21:10:57 +00001498 def abort(self):
1499 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001500 self._log_abort()
showard97aed502008-11-04 02:01:24 +00001501 self._finish_task(False)
jadmanskic2ac77f2008-05-16 21:44:04 +00001502
1503
showard21baa452008-10-21 00:08:39 +00001504 def _reboot_hosts(self):
1505 reboot_after = self.job.reboot_after
1506 do_reboot = False
showard0fc38302008-10-23 00:44:07 +00001507 if reboot_after == models.RebootAfter.ALWAYS:
showard21baa452008-10-21 00:08:39 +00001508 do_reboot = True
showard0fc38302008-10-23 00:44:07 +00001509 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
showard21baa452008-10-21 00:08:39 +00001510 num_tests_failed = self.monitor.num_tests_failed()
1511 do_reboot = (self.success and num_tests_failed == 0)
1512
showard8ebca792008-11-04 21:54:22 +00001513 for queue_entry in self.queue_entries:
1514 if do_reboot:
showard45ae8192008-11-05 19:32:53 +00001515 # don't pass the queue entry to the CleanupTask. if the cleanup
showardfa8629c2008-11-04 16:51:23 +00001516 # fails, the job doesn't care -- it's over.
showard45ae8192008-11-05 19:32:53 +00001517 cleanup_task = CleanupTask(host=queue_entry.get_host())
1518 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
showard8ebca792008-11-04 21:54:22 +00001519 else:
1520 queue_entry.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001521
1522
jadmanski0afbb632008-06-06 21:10:57 +00001523 def epilog(self):
1524 super(QueueTask, self).epilog()
showard97aed502008-11-04 02:01:24 +00001525 self._finish_task(self.success)
showard21baa452008-10-21 00:08:39 +00001526 self._reboot_hosts()
mblighbb421852008-03-11 22:36:16 +00001527
showardb18134f2009-03-20 20:52:18 +00001528 logging.info("queue_task finished with succes=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001529
1530
mblighbb421852008-03-11 22:36:16 +00001531class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001532 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001533 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
jadmanski0afbb632008-06-06 21:10:57 +00001534 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001535
1536
jadmanski0afbb632008-06-06 21:10:57 +00001537 def run(self):
1538 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001539
1540
jadmanski0afbb632008-06-06 21:10:57 +00001541 def prolog(self):
1542 # recovering an existing process - don't do prolog
1543 pass
mblighbb421852008-03-11 22:36:16 +00001544
1545
showard8fe93b52008-11-18 17:53:22 +00001546class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001547 def __init__(self, host=None, queue_entry=None):
1548 assert bool(host) ^ bool(queue_entry)
1549 if queue_entry:
1550 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001551 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001552 self.host = host
showard170873e2009-01-07 00:22:26 +00001553
1554 self.create_temp_resultsdir('.cleanup')
1555 self.cmd = [_autoserv_path, '-p', '--cleanup', '-m', host.hostname,
1556 '-r', _drone_manager.absolute_path(self.temp_results_dir)]
showarde788ea62008-11-17 21:02:47 +00001557 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001558 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1559 failure_tasks=[repair_task])
1560
1561 self._set_ids(host=host, queue_entries=[queue_entry])
1562 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001563
mblighd5c95802008-03-05 00:33:46 +00001564
jadmanski0afbb632008-06-06 21:10:57 +00001565 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001566 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001567 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001568 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001569
mblighd5c95802008-03-05 00:33:46 +00001570
showard21baa452008-10-21 00:08:39 +00001571 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001572 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001573 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001574 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001575 self.host.update_field('dirty', 0)
1576
1577
mblighd5c95802008-03-05 00:33:46 +00001578class AbortTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001579 def __init__(self, queue_entry, agents_to_abort):
jadmanski0afbb632008-06-06 21:10:57 +00001580 super(AbortTask, self).__init__('')
showard170873e2009-01-07 00:22:26 +00001581 self.queue_entry = queue_entry
1582 # don't use _set_ids, since we don't want to set the host_ids
1583 self.queue_entry_ids = [queue_entry.id]
1584 self.agents_to_abort = agents_to_abort
mbligh36768f02008-02-22 18:28:33 +00001585
1586
jadmanski0afbb632008-06-06 21:10:57 +00001587 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001588 logging.info("starting abort on host %s, job %s",
1589 self.queue_entry.host_id, self.queue_entry.job_id)
mbligh36768f02008-02-22 18:28:33 +00001590
mblighd64e5702008-04-04 21:39:28 +00001591
jadmanski0afbb632008-06-06 21:10:57 +00001592 def epilog(self):
1593 super(AbortTask, self).epilog()
1594 self.queue_entry.set_status('Aborted')
1595 self.success = True
1596
1597
1598 def run(self):
1599 for agent in self.agents_to_abort:
1600 if (agent.active_task):
1601 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +00001602
1603
showard97aed502008-11-04 02:01:24 +00001604class FinalReparseTask(AgentTask):
showard97aed502008-11-04 02:01:24 +00001605 _num_running_parses = 0
1606
1607 def __init__(self, queue_entries):
1608 self._queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001609 # don't use _set_ids, since we don't want to set the host_ids
1610 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00001611 self._parse_started = False
1612
1613 assert len(queue_entries) > 0
1614 queue_entry = queue_entries[0]
showard97aed502008-11-04 02:01:24 +00001615
showard170873e2009-01-07 00:22:26 +00001616 self._execution_tag = queue_entry.execution_tag()
1617 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1618 self._autoserv_monitor = PidfileRunMonitor()
1619 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1620 self._final_status = self._determine_final_status()
1621
showard97aed502008-11-04 02:01:24 +00001622 if _testing_mode:
1623 self.cmd = 'true'
showard170873e2009-01-07 00:22:26 +00001624 else:
1625 super(FinalReparseTask, self).__init__(
1626 cmd=self._generate_parse_command(),
1627 working_directory=self._execution_tag)
showard97aed502008-11-04 02:01:24 +00001628
showard170873e2009-01-07 00:22:26 +00001629 self.log_file = os.path.join(self._execution_tag, '.parse.log')
showard97aed502008-11-04 02:01:24 +00001630
1631
1632 @classmethod
1633 def _increment_running_parses(cls):
1634 cls._num_running_parses += 1
1635
1636
1637 @classmethod
1638 def _decrement_running_parses(cls):
1639 cls._num_running_parses -= 1
1640
1641
1642 @classmethod
1643 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00001644 return (cls._num_running_parses <
1645 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00001646
1647
showard170873e2009-01-07 00:22:26 +00001648 def _determine_final_status(self):
1649 # we'll use a PidfileRunMonitor to read the autoserv exit status
1650 if self._autoserv_monitor.exit_code() == 0:
1651 return models.HostQueueEntry.Status.COMPLETED
1652 return models.HostQueueEntry.Status.FAILED
1653
1654
showard97aed502008-11-04 02:01:24 +00001655 def prolog(self):
1656 super(FinalReparseTask, self).prolog()
1657 for queue_entry in self._queue_entries:
1658 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
1659
1660
1661 def epilog(self):
1662 super(FinalReparseTask, self).epilog()
showard97aed502008-11-04 02:01:24 +00001663 for queue_entry in self._queue_entries:
showard170873e2009-01-07 00:22:26 +00001664 queue_entry.set_status(self._final_status)
showard97aed502008-11-04 02:01:24 +00001665
1666
showard2bab8f42008-11-12 18:15:22 +00001667 def _generate_parse_command(self):
showard170873e2009-01-07 00:22:26 +00001668 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
1669 self._results_dir]
showard97aed502008-11-04 02:01:24 +00001670
1671
1672 def poll(self):
1673 # override poll to keep trying to start until the parse count goes down
1674 # and we can, at which point we revert to default behavior
1675 if self._parse_started:
1676 super(FinalReparseTask, self).poll()
1677 else:
1678 self._try_starting_parse()
1679
1680
1681 def run(self):
1682 # override run() to not actually run unless we can
1683 self._try_starting_parse()
1684
1685
1686 def _try_starting_parse(self):
1687 if not self._can_run_new_parse():
1688 return
showard170873e2009-01-07 00:22:26 +00001689
showard678df4f2009-02-04 21:36:39 +00001690 # make sure we actually have results to parse
showard35162b02009-03-03 02:17:30 +00001691 # this should never happen in normal operation
showard678df4f2009-02-04 21:36:39 +00001692 if not self._autoserv_monitor.has_process():
1693 email_manager.manager.enqueue_notify_email(
1694 'No results to parse',
1695 'No results to parse at %s' % self._autoserv_monitor.pidfile_id)
1696 self.finished(False)
1697 return
1698
showard97aed502008-11-04 02:01:24 +00001699 # actually run the parse command
showard170873e2009-01-07 00:22:26 +00001700 self.monitor = PidfileRunMonitor()
1701 self.monitor.run(self.cmd, self._working_directory,
1702 log_file=self.log_file,
1703 pidfile_name='.parser_execute',
1704 paired_with_pidfile=self._autoserv_monitor.pidfile_id)
1705
showard97aed502008-11-04 02:01:24 +00001706 self._increment_running_parses()
1707 self._parse_started = True
1708
1709
1710 def finished(self, success):
1711 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00001712 if self._parse_started:
1713 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00001714
1715
showardc9ae1782009-01-30 01:42:37 +00001716class SetEntryPendingTask(AgentTask):
1717 def __init__(self, queue_entry):
1718 super(SetEntryPendingTask, self).__init__(cmd='')
1719 self._queue_entry = queue_entry
1720 self._set_ids(queue_entries=[queue_entry])
1721
1722
1723 def run(self):
1724 agent = self._queue_entry.on_pending()
1725 if agent:
1726 self.agent.dispatcher.add_agent(agent)
1727 self.finished(True)
1728
1729
showarda3c58572009-03-12 20:36:59 +00001730class DBError(Exception):
1731 """Raised by the DBObject constructor when its select fails."""
1732
1733
mbligh36768f02008-02-22 18:28:33 +00001734class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00001735 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00001736
1737 # Subclasses MUST override these:
1738 _table_name = ''
1739 _fields = ()
1740
showarda3c58572009-03-12 20:36:59 +00001741 # A mapping from (type, id) to the instance of the object for that
1742 # particular id. This prevents us from creating new Job() and Host()
1743 # instances for every HostQueueEntry object that we instantiate as
1744 # multiple HQEs often share the same Job.
1745 _instances_by_type_and_id = weakref.WeakValueDictionary()
1746 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00001747
showarda3c58572009-03-12 20:36:59 +00001748
1749 def __new__(cls, id=None, **kwargs):
1750 """
1751 Look to see if we already have an instance for this particular type
1752 and id. If so, use it instead of creating a duplicate instance.
1753 """
1754 if id is not None:
1755 instance = cls._instances_by_type_and_id.get((cls, id))
1756 if instance:
1757 return instance
1758 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
1759
1760
1761 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00001762 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00001763 assert self._table_name, '_table_name must be defined in your class'
1764 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00001765 if not new_record:
1766 if self._initialized and not always_query:
1767 return # We've already been initialized.
1768 if id is None:
1769 id = row[0]
1770 # Tell future constructors to use us instead of re-querying while
1771 # this instance is still around.
1772 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00001773
showard6ae5ea92009-02-25 00:11:51 +00001774 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00001775
jadmanski0afbb632008-06-06 21:10:57 +00001776 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00001777
jadmanski0afbb632008-06-06 21:10:57 +00001778 if row is None:
showardccbd6c52009-03-21 00:10:21 +00001779 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00001780
showarda3c58572009-03-12 20:36:59 +00001781 if self._initialized:
1782 differences = self._compare_fields_in_row(row)
1783 if differences:
showard7629f142009-03-27 21:02:02 +00001784 logging.warn(
1785 'initialized %s %s instance requery is updating: %s',
1786 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00001787 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00001788 self._initialized = True
1789
1790
1791 @classmethod
1792 def _clear_instance_cache(cls):
1793 """Used for testing, clear the internal instance cache."""
1794 cls._instances_by_type_and_id.clear()
1795
1796
showardccbd6c52009-03-21 00:10:21 +00001797 def _fetch_row_from_db(self, row_id):
1798 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
1799 rows = _db.execute(sql, (row_id,))
1800 if not rows:
1801 raise DBError("row not found (table=%s, id=%s)"
1802 % (self.__table, id))
1803 return rows[0]
1804
1805
showarda3c58572009-03-12 20:36:59 +00001806 def _assert_row_length(self, row):
1807 assert len(row) == len(self._fields), (
1808 "table = %s, row = %s/%d, fields = %s/%d" % (
1809 self.__table, row, len(row), self._fields, len(self._fields)))
1810
1811
1812 def _compare_fields_in_row(self, row):
1813 """
1814 Given a row as returned by a SELECT query, compare it to our existing
1815 in memory fields.
1816
1817 @param row - A sequence of values corresponding to fields named in
1818 The class attribute _fields.
1819
1820 @returns A dictionary listing the differences keyed by field name
1821 containing tuples of (current_value, row_value).
1822 """
1823 self._assert_row_length(row)
1824 differences = {}
1825 for field, row_value in itertools.izip(self._fields, row):
1826 current_value = getattr(self, field)
1827 if current_value != row_value:
1828 differences[field] = (current_value, row_value)
1829 return differences
showard2bab8f42008-11-12 18:15:22 +00001830
1831
1832 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00001833 """
1834 Update our field attributes using a single row returned by SELECT.
1835
1836 @param row - A sequence of values corresponding to fields named in
1837 the class fields list.
1838 """
1839 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00001840
showard2bab8f42008-11-12 18:15:22 +00001841 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00001842 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00001843 setattr(self, field, value)
1844 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00001845
showard2bab8f42008-11-12 18:15:22 +00001846 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00001847
mblighe2586682008-02-29 22:45:46 +00001848
showardccbd6c52009-03-21 00:10:21 +00001849 def update_from_database(self):
1850 assert self.id is not None
1851 row = self._fetch_row_from_db(self.id)
1852 self._update_fields_from_row(row)
1853
1854
jadmanski0afbb632008-06-06 21:10:57 +00001855 def count(self, where, table = None):
1856 if not table:
1857 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00001858
jadmanski0afbb632008-06-06 21:10:57 +00001859 rows = _db.execute("""
1860 SELECT count(*) FROM %s
1861 WHERE %s
1862 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00001863
jadmanski0afbb632008-06-06 21:10:57 +00001864 assert len(rows) == 1
1865
1866 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001867
1868
mblighf8c624d2008-07-03 16:58:45 +00001869 def update_field(self, field, value, condition=''):
showard2bab8f42008-11-12 18:15:22 +00001870 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00001871
showard2bab8f42008-11-12 18:15:22 +00001872 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00001873 return
mbligh36768f02008-02-22 18:28:33 +00001874
mblighf8c624d2008-07-03 16:58:45 +00001875 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
1876 if condition:
1877 query += ' AND (%s)' % condition
jadmanski0afbb632008-06-06 21:10:57 +00001878 _db.execute(query, (value, self.id))
1879
showard2bab8f42008-11-12 18:15:22 +00001880 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00001881
1882
jadmanski0afbb632008-06-06 21:10:57 +00001883 def save(self):
1884 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00001885 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00001886 columns = ','.join([str(key) for key in keys])
1887 values = ['"%s"' % self.__dict__[key] for key in keys]
showard89f84db2009-03-12 20:39:13 +00001888 values_str = ','.join(values)
1889 query = ('INSERT INTO %s (%s) VALUES (%s)' %
1890 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00001891 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00001892 # Update our id to the one the database just assigned to us.
1893 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00001894
1895
jadmanski0afbb632008-06-06 21:10:57 +00001896 def delete(self):
showarda3c58572009-03-12 20:36:59 +00001897 self._instances_by_type_and_id.pop((type(self), id), None)
1898 self._initialized = False
1899 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00001900 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
1901 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00001902
1903
showard63a34772008-08-18 19:32:50 +00001904 @staticmethod
1905 def _prefix_with(string, prefix):
1906 if string:
1907 string = prefix + string
1908 return string
1909
1910
jadmanski0afbb632008-06-06 21:10:57 +00001911 @classmethod
showard989f25d2008-10-01 11:38:11 +00001912 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00001913 """
1914 Construct instances of our class based on the given database query.
1915
1916 @yields One class instance for each row fetched.
1917 """
showard63a34772008-08-18 19:32:50 +00001918 order_by = cls._prefix_with(order_by, 'ORDER BY ')
1919 where = cls._prefix_with(where, 'WHERE ')
1920 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00001921 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00001922 'joins' : joins,
1923 'where' : where,
1924 'order_by' : order_by})
1925 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00001926 for row in rows:
1927 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00001928
mbligh36768f02008-02-22 18:28:33 +00001929
1930class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001931 _table_name = 'ineligible_host_queues'
1932 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00001933
1934
showard89f84db2009-03-12 20:39:13 +00001935class AtomicGroup(DBObject):
1936 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00001937 _fields = ('id', 'name', 'description', 'max_number_of_machines',
1938 'invalid')
showard89f84db2009-03-12 20:39:13 +00001939
1940
showard989f25d2008-10-01 11:38:11 +00001941class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001942 _table_name = 'labels'
1943 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00001944 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00001945
1946
mbligh36768f02008-02-22 18:28:33 +00001947class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001948 _table_name = 'hosts'
1949 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
1950 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
1951
1952
jadmanski0afbb632008-06-06 21:10:57 +00001953 def current_task(self):
1954 rows = _db.execute("""
1955 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
1956 """, (self.id,))
1957
1958 if len(rows) == 0:
1959 return None
1960 else:
1961 assert len(rows) == 1
1962 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00001963 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00001964
1965
jadmanski0afbb632008-06-06 21:10:57 +00001966 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00001967 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001968 if self.current_task():
1969 self.current_task().requeue()
1970
showard6ae5ea92009-02-25 00:11:51 +00001971
jadmanski0afbb632008-06-06 21:10:57 +00001972 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00001973 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00001974 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00001975
1976
showard170873e2009-01-07 00:22:26 +00001977 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00001978 """
showard170873e2009-01-07 00:22:26 +00001979 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00001980 """
1981 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00001982 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00001983 FROM labels
1984 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00001985 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00001986 ORDER BY labels.name
1987 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00001988 platform = None
1989 all_labels = []
1990 for label_name, is_platform in rows:
1991 if is_platform:
1992 platform = label_name
1993 all_labels.append(label_name)
1994 return platform, all_labels
1995
1996
1997 def reverify_tasks(self):
1998 cleanup_task = CleanupTask(host=self)
1999 verify_task = VerifyTask(host=self)
2000 # just to make sure this host does not get taken away
2001 self.set_status('Cleaning')
2002 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002003
2004
mbligh36768f02008-02-22 18:28:33 +00002005class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002006 _table_name = 'host_queue_entries'
2007 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002008 'active', 'complete', 'deleted', 'execution_subdir',
2009 'atomic_group_id')
showard6ae5ea92009-02-25 00:11:51 +00002010
2011
showarda3c58572009-03-12 20:36:59 +00002012 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002013 assert id or row
showarda3c58572009-03-12 20:36:59 +00002014 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002015 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002016
jadmanski0afbb632008-06-06 21:10:57 +00002017 if self.host_id:
2018 self.host = Host(self.host_id)
2019 else:
2020 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002021
showard170873e2009-01-07 00:22:26 +00002022 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002023 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002024
2025
showard89f84db2009-03-12 20:39:13 +00002026 @classmethod
2027 def clone(cls, template):
2028 """
2029 Creates a new row using the values from a template instance.
2030
2031 The new instance will not exist in the database or have a valid
2032 id attribute until its save() method is called.
2033 """
2034 assert isinstance(template, cls)
2035 new_row = [getattr(template, field) for field in cls._fields]
2036 clone = cls(row=new_row, new_record=True)
2037 clone.id = None
2038 return clone
2039
2040
showardc85c21b2008-11-24 22:17:37 +00002041 def _view_job_url(self):
2042 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2043
2044
jadmanski0afbb632008-06-06 21:10:57 +00002045 def set_host(self, host):
2046 if host:
2047 self.queue_log_record('Assigning host ' + host.hostname)
2048 self.update_field('host_id', host.id)
2049 self.update_field('active', True)
2050 self.block_host(host.id)
2051 else:
2052 self.queue_log_record('Releasing host')
2053 self.unblock_host(self.host.id)
2054 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002055
jadmanski0afbb632008-06-06 21:10:57 +00002056 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002057
2058
jadmanski0afbb632008-06-06 21:10:57 +00002059 def get_host(self):
2060 return self.host
mbligh36768f02008-02-22 18:28:33 +00002061
2062
jadmanski0afbb632008-06-06 21:10:57 +00002063 def queue_log_record(self, log_line):
2064 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002065 _drone_manager.write_lines_to_file(self.queue_log_path,
2066 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002067
2068
jadmanski0afbb632008-06-06 21:10:57 +00002069 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002070 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002071 row = [0, self.job.id, host_id]
2072 block = IneligibleHostQueue(row=row, new_record=True)
2073 block.save()
mblighe2586682008-02-29 22:45:46 +00002074
2075
jadmanski0afbb632008-06-06 21:10:57 +00002076 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002077 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002078 blocks = IneligibleHostQueue.fetch(
2079 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2080 for block in blocks:
2081 block.delete()
mblighe2586682008-02-29 22:45:46 +00002082
2083
showard2bab8f42008-11-12 18:15:22 +00002084 def set_execution_subdir(self, subdir=None):
2085 if subdir is None:
2086 assert self.get_host()
2087 subdir = self.get_host().hostname
2088 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002089
2090
showard6355f6b2008-12-05 18:52:13 +00002091 def _get_hostname(self):
2092 if self.host:
2093 return self.host.hostname
2094 return 'no host'
2095
2096
showard170873e2009-01-07 00:22:26 +00002097 def __str__(self):
2098 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2099
2100
jadmanski0afbb632008-06-06 21:10:57 +00002101 def set_status(self, status):
mblighf8c624d2008-07-03 16:58:45 +00002102 abort_statuses = ['Abort', 'Aborting', 'Aborted']
2103 if status not in abort_statuses:
2104 condition = ' AND '.join(['status <> "%s"' % x
2105 for x in abort_statuses])
2106 else:
2107 condition = ''
2108 self.update_field('status', status, condition=condition)
2109
showardb18134f2009-03-20 20:52:18 +00002110 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002111
showardc85c21b2008-11-24 22:17:37 +00002112 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002113 self.update_field('complete', False)
2114 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002115
jadmanski0afbb632008-06-06 21:10:57 +00002116 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showarde58e3f82008-11-20 19:04:59 +00002117 'Aborting']:
jadmanski0afbb632008-06-06 21:10:57 +00002118 self.update_field('complete', False)
2119 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002120
showardc85c21b2008-11-24 22:17:37 +00002121 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002122 self.update_field('complete', True)
2123 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002124
2125 should_email_status = (status.lower() in _notify_email_statuses or
2126 'all' in _notify_email_statuses)
2127 if should_email_status:
2128 self._email_on_status(status)
2129
2130 self._email_on_job_complete()
2131
2132
2133 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002134 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002135
2136 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2137 self.job.id, self.job.name, hostname, status)
2138 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2139 self.job.id, self.job.name, hostname, status,
2140 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002141 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002142
2143
2144 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002145 if not self.job.is_finished():
2146 return
showard542e8402008-09-19 20:16:18 +00002147
showardc85c21b2008-11-24 22:17:37 +00002148 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002149 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002150 for queue_entry in hosts_queue:
2151 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002152 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002153 queue_entry.status))
2154
2155 summary_text = "\n".join(summary_text)
2156 status_counts = models.Job.objects.get_status_counts(
2157 [self.job.id])[self.job.id]
2158 status = ', '.join('%d %s' % (count, status) for status, count
2159 in status_counts.iteritems())
2160
2161 subject = 'Autotest: Job ID: %s "%s" %s' % (
2162 self.job.id, self.job.name, status)
2163 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2164 self.job.id, self.job.name, status, self._view_job_url(),
2165 summary_text)
showard170873e2009-01-07 00:22:26 +00002166 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002167
2168
showard89f84db2009-03-12 20:39:13 +00002169 def run(self, assigned_host=None):
2170 if self.meta_host is not None or self.atomic_group_id is not None:
jadmanski0afbb632008-06-06 21:10:57 +00002171 assert assigned_host
2172 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00002173 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00002174
showardb18134f2009-03-20 20:52:18 +00002175 logging.info("%s/%s/%s scheduled on %s, status=%s",
2176 self.job.name, self.meta_host, self.atomic_group_id,
2177 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002178
jadmanski0afbb632008-06-06 21:10:57 +00002179 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002180
showard6ae5ea92009-02-25 00:11:51 +00002181
jadmanski0afbb632008-06-06 21:10:57 +00002182 def requeue(self):
2183 self.set_status('Queued')
showardde634ee2009-01-30 01:44:24 +00002184 # verify/cleanup failure sets the execution subdir, so reset it here
2185 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002186 if self.meta_host:
2187 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002188
2189
jadmanski0afbb632008-06-06 21:10:57 +00002190 def handle_host_failure(self):
2191 """\
2192 Called when this queue entry's host has failed verification and
2193 repair.
2194 """
2195 assert not self.meta_host
2196 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002197 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002198
2199
jadmanskif7fa2cc2008-10-01 14:13:23 +00002200 @property
2201 def aborted_by(self):
2202 self._load_abort_info()
2203 return self._aborted_by
2204
2205
2206 @property
2207 def aborted_on(self):
2208 self._load_abort_info()
2209 return self._aborted_on
2210
2211
2212 def _load_abort_info(self):
2213 """ Fetch info about who aborted the job. """
2214 if hasattr(self, "_aborted_by"):
2215 return
2216 rows = _db.execute("""
2217 SELECT users.login, aborted_host_queue_entries.aborted_on
2218 FROM aborted_host_queue_entries
2219 INNER JOIN users
2220 ON users.id = aborted_host_queue_entries.aborted_by_id
2221 WHERE aborted_host_queue_entries.queue_entry_id = %s
2222 """, (self.id,))
2223 if rows:
2224 self._aborted_by, self._aborted_on = rows[0]
2225 else:
2226 self._aborted_by = self._aborted_on = None
2227
2228
showardb2e2c322008-10-14 17:33:55 +00002229 def on_pending(self):
2230 """
2231 Called when an entry in a synchronous job has passed verify. If the
2232 job is ready to run, returns an agent to run the job. Returns None
2233 otherwise.
2234 """
2235 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002236 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00002237 if self.job.is_ready():
2238 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00002239 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00002240 return None
2241
2242
showard170873e2009-01-07 00:22:26 +00002243 def abort(self, dispatcher, agents_to_abort=[]):
showard1be97432008-10-17 15:30:45 +00002244 host = self.get_host()
showard9d9ffd52008-11-09 23:14:35 +00002245 if self.active and host:
showard170873e2009-01-07 00:22:26 +00002246 dispatcher.add_agent(Agent(tasks=host.reverify_tasks()))
showard1be97432008-10-17 15:30:45 +00002247
showard170873e2009-01-07 00:22:26 +00002248 abort_task = AbortTask(self, agents_to_abort)
showard1be97432008-10-17 15:30:45 +00002249 self.set_status('Aborting')
showard170873e2009-01-07 00:22:26 +00002250 dispatcher.add_agent(Agent(tasks=[abort_task], num_processes=0))
2251
2252 def execution_tag(self):
2253 assert self.execution_subdir
2254 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002255
2256
mbligh36768f02008-02-22 18:28:33 +00002257class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002258 _table_name = 'jobs'
2259 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2260 'control_type', 'created_on', 'synch_count', 'timeout',
2261 'run_verify', 'email_list', 'reboot_before', 'reboot_after')
2262
2263
showarda3c58572009-03-12 20:36:59 +00002264 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002265 assert id or row
showarda3c58572009-03-12 20:36:59 +00002266 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002267
mblighe2586682008-02-29 22:45:46 +00002268
jadmanski0afbb632008-06-06 21:10:57 +00002269 def is_server_job(self):
2270 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002271
2272
showard170873e2009-01-07 00:22:26 +00002273 def tag(self):
2274 return "%s-%s" % (self.id, self.owner)
2275
2276
jadmanski0afbb632008-06-06 21:10:57 +00002277 def get_host_queue_entries(self):
2278 rows = _db.execute("""
2279 SELECT * FROM host_queue_entries
2280 WHERE job_id= %s
2281 """, (self.id,))
2282 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002283
jadmanski0afbb632008-06-06 21:10:57 +00002284 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002285
jadmanski0afbb632008-06-06 21:10:57 +00002286 return entries
mbligh36768f02008-02-22 18:28:33 +00002287
2288
jadmanski0afbb632008-06-06 21:10:57 +00002289 def set_status(self, status, update_queues=False):
2290 self.update_field('status',status)
2291
2292 if update_queues:
2293 for queue_entry in self.get_host_queue_entries():
2294 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002295
2296
jadmanski0afbb632008-06-06 21:10:57 +00002297 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00002298 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
2299 status='Pending')
2300 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00002301
2302
jadmanski0afbb632008-06-06 21:10:57 +00002303 def num_machines(self, clause = None):
2304 sql = "job_id=%s" % self.id
2305 if clause:
2306 sql += " AND (%s)" % clause
2307 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002308
2309
jadmanski0afbb632008-06-06 21:10:57 +00002310 def num_queued(self):
2311 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002312
2313
jadmanski0afbb632008-06-06 21:10:57 +00002314 def num_active(self):
2315 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002316
2317
jadmanski0afbb632008-06-06 21:10:57 +00002318 def num_complete(self):
2319 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002320
2321
jadmanski0afbb632008-06-06 21:10:57 +00002322 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002323 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002324
mbligh36768f02008-02-22 18:28:33 +00002325
showard6bb7c292009-01-30 01:44:51 +00002326 def _not_yet_run_entries(self, include_verifying=True):
2327 statuses = [models.HostQueueEntry.Status.QUEUED,
2328 models.HostQueueEntry.Status.PENDING]
2329 if include_verifying:
2330 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2331 return models.HostQueueEntry.objects.filter(job=self.id,
2332 status__in=statuses)
2333
2334
2335 def _stop_all_entries(self):
2336 entries_to_stop = self._not_yet_run_entries(
2337 include_verifying=False)
2338 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002339 assert not child_entry.complete, (
2340 '%s status=%s, active=%s, complete=%s' %
2341 (child_entry.id, child_entry.status, child_entry.active,
2342 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002343 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2344 child_entry.host.status = models.Host.Status.READY
2345 child_entry.host.save()
2346 child_entry.status = models.HostQueueEntry.Status.STOPPED
2347 child_entry.save()
2348
showard2bab8f42008-11-12 18:15:22 +00002349 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002350 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002351 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002352 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002353
2354
jadmanski0afbb632008-06-06 21:10:57 +00002355 def write_to_machines_file(self, queue_entry):
2356 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002357 file_path = os.path.join(self.tag(), '.machines')
2358 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002359
2360
showard2bab8f42008-11-12 18:15:22 +00002361 def _next_group_name(self):
2362 query = models.HostQueueEntry.objects.filter(
2363 job=self.id).values('execution_subdir').distinct()
2364 subdirs = (entry['execution_subdir'] for entry in query)
2365 groups = (re.match(r'group(\d+)', subdir) for subdir in subdirs)
2366 ids = [int(match.group(1)) for match in groups if match]
2367 if ids:
2368 next_id = max(ids) + 1
2369 else:
2370 next_id = 0
2371 return "group%d" % next_id
2372
2373
showard170873e2009-01-07 00:22:26 +00002374 def _write_control_file(self, execution_tag):
2375 control_path = _drone_manager.attach_file_to_execution(
2376 execution_tag, self.control_file)
2377 return control_path
mbligh36768f02008-02-22 18:28:33 +00002378
showardb2e2c322008-10-14 17:33:55 +00002379
showard2bab8f42008-11-12 18:15:22 +00002380 def get_group_entries(self, queue_entry_from_group):
2381 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002382 return list(HostQueueEntry.fetch(
2383 where='job_id=%s AND execution_subdir=%s',
2384 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002385
2386
showardb2e2c322008-10-14 17:33:55 +00002387 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002388 assert queue_entries
2389 execution_tag = queue_entries[0].execution_tag()
2390 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002391 hostnames = ','.join([entry.get_host().hostname
2392 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002393
showard170873e2009-01-07 00:22:26 +00002394 params = [_autoserv_path, '-P', execution_tag, '-p', '-n',
2395 '-r', _drone_manager.absolute_path(execution_tag),
2396 '-u', self.owner, '-l', self.name, '-m', hostnames,
2397 _drone_manager.absolute_path(control_path)]
mbligh36768f02008-02-22 18:28:33 +00002398
jadmanski0afbb632008-06-06 21:10:57 +00002399 if not self.is_server_job():
2400 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002401
showardb2e2c322008-10-14 17:33:55 +00002402 return params
mblighe2586682008-02-29 22:45:46 +00002403
mbligh36768f02008-02-22 18:28:33 +00002404
showardc9ae1782009-01-30 01:42:37 +00002405 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002406 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002407 return True
showard0fc38302008-10-23 00:44:07 +00002408 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002409 return queue_entry.get_host().dirty
2410 return False
showard21baa452008-10-21 00:08:39 +00002411
showardc9ae1782009-01-30 01:42:37 +00002412
2413 def _should_run_verify(self, queue_entry):
2414 do_not_verify = (queue_entry.host.protection ==
2415 host_protections.Protection.DO_NOT_VERIFY)
2416 if do_not_verify:
2417 return False
2418 return self.run_verify
2419
2420
2421 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002422 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002423 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002424 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002425 if self._should_run_verify(queue_entry):
2426 tasks.append(VerifyTask(queue_entry=queue_entry))
2427 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002428 return tasks
2429
2430
showard2bab8f42008-11-12 18:15:22 +00002431 def _assign_new_group(self, queue_entries):
2432 if len(queue_entries) == 1:
2433 group_name = queue_entries[0].get_host().hostname
2434 else:
2435 group_name = self._next_group_name()
showardb18134f2009-03-20 20:52:18 +00002436 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002437 self.id, [entry.host.hostname for entry in queue_entries],
2438 group_name)
2439
2440 for queue_entry in queue_entries:
2441 queue_entry.set_execution_subdir(group_name)
2442
2443
2444 def _choose_group_to_run(self, include_queue_entry):
2445 chosen_entries = [include_queue_entry]
2446
2447 num_entries_needed = self.synch_count - 1
2448 if num_entries_needed > 0:
2449 pending_entries = HostQueueEntry.fetch(
2450 where='job_id = %s AND status = "Pending" AND id != %s',
2451 params=(self.id, include_queue_entry.id))
2452 chosen_entries += list(pending_entries)[:num_entries_needed]
2453
2454 self._assign_new_group(chosen_entries)
2455 return chosen_entries
2456
2457
2458 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002459 if not self.is_ready():
showardc9ae1782009-01-30 01:42:37 +00002460 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2461 return Agent(self._get_pre_job_tasks(queue_entry))
mbligh36768f02008-02-22 18:28:33 +00002462
showard2bab8f42008-11-12 18:15:22 +00002463 queue_entries = self._choose_group_to_run(queue_entry)
2464 return self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00002465
2466
2467 def _finish_run(self, queue_entries, initial_tasks=[]):
showardb2ccdda2008-10-28 20:39:05 +00002468 for queue_entry in queue_entries:
2469 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002470 params = self._get_autoserv_params(queue_entries)
2471 queue_task = QueueTask(job=self, queue_entries=queue_entries,
2472 cmd=params)
2473 tasks = initial_tasks + [queue_task]
2474 entry_ids = [entry.id for entry in queue_entries]
2475
showard170873e2009-01-07 00:22:26 +00002476 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00002477
2478
mbligh36768f02008-02-22 18:28:33 +00002479if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002480 main()