blob: aa52a9fcf228180f80715517141d45137a31f6a2 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showard136e6dc2009-06-10 19:38:49 +000010import itertools, logging, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard136e6dc2009-06-10 19:38:49 +000014from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000015from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000016from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000017from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000018from autotest_lib.scheduler import drone_manager, drones, email_manager
showard136e6dc2009-06-10 19:38:49 +000019from autotest_lib.scheduler import monitor_db_cleanup, scheduler_logging_config
showardd1ee1dd2009-01-07 21:33:08 +000020from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000021
mblighb090f142008-02-27 21:33:46 +000022
mbligh36768f02008-02-22 18:28:33 +000023RESULTS_DIR = '.'
24AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000025DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000026AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
27
28if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000029 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000030AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
31AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
32
33if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000034 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000035
mbligh90a549d2008-03-25 23:52:34 +000036# how long to wait for autoserv to write a pidfile
37PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000038
showardd3dc1992009-04-22 21:01:40 +000039_AUTOSERV_PID_FILE = '.autoserv_execute'
40_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
41_PARSER_PID_FILE = '.parser_execute'
42
43_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
44 _PARSER_PID_FILE)
45
showard35162b02009-03-03 02:17:30 +000046# error message to leave in results dir when an autoserv process disappears
47# mysteriously
48_LOST_PROCESS_ERROR = """\
49Autoserv failed abnormally during execution for this job, probably due to a
50system error on the Autotest server. Full results may not be available. Sorry.
51"""
52
mbligh6f8bab42008-02-29 22:45:14 +000053_db = None
mbligh36768f02008-02-22 18:28:33 +000054_shutdown = False
showard170873e2009-01-07 00:22:26 +000055_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
56_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000057_testing_mode = False
showard542e8402008-09-19 20:16:18 +000058_base_url = None
showardc85c21b2008-11-24 22:17:37 +000059_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000060_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000061
62
mbligh83c1e9e2009-05-01 23:10:41 +000063def _site_init_monitor_db_dummy():
64 return {}
65
66
mbligh36768f02008-02-22 18:28:33 +000067def main():
showard27f33872009-04-07 18:20:53 +000068 try:
69 main_without_exception_handling()
showard29caa4b2009-05-26 19:27:09 +000070 except SystemExit:
71 raise
showard27f33872009-04-07 18:20:53 +000072 except:
73 logging.exception('Exception escaping in monitor_db')
74 raise
75
76
77def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000078 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000079
showard136e6dc2009-06-10 19:38:49 +000080 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000081 parser = optparse.OptionParser(usage)
82 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
83 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000084 parser.add_option('--test', help='Indicate that scheduler is under ' +
85 'test and should use dummy autoserv and no parsing',
86 action='store_true')
87 (options, args) = parser.parse_args()
88 if len(args) != 1:
89 parser.print_usage()
90 return
mbligh36768f02008-02-22 18:28:33 +000091
showard5613c662009-06-08 23:30:33 +000092 scheduler_enabled = global_config.global_config.get_config_value(
93 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
94
95 if not scheduler_enabled:
96 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
97 "global_config's SCHEDULER section to enabled it. Exiting.")
98 print msg
99 sys.exit(1)
100
jadmanski0afbb632008-06-06 21:10:57 +0000101 global RESULTS_DIR
102 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000103
mbligh83c1e9e2009-05-01 23:10:41 +0000104 site_init = utils.import_site_function(__file__,
105 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
106 _site_init_monitor_db_dummy)
107 site_init()
108
showardcca334f2009-03-12 20:38:34 +0000109 # Change the cwd while running to avoid issues incase we were launched from
110 # somewhere odd (such as a random NFS home directory of the person running
111 # sudo to launch us as the appropriate user).
112 os.chdir(RESULTS_DIR)
113
jadmanski0afbb632008-06-06 21:10:57 +0000114 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000115 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
116 "notify_email_statuses",
117 default='')
showardc85c21b2008-11-24 22:17:37 +0000118 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000119 _notify_email_statuses = [status for status in
120 re.split(r'[\s,;:]', notify_statuses_list.lower())
121 if status]
showardc85c21b2008-11-24 22:17:37 +0000122
jadmanski0afbb632008-06-06 21:10:57 +0000123 if options.test:
124 global _autoserv_path
125 _autoserv_path = 'autoserv_dummy'
126 global _testing_mode
127 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000128
mbligh37eceaa2008-12-15 22:56:37 +0000129 # AUTOTEST_WEB.base_url is still a supported config option as some people
130 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000131 global _base_url
showard170873e2009-01-07 00:22:26 +0000132 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
133 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000134 if config_base_url:
135 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000136 else:
mbligh37eceaa2008-12-15 22:56:37 +0000137 # For the common case of everything running on a single server you
138 # can just set the hostname in a single place in the config file.
139 server_name = c.get_config_value('SERVER', 'hostname')
140 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000141 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000142 sys.exit(1)
143 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000144
showardc5afc462009-01-13 00:09:39 +0000145 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000146 server.start()
147
jadmanski0afbb632008-06-06 21:10:57 +0000148 try:
showard136e6dc2009-06-10 19:38:49 +0000149 init()
showardc5afc462009-01-13 00:09:39 +0000150 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000151 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000152
jadmanski0afbb632008-06-06 21:10:57 +0000153 while not _shutdown:
154 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000155 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000156 except:
showard170873e2009-01-07 00:22:26 +0000157 email_manager.manager.log_stacktrace(
158 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000159
showard170873e2009-01-07 00:22:26 +0000160 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000161 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000162 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000163 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000164
165
showard136e6dc2009-06-10 19:38:49 +0000166def setup_logging():
167 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
168 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
169 logging_manager.configure_logging(
170 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
171 logfile_name=log_name)
172
173
mbligh36768f02008-02-22 18:28:33 +0000174def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000175 global _shutdown
176 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000177 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000178
179
showard136e6dc2009-06-10 19:38:49 +0000180def init():
showardb18134f2009-03-20 20:52:18 +0000181 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
182 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000183
mblighfb676032009-04-01 18:25:38 +0000184 utils.write_pid("monitor_db")
185
showardb1e51872008-10-07 11:08:18 +0000186 if _testing_mode:
187 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000188 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000189
jadmanski0afbb632008-06-06 21:10:57 +0000190 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
191 global _db
showard170873e2009-01-07 00:22:26 +0000192 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000193 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000194
showardfa8629c2008-11-04 16:51:23 +0000195 # ensure Django connection is in autocommit
196 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000197 # bypass the readonly connection
198 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000199
showardb18134f2009-03-20 20:52:18 +0000200 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000201 signal.signal(signal.SIGINT, handle_sigint)
202
showardd1ee1dd2009-01-07 21:33:08 +0000203 drones = global_config.global_config.get_config_value(
204 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
205 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000206 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000207 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000208 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
209
showardb18134f2009-03-20 20:52:18 +0000210 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000211
212
showard87ba02a2009-04-20 19:37:32 +0000213def _autoserv_command_line(machines, results_dir, extra_args, job=None,
214 queue_entry=None):
showardf1ae3542009-05-11 19:26:02 +0000215 """
216 @returns The autoserv command line as a list of executable + parameters.
217
218 @param machines - string - A machine or comma separated list of machines
219 for the (-m) flag.
220 @param results_dir - string - Where the results will be written (-r).
221 @param extra_args - list - Additional arguments to pass to autoserv.
222 @param job - Job object - If supplied, -u owner and -l name parameters
223 will be added.
224 @param queue_entry - A HostQueueEntry object - If supplied and no Job
225 object was supplied, this will be used to lookup the Job object.
226 """
showard87ba02a2009-04-20 19:37:32 +0000227 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
228 '-r', _drone_manager.absolute_path(results_dir)]
229 if job or queue_entry:
230 if not job:
231 job = queue_entry.job
232 autoserv_argv += ['-u', job.owner, '-l', job.name]
233 return autoserv_argv + extra_args
234
235
showard89f84db2009-03-12 20:39:13 +0000236class SchedulerError(Exception):
237 """Raised by HostScheduler when an inconsistent state occurs."""
238
239
showard63a34772008-08-18 19:32:50 +0000240class HostScheduler(object):
241 def _get_ready_hosts(self):
242 # avoid any host with a currently active queue entry against it
243 hosts = Host.fetch(
244 joins='LEFT JOIN host_queue_entries AS active_hqe '
245 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000246 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000247 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000248 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000249 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
250 return dict((host.id, host) for host in hosts)
251
252
253 @staticmethod
254 def _get_sql_id_list(id_list):
255 return ','.join(str(item_id) for item_id in id_list)
256
257
258 @classmethod
showard989f25d2008-10-01 11:38:11 +0000259 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000260 if not id_list:
261 return {}
showard63a34772008-08-18 19:32:50 +0000262 query %= cls._get_sql_id_list(id_list)
263 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000264 return cls._process_many2many_dict(rows, flip)
265
266
267 @staticmethod
268 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000269 result = {}
270 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000271 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000272 if flip:
273 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000274 result.setdefault(left_id, set()).add(right_id)
275 return result
276
277
278 @classmethod
279 def _get_job_acl_groups(cls, job_ids):
280 query = """
showardd9ac4452009-02-07 02:04:37 +0000281 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000282 FROM jobs
283 INNER JOIN users ON users.login = jobs.owner
284 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
285 WHERE jobs.id IN (%s)
286 """
287 return cls._get_many2many_dict(query, job_ids)
288
289
290 @classmethod
291 def _get_job_ineligible_hosts(cls, job_ids):
292 query = """
293 SELECT job_id, host_id
294 FROM ineligible_host_queues
295 WHERE job_id IN (%s)
296 """
297 return cls._get_many2many_dict(query, job_ids)
298
299
300 @classmethod
showard989f25d2008-10-01 11:38:11 +0000301 def _get_job_dependencies(cls, job_ids):
302 query = """
303 SELECT job_id, label_id
304 FROM jobs_dependency_labels
305 WHERE job_id IN (%s)
306 """
307 return cls._get_many2many_dict(query, job_ids)
308
309
310 @classmethod
showard63a34772008-08-18 19:32:50 +0000311 def _get_host_acls(cls, host_ids):
312 query = """
showardd9ac4452009-02-07 02:04:37 +0000313 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000314 FROM acl_groups_hosts
315 WHERE host_id IN (%s)
316 """
317 return cls._get_many2many_dict(query, host_ids)
318
319
320 @classmethod
321 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000322 if not host_ids:
323 return {}, {}
showard63a34772008-08-18 19:32:50 +0000324 query = """
325 SELECT label_id, host_id
326 FROM hosts_labels
327 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000328 """ % cls._get_sql_id_list(host_ids)
329 rows = _db.execute(query)
330 labels_to_hosts = cls._process_many2many_dict(rows)
331 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
332 return labels_to_hosts, hosts_to_labels
333
334
335 @classmethod
336 def _get_labels(cls):
337 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000338
339
340 def refresh(self, pending_queue_entries):
341 self._hosts_available = self._get_ready_hosts()
342
343 relevant_jobs = [queue_entry.job_id
344 for queue_entry in pending_queue_entries]
345 self._job_acls = self._get_job_acl_groups(relevant_jobs)
346 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000347 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000348
349 host_ids = self._hosts_available.keys()
350 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000351 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
352
353 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000354
355
356 def _is_acl_accessible(self, host_id, queue_entry):
357 job_acls = self._job_acls.get(queue_entry.job_id, set())
358 host_acls = self._host_acls.get(host_id, set())
359 return len(host_acls.intersection(job_acls)) > 0
360
361
showard989f25d2008-10-01 11:38:11 +0000362 def _check_job_dependencies(self, job_dependencies, host_labels):
363 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000364 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000365
366
367 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
368 queue_entry):
showardade14e22009-01-26 22:38:32 +0000369 if not queue_entry.meta_host:
370 # bypass only_if_needed labels when a specific host is selected
371 return True
372
showard989f25d2008-10-01 11:38:11 +0000373 for label_id in host_labels:
374 label = self._labels[label_id]
375 if not label.only_if_needed:
376 # we don't care about non-only_if_needed labels
377 continue
378 if queue_entry.meta_host == label_id:
379 # if the label was requested in a metahost it's OK
380 continue
381 if label_id not in job_dependencies:
382 return False
383 return True
384
385
showard89f84db2009-03-12 20:39:13 +0000386 def _check_atomic_group_labels(self, host_labels, queue_entry):
387 """
388 Determine if the given HostQueueEntry's atomic group settings are okay
389 to schedule on a host with the given labels.
390
391 @param host_labels - A list of label ids that the host has.
392 @param queue_entry - The HostQueueEntry being considered for the host.
393
394 @returns True if atomic group settings are okay, False otherwise.
395 """
396 return (self._get_host_atomic_group_id(host_labels) ==
397 queue_entry.atomic_group_id)
398
399
400 def _get_host_atomic_group_id(self, host_labels):
401 """
402 Return the atomic group label id for a host with the given set of
403 labels if any, or None otherwise. Raises an exception if more than
404 one atomic group are found in the set of labels.
405
406 @param host_labels - A list of label ids that the host has.
407
408 @returns The id of the atomic group found on a label in host_labels
409 or None if no atomic group label is found.
410 @raises SchedulerError - If more than one atomic group label is found.
411 """
412 atomic_ids = [self._labels[label_id].atomic_group_id
413 for label_id in host_labels
414 if self._labels[label_id].atomic_group_id is not None]
415 if not atomic_ids:
416 return None
417 if len(atomic_ids) > 1:
418 raise SchedulerError('More than one atomic label on host.')
419 return atomic_ids[0]
420
421
422 def _get_atomic_group_labels(self, atomic_group_id):
423 """
424 Lookup the label ids that an atomic_group is associated with.
425
426 @param atomic_group_id - The id of the AtomicGroup to look up.
427
428 @returns A generator yeilding Label ids for this atomic group.
429 """
430 return (id for id, label in self._labels.iteritems()
431 if label.atomic_group_id == atomic_group_id
432 and not label.invalid)
433
434
showard54c1ea92009-05-20 00:32:58 +0000435 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000436 """
437 @param group_hosts - A sequence of Host ids to test for usability
438 and eligibility against the Job associated with queue_entry.
439 @param queue_entry - The HostQueueEntry that these hosts are being
440 tested for eligibility against.
441
442 @returns A subset of group_hosts Host ids that are eligible for the
443 supplied queue_entry.
444 """
445 return set(host_id for host_id in group_hosts
446 if self._is_host_usable(host_id)
447 and self._is_host_eligible_for_job(host_id, queue_entry))
448
449
showard989f25d2008-10-01 11:38:11 +0000450 def _is_host_eligible_for_job(self, host_id, queue_entry):
451 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
452 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000453
showard89f84db2009-03-12 20:39:13 +0000454 return (self._is_acl_accessible(host_id, queue_entry) and
455 self._check_job_dependencies(job_dependencies, host_labels) and
456 self._check_only_if_needed_labels(
457 job_dependencies, host_labels, queue_entry) and
458 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000459
460
showard63a34772008-08-18 19:32:50 +0000461 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000462 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000463 return None
464 return self._hosts_available.pop(queue_entry.host_id, None)
465
466
467 def _is_host_usable(self, host_id):
468 if host_id not in self._hosts_available:
469 # host was already used during this scheduling cycle
470 return False
471 if self._hosts_available[host_id].invalid:
472 # Invalid hosts cannot be used for metahosts. They're included in
473 # the original query because they can be used by non-metahosts.
474 return False
475 return True
476
477
478 def _schedule_metahost(self, queue_entry):
479 label_id = queue_entry.meta_host
480 hosts_in_label = self._label_hosts.get(label_id, set())
481 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
482 set())
483
484 # must iterate over a copy so we can mutate the original while iterating
485 for host_id in list(hosts_in_label):
486 if not self._is_host_usable(host_id):
487 hosts_in_label.remove(host_id)
488 continue
489 if host_id in ineligible_host_ids:
490 continue
showard989f25d2008-10-01 11:38:11 +0000491 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000492 continue
493
showard89f84db2009-03-12 20:39:13 +0000494 # Remove the host from our cached internal state before returning
495 # the host object.
showard63a34772008-08-18 19:32:50 +0000496 hosts_in_label.remove(host_id)
497 return self._hosts_available.pop(host_id)
498 return None
499
500
501 def find_eligible_host(self, queue_entry):
502 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000503 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000504 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000505 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000506 return self._schedule_metahost(queue_entry)
507
508
showard89f84db2009-03-12 20:39:13 +0000509 def find_eligible_atomic_group(self, queue_entry):
510 """
511 Given an atomic group host queue entry, locate an appropriate group
512 of hosts for the associated job to run on.
513
514 The caller is responsible for creating new HQEs for the additional
515 hosts returned in order to run the actual job on them.
516
517 @returns A list of Host instances in a ready state to satisfy this
518 atomic group scheduling. Hosts will all belong to the same
519 atomic group label as specified by the queue_entry.
520 An empty list will be returned if no suitable atomic
521 group could be found.
522
523 TODO(gps): what is responsible for kicking off any attempted repairs on
524 a group of hosts? not this function, but something needs to. We do
525 not communicate that reason for returning [] outside of here...
526 For now, we'll just be unschedulable if enough hosts within one group
527 enter Repair Failed state.
528 """
529 assert queue_entry.atomic_group_id is not None
530 job = queue_entry.job
531 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000532 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000533 if job.synch_count > atomic_group.max_number_of_machines:
534 # Such a Job and HostQueueEntry should never be possible to
535 # create using the frontend. Regardless, we can't process it.
536 # Abort it immediately and log an error on the scheduler.
537 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000538 logging.error(
539 'Error: job %d synch_count=%d > requested atomic_group %d '
540 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
541 job.id, job.synch_count, atomic_group.id,
542 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000543 return []
544 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
545 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
546 set())
547
548 # Look in each label associated with atomic_group until we find one with
549 # enough hosts to satisfy the job.
550 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
551 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
552 if queue_entry.meta_host is not None:
553 # If we have a metahost label, only allow its hosts.
554 group_hosts.intersection_update(hosts_in_label)
555 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000556 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000557 group_hosts, queue_entry)
558
559 # Job.synch_count is treated as "minimum synch count" when
560 # scheduling for an atomic group of hosts. The atomic group
561 # number of machines is the maximum to pick out of a single
562 # atomic group label for scheduling at one time.
563 min_hosts = job.synch_count
564 max_hosts = atomic_group.max_number_of_machines
565
showard54c1ea92009-05-20 00:32:58 +0000566 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000567 # Not enough eligible hosts in this atomic group label.
568 continue
569
showard54c1ea92009-05-20 00:32:58 +0000570 eligible_hosts_in_group = [self._hosts_available[id]
571 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000572 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000573 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000574
showard89f84db2009-03-12 20:39:13 +0000575 # Limit ourselves to scheduling the atomic group size.
576 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000577 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000578
579 # Remove the selected hosts from our cached internal state
580 # of available hosts in order to return the Host objects.
581 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000582 for host in eligible_hosts_in_group:
583 hosts_in_label.discard(host.id)
584 self._hosts_available.pop(host.id)
585 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000586 return host_list
587
588 return []
589
590
showard170873e2009-01-07 00:22:26 +0000591class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000592 def __init__(self):
593 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000594 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000595 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000596 user_cleanup_time = scheduler_config.config.clean_interval
597 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
598 _db, user_cleanup_time)
599 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000600 self._host_agents = {}
601 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000602
mbligh36768f02008-02-22 18:28:33 +0000603
showard915958d2009-04-22 21:00:58 +0000604 def initialize(self, recover_hosts=True):
605 self._periodic_cleanup.initialize()
606 self._24hr_upkeep.initialize()
607
jadmanski0afbb632008-06-06 21:10:57 +0000608 # always recover processes
609 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000610
jadmanski0afbb632008-06-06 21:10:57 +0000611 if recover_hosts:
612 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000613
614
jadmanski0afbb632008-06-06 21:10:57 +0000615 def tick(self):
showard170873e2009-01-07 00:22:26 +0000616 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000617 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000618 self._find_aborting()
showard1ff7b2e2009-05-15 23:17:18 +0000619 self._find_reverify()
showard29f7cd22009-04-29 21:16:24 +0000620 self._process_recurring_runs()
jadmanski0afbb632008-06-06 21:10:57 +0000621 self._schedule_new_jobs()
622 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000623 _drone_manager.execute_actions()
624 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000625
showard97aed502008-11-04 02:01:24 +0000626
mblighf3294cc2009-04-08 21:17:38 +0000627 def _run_cleanup(self):
628 self._periodic_cleanup.run_cleanup_maybe()
629 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000630
mbligh36768f02008-02-22 18:28:33 +0000631
showard170873e2009-01-07 00:22:26 +0000632 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
633 for object_id in object_ids:
634 agent_dict.setdefault(object_id, set()).add(agent)
635
636
637 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
638 for object_id in object_ids:
639 assert object_id in agent_dict
640 agent_dict[object_id].remove(agent)
641
642
jadmanski0afbb632008-06-06 21:10:57 +0000643 def add_agent(self, agent):
644 self._agents.append(agent)
645 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000646 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
647 self._register_agent_for_ids(self._queue_entry_agents,
648 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000649
showard170873e2009-01-07 00:22:26 +0000650
651 def get_agents_for_entry(self, queue_entry):
652 """
653 Find agents corresponding to the specified queue_entry.
654 """
showardd3dc1992009-04-22 21:01:40 +0000655 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000656
657
658 def host_has_agent(self, host):
659 """
660 Determine if there is currently an Agent present using this host.
661 """
662 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000663
664
jadmanski0afbb632008-06-06 21:10:57 +0000665 def remove_agent(self, agent):
666 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000667 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
668 agent)
669 self._unregister_agent_for_ids(self._queue_entry_agents,
670 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000671
672
jadmanski0afbb632008-06-06 21:10:57 +0000673 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000674 self._register_pidfiles()
675 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000676 self._recover_all_recoverable_entries()
showard170873e2009-01-07 00:22:26 +0000677 self._requeue_other_active_entries()
showard170873e2009-01-07 00:22:26 +0000678 self._reverify_remaining_hosts()
679 # reinitialize drones after killing orphaned processes, since they can
680 # leave around files when they die
681 _drone_manager.execute_actions()
682 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000683
showard170873e2009-01-07 00:22:26 +0000684
685 def _register_pidfiles(self):
686 # during recovery we may need to read pidfiles for both running and
687 # parsing entries
688 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000689 where="status IN ('Running', 'Gathering', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000690 for queue_entry in queue_entries:
showardd3dc1992009-04-22 21:01:40 +0000691 for pidfile_name in _ALL_PIDFILE_NAMES:
692 pidfile_id = _drone_manager.get_pidfile_id_from(
693 queue_entry.execution_tag(), pidfile_name=pidfile_name)
694 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000695
696
showardd3dc1992009-04-22 21:01:40 +0000697 def _recover_entries_with_status(self, status, orphans, pidfile_name,
698 recover_entries_fn):
699 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000700 for queue_entry in queue_entries:
701 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000702 # synchronous job we've already recovered
703 continue
showardd3dc1992009-04-22 21:01:40 +0000704 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showard170873e2009-01-07 00:22:26 +0000705 execution_tag = queue_entry.execution_tag()
706 run_monitor = PidfileRunMonitor()
showardd3dc1992009-04-22 21:01:40 +0000707 run_monitor.attach_to_existing_process(execution_tag,
708 pidfile_name=pidfile_name)
showard597bfd32009-05-08 18:22:50 +0000709
710 log_message = ('Recovering %s entry %s ' %
711 (status.lower(),
712 ', '.join(str(entry) for entry in queue_entries)))
showard170873e2009-01-07 00:22:26 +0000713 if not run_monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +0000714 # execution apparently never happened
showard597bfd32009-05-08 18:22:50 +0000715 logging.info(log_message + 'without process')
showardd3dc1992009-04-22 21:01:40 +0000716 recover_entries_fn(queue_entry.job, queue_entries, None)
showard170873e2009-01-07 00:22:26 +0000717 continue
mbligh90a549d2008-03-25 23:52:34 +0000718
showard597bfd32009-05-08 18:22:50 +0000719 logging.info(log_message + '(process %s)',
showardd3dc1992009-04-22 21:01:40 +0000720 run_monitor.get_process())
721 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
722 orphans.discard(run_monitor.get_process())
723
724
725 def _kill_remaining_orphan_processes(self, orphans):
726 for process in orphans:
showardb18134f2009-03-20 20:52:18 +0000727 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000728 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000729
showard170873e2009-01-07 00:22:26 +0000730
showardd3dc1992009-04-22 21:01:40 +0000731 def _recover_running_entries(self, orphans):
732 def recover_entries(job, queue_entries, run_monitor):
733 if run_monitor is not None:
734 queue_task = RecoveryQueueTask(job=job,
735 queue_entries=queue_entries,
736 run_monitor=run_monitor)
737 self.add_agent(Agent(tasks=[queue_task],
738 num_processes=len(queue_entries)))
739 # else, _requeue_other_active_entries will cover this
740
741 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
742 orphans, '.autoserv_execute',
743 recover_entries)
744
745
746 def _recover_gathering_entries(self, orphans):
747 def recover_entries(job, queue_entries, run_monitor):
748 gather_task = GatherLogsTask(job, queue_entries,
749 run_monitor=run_monitor)
750 self.add_agent(Agent([gather_task]))
751
752 self._recover_entries_with_status(
753 models.HostQueueEntry.Status.GATHERING,
754 orphans, _CRASHINFO_PID_FILE, recover_entries)
755
756
757 def _recover_parsing_entries(self, orphans):
758 def recover_entries(job, queue_entries, run_monitor):
759 reparse_task = FinalReparseTask(queue_entries,
760 run_monitor=run_monitor)
761 self.add_agent(Agent([reparse_task], num_processes=0))
762
763 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
764 orphans, _PARSER_PID_FILE,
765 recover_entries)
766
767
768 def _recover_all_recoverable_entries(self):
769 orphans = _drone_manager.get_orphaned_autoserv_processes()
770 self._recover_running_entries(orphans)
771 self._recover_gathering_entries(orphans)
772 self._recover_parsing_entries(orphans)
773 self._kill_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000774
showard97aed502008-11-04 02:01:24 +0000775
showard170873e2009-01-07 00:22:26 +0000776 def _requeue_other_active_entries(self):
777 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000778 where='active AND NOT complete AND '
779 '(aborted OR status != "Pending")')
showard170873e2009-01-07 00:22:26 +0000780 for queue_entry in queue_entries:
781 if self.get_agents_for_entry(queue_entry):
782 # entry has already been recovered
783 continue
showardd3dc1992009-04-22 21:01:40 +0000784 if queue_entry.aborted:
785 queue_entry.abort(self)
786 continue
787
788 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
showardb18134f2009-03-20 20:52:18 +0000789 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000790 if queue_entry.host:
791 tasks = queue_entry.host.reverify_tasks()
792 self.add_agent(Agent(tasks))
793 agent = queue_entry.requeue()
794
795
showard1ff7b2e2009-05-15 23:17:18 +0000796 def _find_reverify(self):
showard6d7b2ff2009-06-10 00:16:47 +0000797 tasks = models.SpecialTask.objects.filter(
798 task=models.SpecialTask.Task.REVERIFY, is_active=False,
799 is_complete=False)
800
801 host_ids = [str(task.host.id) for task in tasks]
802
803 if host_ids:
804 where = 'id IN (%s)' % ','.join(host_ids)
805 host_ids_reverifying = self._reverify_hosts_where(
806 where, cleanup=False)
807 tasks = tasks.filter(host__id__in=host_ids_reverifying)
808 for task in tasks:
809 task.is_active=True
810 task.save()
showard1ff7b2e2009-05-15 23:17:18 +0000811
812
showard170873e2009-01-07 00:22:26 +0000813 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000814 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000815 self._reverify_hosts_where("""(status = 'Repairing' OR
816 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000817 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000818
showard170873e2009-01-07 00:22:26 +0000819 # recover "Running" hosts with no active queue entries, although this
820 # should never happen
821 message = ('Recovering running host %s - this probably indicates a '
822 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000823 self._reverify_hosts_where("""status = 'Running' AND
824 id NOT IN (SELECT host_id
825 FROM host_queue_entries
826 WHERE active)""",
827 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000828
829
jadmanski0afbb632008-06-06 21:10:57 +0000830 def _reverify_hosts_where(self, where,
showarda64e52a2009-06-08 23:24:08 +0000831 print_message='Reverifying host %s',
832 cleanup=True):
showard170873e2009-01-07 00:22:26 +0000833 full_where='locked = 0 AND invalid = 0 AND ' + where
showard6d7b2ff2009-06-10 00:16:47 +0000834 host_ids_reverifying = []
showard170873e2009-01-07 00:22:26 +0000835 for host in Host.fetch(where=full_where):
836 if self.host_has_agent(host):
837 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000838 continue
showard170873e2009-01-07 00:22:26 +0000839 if print_message:
showardb18134f2009-03-20 20:52:18 +0000840 logging.info(print_message, host.hostname)
showarda64e52a2009-06-08 23:24:08 +0000841 tasks = host.reverify_tasks(cleanup)
showard170873e2009-01-07 00:22:26 +0000842 self.add_agent(Agent(tasks))
showard6d7b2ff2009-06-10 00:16:47 +0000843 host_ids_reverifying.append(host.id)
844 return host_ids_reverifying
mbligh36768f02008-02-22 18:28:33 +0000845
846
jadmanski0afbb632008-06-06 21:10:57 +0000847 def _recover_hosts(self):
848 # recover "Repair Failed" hosts
849 message = 'Reverifying dead host %s'
850 self._reverify_hosts_where("status = 'Repair Failed'",
851 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000852
853
showard04c82c52008-05-29 19:38:12 +0000854
showardb95b1bd2008-08-15 18:11:04 +0000855 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000856 # prioritize by job priority, then non-metahost over metahost, then FIFO
857 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000858 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000859 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000860 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000861
862
showard89f84db2009-03-12 20:39:13 +0000863 def _refresh_pending_queue_entries(self):
864 """
865 Lookup the pending HostQueueEntries and call our HostScheduler
866 refresh() method given that list. Return the list.
867
868 @returns A list of pending HostQueueEntries sorted in priority order.
869 """
showard63a34772008-08-18 19:32:50 +0000870 queue_entries = self._get_pending_queue_entries()
871 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000872 return []
showardb95b1bd2008-08-15 18:11:04 +0000873
showard63a34772008-08-18 19:32:50 +0000874 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000875
showard89f84db2009-03-12 20:39:13 +0000876 return queue_entries
877
878
879 def _schedule_atomic_group(self, queue_entry):
880 """
881 Schedule the given queue_entry on an atomic group of hosts.
882
883 Returns immediately if there are insufficient available hosts.
884
885 Creates new HostQueueEntries based off of queue_entry for the
886 scheduled hosts and starts them all running.
887 """
888 # This is a virtual host queue entry representing an entire
889 # atomic group, find a group and schedule their hosts.
890 group_hosts = self._host_scheduler.find_eligible_atomic_group(
891 queue_entry)
892 if not group_hosts:
893 return
894 # The first assigned host uses the original HostQueueEntry
895 group_queue_entries = [queue_entry]
896 for assigned_host in group_hosts[1:]:
897 # Create a new HQE for every additional assigned_host.
898 new_hqe = HostQueueEntry.clone(queue_entry)
899 new_hqe.save()
900 group_queue_entries.append(new_hqe)
901 assert len(group_queue_entries) == len(group_hosts)
902 for queue_entry, host in itertools.izip(group_queue_entries,
903 group_hosts):
904 self._run_queue_entry(queue_entry, host)
905
906
907 def _schedule_new_jobs(self):
908 queue_entries = self._refresh_pending_queue_entries()
909 if not queue_entries:
910 return
911
showard63a34772008-08-18 19:32:50 +0000912 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000913 if (queue_entry.atomic_group_id is None or
914 queue_entry.host_id is not None):
915 assigned_host = self._host_scheduler.find_eligible_host(
916 queue_entry)
917 if assigned_host:
918 self._run_queue_entry(queue_entry, assigned_host)
919 else:
920 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000921
922
923 def _run_queue_entry(self, queue_entry, host):
showard77182562009-06-10 00:16:05 +0000924 agent = queue_entry.run_pre_job_tasks(assigned_host=host)
925 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000926
927
jadmanski0afbb632008-06-06 21:10:57 +0000928 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +0000929 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
930 for agent in self.get_agents_for_entry(entry):
931 agent.abort()
932 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000933
934
showard324bf812009-01-20 23:23:38 +0000935 def _can_start_agent(self, agent, num_started_this_cycle,
936 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000937 # always allow zero-process agents to run
938 if agent.num_processes == 0:
939 return True
940 # don't allow any nonzero-process agents to run after we've reached a
941 # limit (this avoids starvation of many-process agents)
942 if have_reached_limit:
943 return False
944 # total process throttling
showard324bf812009-01-20 23:23:38 +0000945 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000946 return False
947 # if a single agent exceeds the per-cycle throttling, still allow it to
948 # run when it's the first agent in the cycle
949 if num_started_this_cycle == 0:
950 return True
951 # per-cycle throttling
952 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000953 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000954 return False
955 return True
956
957
jadmanski0afbb632008-06-06 21:10:57 +0000958 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000959 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000960 have_reached_limit = False
961 # iterate over copy, so we can remove agents during iteration
962 for agent in list(self._agents):
963 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000964 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000965 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000966 continue
967 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000968 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000969 have_reached_limit):
970 have_reached_limit = True
971 continue
showard4c5374f2008-09-04 17:02:56 +0000972 num_started_this_cycle += agent.num_processes
973 agent.tick()
showarda9435c02009-05-13 21:28:17 +0000974 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +0000975 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000976
977
showard29f7cd22009-04-29 21:16:24 +0000978 def _process_recurring_runs(self):
979 recurring_runs = models.RecurringRun.objects.filter(
980 start_date__lte=datetime.datetime.now())
981 for rrun in recurring_runs:
982 # Create job from template
983 job = rrun.job
984 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +0000985 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +0000986
987 host_objects = info['hosts']
988 one_time_hosts = info['one_time_hosts']
989 metahost_objects = info['meta_hosts']
990 dependencies = info['dependencies']
991 atomic_group = info['atomic_group']
992
993 for host in one_time_hosts or []:
994 this_host = models.Host.create_one_time_host(host.hostname)
995 host_objects.append(this_host)
996
997 try:
998 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +0000999 options=options,
showard29f7cd22009-04-29 21:16:24 +00001000 host_objects=host_objects,
1001 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001002 atomic_group=atomic_group)
1003
1004 except Exception, ex:
1005 logging.exception(ex)
1006 #TODO send email
1007
1008 if rrun.loop_count == 1:
1009 rrun.delete()
1010 else:
1011 if rrun.loop_count != 0: # if not infinite loop
1012 # calculate new start_date
1013 difference = datetime.timedelta(seconds=rrun.loop_period)
1014 rrun.start_date = rrun.start_date + difference
1015 rrun.loop_count -= 1
1016 rrun.save()
1017
1018
showard170873e2009-01-07 00:22:26 +00001019class PidfileRunMonitor(object):
1020 """
1021 Client must call either run() to start a new process or
1022 attach_to_existing_process().
1023 """
mbligh36768f02008-02-22 18:28:33 +00001024
showard170873e2009-01-07 00:22:26 +00001025 class _PidfileException(Exception):
1026 """
1027 Raised when there's some unexpected behavior with the pid file, but only
1028 used internally (never allowed to escape this class).
1029 """
mbligh36768f02008-02-22 18:28:33 +00001030
1031
showard170873e2009-01-07 00:22:26 +00001032 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001033 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001034 self._start_time = None
1035 self.pidfile_id = None
1036 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001037
1038
showard170873e2009-01-07 00:22:26 +00001039 def _add_nice_command(self, command, nice_level):
1040 if not nice_level:
1041 return command
1042 return ['nice', '-n', str(nice_level)] + command
1043
1044
1045 def _set_start_time(self):
1046 self._start_time = time.time()
1047
1048
1049 def run(self, command, working_directory, nice_level=None, log_file=None,
1050 pidfile_name=None, paired_with_pidfile=None):
1051 assert command is not None
1052 if nice_level is not None:
1053 command = ['nice', '-n', str(nice_level)] + command
1054 self._set_start_time()
1055 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001056 command, working_directory, pidfile_name=pidfile_name,
1057 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001058
1059
showardd3dc1992009-04-22 21:01:40 +00001060 def attach_to_existing_process(self, execution_tag,
1061 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001062 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001063 self.pidfile_id = _drone_manager.get_pidfile_id_from(
1064 execution_tag, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001065 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001066
1067
jadmanski0afbb632008-06-06 21:10:57 +00001068 def kill(self):
showard170873e2009-01-07 00:22:26 +00001069 if self.has_process():
1070 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001071
mbligh36768f02008-02-22 18:28:33 +00001072
showard170873e2009-01-07 00:22:26 +00001073 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001074 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001075 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001076
1077
showard170873e2009-01-07 00:22:26 +00001078 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001079 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001080 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001081 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001082
1083
showard170873e2009-01-07 00:22:26 +00001084 def _read_pidfile(self, use_second_read=False):
1085 assert self.pidfile_id is not None, (
1086 'You must call run() or attach_to_existing_process()')
1087 contents = _drone_manager.get_pidfile_contents(
1088 self.pidfile_id, use_second_read=use_second_read)
1089 if contents.is_invalid():
1090 self._state = drone_manager.PidfileContents()
1091 raise self._PidfileException(contents)
1092 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001093
1094
showard21baa452008-10-21 00:08:39 +00001095 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001096 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1097 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001098 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001099 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001100 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001101
1102
1103 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001104 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001105 return
mblighbb421852008-03-11 22:36:16 +00001106
showard21baa452008-10-21 00:08:39 +00001107 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001108
showard170873e2009-01-07 00:22:26 +00001109 if self._state.process is None:
1110 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001111 return
mbligh90a549d2008-03-25 23:52:34 +00001112
showard21baa452008-10-21 00:08:39 +00001113 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001114 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001115 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001116 return
mbligh90a549d2008-03-25 23:52:34 +00001117
showard170873e2009-01-07 00:22:26 +00001118 # pid but no running process - maybe process *just* exited
1119 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001120 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001121 # autoserv exited without writing an exit code
1122 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001123 self._handle_pidfile_error(
1124 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001125
showard21baa452008-10-21 00:08:39 +00001126
1127 def _get_pidfile_info(self):
1128 """\
1129 After completion, self._state will contain:
1130 pid=None, exit_status=None if autoserv has not yet run
1131 pid!=None, exit_status=None if autoserv is running
1132 pid!=None, exit_status!=None if autoserv has completed
1133 """
1134 try:
1135 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001136 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001137 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001138
1139
showard170873e2009-01-07 00:22:26 +00001140 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001141 """\
1142 Called when no pidfile is found or no pid is in the pidfile.
1143 """
showard170873e2009-01-07 00:22:26 +00001144 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001145 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001146 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1147 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001148 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001149 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001150
1151
showard35162b02009-03-03 02:17:30 +00001152 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001153 """\
1154 Called when autoserv has exited without writing an exit status,
1155 or we've timed out waiting for autoserv to write a pid to the
1156 pidfile. In either case, we just return failure and the caller
1157 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001158
showard170873e2009-01-07 00:22:26 +00001159 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001160 """
1161 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001162 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001163 self._state.exit_status = 1
1164 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001165
1166
jadmanski0afbb632008-06-06 21:10:57 +00001167 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001168 self._get_pidfile_info()
1169 return self._state.exit_status
1170
1171
1172 def num_tests_failed(self):
1173 self._get_pidfile_info()
1174 assert self._state.num_tests_failed is not None
1175 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001176
1177
mbligh36768f02008-02-22 18:28:33 +00001178class Agent(object):
showard77182562009-06-10 00:16:05 +00001179 """
1180 An agent for use by the Dispatcher class to perform a sequence of tasks.
1181
1182 The following methods are required on all task objects:
1183 poll() - Called periodically to let the task check its status and
1184 update its internal state. If the task succeeded.
1185 is_done() - Returns True if the task is finished.
1186 abort() - Called when an abort has been requested. The task must
1187 set its aborted attribute to True if it actually aborted.
1188
1189 The following attributes are required on all task objects:
1190 aborted - bool, True if this task was aborted.
1191 failure_tasks - A sequence of tasks to be run using a new Agent
1192 by the dispatcher should this task fail.
1193 success - bool, True if this task succeeded.
1194 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1195 host_ids - A sequence of Host ids this task represents.
1196
1197 The following attribute is written to all task objects:
1198 agent - A reference to the Agent instance that the task has been
1199 added to.
1200 """
1201
1202
showard170873e2009-01-07 00:22:26 +00001203 def __init__(self, tasks, num_processes=1):
showard77182562009-06-10 00:16:05 +00001204 """
1205 @param tasks: A list of tasks as described in the class docstring.
1206 @param num_processes: The number of subprocesses the Agent represents.
1207 This is used by the Dispatcher for managing the load on the
1208 system. Defaults to 1.
1209 """
jadmanski0afbb632008-06-06 21:10:57 +00001210 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001211 self.queue = None
showard77182562009-06-10 00:16:05 +00001212 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001213 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001214 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001215
showard170873e2009-01-07 00:22:26 +00001216 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1217 for task in tasks)
1218 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1219
showardd3dc1992009-04-22 21:01:40 +00001220 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001221 for task in tasks:
1222 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001223
1224
showardd3dc1992009-04-22 21:01:40 +00001225 def _clear_queue(self):
1226 self.queue = Queue.Queue(0)
1227
1228
showard170873e2009-01-07 00:22:26 +00001229 def _union_ids(self, id_lists):
1230 return set(itertools.chain(*id_lists))
1231
1232
jadmanski0afbb632008-06-06 21:10:57 +00001233 def add_task(self, task):
1234 self.queue.put_nowait(task)
1235 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001236
1237
jadmanski0afbb632008-06-06 21:10:57 +00001238 def tick(self):
showard21baa452008-10-21 00:08:39 +00001239 while not self.is_done():
showard08a36412009-05-05 01:01:13 +00001240 if self.active_task:
showard21baa452008-10-21 00:08:39 +00001241 self.active_task.poll()
1242 if not self.active_task.is_done():
1243 return
1244 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001245
1246
jadmanski0afbb632008-06-06 21:10:57 +00001247 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001248 logging.info("agent picking task")
showard08a36412009-05-05 01:01:13 +00001249 if self.active_task is not None:
jadmanski0afbb632008-06-06 21:10:57 +00001250 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001251 if not self.active_task.success:
1252 self.on_task_failure()
showard08a36412009-05-05 01:01:13 +00001253 self.active_task = None
mblighe2586682008-02-29 22:45:46 +00001254
jadmanski0afbb632008-06-06 21:10:57 +00001255 if not self.is_done():
1256 self.active_task = self.queue.get_nowait()
mbligh36768f02008-02-22 18:28:33 +00001257
1258
jadmanski0afbb632008-06-06 21:10:57 +00001259 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001260 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001261 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1262 # get reset.
1263 new_agent = Agent(self.active_task.failure_tasks)
1264 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001265
mblighe2586682008-02-29 22:45:46 +00001266
showard4c5374f2008-09-04 17:02:56 +00001267 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001268 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001269
1270
jadmanski0afbb632008-06-06 21:10:57 +00001271 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001272 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001273
1274
showardd3dc1992009-04-22 21:01:40 +00001275 def abort(self):
showard08a36412009-05-05 01:01:13 +00001276 # abort tasks until the queue is empty or a task ignores the abort
1277 while not self.is_done():
1278 if not self.active_task:
1279 self._next_task()
showardd3dc1992009-04-22 21:01:40 +00001280 self.active_task.abort()
showard08a36412009-05-05 01:01:13 +00001281 if not self.active_task.aborted:
1282 # tasks can choose to ignore aborts
showard20f9bdd2009-04-29 19:48:33 +00001283 return
1284 self.active_task = None
1285
showardd3dc1992009-04-22 21:01:40 +00001286
showard77182562009-06-10 00:16:05 +00001287class DelayedCallTask(object):
1288 """
1289 A task object like AgentTask for an Agent to run that waits for the
1290 specified amount of time to have elapsed before calling the supplied
1291 callback once and finishing. If the callback returns anything, it is
1292 assumed to be a new Agent instance and will be added to the dispatcher.
1293
1294 @attribute end_time: The absolute posix time after which this task will
1295 call its callback when it is polled and be finished.
1296
1297 Also has all attributes required by the Agent class.
1298 """
1299 def __init__(self, delay_seconds, callback, now_func=None):
1300 """
1301 @param delay_seconds: The delay in seconds from now that this task
1302 will call the supplied callback and be done.
1303 @param callback: A callable to be called by this task once after at
1304 least delay_seconds time has elapsed. It must return None
1305 or a new Agent instance.
1306 @param now_func: A time.time like function. Default: time.time.
1307 Used for testing.
1308 """
1309 assert delay_seconds > 0
1310 assert callable(callback)
1311 if not now_func:
1312 now_func = time.time
1313 self._now_func = now_func
1314 self._callback = callback
1315
1316 self.end_time = self._now_func() + delay_seconds
1317
1318 # These attributes are required by Agent.
1319 self.aborted = False
1320 self.failure_tasks = ()
1321 self.host_ids = ()
1322 self.success = False
1323 self.queue_entry_ids = ()
1324 # This is filled in by Agent.add_task().
1325 self.agent = None
1326
1327
1328 def poll(self):
1329 if self._callback and self._now_func() >= self.end_time:
1330 new_agent = self._callback()
1331 if new_agent:
1332 self.agent.dispatcher.add_agent(new_agent)
1333 self._callback = None
1334 self.success = True
1335
1336
1337 def is_done(self):
1338 return not self._callback
1339
1340
1341 def abort(self):
1342 self.aborted = True
1343 self._callback = None
1344
1345
mbligh36768f02008-02-22 18:28:33 +00001346class AgentTask(object):
showardd3dc1992009-04-22 21:01:40 +00001347 def __init__(self, cmd, working_directory=None, failure_tasks=[],
1348 pidfile_name=None, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001349 self.done = False
1350 self.failure_tasks = failure_tasks
1351 self.started = False
1352 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001353 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001354 self.task = None
1355 self.agent = None
1356 self.monitor = None
1357 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001358 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001359 self.queue_entry_ids = []
1360 self.host_ids = []
1361 self.log_file = None
1362
1363
1364 def _set_ids(self, host=None, queue_entries=None):
1365 if queue_entries and queue_entries != [None]:
1366 self.host_ids = [entry.host.id for entry in queue_entries]
1367 self.queue_entry_ids = [entry.id for entry in queue_entries]
1368 else:
1369 assert host
1370 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001371
1372
jadmanski0afbb632008-06-06 21:10:57 +00001373 def poll(self):
showard08a36412009-05-05 01:01:13 +00001374 if not self.started:
1375 self.start()
1376 self.tick()
1377
1378
1379 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001380 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001381 exit_code = self.monitor.exit_code()
1382 if exit_code is None:
1383 return
1384 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001385 else:
1386 success = False
mbligh36768f02008-02-22 18:28:33 +00001387
jadmanski0afbb632008-06-06 21:10:57 +00001388 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001389
1390
jadmanski0afbb632008-06-06 21:10:57 +00001391 def is_done(self):
1392 return self.done
mbligh36768f02008-02-22 18:28:33 +00001393
1394
jadmanski0afbb632008-06-06 21:10:57 +00001395 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001396 if self.done:
1397 return
jadmanski0afbb632008-06-06 21:10:57 +00001398 self.done = True
1399 self.success = success
1400 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001401
1402
jadmanski0afbb632008-06-06 21:10:57 +00001403 def prolog(self):
1404 pass
mblighd64e5702008-04-04 21:39:28 +00001405
1406
jadmanski0afbb632008-06-06 21:10:57 +00001407 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001408 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001409
mbligh36768f02008-02-22 18:28:33 +00001410
jadmanski0afbb632008-06-06 21:10:57 +00001411 def cleanup(self):
showard6b733412009-04-27 20:09:18 +00001412 if self.monitor and self.monitor.has_process() and self.log_file:
showard170873e2009-01-07 00:22:26 +00001413 _drone_manager.copy_to_results_repository(
1414 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001415
1416
jadmanski0afbb632008-06-06 21:10:57 +00001417 def epilog(self):
1418 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001419
1420
jadmanski0afbb632008-06-06 21:10:57 +00001421 def start(self):
1422 assert self.agent
1423
1424 if not self.started:
1425 self.prolog()
1426 self.run()
1427
1428 self.started = True
1429
1430
1431 def abort(self):
1432 if self.monitor:
1433 self.monitor.kill()
1434 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001435 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001436 self.cleanup()
1437
1438
showard170873e2009-01-07 00:22:26 +00001439 def set_host_log_file(self, base_name, host):
1440 filename = '%s.%s' % (time.time(), base_name)
1441 self.log_file = os.path.join('hosts', host.hostname, filename)
1442
1443
showardde634ee2009-01-30 01:44:24 +00001444 def _get_consistent_execution_tag(self, queue_entries):
1445 first_execution_tag = queue_entries[0].execution_tag()
1446 for queue_entry in queue_entries[1:]:
1447 assert queue_entry.execution_tag() == first_execution_tag, (
1448 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1449 queue_entry,
1450 first_execution_tag,
1451 queue_entries[0]))
1452 return first_execution_tag
1453
1454
showarda1e74b32009-05-12 17:32:04 +00001455 def _copy_results(self, queue_entries, use_monitor=None):
showardde634ee2009-01-30 01:44:24 +00001456 assert len(queue_entries) > 0
showard6b733412009-04-27 20:09:18 +00001457 if use_monitor is None:
1458 assert self.monitor
1459 use_monitor = self.monitor
1460 assert use_monitor.has_process()
showardde634ee2009-01-30 01:44:24 +00001461 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001462 results_path = execution_tag + '/'
showard6b733412009-04-27 20:09:18 +00001463 _drone_manager.copy_to_results_repository(use_monitor.get_process(),
showard678df4f2009-02-04 21:36:39 +00001464 results_path)
showardde634ee2009-01-30 01:44:24 +00001465
showarda1e74b32009-05-12 17:32:04 +00001466
1467 def _parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001468 reparse_task = FinalReparseTask(queue_entries)
1469 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1470
1471
showarda1e74b32009-05-12 17:32:04 +00001472 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1473 self._copy_results(queue_entries, use_monitor)
1474 self._parse_results(queue_entries)
1475
1476
showardd3dc1992009-04-22 21:01:40 +00001477 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001478 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001479 self.monitor = PidfileRunMonitor()
1480 self.monitor.run(self.cmd, self._working_directory,
1481 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001482 log_file=self.log_file,
1483 pidfile_name=pidfile_name,
1484 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001485
1486
showardd9205182009-04-27 20:09:55 +00001487class TaskWithJobKeyvals(object):
1488 """AgentTask mixin providing functionality to help with job keyval files."""
1489 _KEYVAL_FILE = 'keyval'
1490 def _format_keyval(self, key, value):
1491 return '%s=%s' % (key, value)
1492
1493
1494 def _keyval_path(self):
1495 """Subclasses must override this"""
1496 raise NotImplemented
1497
1498
1499 def _write_keyval_after_job(self, field, value):
1500 assert self.monitor
1501 if not self.monitor.has_process():
1502 return
1503 _drone_manager.write_lines_to_file(
1504 self._keyval_path(), [self._format_keyval(field, value)],
1505 paired_with_process=self.monitor.get_process())
1506
1507
1508 def _job_queued_keyval(self, job):
1509 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1510
1511
1512 def _write_job_finished(self):
1513 self._write_keyval_after_job("job_finished", int(time.time()))
1514
1515
1516class RepairTask(AgentTask, TaskWithJobKeyvals):
showarde788ea62008-11-17 21:02:47 +00001517 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001518 """\
showard170873e2009-01-07 00:22:26 +00001519 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001520 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001521 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001522 # normalize the protection name
1523 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001524
jadmanski0afbb632008-06-06 21:10:57 +00001525 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001526 self.queue_entry_to_fail = queue_entry
1527 # *don't* include the queue entry in IDs -- if the queue entry is
1528 # aborted, we want to leave the repair task running
1529 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001530
1531 self.create_temp_resultsdir('.repair')
showard87ba02a2009-04-20 19:37:32 +00001532 cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1533 ['-R', '--host-protection', protection],
1534 queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001535 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1536
showard170873e2009-01-07 00:22:26 +00001537 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001538
mbligh36768f02008-02-22 18:28:33 +00001539
jadmanski0afbb632008-06-06 21:10:57 +00001540 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001541 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001542 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001543 if self.queue_entry_to_fail:
1544 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001545
1546
showardd9205182009-04-27 20:09:55 +00001547 def _keyval_path(self):
1548 return os.path.join(self.temp_results_dir, self._KEYVAL_FILE)
1549
1550
showardde634ee2009-01-30 01:44:24 +00001551 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001552 assert self.queue_entry_to_fail
1553
1554 if self.queue_entry_to_fail.meta_host:
1555 return # don't fail metahost entries, they'll be reassigned
1556
1557 self.queue_entry_to_fail.update_from_database()
1558 if self.queue_entry_to_fail.status != 'Queued':
1559 return # entry has been aborted
1560
1561 self.queue_entry_to_fail.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001562 queued_key, queued_time = self._job_queued_keyval(
1563 self.queue_entry_to_fail.job)
1564 self._write_keyval_after_job(queued_key, queued_time)
1565 self._write_job_finished()
showard678df4f2009-02-04 21:36:39 +00001566 # copy results logs into the normal place for job results
1567 _drone_manager.copy_results_on_drone(
1568 self.monitor.get_process(),
1569 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001570 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001571
showarda1e74b32009-05-12 17:32:04 +00001572 self._copy_results([self.queue_entry_to_fail])
1573 if self.queue_entry_to_fail.job.parse_failed_repair:
1574 self._parse_results([self.queue_entry_to_fail])
showardccbd6c52009-03-21 00:10:21 +00001575 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001576
1577
jadmanski0afbb632008-06-06 21:10:57 +00001578 def epilog(self):
1579 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001580
1581 tasks = models.SpecialTask.objects.filter(host__id=self.host.id,
1582 is_active=True)
1583 for task in tasks:
1584 task.is_complete = True
1585 task.save()
1586
jadmanski0afbb632008-06-06 21:10:57 +00001587 if self.success:
1588 self.host.set_status('Ready')
1589 else:
1590 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001591 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001592 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001593
1594
showard8fe93b52008-11-18 17:53:22 +00001595class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001596 def epilog(self):
1597 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001598 should_copy_results = (self.queue_entry and not self.success
1599 and not self.queue_entry.meta_host)
1600 if should_copy_results:
1601 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001602 destination = os.path.join(self.queue_entry.execution_tag(),
1603 os.path.basename(self.log_file))
1604 _drone_manager.copy_to_results_repository(
1605 self.monitor.get_process(), self.log_file,
1606 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001607
1608
1609class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001610 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001611 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001612 self.host = host or queue_entry.host
1613 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001614
jadmanski0afbb632008-06-06 21:10:57 +00001615 self.create_temp_resultsdir('.verify')
showard87ba02a2009-04-20 19:37:32 +00001616 cmd = _autoserv_command_line(self.host.hostname, self.temp_results_dir,
1617 ['-v'], queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001618 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001619 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1620 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001621
showard170873e2009-01-07 00:22:26 +00001622 self.set_host_log_file('verify', self.host)
1623 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001624
1625
jadmanski0afbb632008-06-06 21:10:57 +00001626 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001627 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001628 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001629 if self.queue_entry:
1630 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001631 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001632
1633
jadmanski0afbb632008-06-06 21:10:57 +00001634 def epilog(self):
1635 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001636
jadmanski0afbb632008-06-06 21:10:57 +00001637 if self.success:
showard6d7b2ff2009-06-10 00:16:47 +00001638 tasks = models.SpecialTask.objects.filter(host__id=self.host.id,
1639 is_active=True)
1640 for task in tasks:
1641 task.is_complete=True
1642 task.save()
1643
jadmanski0afbb632008-06-06 21:10:57 +00001644 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001645
1646
showardd9205182009-04-27 20:09:55 +00001647class QueueTask(AgentTask, TaskWithJobKeyvals):
showardf1ae3542009-05-11 19:26:02 +00001648 def __init__(self, job, queue_entries, cmd, group_name=''):
jadmanski0afbb632008-06-06 21:10:57 +00001649 self.job = job
1650 self.queue_entries = queue_entries
showardf1ae3542009-05-11 19:26:02 +00001651 self.group_name = group_name
showard170873e2009-01-07 00:22:26 +00001652 super(QueueTask, self).__init__(cmd, self._execution_tag())
1653 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001654
1655
showard73ec0442009-02-07 02:05:20 +00001656 def _keyval_path(self):
showardd9205182009-04-27 20:09:55 +00001657 return os.path.join(self._execution_tag(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001658
1659
1660 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1661 keyval_contents = '\n'.join(self._format_keyval(key, value)
1662 for key, value in keyval_dict.iteritems())
1663 # always end with a newline to allow additional keyvals to be written
1664 keyval_contents += '\n'
1665 _drone_manager.attach_file_to_execution(self._execution_tag(),
1666 keyval_contents,
1667 file_path=keyval_path)
1668
1669
1670 def _write_keyvals_before_job(self, keyval_dict):
1671 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1672
1673
showard170873e2009-01-07 00:22:26 +00001674 def _write_host_keyvals(self, host):
1675 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1676 host.hostname)
1677 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001678 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1679 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001680
1681
showard170873e2009-01-07 00:22:26 +00001682 def _execution_tag(self):
1683 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001684
1685
jadmanski0afbb632008-06-06 21:10:57 +00001686 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001687 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001688 keyval_dict = {queued_key: queued_time}
1689 if self.group_name:
1690 keyval_dict['host_group_name'] = self.group_name
1691 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001692 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001693 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001694 queue_entry.set_status('Running')
showard12f3e322009-05-13 21:27:42 +00001695 queue_entry.update_field('started_on', datetime.datetime.now())
jadmanski0afbb632008-06-06 21:10:57 +00001696 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001697 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001698 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001699 assert len(self.queue_entries) == 1
1700 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001701
1702
showard35162b02009-03-03 02:17:30 +00001703 def _write_lost_process_error_file(self):
1704 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1705 _drone_manager.write_lines_to_file(error_file_path,
1706 [_LOST_PROCESS_ERROR])
1707
1708
showardd3dc1992009-04-22 21:01:40 +00001709 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001710 if not self.monitor:
1711 return
1712
showardd9205182009-04-27 20:09:55 +00001713 self._write_job_finished()
1714
showardd3dc1992009-04-22 21:01:40 +00001715 # both of these conditionals can be true, iff the process ran, wrote a
1716 # pid to its pidfile, and then exited without writing an exit code
showard35162b02009-03-03 02:17:30 +00001717 if self.monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +00001718 gather_task = GatherLogsTask(self.job, self.queue_entries)
1719 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showard35162b02009-03-03 02:17:30 +00001720
1721 if self.monitor.lost_process:
1722 self._write_lost_process_error_file()
1723 for queue_entry in self.queue_entries:
1724 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001725
1726
showardcbd74612008-11-19 21:42:02 +00001727 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001728 _drone_manager.write_lines_to_file(
1729 os.path.join(self._execution_tag(), 'status.log'),
1730 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001731 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001732
1733
jadmanskif7fa2cc2008-10-01 14:13:23 +00001734 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001735 if not self.monitor or not self.monitor.has_process():
1736 return
1737
jadmanskif7fa2cc2008-10-01 14:13:23 +00001738 # build up sets of all the aborted_by and aborted_on values
1739 aborted_by, aborted_on = set(), set()
1740 for queue_entry in self.queue_entries:
1741 if queue_entry.aborted_by:
1742 aborted_by.add(queue_entry.aborted_by)
1743 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1744 aborted_on.add(t)
1745
1746 # extract some actual, unique aborted by value and write it out
1747 assert len(aborted_by) <= 1
1748 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001749 aborted_by_value = aborted_by.pop()
1750 aborted_on_value = max(aborted_on)
1751 else:
1752 aborted_by_value = 'autotest_system'
1753 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001754
showarda0382352009-02-11 23:36:43 +00001755 self._write_keyval_after_job("aborted_by", aborted_by_value)
1756 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001757
showardcbd74612008-11-19 21:42:02 +00001758 aborted_on_string = str(datetime.datetime.fromtimestamp(
1759 aborted_on_value))
1760 self._write_status_comment('Job aborted by %s on %s' %
1761 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001762
1763
jadmanski0afbb632008-06-06 21:10:57 +00001764 def abort(self):
1765 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001766 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001767 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001768
1769
jadmanski0afbb632008-06-06 21:10:57 +00001770 def epilog(self):
1771 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001772 self._finish_task()
1773 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001774
1775
mblighbb421852008-03-11 22:36:16 +00001776class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001777 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001778 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
showard5add1c82009-05-26 19:27:46 +00001779 self.monitor = run_monitor
1780 self.started = True
1781 # since we set started=True here, prolog() and run() shouldn't be called
mblighbb421852008-03-11 22:36:16 +00001782
1783
jadmanski0afbb632008-06-06 21:10:57 +00001784 def run(self):
showard5add1c82009-05-26 19:27:46 +00001785 raise NotImplemented('This should never be called')
mblighbb421852008-03-11 22:36:16 +00001786
1787
jadmanski0afbb632008-06-06 21:10:57 +00001788 def prolog(self):
showard5add1c82009-05-26 19:27:46 +00001789 raise NotImplemented('This should never be called')
mblighbb421852008-03-11 22:36:16 +00001790
1791
showardd3dc1992009-04-22 21:01:40 +00001792class PostJobTask(AgentTask):
1793 def __init__(self, queue_entries, pidfile_name, logfile_name,
1794 run_monitor=None):
1795 """
1796 If run_monitor != None, we're recovering a running task.
1797 """
1798 self._queue_entries = queue_entries
1799 self._pidfile_name = pidfile_name
showardd3dc1992009-04-22 21:01:40 +00001800
1801 self._execution_tag = self._get_consistent_execution_tag(queue_entries)
1802 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1803 self._autoserv_monitor = PidfileRunMonitor()
1804 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1805 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1806
1807 if _testing_mode:
1808 command = 'true'
1809 else:
1810 command = self._generate_command(self._results_dir)
1811
1812 super(PostJobTask, self).__init__(cmd=command,
1813 working_directory=self._execution_tag)
showard5add1c82009-05-26 19:27:46 +00001814 # this must happen *after* the super call
1815 self.monitor = run_monitor
1816 if run_monitor:
1817 self.started = True
showardd3dc1992009-04-22 21:01:40 +00001818
1819 self.log_file = os.path.join(self._execution_tag, logfile_name)
1820 self._final_status = self._determine_final_status()
1821
1822
1823 def _generate_command(self, results_dir):
1824 raise NotImplementedError('Subclasses must override this')
1825
1826
1827 def _job_was_aborted(self):
1828 was_aborted = None
1829 for queue_entry in self._queue_entries:
1830 queue_entry.update_from_database()
1831 if was_aborted is None: # first queue entry
1832 was_aborted = bool(queue_entry.aborted)
1833 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
1834 email_manager.manager.enqueue_notify_email(
1835 'Inconsistent abort state',
1836 'Queue entries have inconsistent abort state: ' +
1837 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
1838 # don't crash here, just assume true
1839 return True
1840 return was_aborted
1841
1842
1843 def _determine_final_status(self):
1844 if self._job_was_aborted():
1845 return models.HostQueueEntry.Status.ABORTED
1846
1847 # we'll use a PidfileRunMonitor to read the autoserv exit status
1848 if self._autoserv_monitor.exit_code() == 0:
1849 return models.HostQueueEntry.Status.COMPLETED
1850 return models.HostQueueEntry.Status.FAILED
1851
1852
1853 def run(self):
showard5add1c82009-05-26 19:27:46 +00001854 assert not self.monitor
showardd3dc1992009-04-22 21:01:40 +00001855
showard5add1c82009-05-26 19:27:46 +00001856 # make sure we actually have results to work with.
1857 # this should never happen in normal operation.
1858 if not self._autoserv_monitor.has_process():
1859 email_manager.manager.enqueue_notify_email(
1860 'No results in post-job task',
1861 'No results in post-job task at %s' %
1862 self._autoserv_monitor.pidfile_id)
1863 self.finished(False)
1864 return
1865
1866 super(PostJobTask, self).run(
1867 pidfile_name=self._pidfile_name,
1868 paired_with_pidfile=self._paired_with_pidfile)
showardd3dc1992009-04-22 21:01:40 +00001869
1870
1871 def _set_all_statuses(self, status):
1872 for queue_entry in self._queue_entries:
1873 queue_entry.set_status(status)
1874
1875
1876 def abort(self):
1877 # override AgentTask.abort() to avoid killing the process and ending
1878 # the task. post-job tasks continue when the job is aborted.
1879 pass
1880
1881
1882class GatherLogsTask(PostJobTask):
1883 """
1884 Task responsible for
1885 * gathering uncollected logs (if Autoserv crashed hard or was killed)
1886 * copying logs to the results repository
1887 * spawning CleanupTasks for hosts, if necessary
1888 * spawning a FinalReparseTask for the job
1889 """
1890 def __init__(self, job, queue_entries, run_monitor=None):
1891 self._job = job
1892 super(GatherLogsTask, self).__init__(
1893 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
1894 logfile_name='.collect_crashinfo.log', run_monitor=run_monitor)
1895 self._set_ids(queue_entries=queue_entries)
1896
1897
1898 def _generate_command(self, results_dir):
1899 host_list = ','.join(queue_entry.host.hostname
1900 for queue_entry in self._queue_entries)
1901 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
1902 '-r', results_dir]
1903
1904
1905 def prolog(self):
1906 super(GatherLogsTask, self).prolog()
1907 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
1908
1909
1910 def _reboot_hosts(self):
1911 reboot_after = self._job.reboot_after
1912 do_reboot = False
showard6b733412009-04-27 20:09:18 +00001913 if self._final_status == models.HostQueueEntry.Status.ABORTED:
1914 do_reboot = True
1915 elif reboot_after == models.RebootAfter.ALWAYS:
showardd3dc1992009-04-22 21:01:40 +00001916 do_reboot = True
1917 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
1918 final_success = (
1919 self._final_status == models.HostQueueEntry.Status.COMPLETED)
1920 num_tests_failed = self._autoserv_monitor.num_tests_failed()
1921 do_reboot = (final_success and num_tests_failed == 0)
1922
1923 for queue_entry in self._queue_entries:
1924 if do_reboot:
1925 # don't pass the queue entry to the CleanupTask. if the cleanup
1926 # fails, the job doesn't care -- it's over.
1927 cleanup_task = CleanupTask(host=queue_entry.host)
1928 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1929 else:
1930 queue_entry.host.set_status('Ready')
1931
1932
1933 def epilog(self):
1934 super(GatherLogsTask, self).epilog()
showardebc0fb72009-05-13 21:28:07 +00001935 if self._autoserv_monitor.has_process():
1936 self._copy_and_parse_results(self._queue_entries,
1937 use_monitor=self._autoserv_monitor)
showardd3dc1992009-04-22 21:01:40 +00001938 self._reboot_hosts()
1939
1940
showard0bbfc212009-04-29 21:06:13 +00001941 def run(self):
showard597bfd32009-05-08 18:22:50 +00001942 autoserv_exit_code = self._autoserv_monitor.exit_code()
1943 # only run if Autoserv exited due to some signal. if we have no exit
1944 # code, assume something bad (and signal-like) happened.
1945 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00001946 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00001947 else:
1948 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00001949
1950
showard8fe93b52008-11-18 17:53:22 +00001951class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001952 def __init__(self, host=None, queue_entry=None):
1953 assert bool(host) ^ bool(queue_entry)
1954 if queue_entry:
1955 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001956 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001957 self.host = host
showard170873e2009-01-07 00:22:26 +00001958
1959 self.create_temp_resultsdir('.cleanup')
showard87ba02a2009-04-20 19:37:32 +00001960 self.cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1961 ['--cleanup'],
1962 queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001963 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001964 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1965 failure_tasks=[repair_task])
1966
1967 self._set_ids(host=host, queue_entries=[queue_entry])
1968 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001969
mblighd5c95802008-03-05 00:33:46 +00001970
jadmanski0afbb632008-06-06 21:10:57 +00001971 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001972 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001973 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001974 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001975
mblighd5c95802008-03-05 00:33:46 +00001976
showard21baa452008-10-21 00:08:39 +00001977 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001978 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001979 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001980 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001981 self.host.update_field('dirty', 0)
1982
1983
showardd3dc1992009-04-22 21:01:40 +00001984class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00001985 _num_running_parses = 0
1986
showardd3dc1992009-04-22 21:01:40 +00001987 def __init__(self, queue_entries, run_monitor=None):
1988 super(FinalReparseTask, self).__init__(queue_entries,
1989 pidfile_name=_PARSER_PID_FILE,
1990 logfile_name='.parse.log',
1991 run_monitor=run_monitor)
showard170873e2009-01-07 00:22:26 +00001992 # don't use _set_ids, since we don't want to set the host_ids
1993 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard5add1c82009-05-26 19:27:46 +00001994 self._parse_started = (run_monitor is not None)
showard97aed502008-11-04 02:01:24 +00001995
showard97aed502008-11-04 02:01:24 +00001996
1997 @classmethod
1998 def _increment_running_parses(cls):
1999 cls._num_running_parses += 1
2000
2001
2002 @classmethod
2003 def _decrement_running_parses(cls):
2004 cls._num_running_parses -= 1
2005
2006
2007 @classmethod
2008 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002009 return (cls._num_running_parses <
2010 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002011
2012
2013 def prolog(self):
2014 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002015 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00002016
2017
2018 def epilog(self):
2019 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002020 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00002021
2022
showardd3dc1992009-04-22 21:01:40 +00002023 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00002024 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00002025 results_dir]
showard97aed502008-11-04 02:01:24 +00002026
2027
showard08a36412009-05-05 01:01:13 +00002028 def tick(self):
2029 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002030 # and we can, at which point we revert to default behavior
2031 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00002032 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002033 else:
2034 self._try_starting_parse()
2035
2036
2037 def run(self):
2038 # override run() to not actually run unless we can
2039 self._try_starting_parse()
2040
2041
2042 def _try_starting_parse(self):
2043 if not self._can_run_new_parse():
2044 return
showard170873e2009-01-07 00:22:26 +00002045
showard97aed502008-11-04 02:01:24 +00002046 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002047 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00002048
showard97aed502008-11-04 02:01:24 +00002049 self._increment_running_parses()
2050 self._parse_started = True
2051
2052
2053 def finished(self, success):
2054 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00002055 if self._parse_started:
2056 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002057
2058
showardc9ae1782009-01-30 01:42:37 +00002059class SetEntryPendingTask(AgentTask):
2060 def __init__(self, queue_entry):
2061 super(SetEntryPendingTask, self).__init__(cmd='')
2062 self._queue_entry = queue_entry
2063 self._set_ids(queue_entries=[queue_entry])
2064
2065
2066 def run(self):
2067 agent = self._queue_entry.on_pending()
2068 if agent:
2069 self.agent.dispatcher.add_agent(agent)
2070 self.finished(True)
2071
2072
showarda3c58572009-03-12 20:36:59 +00002073class DBError(Exception):
2074 """Raised by the DBObject constructor when its select fails."""
2075
2076
mbligh36768f02008-02-22 18:28:33 +00002077class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002078 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002079
2080 # Subclasses MUST override these:
2081 _table_name = ''
2082 _fields = ()
2083
showarda3c58572009-03-12 20:36:59 +00002084 # A mapping from (type, id) to the instance of the object for that
2085 # particular id. This prevents us from creating new Job() and Host()
2086 # instances for every HostQueueEntry object that we instantiate as
2087 # multiple HQEs often share the same Job.
2088 _instances_by_type_and_id = weakref.WeakValueDictionary()
2089 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002090
showarda3c58572009-03-12 20:36:59 +00002091
2092 def __new__(cls, id=None, **kwargs):
2093 """
2094 Look to see if we already have an instance for this particular type
2095 and id. If so, use it instead of creating a duplicate instance.
2096 """
2097 if id is not None:
2098 instance = cls._instances_by_type_and_id.get((cls, id))
2099 if instance:
2100 return instance
2101 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2102
2103
2104 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00002105 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00002106 assert self._table_name, '_table_name must be defined in your class'
2107 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002108 if not new_record:
2109 if self._initialized and not always_query:
2110 return # We've already been initialized.
2111 if id is None:
2112 id = row[0]
2113 # Tell future constructors to use us instead of re-querying while
2114 # this instance is still around.
2115 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002116
showard6ae5ea92009-02-25 00:11:51 +00002117 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002118
jadmanski0afbb632008-06-06 21:10:57 +00002119 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002120
jadmanski0afbb632008-06-06 21:10:57 +00002121 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002122 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002123
showarda3c58572009-03-12 20:36:59 +00002124 if self._initialized:
2125 differences = self._compare_fields_in_row(row)
2126 if differences:
showard7629f142009-03-27 21:02:02 +00002127 logging.warn(
2128 'initialized %s %s instance requery is updating: %s',
2129 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002130 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002131 self._initialized = True
2132
2133
2134 @classmethod
2135 def _clear_instance_cache(cls):
2136 """Used for testing, clear the internal instance cache."""
2137 cls._instances_by_type_and_id.clear()
2138
2139
showardccbd6c52009-03-21 00:10:21 +00002140 def _fetch_row_from_db(self, row_id):
2141 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2142 rows = _db.execute(sql, (row_id,))
2143 if not rows:
showard76e29d12009-04-15 21:53:10 +00002144 raise DBError("row not found (table=%s, row id=%s)"
2145 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002146 return rows[0]
2147
2148
showarda3c58572009-03-12 20:36:59 +00002149 def _assert_row_length(self, row):
2150 assert len(row) == len(self._fields), (
2151 "table = %s, row = %s/%d, fields = %s/%d" % (
2152 self.__table, row, len(row), self._fields, len(self._fields)))
2153
2154
2155 def _compare_fields_in_row(self, row):
2156 """
2157 Given a row as returned by a SELECT query, compare it to our existing
2158 in memory fields.
2159
2160 @param row - A sequence of values corresponding to fields named in
2161 The class attribute _fields.
2162
2163 @returns A dictionary listing the differences keyed by field name
2164 containing tuples of (current_value, row_value).
2165 """
2166 self._assert_row_length(row)
2167 differences = {}
2168 for field, row_value in itertools.izip(self._fields, row):
2169 current_value = getattr(self, field)
2170 if current_value != row_value:
2171 differences[field] = (current_value, row_value)
2172 return differences
showard2bab8f42008-11-12 18:15:22 +00002173
2174
2175 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002176 """
2177 Update our field attributes using a single row returned by SELECT.
2178
2179 @param row - A sequence of values corresponding to fields named in
2180 the class fields list.
2181 """
2182 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002183
showard2bab8f42008-11-12 18:15:22 +00002184 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002185 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002186 setattr(self, field, value)
2187 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002188
showard2bab8f42008-11-12 18:15:22 +00002189 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002190
mblighe2586682008-02-29 22:45:46 +00002191
showardccbd6c52009-03-21 00:10:21 +00002192 def update_from_database(self):
2193 assert self.id is not None
2194 row = self._fetch_row_from_db(self.id)
2195 self._update_fields_from_row(row)
2196
2197
jadmanski0afbb632008-06-06 21:10:57 +00002198 def count(self, where, table = None):
2199 if not table:
2200 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002201
jadmanski0afbb632008-06-06 21:10:57 +00002202 rows = _db.execute("""
2203 SELECT count(*) FROM %s
2204 WHERE %s
2205 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002206
jadmanski0afbb632008-06-06 21:10:57 +00002207 assert len(rows) == 1
2208
2209 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002210
2211
showardd3dc1992009-04-22 21:01:40 +00002212 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002213 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002214
showard2bab8f42008-11-12 18:15:22 +00002215 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002216 return
mbligh36768f02008-02-22 18:28:33 +00002217
mblighf8c624d2008-07-03 16:58:45 +00002218 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002219 _db.execute(query, (value, self.id))
2220
showard2bab8f42008-11-12 18:15:22 +00002221 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002222
2223
jadmanski0afbb632008-06-06 21:10:57 +00002224 def save(self):
2225 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002226 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002227 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002228 values = []
2229 for key in keys:
2230 value = getattr(self, key)
2231 if value is None:
2232 values.append('NULL')
2233 else:
2234 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002235 values_str = ','.join(values)
2236 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2237 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002238 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002239 # Update our id to the one the database just assigned to us.
2240 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002241
2242
jadmanski0afbb632008-06-06 21:10:57 +00002243 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002244 self._instances_by_type_and_id.pop((type(self), id), None)
2245 self._initialized = False
2246 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002247 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2248 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002249
2250
showard63a34772008-08-18 19:32:50 +00002251 @staticmethod
2252 def _prefix_with(string, prefix):
2253 if string:
2254 string = prefix + string
2255 return string
2256
2257
jadmanski0afbb632008-06-06 21:10:57 +00002258 @classmethod
showard989f25d2008-10-01 11:38:11 +00002259 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002260 """
2261 Construct instances of our class based on the given database query.
2262
2263 @yields One class instance for each row fetched.
2264 """
showard63a34772008-08-18 19:32:50 +00002265 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2266 where = cls._prefix_with(where, 'WHERE ')
2267 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002268 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002269 'joins' : joins,
2270 'where' : where,
2271 'order_by' : order_by})
2272 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002273 for row in rows:
2274 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002275
mbligh36768f02008-02-22 18:28:33 +00002276
2277class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002278 _table_name = 'ineligible_host_queues'
2279 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002280
2281
showard89f84db2009-03-12 20:39:13 +00002282class AtomicGroup(DBObject):
2283 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002284 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2285 'invalid')
showard89f84db2009-03-12 20:39:13 +00002286
2287
showard989f25d2008-10-01 11:38:11 +00002288class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002289 _table_name = 'labels'
2290 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002291 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002292
2293
mbligh36768f02008-02-22 18:28:33 +00002294class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002295 _table_name = 'hosts'
2296 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2297 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2298
2299
jadmanski0afbb632008-06-06 21:10:57 +00002300 def current_task(self):
2301 rows = _db.execute("""
2302 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
2303 """, (self.id,))
2304
2305 if len(rows) == 0:
2306 return None
2307 else:
2308 assert len(rows) == 1
2309 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00002310 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00002311
2312
jadmanski0afbb632008-06-06 21:10:57 +00002313 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00002314 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002315 if self.current_task():
2316 self.current_task().requeue()
2317
showard6ae5ea92009-02-25 00:11:51 +00002318
jadmanski0afbb632008-06-06 21:10:57 +00002319 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002320 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002321 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002322
2323
showard170873e2009-01-07 00:22:26 +00002324 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002325 """
showard170873e2009-01-07 00:22:26 +00002326 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002327 """
2328 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002329 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002330 FROM labels
2331 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002332 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002333 ORDER BY labels.name
2334 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002335 platform = None
2336 all_labels = []
2337 for label_name, is_platform in rows:
2338 if is_platform:
2339 platform = label_name
2340 all_labels.append(label_name)
2341 return platform, all_labels
2342
2343
showarda64e52a2009-06-08 23:24:08 +00002344 def reverify_tasks(self, cleanup=True):
2345 tasks = [VerifyTask(host=self)]
showard6d7b2ff2009-06-10 00:16:47 +00002346 # just to make sure this host does not get taken away
2347 self.set_status('Verifying')
showarda64e52a2009-06-08 23:24:08 +00002348 if cleanup:
2349 tasks.insert(0, CleanupTask(host=self))
showard6d7b2ff2009-06-10 00:16:47 +00002350 self.set_status('Cleaning')
showarda64e52a2009-06-08 23:24:08 +00002351 return tasks
showardd8e548a2008-09-09 03:04:57 +00002352
2353
showard54c1ea92009-05-20 00:32:58 +00002354 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2355
2356
2357 @classmethod
2358 def cmp_for_sort(cls, a, b):
2359 """
2360 A comparison function for sorting Host objects by hostname.
2361
2362 This strips any trailing numeric digits, ignores leading 0s and
2363 compares hostnames by the leading name and the trailing digits as a
2364 number. If both hostnames do not match this pattern, they are simply
2365 compared as lower case strings.
2366
2367 Example of how hostnames will be sorted:
2368
2369 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2370
2371 This hopefully satisfy most people's hostname sorting needs regardless
2372 of their exact naming schemes. Nobody sane should have both a host10
2373 and host010 (but the algorithm works regardless).
2374 """
2375 lower_a = a.hostname.lower()
2376 lower_b = b.hostname.lower()
2377 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2378 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2379 if match_a and match_b:
2380 name_a, number_a_str = match_a.groups()
2381 name_b, number_b_str = match_b.groups()
2382 number_a = int(number_a_str.lstrip('0'))
2383 number_b = int(number_b_str.lstrip('0'))
2384 result = cmp((name_a, number_a), (name_b, number_b))
2385 if result == 0 and lower_a != lower_b:
2386 # If they compared equal above but the lower case names are
2387 # indeed different, don't report equality. abc012 != abc12.
2388 return cmp(lower_a, lower_b)
2389 return result
2390 else:
2391 return cmp(lower_a, lower_b)
2392
2393
mbligh36768f02008-02-22 18:28:33 +00002394class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002395 _table_name = 'host_queue_entries'
2396 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002397 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002398 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002399
2400
showarda3c58572009-03-12 20:36:59 +00002401 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002402 assert id or row
showarda3c58572009-03-12 20:36:59 +00002403 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002404 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002405
jadmanski0afbb632008-06-06 21:10:57 +00002406 if self.host_id:
2407 self.host = Host(self.host_id)
2408 else:
2409 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002410
showard77182562009-06-10 00:16:05 +00002411 if self.atomic_group_id:
2412 self.atomic_group = AtomicGroup(self.atomic_group_id,
2413 always_query=False)
2414 else:
2415 self.atomic_group = None
2416
showard170873e2009-01-07 00:22:26 +00002417 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002418 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002419
2420
showard89f84db2009-03-12 20:39:13 +00002421 @classmethod
2422 def clone(cls, template):
2423 """
2424 Creates a new row using the values from a template instance.
2425
2426 The new instance will not exist in the database or have a valid
2427 id attribute until its save() method is called.
2428 """
2429 assert isinstance(template, cls)
2430 new_row = [getattr(template, field) for field in cls._fields]
2431 clone = cls(row=new_row, new_record=True)
2432 clone.id = None
2433 return clone
2434
2435
showardc85c21b2008-11-24 22:17:37 +00002436 def _view_job_url(self):
2437 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2438
2439
showardf1ae3542009-05-11 19:26:02 +00002440 def get_labels(self):
2441 """
2442 Get all labels associated with this host queue entry (either via the
2443 meta_host or as a job dependency label). The labels yielded are not
2444 guaranteed to be unique.
2445
2446 @yields Label instances associated with this host_queue_entry.
2447 """
2448 if self.meta_host:
2449 yield Label(id=self.meta_host, always_query=False)
2450 labels = Label.fetch(
2451 joins="JOIN jobs_dependency_labels AS deps "
2452 "ON (labels.id = deps.label_id)",
2453 where="deps.job_id = %d" % self.job.id)
2454 for label in labels:
2455 yield label
2456
2457
jadmanski0afbb632008-06-06 21:10:57 +00002458 def set_host(self, host):
2459 if host:
2460 self.queue_log_record('Assigning host ' + host.hostname)
2461 self.update_field('host_id', host.id)
2462 self.update_field('active', True)
2463 self.block_host(host.id)
2464 else:
2465 self.queue_log_record('Releasing host')
2466 self.unblock_host(self.host.id)
2467 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002468
jadmanski0afbb632008-06-06 21:10:57 +00002469 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002470
2471
jadmanski0afbb632008-06-06 21:10:57 +00002472 def get_host(self):
2473 return self.host
mbligh36768f02008-02-22 18:28:33 +00002474
2475
jadmanski0afbb632008-06-06 21:10:57 +00002476 def queue_log_record(self, log_line):
2477 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002478 _drone_manager.write_lines_to_file(self.queue_log_path,
2479 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002480
2481
jadmanski0afbb632008-06-06 21:10:57 +00002482 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002483 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002484 row = [0, self.job.id, host_id]
2485 block = IneligibleHostQueue(row=row, new_record=True)
2486 block.save()
mblighe2586682008-02-29 22:45:46 +00002487
2488
jadmanski0afbb632008-06-06 21:10:57 +00002489 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002490 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002491 blocks = IneligibleHostQueue.fetch(
2492 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2493 for block in blocks:
2494 block.delete()
mblighe2586682008-02-29 22:45:46 +00002495
2496
showard2bab8f42008-11-12 18:15:22 +00002497 def set_execution_subdir(self, subdir=None):
2498 if subdir is None:
2499 assert self.get_host()
2500 subdir = self.get_host().hostname
2501 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002502
2503
showard6355f6b2008-12-05 18:52:13 +00002504 def _get_hostname(self):
2505 if self.host:
2506 return self.host.hostname
2507 return 'no host'
2508
2509
showard170873e2009-01-07 00:22:26 +00002510 def __str__(self):
2511 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2512
2513
jadmanski0afbb632008-06-06 21:10:57 +00002514 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002515 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002516
showardb18134f2009-03-20 20:52:18 +00002517 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002518
showardc85c21b2008-11-24 22:17:37 +00002519 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002520 self.update_field('complete', False)
2521 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002522
jadmanski0afbb632008-06-06 21:10:57 +00002523 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002524 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002525 self.update_field('complete', False)
2526 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002527
showardc85c21b2008-11-24 22:17:37 +00002528 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002529 self.update_field('complete', True)
2530 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002531
2532 should_email_status = (status.lower() in _notify_email_statuses or
2533 'all' in _notify_email_statuses)
2534 if should_email_status:
2535 self._email_on_status(status)
2536
2537 self._email_on_job_complete()
2538
2539
2540 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002541 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002542
2543 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2544 self.job.id, self.job.name, hostname, status)
2545 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2546 self.job.id, self.job.name, hostname, status,
2547 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002548 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002549
2550
2551 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002552 if not self.job.is_finished():
2553 return
showard542e8402008-09-19 20:16:18 +00002554
showardc85c21b2008-11-24 22:17:37 +00002555 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002556 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002557 for queue_entry in hosts_queue:
2558 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002559 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002560 queue_entry.status))
2561
2562 summary_text = "\n".join(summary_text)
2563 status_counts = models.Job.objects.get_status_counts(
2564 [self.job.id])[self.job.id]
2565 status = ', '.join('%d %s' % (count, status) for status, count
2566 in status_counts.iteritems())
2567
2568 subject = 'Autotest: Job ID: %s "%s" %s' % (
2569 self.job.id, self.job.name, status)
2570 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2571 self.job.id, self.job.name, status, self._view_job_url(),
2572 summary_text)
showard170873e2009-01-07 00:22:26 +00002573 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002574
2575
showard77182562009-06-10 00:16:05 +00002576 def run_pre_job_tasks(self, assigned_host=None):
2577 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002578 assert assigned_host
2579 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00002580 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00002581
showardb18134f2009-03-20 20:52:18 +00002582 logging.info("%s/%s/%s scheduled on %s, status=%s",
2583 self.job.name, self.meta_host, self.atomic_group_id,
2584 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002585
showard77182562009-06-10 00:16:05 +00002586 return self._do_run_pre_job_tasks()
2587
2588
2589 def _do_run_pre_job_tasks(self):
2590 # Every host goes thru the Verifying stage (which may or may not
2591 # actually do anything as determined by get_pre_job_tasks).
2592 self.set_status(models.HostQueueEntry.Status.VERIFYING)
2593
2594 # The pre job tasks always end with a SetEntryPendingTask which
2595 # will continue as appropriate through queue_entry.on_pending().
2596 return Agent(self.job.get_pre_job_tasks(queue_entry=self))
mblighe2586682008-02-29 22:45:46 +00002597
showard6ae5ea92009-02-25 00:11:51 +00002598
jadmanski0afbb632008-06-06 21:10:57 +00002599 def requeue(self):
2600 self.set_status('Queued')
showard12f3e322009-05-13 21:27:42 +00002601 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002602 # verify/cleanup failure sets the execution subdir, so reset it here
2603 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002604 if self.meta_host:
2605 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002606
2607
jadmanski0afbb632008-06-06 21:10:57 +00002608 def handle_host_failure(self):
2609 """\
2610 Called when this queue entry's host has failed verification and
2611 repair.
2612 """
2613 assert not self.meta_host
2614 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002615 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002616
2617
jadmanskif7fa2cc2008-10-01 14:13:23 +00002618 @property
2619 def aborted_by(self):
2620 self._load_abort_info()
2621 return self._aborted_by
2622
2623
2624 @property
2625 def aborted_on(self):
2626 self._load_abort_info()
2627 return self._aborted_on
2628
2629
2630 def _load_abort_info(self):
2631 """ Fetch info about who aborted the job. """
2632 if hasattr(self, "_aborted_by"):
2633 return
2634 rows = _db.execute("""
2635 SELECT users.login, aborted_host_queue_entries.aborted_on
2636 FROM aborted_host_queue_entries
2637 INNER JOIN users
2638 ON users.id = aborted_host_queue_entries.aborted_by_id
2639 WHERE aborted_host_queue_entries.queue_entry_id = %s
2640 """, (self.id,))
2641 if rows:
2642 self._aborted_by, self._aborted_on = rows[0]
2643 else:
2644 self._aborted_by = self._aborted_on = None
2645
2646
showardb2e2c322008-10-14 17:33:55 +00002647 def on_pending(self):
2648 """
2649 Called when an entry in a synchronous job has passed verify. If the
2650 job is ready to run, returns an agent to run the job. Returns None
2651 otherwise.
2652 """
2653 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002654 self.get_host().set_status('Pending')
showard77182562009-06-10 00:16:05 +00002655 return self.job.run_if_ready(queue_entry=self)
showardb2e2c322008-10-14 17:33:55 +00002656
2657
showardd3dc1992009-04-22 21:01:40 +00002658 def abort(self, dispatcher):
2659 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002660
showardd3dc1992009-04-22 21:01:40 +00002661 Status = models.HostQueueEntry.Status
2662 has_running_job_agent = (
2663 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2664 and dispatcher.get_agents_for_entry(self))
2665 if has_running_job_agent:
2666 # do nothing; post-job tasks will finish and then mark this entry
2667 # with status "Aborted" and take care of the host
2668 return
2669
2670 if self.status in (Status.STARTING, Status.PENDING):
2671 self.host.set_status(models.Host.Status.READY)
2672 elif self.status == Status.VERIFYING:
2673 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2674
2675 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002676
2677 def execution_tag(self):
2678 assert self.execution_subdir
2679 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002680
2681
mbligh36768f02008-02-22 18:28:33 +00002682class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002683 _table_name = 'jobs'
2684 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2685 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002686 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002687 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002688
showard77182562009-06-10 00:16:05 +00002689 # This does not need to be a column in the DB. The delays are likely to
2690 # be configured short. If the scheduler is stopped and restarted in
2691 # the middle of a job's delay cycle, the delay cycle will either be
2692 # repeated or skipped depending on the number of Pending machines found
2693 # when the restarted scheduler recovers to track it. Not a problem.
2694 #
2695 # A reference to the DelayedCallTask that will wake up the job should
2696 # no other HQEs change state in time. Its end_time attribute is used
2697 # by our run_with_ready_delay() method to determine if the wait is over.
2698 _delay_ready_task = None
2699
2700 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
2701 # all status='Pending' atomic group HQEs incase a delay was running when the
2702 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00002703
showarda3c58572009-03-12 20:36:59 +00002704 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002705 assert id or row
showarda3c58572009-03-12 20:36:59 +00002706 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002707
mblighe2586682008-02-29 22:45:46 +00002708
jadmanski0afbb632008-06-06 21:10:57 +00002709 def is_server_job(self):
2710 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002711
2712
showard170873e2009-01-07 00:22:26 +00002713 def tag(self):
2714 return "%s-%s" % (self.id, self.owner)
2715
2716
jadmanski0afbb632008-06-06 21:10:57 +00002717 def get_host_queue_entries(self):
2718 rows = _db.execute("""
2719 SELECT * FROM host_queue_entries
2720 WHERE job_id= %s
2721 """, (self.id,))
2722 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002723
jadmanski0afbb632008-06-06 21:10:57 +00002724 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002725
jadmanski0afbb632008-06-06 21:10:57 +00002726 return entries
mbligh36768f02008-02-22 18:28:33 +00002727
2728
jadmanski0afbb632008-06-06 21:10:57 +00002729 def set_status(self, status, update_queues=False):
2730 self.update_field('status',status)
2731
2732 if update_queues:
2733 for queue_entry in self.get_host_queue_entries():
2734 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002735
2736
showard77182562009-06-10 00:16:05 +00002737 def _atomic_and_has_started(self):
2738 """
2739 @returns True if any of the HostQueueEntries associated with this job
2740 have entered the Status.STARTING state or beyond.
2741 """
2742 atomic_entries = models.HostQueueEntry.objects.filter(
2743 job=self.id, atomic_group__isnull=False)
2744 if atomic_entries.count() <= 0:
2745 return False
2746
2747 non_started_statuses = (models.HostQueueEntry.Status.QUEUED,
2748 models.HostQueueEntry.Status.VERIFYING,
2749 models.HostQueueEntry.Status.PENDING)
2750 started_entries = atomic_entries.exclude(
2751 status__in=non_started_statuses)
2752 return started_entries.count() > 0
2753
2754
2755 def _pending_count(self):
2756 """The number of HostQueueEntries for this job in the Pending state."""
2757 pending_entries = models.HostQueueEntry.objects.filter(
2758 job=self.id, status=models.HostQueueEntry.Status.PENDING)
2759 return pending_entries.count()
2760
2761
jadmanski0afbb632008-06-06 21:10:57 +00002762 def is_ready(self):
showard77182562009-06-10 00:16:05 +00002763 # NOTE: Atomic group jobs stop reporting ready after they have been
2764 # started to avoid launching multiple copies of one atomic job.
2765 # Only possible if synch_count is less than than half the number of
2766 # machines in the atomic group.
2767 return (self._pending_count() >= self.synch_count
2768 and not self._atomic_and_has_started())
mbligh36768f02008-02-22 18:28:33 +00002769
2770
jadmanski0afbb632008-06-06 21:10:57 +00002771 def num_machines(self, clause = None):
2772 sql = "job_id=%s" % self.id
2773 if clause:
2774 sql += " AND (%s)" % clause
2775 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002776
2777
jadmanski0afbb632008-06-06 21:10:57 +00002778 def num_queued(self):
2779 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002780
2781
jadmanski0afbb632008-06-06 21:10:57 +00002782 def num_active(self):
2783 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002784
2785
jadmanski0afbb632008-06-06 21:10:57 +00002786 def num_complete(self):
2787 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002788
2789
jadmanski0afbb632008-06-06 21:10:57 +00002790 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002791 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002792
mbligh36768f02008-02-22 18:28:33 +00002793
showard6bb7c292009-01-30 01:44:51 +00002794 def _not_yet_run_entries(self, include_verifying=True):
2795 statuses = [models.HostQueueEntry.Status.QUEUED,
2796 models.HostQueueEntry.Status.PENDING]
2797 if include_verifying:
2798 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2799 return models.HostQueueEntry.objects.filter(job=self.id,
2800 status__in=statuses)
2801
2802
2803 def _stop_all_entries(self):
2804 entries_to_stop = self._not_yet_run_entries(
2805 include_verifying=False)
2806 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002807 assert not child_entry.complete, (
2808 '%s status=%s, active=%s, complete=%s' %
2809 (child_entry.id, child_entry.status, child_entry.active,
2810 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002811 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2812 child_entry.host.status = models.Host.Status.READY
2813 child_entry.host.save()
2814 child_entry.status = models.HostQueueEntry.Status.STOPPED
2815 child_entry.save()
2816
showard2bab8f42008-11-12 18:15:22 +00002817 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002818 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002819 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002820 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002821
2822
jadmanski0afbb632008-06-06 21:10:57 +00002823 def write_to_machines_file(self, queue_entry):
2824 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002825 file_path = os.path.join(self.tag(), '.machines')
2826 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002827
2828
showardf1ae3542009-05-11 19:26:02 +00002829 def _next_group_name(self, group_name=''):
2830 """@returns a directory name to use for the next host group results."""
2831 if group_name:
2832 # Sanitize for use as a pathname.
2833 group_name = group_name.replace(os.path.sep, '_')
2834 if group_name.startswith('.'):
2835 group_name = '_' + group_name[1:]
2836 # Add a separator between the group name and 'group%d'.
2837 group_name += '.'
2838 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00002839 query = models.HostQueueEntry.objects.filter(
2840 job=self.id).values('execution_subdir').distinct()
2841 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00002842 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
2843 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00002844 if ids:
2845 next_id = max(ids) + 1
2846 else:
2847 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00002848 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00002849
2850
showard170873e2009-01-07 00:22:26 +00002851 def _write_control_file(self, execution_tag):
2852 control_path = _drone_manager.attach_file_to_execution(
2853 execution_tag, self.control_file)
2854 return control_path
mbligh36768f02008-02-22 18:28:33 +00002855
showardb2e2c322008-10-14 17:33:55 +00002856
showard2bab8f42008-11-12 18:15:22 +00002857 def get_group_entries(self, queue_entry_from_group):
2858 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002859 return list(HostQueueEntry.fetch(
2860 where='job_id=%s AND execution_subdir=%s',
2861 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002862
2863
showardb2e2c322008-10-14 17:33:55 +00002864 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002865 assert queue_entries
2866 execution_tag = queue_entries[0].execution_tag()
2867 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002868 hostnames = ','.join([entry.get_host().hostname
2869 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002870
showard87ba02a2009-04-20 19:37:32 +00002871 params = _autoserv_command_line(
2872 hostnames, execution_tag,
2873 ['-P', execution_tag, '-n',
2874 _drone_manager.absolute_path(control_path)],
2875 job=self)
mbligh36768f02008-02-22 18:28:33 +00002876
jadmanski0afbb632008-06-06 21:10:57 +00002877 if not self.is_server_job():
2878 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002879
showardb2e2c322008-10-14 17:33:55 +00002880 return params
mblighe2586682008-02-29 22:45:46 +00002881
mbligh36768f02008-02-22 18:28:33 +00002882
showardc9ae1782009-01-30 01:42:37 +00002883 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002884 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002885 return True
showard0fc38302008-10-23 00:44:07 +00002886 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002887 return queue_entry.get_host().dirty
2888 return False
showard21baa452008-10-21 00:08:39 +00002889
showardc9ae1782009-01-30 01:42:37 +00002890
2891 def _should_run_verify(self, queue_entry):
2892 do_not_verify = (queue_entry.host.protection ==
2893 host_protections.Protection.DO_NOT_VERIFY)
2894 if do_not_verify:
2895 return False
2896 return self.run_verify
2897
2898
showard77182562009-06-10 00:16:05 +00002899 def get_pre_job_tasks(self, queue_entry):
2900 """
2901 Get a list of tasks to perform before the host_queue_entry
2902 may be used to run this Job (such as Cleanup & Verify).
2903
2904 @returns A list of tasks to be done to the given queue_entry before
2905 it should be considered be ready to run this job. The last
2906 task in the list calls HostQueueEntry.on_pending(), which
2907 continues the flow of the job.
2908 """
showard21baa452008-10-21 00:08:39 +00002909 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002910 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002911 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002912 if self._should_run_verify(queue_entry):
2913 tasks.append(VerifyTask(queue_entry=queue_entry))
2914 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002915 return tasks
2916
2917
showardf1ae3542009-05-11 19:26:02 +00002918 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00002919 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00002920 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00002921 else:
showardf1ae3542009-05-11 19:26:02 +00002922 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00002923 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002924 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00002925 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00002926
2927 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00002928 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00002929
2930
2931 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00002932 """
2933 @returns A tuple containing a list of HostQueueEntry instances to be
2934 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00002935 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00002936 """
showard77182562009-06-10 00:16:05 +00002937 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00002938 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00002939 if atomic_group:
2940 num_entries_wanted = atomic_group.max_number_of_machines
2941 else:
2942 num_entries_wanted = self.synch_count
2943 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00002944
showardf1ae3542009-05-11 19:26:02 +00002945 if num_entries_wanted > 0:
2946 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00002947 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00002948 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00002949 params=(self.id, include_queue_entry.id)))
2950
2951 # Sort the chosen hosts by hostname before slicing.
2952 def cmp_queue_entries_by_hostname(entry_a, entry_b):
2953 return Host.cmp_for_sort(entry_a.host, entry_b.host)
2954 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
2955 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00002956
showardf1ae3542009-05-11 19:26:02 +00002957 # Sanity check. We'll only ever be called if this can be met.
2958 assert len(chosen_entries) >= self.synch_count
2959
2960 if atomic_group:
2961 # Look at any meta_host and dependency labels and pick the first
2962 # one that also specifies this atomic group. Use that label name
2963 # as the group name if possible (it is more specific).
2964 group_name = atomic_group.name
2965 for label in include_queue_entry.get_labels():
2966 if label.atomic_group_id:
2967 assert label.atomic_group_id == atomic_group.id
2968 group_name = label.name
2969 break
2970 else:
2971 group_name = ''
2972
2973 self._assign_new_group(chosen_entries, group_name=group_name)
2974 return chosen_entries, group_name
showard2bab8f42008-11-12 18:15:22 +00002975
2976
showard77182562009-06-10 00:16:05 +00002977 def run_if_ready(self, queue_entry):
2978 """
2979 @returns An Agent instance to ultimately run this job if enough hosts
2980 are ready for it to run.
2981 @returns None and potentially cleans up excess hosts if this Job
2982 is not ready to run.
2983 """
showardb2e2c322008-10-14 17:33:55 +00002984 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00002985 self.stop_if_necessary()
2986 return None
mbligh36768f02008-02-22 18:28:33 +00002987
showard77182562009-06-10 00:16:05 +00002988 if queue_entry.atomic_group:
2989 return self.run_with_ready_delay(queue_entry)
2990
2991 return self.run(queue_entry)
2992
2993
2994 def run_with_ready_delay(self, queue_entry):
2995 """
2996 Start a delay to wait for more hosts to enter Pending state before
2997 launching an atomic group job. Once set, the a delay cannot be reset.
2998
2999 @param queue_entry: The HostQueueEntry object to get atomic group
3000 info from and pass to run_if_ready when the delay is up.
3001
3002 @returns An Agent to run the job as appropriate or None if a delay
3003 has already been set.
3004 """
3005 assert queue_entry.job_id == self.id
3006 assert queue_entry.atomic_group
3007 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
3008 pending_threshold = queue_entry.atomic_group.max_number_of_machines
3009 over_max_threshold = (self._pending_count() >= pending_threshold)
3010 delay_expired = (self._delay_ready_task and
3011 time.time() >= self._delay_ready_task.end_time)
3012
3013 # Delay is disabled or we already have enough? Do not wait to run.
3014 if not delay or over_max_threshold or delay_expired:
3015 return self.run(queue_entry)
3016
3017 # A delay was previously scheduled.
3018 if self._delay_ready_task:
3019 return None
3020
3021 def run_job_after_delay():
3022 logging.info('Job %s done waiting for extra hosts.', self.id)
3023 return self.run(queue_entry)
3024
3025 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3026 callback=run_job_after_delay)
3027
3028 return Agent([self._delay_ready_task], num_processes=0)
3029
3030
3031 def run(self, queue_entry):
3032 """
3033 @param queue_entry: The HostQueueEntry instance calling this method.
3034 @returns An Agent instance to run this job or None if we've already
3035 been run.
3036 """
3037 if queue_entry.atomic_group and self._atomic_and_has_started():
3038 logging.error('Job.run() called on running atomic Job %d '
3039 'with HQE %s.', self.id, queue_entry)
3040 return None
showardf1ae3542009-05-11 19:26:02 +00003041 queue_entries, group_name = self._choose_group_to_run(queue_entry)
3042 return self._finish_run(queue_entries, group_name)
showardb2e2c322008-10-14 17:33:55 +00003043
3044
showardf1ae3542009-05-11 19:26:02 +00003045 def _finish_run(self, queue_entries, group_name):
showardb2ccdda2008-10-28 20:39:05 +00003046 for queue_entry in queue_entries:
3047 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00003048 params = self._get_autoserv_params(queue_entries)
3049 queue_task = QueueTask(job=self, queue_entries=queue_entries,
showardf1ae3542009-05-11 19:26:02 +00003050 cmd=params, group_name=group_name)
3051 tasks = [queue_task]
showard77182562009-06-10 00:16:05 +00003052 if self._delay_ready_task:
3053 # Cancel any pending callback that would try to run again
3054 # as we are already running.
3055 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003056
showard170873e2009-01-07 00:22:26 +00003057 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00003058
3059
mbligh36768f02008-02-22 18:28:33 +00003060if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003061 main()