blob: 93c18eb63d89cd46a12cad8367a05f20bd1e99f1 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard402934a2009-12-21 22:20:47 +00008import common
showardef519212009-05-08 02:29:53 +00009import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +000010import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardf13a9e22009-12-18 22:54:09 +000011import itertools, logging, weakref, gc
showard402934a2009-12-21 22:20:47 +000012
mbligh8bcd23a2009-02-03 19:14:06 +000013import MySQLdb
showard402934a2009-12-21 22:20:47 +000014
showard043c62a2009-06-10 19:48:57 +000015from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000016from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000017
18import django.db
19
showard136e6dc2009-06-10 19:38:49 +000020from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000021from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000022from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000023from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000024from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000025from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000026from autotest_lib.scheduler import status_server, scheduler_config
showardf13a9e22009-12-18 22:54:09 +000027from autotest_lib.scheduler import gc_stats
mbligh70feeee2008-06-11 16:20:49 +000028
showard549afad2009-08-20 23:33:36 +000029BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
30PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000031
mbligh36768f02008-02-22 18:28:33 +000032RESULTS_DIR = '.'
33AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000034DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000035AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
36
37if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000038 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000039AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
40AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
41
42if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000043 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000044
showardd3dc1992009-04-22 21:01:40 +000045_AUTOSERV_PID_FILE = '.autoserv_execute'
46_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
47_PARSER_PID_FILE = '.parser_execute'
48
49_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
50 _PARSER_PID_FILE)
51
showard35162b02009-03-03 02:17:30 +000052# error message to leave in results dir when an autoserv process disappears
53# mysteriously
54_LOST_PROCESS_ERROR = """\
55Autoserv failed abnormally during execution for this job, probably due to a
56system error on the Autotest server. Full results may not be available. Sorry.
57"""
58
mbligh6f8bab42008-02-29 22:45:14 +000059_db = None
mbligh36768f02008-02-22 18:28:33 +000060_shutdown = False
showard170873e2009-01-07 00:22:26 +000061_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
62_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000063_testing_mode = False
showard542e8402008-09-19 20:16:18 +000064_base_url = None
showardc85c21b2008-11-24 22:17:37 +000065_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000066_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000067
68
showardec6a3b92009-09-25 20:29:13 +000069def _get_pidfile_timeout_secs():
70 """@returns How long to wait for autoserv to write pidfile."""
71 pidfile_timeout_mins = global_config.global_config.get_config_value(
72 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
73 return pidfile_timeout_mins * 60
74
75
mbligh83c1e9e2009-05-01 23:10:41 +000076def _site_init_monitor_db_dummy():
77 return {}
78
79
mbligh36768f02008-02-22 18:28:33 +000080def main():
showard27f33872009-04-07 18:20:53 +000081 try:
showard549afad2009-08-20 23:33:36 +000082 try:
83 main_without_exception_handling()
84 except SystemExit:
85 raise
86 except:
87 logging.exception('Exception escaping in monitor_db')
88 raise
89 finally:
90 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000091
92
93def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000094 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000095
showard136e6dc2009-06-10 19:38:49 +000096 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000097 parser = optparse.OptionParser(usage)
98 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
99 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000100 parser.add_option('--test', help='Indicate that scheduler is under ' +
101 'test and should use dummy autoserv and no parsing',
102 action='store_true')
103 (options, args) = parser.parse_args()
104 if len(args) != 1:
105 parser.print_usage()
106 return
mbligh36768f02008-02-22 18:28:33 +0000107
showard5613c662009-06-08 23:30:33 +0000108 scheduler_enabled = global_config.global_config.get_config_value(
109 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
110
111 if not scheduler_enabled:
112 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
113 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000114 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000115 sys.exit(1)
116
jadmanski0afbb632008-06-06 21:10:57 +0000117 global RESULTS_DIR
118 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000119
mbligh83c1e9e2009-05-01 23:10:41 +0000120 site_init = utils.import_site_function(__file__,
121 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
122 _site_init_monitor_db_dummy)
123 site_init()
124
showardcca334f2009-03-12 20:38:34 +0000125 # Change the cwd while running to avoid issues incase we were launched from
126 # somewhere odd (such as a random NFS home directory of the person running
127 # sudo to launch us as the appropriate user).
128 os.chdir(RESULTS_DIR)
129
jadmanski0afbb632008-06-06 21:10:57 +0000130 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000131 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
132 "notify_email_statuses",
133 default='')
showardc85c21b2008-11-24 22:17:37 +0000134 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000135 _notify_email_statuses = [status for status in
136 re.split(r'[\s,;:]', notify_statuses_list.lower())
137 if status]
showardc85c21b2008-11-24 22:17:37 +0000138
jadmanski0afbb632008-06-06 21:10:57 +0000139 if options.test:
140 global _autoserv_path
141 _autoserv_path = 'autoserv_dummy'
142 global _testing_mode
143 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000144
mbligh37eceaa2008-12-15 22:56:37 +0000145 # AUTOTEST_WEB.base_url is still a supported config option as some people
146 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000147 global _base_url
showard170873e2009-01-07 00:22:26 +0000148 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
149 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000150 if config_base_url:
151 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000152 else:
mbligh37eceaa2008-12-15 22:56:37 +0000153 # For the common case of everything running on a single server you
154 # can just set the hostname in a single place in the config file.
155 server_name = c.get_config_value('SERVER', 'hostname')
156 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000157 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000158 sys.exit(1)
159 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000160
showardc5afc462009-01-13 00:09:39 +0000161 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000162 server.start()
163
jadmanski0afbb632008-06-06 21:10:57 +0000164 try:
showard136e6dc2009-06-10 19:38:49 +0000165 init()
showardc5afc462009-01-13 00:09:39 +0000166 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000167 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000168
jadmanski0afbb632008-06-06 21:10:57 +0000169 while not _shutdown:
170 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000171 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000172 except:
showard170873e2009-01-07 00:22:26 +0000173 email_manager.manager.log_stacktrace(
174 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000175
showard170873e2009-01-07 00:22:26 +0000176 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000177 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000178 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000179 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000180
181
showard136e6dc2009-06-10 19:38:49 +0000182def setup_logging():
183 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
184 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
185 logging_manager.configure_logging(
186 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
187 logfile_name=log_name)
188
189
mbligh36768f02008-02-22 18:28:33 +0000190def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000191 global _shutdown
192 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000193 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000194
195
showard136e6dc2009-06-10 19:38:49 +0000196def init():
showardb18134f2009-03-20 20:52:18 +0000197 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
198 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000199
showard8de37132009-08-31 18:33:08 +0000200 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000201 logging.critical("monitor_db already running, aborting!")
202 sys.exit(1)
203 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000204
showardb1e51872008-10-07 11:08:18 +0000205 if _testing_mode:
206 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000207 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000208
jadmanski0afbb632008-06-06 21:10:57 +0000209 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
210 global _db
showard170873e2009-01-07 00:22:26 +0000211 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000212 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000213
showardfa8629c2008-11-04 16:51:23 +0000214 # ensure Django connection is in autocommit
215 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000216 # bypass the readonly connection
217 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000218
showardb18134f2009-03-20 20:52:18 +0000219 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000220 signal.signal(signal.SIGINT, handle_sigint)
221
showardd1ee1dd2009-01-07 21:33:08 +0000222 drones = global_config.global_config.get_config_value(
223 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
224 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000225 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000226 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000227 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
228
showardb18134f2009-03-20 20:52:18 +0000229 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000230
231
showarded2afea2009-07-07 20:54:07 +0000232def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
233 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000234 """
235 @returns The autoserv command line as a list of executable + parameters.
236
237 @param machines - string - A machine or comma separated list of machines
238 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000239 @param extra_args - list - Additional arguments to pass to autoserv.
240 @param job - Job object - If supplied, -u owner and -l name parameters
241 will be added.
242 @param queue_entry - A HostQueueEntry object - If supplied and no Job
243 object was supplied, this will be used to lookup the Job object.
244 """
showarda9545c02009-12-18 22:44:26 +0000245 autoserv_argv = [_autoserv_path, '-p',
showarded2afea2009-07-07 20:54:07 +0000246 '-r', drone_manager.WORKING_DIRECTORY]
showarda9545c02009-12-18 22:44:26 +0000247 if machines:
248 autoserv_argv += ['-m', machines]
showard87ba02a2009-04-20 19:37:32 +0000249 if job or queue_entry:
250 if not job:
251 job = queue_entry.job
252 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000253 if verbose:
254 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000255 return autoserv_argv + extra_args
256
257
showard89f84db2009-03-12 20:39:13 +0000258class SchedulerError(Exception):
259 """Raised by HostScheduler when an inconsistent state occurs."""
260
261
showard63a34772008-08-18 19:32:50 +0000262class HostScheduler(object):
263 def _get_ready_hosts(self):
264 # avoid any host with a currently active queue entry against it
265 hosts = Host.fetch(
266 joins='LEFT JOIN host_queue_entries AS active_hqe '
267 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000268 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000269 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000270 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000271 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
272 return dict((host.id, host) for host in hosts)
273
274
275 @staticmethod
276 def _get_sql_id_list(id_list):
277 return ','.join(str(item_id) for item_id in id_list)
278
279
280 @classmethod
showard989f25d2008-10-01 11:38:11 +0000281 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000282 if not id_list:
283 return {}
showard63a34772008-08-18 19:32:50 +0000284 query %= cls._get_sql_id_list(id_list)
285 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000286 return cls._process_many2many_dict(rows, flip)
287
288
289 @staticmethod
290 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000291 result = {}
292 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000293 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000294 if flip:
295 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000296 result.setdefault(left_id, set()).add(right_id)
297 return result
298
299
300 @classmethod
301 def _get_job_acl_groups(cls, job_ids):
302 query = """
showardd9ac4452009-02-07 02:04:37 +0000303 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000304 FROM jobs
305 INNER JOIN users ON users.login = jobs.owner
306 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
307 WHERE jobs.id IN (%s)
308 """
309 return cls._get_many2many_dict(query, job_ids)
310
311
312 @classmethod
313 def _get_job_ineligible_hosts(cls, job_ids):
314 query = """
315 SELECT job_id, host_id
316 FROM ineligible_host_queues
317 WHERE job_id IN (%s)
318 """
319 return cls._get_many2many_dict(query, job_ids)
320
321
322 @classmethod
showard989f25d2008-10-01 11:38:11 +0000323 def _get_job_dependencies(cls, job_ids):
324 query = """
325 SELECT job_id, label_id
326 FROM jobs_dependency_labels
327 WHERE job_id IN (%s)
328 """
329 return cls._get_many2many_dict(query, job_ids)
330
331
332 @classmethod
showard63a34772008-08-18 19:32:50 +0000333 def _get_host_acls(cls, host_ids):
334 query = """
showardd9ac4452009-02-07 02:04:37 +0000335 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000336 FROM acl_groups_hosts
337 WHERE host_id IN (%s)
338 """
339 return cls._get_many2many_dict(query, host_ids)
340
341
342 @classmethod
343 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000344 if not host_ids:
345 return {}, {}
showard63a34772008-08-18 19:32:50 +0000346 query = """
347 SELECT label_id, host_id
348 FROM hosts_labels
349 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000350 """ % cls._get_sql_id_list(host_ids)
351 rows = _db.execute(query)
352 labels_to_hosts = cls._process_many2many_dict(rows)
353 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
354 return labels_to_hosts, hosts_to_labels
355
356
357 @classmethod
358 def _get_labels(cls):
359 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000360
361
362 def refresh(self, pending_queue_entries):
363 self._hosts_available = self._get_ready_hosts()
364
365 relevant_jobs = [queue_entry.job_id
366 for queue_entry in pending_queue_entries]
367 self._job_acls = self._get_job_acl_groups(relevant_jobs)
368 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000369 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000370
371 host_ids = self._hosts_available.keys()
372 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000373 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
374
375 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000376
377
378 def _is_acl_accessible(self, host_id, queue_entry):
379 job_acls = self._job_acls.get(queue_entry.job_id, set())
380 host_acls = self._host_acls.get(host_id, set())
381 return len(host_acls.intersection(job_acls)) > 0
382
383
showard989f25d2008-10-01 11:38:11 +0000384 def _check_job_dependencies(self, job_dependencies, host_labels):
385 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000386 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000387
388
389 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
390 queue_entry):
showardade14e22009-01-26 22:38:32 +0000391 if not queue_entry.meta_host:
392 # bypass only_if_needed labels when a specific host is selected
393 return True
394
showard989f25d2008-10-01 11:38:11 +0000395 for label_id in host_labels:
396 label = self._labels[label_id]
397 if not label.only_if_needed:
398 # we don't care about non-only_if_needed labels
399 continue
400 if queue_entry.meta_host == label_id:
401 # if the label was requested in a metahost it's OK
402 continue
403 if label_id not in job_dependencies:
404 return False
405 return True
406
407
showard89f84db2009-03-12 20:39:13 +0000408 def _check_atomic_group_labels(self, host_labels, queue_entry):
409 """
410 Determine if the given HostQueueEntry's atomic group settings are okay
411 to schedule on a host with the given labels.
412
showard6157c632009-07-06 20:19:31 +0000413 @param host_labels: A list of label ids that the host has.
414 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000415
416 @returns True if atomic group settings are okay, False otherwise.
417 """
showard6157c632009-07-06 20:19:31 +0000418 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000419 queue_entry.atomic_group_id)
420
421
showard6157c632009-07-06 20:19:31 +0000422 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000423 """
424 Return the atomic group label id for a host with the given set of
425 labels if any, or None otherwise. Raises an exception if more than
426 one atomic group are found in the set of labels.
427
showard6157c632009-07-06 20:19:31 +0000428 @param host_labels: A list of label ids that the host has.
429 @param queue_entry: The HostQueueEntry we're testing. Only used for
430 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000431
432 @returns The id of the atomic group found on a label in host_labels
433 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000434 """
showard6157c632009-07-06 20:19:31 +0000435 atomic_labels = [self._labels[label_id] for label_id in host_labels
436 if self._labels[label_id].atomic_group_id is not None]
437 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000438 if not atomic_ids:
439 return None
440 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000441 logging.error('More than one Atomic Group on HQE "%s" via: %r',
442 queue_entry, atomic_labels)
443 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000444
445
446 def _get_atomic_group_labels(self, atomic_group_id):
447 """
448 Lookup the label ids that an atomic_group is associated with.
449
450 @param atomic_group_id - The id of the AtomicGroup to look up.
451
452 @returns A generator yeilding Label ids for this atomic group.
453 """
454 return (id for id, label in self._labels.iteritems()
455 if label.atomic_group_id == atomic_group_id
456 and not label.invalid)
457
458
showard54c1ea92009-05-20 00:32:58 +0000459 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000460 """
461 @param group_hosts - A sequence of Host ids to test for usability
462 and eligibility against the Job associated with queue_entry.
463 @param queue_entry - The HostQueueEntry that these hosts are being
464 tested for eligibility against.
465
466 @returns A subset of group_hosts Host ids that are eligible for the
467 supplied queue_entry.
468 """
469 return set(host_id for host_id in group_hosts
470 if self._is_host_usable(host_id)
471 and self._is_host_eligible_for_job(host_id, queue_entry))
472
473
showard989f25d2008-10-01 11:38:11 +0000474 def _is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000475 if self._is_host_invalid(host_id):
476 # if an invalid host is scheduled for a job, it's a one-time host
477 # and it therefore bypasses eligibility checks. note this can only
478 # happen for non-metahosts, because invalid hosts have their label
479 # relationships cleared.
480 return True
481
showard989f25d2008-10-01 11:38:11 +0000482 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
483 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000484
showard89f84db2009-03-12 20:39:13 +0000485 return (self._is_acl_accessible(host_id, queue_entry) and
486 self._check_job_dependencies(job_dependencies, host_labels) and
487 self._check_only_if_needed_labels(
488 job_dependencies, host_labels, queue_entry) and
489 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000490
491
showard2924b0a2009-06-18 23:16:15 +0000492 def _is_host_invalid(self, host_id):
493 host_object = self._hosts_available.get(host_id, None)
494 return host_object and host_object.invalid
495
496
showard63a34772008-08-18 19:32:50 +0000497 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000498 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000499 return None
500 return self._hosts_available.pop(queue_entry.host_id, None)
501
502
503 def _is_host_usable(self, host_id):
504 if host_id not in self._hosts_available:
505 # host was already used during this scheduling cycle
506 return False
507 if self._hosts_available[host_id].invalid:
508 # Invalid hosts cannot be used for metahosts. They're included in
509 # the original query because they can be used by non-metahosts.
510 return False
511 return True
512
513
514 def _schedule_metahost(self, queue_entry):
515 label_id = queue_entry.meta_host
516 hosts_in_label = self._label_hosts.get(label_id, set())
517 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
518 set())
519
520 # must iterate over a copy so we can mutate the original while iterating
521 for host_id in list(hosts_in_label):
522 if not self._is_host_usable(host_id):
523 hosts_in_label.remove(host_id)
524 continue
525 if host_id in ineligible_host_ids:
526 continue
showard989f25d2008-10-01 11:38:11 +0000527 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000528 continue
529
showard89f84db2009-03-12 20:39:13 +0000530 # Remove the host from our cached internal state before returning
531 # the host object.
showard63a34772008-08-18 19:32:50 +0000532 hosts_in_label.remove(host_id)
533 return self._hosts_available.pop(host_id)
534 return None
535
536
537 def find_eligible_host(self, queue_entry):
538 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000539 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000540 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000541 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000542 return self._schedule_metahost(queue_entry)
543
544
showard89f84db2009-03-12 20:39:13 +0000545 def find_eligible_atomic_group(self, queue_entry):
546 """
547 Given an atomic group host queue entry, locate an appropriate group
548 of hosts for the associated job to run on.
549
550 The caller is responsible for creating new HQEs for the additional
551 hosts returned in order to run the actual job on them.
552
553 @returns A list of Host instances in a ready state to satisfy this
554 atomic group scheduling. Hosts will all belong to the same
555 atomic group label as specified by the queue_entry.
556 An empty list will be returned if no suitable atomic
557 group could be found.
558
559 TODO(gps): what is responsible for kicking off any attempted repairs on
560 a group of hosts? not this function, but something needs to. We do
561 not communicate that reason for returning [] outside of here...
562 For now, we'll just be unschedulable if enough hosts within one group
563 enter Repair Failed state.
564 """
565 assert queue_entry.atomic_group_id is not None
566 job = queue_entry.job
567 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000568 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000569 if job.synch_count > atomic_group.max_number_of_machines:
570 # Such a Job and HostQueueEntry should never be possible to
571 # create using the frontend. Regardless, we can't process it.
572 # Abort it immediately and log an error on the scheduler.
573 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000574 logging.error(
575 'Error: job %d synch_count=%d > requested atomic_group %d '
576 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
577 job.id, job.synch_count, atomic_group.id,
578 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000579 return []
580 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
581 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
582 set())
583
584 # Look in each label associated with atomic_group until we find one with
585 # enough hosts to satisfy the job.
586 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
587 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
588 if queue_entry.meta_host is not None:
589 # If we have a metahost label, only allow its hosts.
590 group_hosts.intersection_update(hosts_in_label)
591 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000592 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000593 group_hosts, queue_entry)
594
595 # Job.synch_count is treated as "minimum synch count" when
596 # scheduling for an atomic group of hosts. The atomic group
597 # number of machines is the maximum to pick out of a single
598 # atomic group label for scheduling at one time.
599 min_hosts = job.synch_count
600 max_hosts = atomic_group.max_number_of_machines
601
showard54c1ea92009-05-20 00:32:58 +0000602 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000603 # Not enough eligible hosts in this atomic group label.
604 continue
605
showard54c1ea92009-05-20 00:32:58 +0000606 eligible_hosts_in_group = [self._hosts_available[id]
607 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000608 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000609 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000610
showard89f84db2009-03-12 20:39:13 +0000611 # Limit ourselves to scheduling the atomic group size.
612 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000613 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000614
615 # Remove the selected hosts from our cached internal state
616 # of available hosts in order to return the Host objects.
617 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000618 for host in eligible_hosts_in_group:
619 hosts_in_label.discard(host.id)
620 self._hosts_available.pop(host.id)
621 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000622 return host_list
623
624 return []
625
626
showard170873e2009-01-07 00:22:26 +0000627class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000628 def __init__(self):
629 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000630 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000631 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000632 user_cleanup_time = scheduler_config.config.clean_interval
633 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
634 _db, user_cleanup_time)
635 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000636 self._host_agents = {}
637 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000638 self._tick_count = 0
639 self._last_garbage_stats_time = time.time()
640 self._seconds_between_garbage_stats = 60 * (
641 global_config.global_config.get_config_value(
642 scheduler_config.CONFIG_SECTION,
643 'gc_stats_interval_mins', type=int, default=6*60))
mbligh36768f02008-02-22 18:28:33 +0000644
mbligh36768f02008-02-22 18:28:33 +0000645
showard915958d2009-04-22 21:00:58 +0000646 def initialize(self, recover_hosts=True):
647 self._periodic_cleanup.initialize()
648 self._24hr_upkeep.initialize()
649
jadmanski0afbb632008-06-06 21:10:57 +0000650 # always recover processes
651 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000652
jadmanski0afbb632008-06-06 21:10:57 +0000653 if recover_hosts:
654 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000655
656
jadmanski0afbb632008-06-06 21:10:57 +0000657 def tick(self):
showardf13a9e22009-12-18 22:54:09 +0000658 self._garbage_collection()
showard170873e2009-01-07 00:22:26 +0000659 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000660 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000661 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000662 self._process_recurring_runs()
showard8cc058f2009-09-08 16:26:33 +0000663 self._schedule_delay_tasks()
showard8cc058f2009-09-08 16:26:33 +0000664 self._schedule_running_host_queue_entries()
665 self._schedule_special_tasks()
showard65db3932009-10-28 19:54:35 +0000666 self._schedule_new_jobs()
jadmanski0afbb632008-06-06 21:10:57 +0000667 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000668 _drone_manager.execute_actions()
669 email_manager.manager.send_queued_emails()
showard402934a2009-12-21 22:20:47 +0000670 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000671 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000672
showard97aed502008-11-04 02:01:24 +0000673
mblighf3294cc2009-04-08 21:17:38 +0000674 def _run_cleanup(self):
675 self._periodic_cleanup.run_cleanup_maybe()
676 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000677
mbligh36768f02008-02-22 18:28:33 +0000678
showardf13a9e22009-12-18 22:54:09 +0000679 def _garbage_collection(self):
680 threshold_time = time.time() - self._seconds_between_garbage_stats
681 if threshold_time < self._last_garbage_stats_time:
682 # Don't generate these reports very often.
683 return
684
685 self._last_garbage_stats_time = time.time()
686 # Force a full level 0 collection (because we can, it doesn't hurt
687 # at this interval).
688 gc.collect()
689 logging.info('Logging garbage collector stats on tick %d.',
690 self._tick_count)
691 gc_stats._log_garbage_collector_stats()
692
693
showard170873e2009-01-07 00:22:26 +0000694 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
695 for object_id in object_ids:
696 agent_dict.setdefault(object_id, set()).add(agent)
697
698
699 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
700 for object_id in object_ids:
701 assert object_id in agent_dict
702 agent_dict[object_id].remove(agent)
703
704
showardd1195652009-12-08 22:21:02 +0000705 def add_agent_task(self, agent_task):
706 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000707 self._agents.append(agent)
708 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000709 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
710 self._register_agent_for_ids(self._queue_entry_agents,
711 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000712
showard170873e2009-01-07 00:22:26 +0000713
714 def get_agents_for_entry(self, queue_entry):
715 """
716 Find agents corresponding to the specified queue_entry.
717 """
showardd3dc1992009-04-22 21:01:40 +0000718 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000719
720
721 def host_has_agent(self, host):
722 """
723 Determine if there is currently an Agent present using this host.
724 """
725 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000726
727
jadmanski0afbb632008-06-06 21:10:57 +0000728 def remove_agent(self, agent):
729 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000730 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
731 agent)
732 self._unregister_agent_for_ids(self._queue_entry_agents,
733 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000734
735
showard8cc058f2009-09-08 16:26:33 +0000736 def _host_has_scheduled_special_task(self, host):
737 return bool(models.SpecialTask.objects.filter(host__id=host.id,
738 is_active=False,
739 is_complete=False))
740
741
jadmanski0afbb632008-06-06 21:10:57 +0000742 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000743 agent_tasks = self._create_recovery_agent_tasks()
744 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000745 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000746 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000747 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000748 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000749 self._reverify_remaining_hosts()
750 # reinitialize drones after killing orphaned processes, since they can
751 # leave around files when they die
752 _drone_manager.execute_actions()
753 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000754
showard170873e2009-01-07 00:22:26 +0000755
showardd1195652009-12-08 22:21:02 +0000756 def _create_recovery_agent_tasks(self):
757 return (self._get_queue_entry_agent_tasks()
758 + self._get_special_task_agent_tasks(is_active=True))
759
760
761 def _get_queue_entry_agent_tasks(self):
762 # host queue entry statuses handled directly by AgentTasks (Verifying is
763 # handled through SpecialTasks, so is not listed here)
764 statuses = (models.HostQueueEntry.Status.STARTING,
765 models.HostQueueEntry.Status.RUNNING,
766 models.HostQueueEntry.Status.GATHERING,
767 models.HostQueueEntry.Status.PARSING)
768 status_list = ','.join("'%s'" % status for status in statuses)
showard170873e2009-01-07 00:22:26 +0000769 queue_entries = HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000770 where='status IN (%s)' % status_list)
771
772 agent_tasks = []
773 used_queue_entries = set()
774 for entry in queue_entries:
775 if self.get_agents_for_entry(entry):
776 # already being handled
777 continue
778 if entry in used_queue_entries:
779 # already picked up by a synchronous job
780 continue
781 agent_task = self._get_agent_task_for_queue_entry(entry)
782 agent_tasks.append(agent_task)
783 used_queue_entries.update(agent_task.queue_entries)
784 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000785
786
showardd1195652009-12-08 22:21:02 +0000787 def _get_special_task_agent_tasks(self, is_active=False):
788 special_tasks = models.SpecialTask.objects.filter(
789 is_active=is_active, is_complete=False)
790 return [self._get_agent_task_for_special_task(task)
791 for task in special_tasks]
792
793
794 def _get_agent_task_for_queue_entry(self, queue_entry):
795 """
796 Construct an AgentTask instance for the given active HostQueueEntry,
797 if one can currently run it.
798 @param queue_entry: a HostQueueEntry
799 @returns an AgentTask to run the queue entry
800 """
801 task_entries = queue_entry.job.get_group_entries(queue_entry)
802 self._check_for_duplicate_host_entries(task_entries)
803
804 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
805 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000806 if queue_entry.is_hostless():
807 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000808 return QueueTask(queue_entries=task_entries)
809 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
810 return GatherLogsTask(queue_entries=task_entries)
811 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
812 return FinalReparseTask(queue_entries=task_entries)
813
814 raise SchedulerError('_get_agent_task_for_queue_entry got entry with '
815 'invalid status %s: %s' % (entry.status, entry))
816
817
818 def _check_for_duplicate_host_entries(self, task_entries):
showarda9545c02009-12-18 22:44:26 +0000819 parsing_status = models.HostQueueEntry.Status.PARSING
showardd1195652009-12-08 22:21:02 +0000820 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000821 using_host = (task_entry.host is not None
822 and task_entry.status != parsing_status)
823 if using_host:
showardd1195652009-12-08 22:21:02 +0000824 self._assert_host_has_no_agent(task_entry)
825
826
827 def _assert_host_has_no_agent(self, entry):
828 """
829 @param entry: a HostQueueEntry or a SpecialTask
830 """
831 if self.host_has_agent(entry.host):
832 agent = tuple(self._host_agents.get(entry.host.id))[0]
833 raise SchedulerError(
834 'While scheduling %s, host %s already has a host agent %s'
835 % (entry, entry.host, agent.task))
836
837
838 def _get_agent_task_for_special_task(self, special_task):
839 """
840 Construct an AgentTask class to run the given SpecialTask and add it
841 to this dispatcher.
842 @param special_task: a models.SpecialTask instance
843 @returns an AgentTask to run this SpecialTask
844 """
845 self._assert_host_has_no_agent(special_task)
846
847 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
848 for agent_task_class in special_agent_task_classes:
849 if agent_task_class.TASK_TYPE == special_task.task:
850 return agent_task_class(task=special_task)
851
852 raise SchedulerError('No AgentTask class for task', str(special_task))
853
854
855 def _register_pidfiles(self, agent_tasks):
856 for agent_task in agent_tasks:
857 agent_task.register_necessary_pidfiles()
858
859
860 def _recover_tasks(self, agent_tasks):
861 orphans = _drone_manager.get_orphaned_autoserv_processes()
862
863 for agent_task in agent_tasks:
864 agent_task.recover()
865 if agent_task.monitor and agent_task.monitor.has_process():
866 orphans.discard(agent_task.monitor.get_process())
867 self.add_agent_task(agent_task)
868
869 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000870
871
showard8cc058f2009-09-08 16:26:33 +0000872 def _get_unassigned_entries(self, status):
873 for entry in HostQueueEntry.fetch(where="status = '%s'" % status):
showard0db3d432009-10-12 20:29:15 +0000874 if entry.status == status and not self.get_agents_for_entry(entry):
875 # The status can change during iteration, e.g., if job.run()
876 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000877 yield entry
878
879
showard6878e8b2009-07-20 22:37:45 +0000880 def _check_for_remaining_orphan_processes(self, orphans):
881 if not orphans:
882 return
883 subject = 'Unrecovered orphan autoserv processes remain'
884 message = '\n'.join(str(process) for process in orphans)
885 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000886
887 die_on_orphans = global_config.global_config.get_config_value(
888 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
889
890 if die_on_orphans:
891 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000892
showard170873e2009-01-07 00:22:26 +0000893
showard8cc058f2009-09-08 16:26:33 +0000894 def _recover_pending_entries(self):
895 for entry in self._get_unassigned_entries(
896 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000897 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000898 entry.on_pending()
899
900
showardb8900452009-10-12 20:31:01 +0000901 def _check_for_unrecovered_verifying_entries(self):
showard170873e2009-01-07 00:22:26 +0000902 queue_entries = HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000903 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
904 unrecovered_hqes = []
905 for queue_entry in queue_entries:
906 special_tasks = models.SpecialTask.objects.filter(
907 task__in=(models.SpecialTask.Task.CLEANUP,
908 models.SpecialTask.Task.VERIFY),
909 queue_entry__id=queue_entry.id,
910 is_complete=False)
911 if special_tasks.count() == 0:
912 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000913
showardb8900452009-10-12 20:31:01 +0000914 if unrecovered_hqes:
915 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
showarde8e37072009-08-20 23:31:30 +0000916 raise SchedulerError(
showard37757f32009-10-19 18:34:24 +0000917 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000918 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000919
920
showard65db3932009-10-28 19:54:35 +0000921 def _get_prioritized_special_tasks(self):
922 """
923 Returns all queued SpecialTasks prioritized for repair first, then
924 cleanup, then verify.
925 """
926 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
927 is_complete=False,
928 host__locked=False)
929 # exclude hosts with active queue entries unless the SpecialTask is for
930 # that queue entry
931 queued_tasks = models.Host.objects.add_join(
932 queued_tasks, 'host_queue_entries', 'host_id',
933 join_condition='host_queue_entries.active',
934 force_left_join=True)
935 queued_tasks = queued_tasks.extra(
936 where=['(host_queue_entries.id IS NULL OR '
937 'host_queue_entries.id = special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000938
showard65db3932009-10-28 19:54:35 +0000939 # reorder tasks by priority
940 task_priority_order = [models.SpecialTask.Task.REPAIR,
941 models.SpecialTask.Task.CLEANUP,
942 models.SpecialTask.Task.VERIFY]
943 def task_priority_key(task):
944 return task_priority_order.index(task.task)
945 return sorted(queued_tasks, key=task_priority_key)
946
947
showard65db3932009-10-28 19:54:35 +0000948 def _schedule_special_tasks(self):
949 """
950 Execute queued SpecialTasks that are ready to run on idle hosts.
951 """
952 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000953 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000954 continue
showardd1195652009-12-08 22:21:02 +0000955 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000956
957
showard170873e2009-01-07 00:22:26 +0000958 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000959 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000960 # should never happen
showarded2afea2009-07-07 20:54:07 +0000961 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000962 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000963 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000964 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000965 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000966
967
jadmanski0afbb632008-06-06 21:10:57 +0000968 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000969 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +0000970 full_where='locked = 0 AND invalid = 0 AND ' + where
971 for host in Host.fetch(where=full_where):
972 if self.host_has_agent(host):
973 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000974 continue
showard8cc058f2009-09-08 16:26:33 +0000975 if self._host_has_scheduled_special_task(host):
976 # host will have a special task scheduled on the next cycle
977 continue
showard170873e2009-01-07 00:22:26 +0000978 if print_message:
showardb18134f2009-03-20 20:52:18 +0000979 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000980 models.SpecialTask.objects.create(
981 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000982 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000983
984
jadmanski0afbb632008-06-06 21:10:57 +0000985 def _recover_hosts(self):
986 # recover "Repair Failed" hosts
987 message = 'Reverifying dead host %s'
988 self._reverify_hosts_where("status = 'Repair Failed'",
989 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000990
991
showard04c82c52008-05-29 19:38:12 +0000992
showardb95b1bd2008-08-15 18:11:04 +0000993 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000994 # prioritize by job priority, then non-metahost over metahost, then FIFO
995 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000996 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000997 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000998 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000999
1000
showard89f84db2009-03-12 20:39:13 +00001001 def _refresh_pending_queue_entries(self):
1002 """
1003 Lookup the pending HostQueueEntries and call our HostScheduler
1004 refresh() method given that list. Return the list.
1005
1006 @returns A list of pending HostQueueEntries sorted in priority order.
1007 """
showard63a34772008-08-18 19:32:50 +00001008 queue_entries = self._get_pending_queue_entries()
1009 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +00001010 return []
showardb95b1bd2008-08-15 18:11:04 +00001011
showard63a34772008-08-18 19:32:50 +00001012 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +00001013
showard89f84db2009-03-12 20:39:13 +00001014 return queue_entries
1015
1016
1017 def _schedule_atomic_group(self, queue_entry):
1018 """
1019 Schedule the given queue_entry on an atomic group of hosts.
1020
1021 Returns immediately if there are insufficient available hosts.
1022
1023 Creates new HostQueueEntries based off of queue_entry for the
1024 scheduled hosts and starts them all running.
1025 """
1026 # This is a virtual host queue entry representing an entire
1027 # atomic group, find a group and schedule their hosts.
1028 group_hosts = self._host_scheduler.find_eligible_atomic_group(
1029 queue_entry)
1030 if not group_hosts:
1031 return
showardcbe6f942009-06-17 19:33:49 +00001032
1033 logging.info('Expanding atomic group entry %s with hosts %s',
1034 queue_entry,
1035 ', '.join(host.hostname for host in group_hosts))
showard89f84db2009-03-12 20:39:13 +00001036 # The first assigned host uses the original HostQueueEntry
1037 group_queue_entries = [queue_entry]
1038 for assigned_host in group_hosts[1:]:
1039 # Create a new HQE for every additional assigned_host.
1040 new_hqe = HostQueueEntry.clone(queue_entry)
1041 new_hqe.save()
1042 group_queue_entries.append(new_hqe)
1043 assert len(group_queue_entries) == len(group_hosts)
1044 for queue_entry, host in itertools.izip(group_queue_entries,
1045 group_hosts):
1046 self._run_queue_entry(queue_entry, host)
1047
1048
showarda9545c02009-12-18 22:44:26 +00001049 def _schedule_hostless_job(self, queue_entry):
1050 self.add_agent_task(HostlessQueueTask(queue_entry))
1051
1052
showard89f84db2009-03-12 20:39:13 +00001053 def _schedule_new_jobs(self):
1054 queue_entries = self._refresh_pending_queue_entries()
1055 if not queue_entries:
1056 return
1057
showard63a34772008-08-18 19:32:50 +00001058 for queue_entry in queue_entries:
showarde55955f2009-10-07 20:48:58 +00001059 is_unassigned_atomic_group = (
1060 queue_entry.atomic_group_id is not None
1061 and queue_entry.host_id is None)
1062 if is_unassigned_atomic_group:
1063 self._schedule_atomic_group(queue_entry)
showarda9545c02009-12-18 22:44:26 +00001064 elif queue_entry.is_hostless():
1065 self._schedule_hostless_job(queue_entry)
showarde55955f2009-10-07 20:48:58 +00001066 else:
showard89f84db2009-03-12 20:39:13 +00001067 assigned_host = self._host_scheduler.find_eligible_host(
1068 queue_entry)
showard65db3932009-10-28 19:54:35 +00001069 if assigned_host and not self.host_has_agent(assigned_host):
showard89f84db2009-03-12 20:39:13 +00001070 self._run_queue_entry(queue_entry, assigned_host)
showardb95b1bd2008-08-15 18:11:04 +00001071
1072
showard8cc058f2009-09-08 16:26:33 +00001073 def _schedule_running_host_queue_entries(self):
showardd1195652009-12-08 22:21:02 +00001074 for agent_task in self._get_queue_entry_agent_tasks():
1075 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +00001076
1077
1078 def _schedule_delay_tasks(self):
showardd2014822009-10-12 20:26:58 +00001079 for entry in HostQueueEntry.fetch(where='status = "%s"' %
1080 models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +00001081 task = entry.job.schedule_delayed_callback_task(entry)
1082 if task:
showardd1195652009-12-08 22:21:02 +00001083 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +00001084
1085
showardb95b1bd2008-08-15 18:11:04 +00001086 def _run_queue_entry(self, queue_entry, host):
showard8cc058f2009-09-08 16:26:33 +00001087 queue_entry.schedule_pre_job_tasks(assigned_host=host)
mblighd5c95802008-03-05 00:33:46 +00001088
1089
jadmanski0afbb632008-06-06 21:10:57 +00001090 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +00001091 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001092 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001093 for agent in self.get_agents_for_entry(entry):
1094 agent.abort()
1095 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001096
1097
showard324bf812009-01-20 23:23:38 +00001098 def _can_start_agent(self, agent, num_started_this_cycle,
1099 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001100 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +00001101 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +00001102 return True
1103 # don't allow any nonzero-process agents to run after we've reached a
1104 # limit (this avoids starvation of many-process agents)
1105 if have_reached_limit:
1106 return False
1107 # total process throttling
showard9bb960b2009-11-19 01:02:11 +00001108 max_runnable_processes = _drone_manager.max_runnable_processes(
showardd1195652009-12-08 22:21:02 +00001109 agent.task.owner_username)
1110 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +00001111 return False
1112 # if a single agent exceeds the per-cycle throttling, still allow it to
1113 # run when it's the first agent in the cycle
1114 if num_started_this_cycle == 0:
1115 return True
1116 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +00001117 if (num_started_this_cycle + agent.task.num_processes >
1118 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001119 return False
1120 return True
1121
1122
jadmanski0afbb632008-06-06 21:10:57 +00001123 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001124 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001125 have_reached_limit = False
1126 # iterate over copy, so we can remove agents during iteration
1127 for agent in list(self._agents):
showard8cc058f2009-09-08 16:26:33 +00001128 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001129 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001130 have_reached_limit):
1131 have_reached_limit = True
1132 continue
showardd1195652009-12-08 22:21:02 +00001133 num_started_this_cycle += agent.task.num_processes
showard4c5374f2008-09-04 17:02:56 +00001134 agent.tick()
showard8cc058f2009-09-08 16:26:33 +00001135 if agent.is_done():
1136 logging.info("agent finished")
1137 self.remove_agent(agent)
showarda9435c02009-05-13 21:28:17 +00001138 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001139 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001140
1141
showard29f7cd22009-04-29 21:16:24 +00001142 def _process_recurring_runs(self):
1143 recurring_runs = models.RecurringRun.objects.filter(
1144 start_date__lte=datetime.datetime.now())
1145 for rrun in recurring_runs:
1146 # Create job from template
1147 job = rrun.job
1148 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001149 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001150
1151 host_objects = info['hosts']
1152 one_time_hosts = info['one_time_hosts']
1153 metahost_objects = info['meta_hosts']
1154 dependencies = info['dependencies']
1155 atomic_group = info['atomic_group']
1156
1157 for host in one_time_hosts or []:
1158 this_host = models.Host.create_one_time_host(host.hostname)
1159 host_objects.append(this_host)
1160
1161 try:
1162 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001163 options=options,
showard29f7cd22009-04-29 21:16:24 +00001164 host_objects=host_objects,
1165 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001166 atomic_group=atomic_group)
1167
1168 except Exception, ex:
1169 logging.exception(ex)
1170 #TODO send email
1171
1172 if rrun.loop_count == 1:
1173 rrun.delete()
1174 else:
1175 if rrun.loop_count != 0: # if not infinite loop
1176 # calculate new start_date
1177 difference = datetime.timedelta(seconds=rrun.loop_period)
1178 rrun.start_date = rrun.start_date + difference
1179 rrun.loop_count -= 1
1180 rrun.save()
1181
1182
showard170873e2009-01-07 00:22:26 +00001183class PidfileRunMonitor(object):
1184 """
1185 Client must call either run() to start a new process or
1186 attach_to_existing_process().
1187 """
mbligh36768f02008-02-22 18:28:33 +00001188
showard170873e2009-01-07 00:22:26 +00001189 class _PidfileException(Exception):
1190 """
1191 Raised when there's some unexpected behavior with the pid file, but only
1192 used internally (never allowed to escape this class).
1193 """
mbligh36768f02008-02-22 18:28:33 +00001194
1195
showard170873e2009-01-07 00:22:26 +00001196 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001197 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001198 self._start_time = None
1199 self.pidfile_id = None
1200 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001201
1202
showard170873e2009-01-07 00:22:26 +00001203 def _add_nice_command(self, command, nice_level):
1204 if not nice_level:
1205 return command
1206 return ['nice', '-n', str(nice_level)] + command
1207
1208
1209 def _set_start_time(self):
1210 self._start_time = time.time()
1211
1212
showard418785b2009-11-23 20:19:59 +00001213 def run(self, command, working_directory, num_processes, nice_level=None,
1214 log_file=None, pidfile_name=None, paired_with_pidfile=None,
1215 username=None):
showard170873e2009-01-07 00:22:26 +00001216 assert command is not None
1217 if nice_level is not None:
1218 command = ['nice', '-n', str(nice_level)] + command
1219 self._set_start_time()
1220 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001221 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +00001222 num_processes=num_processes, log_file=log_file,
1223 paired_with_pidfile=paired_with_pidfile, username=username)
showard170873e2009-01-07 00:22:26 +00001224
1225
showarded2afea2009-07-07 20:54:07 +00001226 def attach_to_existing_process(self, execution_path,
showardd1195652009-12-08 22:21:02 +00001227 pidfile_name=_AUTOSERV_PID_FILE,
1228 num_processes=None):
showard170873e2009-01-07 00:22:26 +00001229 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001230 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001231 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +00001232 if num_processes is not None:
1233 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +00001234
1235
jadmanski0afbb632008-06-06 21:10:57 +00001236 def kill(self):
showard170873e2009-01-07 00:22:26 +00001237 if self.has_process():
1238 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001239
mbligh36768f02008-02-22 18:28:33 +00001240
showard170873e2009-01-07 00:22:26 +00001241 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001242 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001243 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001244
1245
showard170873e2009-01-07 00:22:26 +00001246 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001247 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001248 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001249 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001250
1251
showard170873e2009-01-07 00:22:26 +00001252 def _read_pidfile(self, use_second_read=False):
1253 assert self.pidfile_id is not None, (
1254 'You must call run() or attach_to_existing_process()')
1255 contents = _drone_manager.get_pidfile_contents(
1256 self.pidfile_id, use_second_read=use_second_read)
1257 if contents.is_invalid():
1258 self._state = drone_manager.PidfileContents()
1259 raise self._PidfileException(contents)
1260 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001261
1262
showard21baa452008-10-21 00:08:39 +00001263 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001264 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1265 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001266 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001267 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001268
1269
1270 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001271 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001272 return
mblighbb421852008-03-11 22:36:16 +00001273
showard21baa452008-10-21 00:08:39 +00001274 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001275
showard170873e2009-01-07 00:22:26 +00001276 if self._state.process is None:
1277 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001278 return
mbligh90a549d2008-03-25 23:52:34 +00001279
showard21baa452008-10-21 00:08:39 +00001280 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001281 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001282 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001283 return
mbligh90a549d2008-03-25 23:52:34 +00001284
showard170873e2009-01-07 00:22:26 +00001285 # pid but no running process - maybe process *just* exited
1286 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001287 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001288 # autoserv exited without writing an exit code
1289 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001290 self._handle_pidfile_error(
1291 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001292
showard21baa452008-10-21 00:08:39 +00001293
1294 def _get_pidfile_info(self):
1295 """\
1296 After completion, self._state will contain:
1297 pid=None, exit_status=None if autoserv has not yet run
1298 pid!=None, exit_status=None if autoserv is running
1299 pid!=None, exit_status!=None if autoserv has completed
1300 """
1301 try:
1302 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001303 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001304 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001305
1306
showard170873e2009-01-07 00:22:26 +00001307 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001308 """\
1309 Called when no pidfile is found or no pid is in the pidfile.
1310 """
showard170873e2009-01-07 00:22:26 +00001311 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001312 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001313 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001314 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001315 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001316
1317
showard35162b02009-03-03 02:17:30 +00001318 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001319 """\
1320 Called when autoserv has exited without writing an exit status,
1321 or we've timed out waiting for autoserv to write a pid to the
1322 pidfile. In either case, we just return failure and the caller
1323 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001324
showard170873e2009-01-07 00:22:26 +00001325 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001326 """
1327 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001328 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001329 self._state.exit_status = 1
1330 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001331
1332
jadmanski0afbb632008-06-06 21:10:57 +00001333 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001334 self._get_pidfile_info()
1335 return self._state.exit_status
1336
1337
1338 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001339 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001340 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001341 if self._state.num_tests_failed is None:
1342 return -1
showard21baa452008-10-21 00:08:39 +00001343 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001344
1345
showardcdaeae82009-08-31 18:32:48 +00001346 def try_copy_results_on_drone(self, **kwargs):
1347 if self.has_process():
1348 # copy results logs into the normal place for job results
1349 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1350
1351
1352 def try_copy_to_results_repository(self, source, **kwargs):
1353 if self.has_process():
1354 _drone_manager.copy_to_results_repository(self.get_process(),
1355 source, **kwargs)
1356
1357
mbligh36768f02008-02-22 18:28:33 +00001358class Agent(object):
showard77182562009-06-10 00:16:05 +00001359 """
showard8cc058f2009-09-08 16:26:33 +00001360 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001361
1362 The following methods are required on all task objects:
1363 poll() - Called periodically to let the task check its status and
1364 update its internal state. If the task succeeded.
1365 is_done() - Returns True if the task is finished.
1366 abort() - Called when an abort has been requested. The task must
1367 set its aborted attribute to True if it actually aborted.
1368
1369 The following attributes are required on all task objects:
1370 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001371 success - bool, True if this task succeeded.
1372 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1373 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001374 """
1375
1376
showard418785b2009-11-23 20:19:59 +00001377 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001378 """
showard8cc058f2009-09-08 16:26:33 +00001379 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001380 """
showard8cc058f2009-09-08 16:26:33 +00001381 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001382
showard77182562009-06-10 00:16:05 +00001383 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001384 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001385
showard8cc058f2009-09-08 16:26:33 +00001386 self.queue_entry_ids = task.queue_entry_ids
1387 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001388
showard8cc058f2009-09-08 16:26:33 +00001389 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001390 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001391
1392
jadmanski0afbb632008-06-06 21:10:57 +00001393 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001394 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001395 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001396 self.task.poll()
1397 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001398 self.finished = True
showardec113162008-05-08 00:52:49 +00001399
1400
jadmanski0afbb632008-06-06 21:10:57 +00001401 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001402 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001403
1404
showardd3dc1992009-04-22 21:01:40 +00001405 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001406 if self.task:
1407 self.task.abort()
1408 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001409 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001410 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001411
showardd3dc1992009-04-22 21:01:40 +00001412
showard77182562009-06-10 00:16:05 +00001413class DelayedCallTask(object):
1414 """
1415 A task object like AgentTask for an Agent to run that waits for the
1416 specified amount of time to have elapsed before calling the supplied
1417 callback once and finishing. If the callback returns anything, it is
1418 assumed to be a new Agent instance and will be added to the dispatcher.
1419
1420 @attribute end_time: The absolute posix time after which this task will
1421 call its callback when it is polled and be finished.
1422
1423 Also has all attributes required by the Agent class.
1424 """
1425 def __init__(self, delay_seconds, callback, now_func=None):
1426 """
1427 @param delay_seconds: The delay in seconds from now that this task
1428 will call the supplied callback and be done.
1429 @param callback: A callable to be called by this task once after at
1430 least delay_seconds time has elapsed. It must return None
1431 or a new Agent instance.
1432 @param now_func: A time.time like function. Default: time.time.
1433 Used for testing.
1434 """
1435 assert delay_seconds > 0
1436 assert callable(callback)
1437 if not now_func:
1438 now_func = time.time
1439 self._now_func = now_func
1440 self._callback = callback
1441
1442 self.end_time = self._now_func() + delay_seconds
1443
1444 # These attributes are required by Agent.
1445 self.aborted = False
showard77182562009-06-10 00:16:05 +00001446 self.host_ids = ()
1447 self.success = False
1448 self.queue_entry_ids = ()
showard418785b2009-11-23 20:19:59 +00001449 self.num_processes = 0
showard77182562009-06-10 00:16:05 +00001450
1451
1452 def poll(self):
showard8cc058f2009-09-08 16:26:33 +00001453 if not self.is_done() and self._now_func() >= self.end_time:
1454 self._callback()
showard77182562009-06-10 00:16:05 +00001455 self.success = True
1456
1457
1458 def is_done(self):
showard8cc058f2009-09-08 16:26:33 +00001459 return self.success or self.aborted
showard77182562009-06-10 00:16:05 +00001460
1461
1462 def abort(self):
1463 self.aborted = True
showard77182562009-06-10 00:16:05 +00001464
1465
mbligh36768f02008-02-22 18:28:33 +00001466class AgentTask(object):
showardd1195652009-12-08 22:21:02 +00001467 class _NullMonitor(object):
1468 pidfile_id = None
1469
1470 def has_process(self):
1471 return True
1472
1473
1474 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001475 """
showardd1195652009-12-08 22:21:02 +00001476 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001477 """
jadmanski0afbb632008-06-06 21:10:57 +00001478 self.done = False
showardd1195652009-12-08 22:21:02 +00001479 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001480 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001481 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001482 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001483 self.queue_entry_ids = []
1484 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001485 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001486
1487
1488 def _set_ids(self, host=None, queue_entries=None):
1489 if queue_entries and queue_entries != [None]:
1490 self.host_ids = [entry.host.id for entry in queue_entries]
1491 self.queue_entry_ids = [entry.id for entry in queue_entries]
1492 else:
1493 assert host
1494 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001495
1496
jadmanski0afbb632008-06-06 21:10:57 +00001497 def poll(self):
showard08a36412009-05-05 01:01:13 +00001498 if not self.started:
1499 self.start()
showardd1195652009-12-08 22:21:02 +00001500 if not self.done:
1501 self.tick()
showard08a36412009-05-05 01:01:13 +00001502
1503
1504 def tick(self):
showardd1195652009-12-08 22:21:02 +00001505 assert self.monitor
1506 exit_code = self.monitor.exit_code()
1507 if exit_code is None:
1508 return
mbligh36768f02008-02-22 18:28:33 +00001509
showardd1195652009-12-08 22:21:02 +00001510 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001511 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001512
1513
jadmanski0afbb632008-06-06 21:10:57 +00001514 def is_done(self):
1515 return self.done
mbligh36768f02008-02-22 18:28:33 +00001516
1517
jadmanski0afbb632008-06-06 21:10:57 +00001518 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001519 if self.done:
showardd1195652009-12-08 22:21:02 +00001520 assert self.started
showard08a36412009-05-05 01:01:13 +00001521 return
showardd1195652009-12-08 22:21:02 +00001522 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001523 self.done = True
1524 self.success = success
1525 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001526
1527
jadmanski0afbb632008-06-06 21:10:57 +00001528 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001529 """
1530 To be overridden.
1531 """
showarded2afea2009-07-07 20:54:07 +00001532 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001533 self.register_necessary_pidfiles()
1534
1535
1536 def _log_file(self):
1537 if not self._log_file_name:
1538 return None
1539 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001540
mbligh36768f02008-02-22 18:28:33 +00001541
jadmanski0afbb632008-06-06 21:10:57 +00001542 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001543 log_file = self._log_file()
1544 if self.monitor and log_file:
1545 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001546
1547
jadmanski0afbb632008-06-06 21:10:57 +00001548 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001549 """
1550 To be overridden.
1551 """
jadmanski0afbb632008-06-06 21:10:57 +00001552 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001553 logging.info("%s finished with success=%s", type(self).__name__,
1554 self.success)
1555
mbligh36768f02008-02-22 18:28:33 +00001556
1557
jadmanski0afbb632008-06-06 21:10:57 +00001558 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001559 if not self.started:
1560 self.prolog()
1561 self.run()
1562
1563 self.started = True
1564
1565
1566 def abort(self):
1567 if self.monitor:
1568 self.monitor.kill()
1569 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001570 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001571 self.cleanup()
1572
1573
showarded2afea2009-07-07 20:54:07 +00001574 def _get_consistent_execution_path(self, execution_entries):
1575 first_execution_path = execution_entries[0].execution_path()
1576 for execution_entry in execution_entries[1:]:
1577 assert execution_entry.execution_path() == first_execution_path, (
1578 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1579 execution_entry,
1580 first_execution_path,
1581 execution_entries[0]))
1582 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001583
1584
showarded2afea2009-07-07 20:54:07 +00001585 def _copy_results(self, execution_entries, use_monitor=None):
1586 """
1587 @param execution_entries: list of objects with execution_path() method
1588 """
showard6d1c1432009-08-20 23:30:39 +00001589 if use_monitor is not None and not use_monitor.has_process():
1590 return
1591
showarded2afea2009-07-07 20:54:07 +00001592 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001593 if use_monitor is None:
1594 assert self.monitor
1595 use_monitor = self.monitor
1596 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001597 execution_path = self._get_consistent_execution_path(execution_entries)
1598 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001599 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001600
showarda1e74b32009-05-12 17:32:04 +00001601
1602 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001603 for queue_entry in queue_entries:
1604 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001605
1606
showarda1e74b32009-05-12 17:32:04 +00001607 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1608 self._copy_results(queue_entries, use_monitor)
1609 self._parse_results(queue_entries)
1610
1611
showardd1195652009-12-08 22:21:02 +00001612 def _command_line(self):
1613 """
1614 Return the command line to run. Must be overridden.
1615 """
1616 raise NotImplementedError
1617
1618
1619 @property
1620 def num_processes(self):
1621 """
1622 Return the number of processes forked by this AgentTask's process. It
1623 may only be approximate. To be overridden if necessary.
1624 """
1625 return 1
1626
1627
1628 def _paired_with_monitor(self):
1629 """
1630 If this AgentTask's process must run on the same machine as some
1631 previous process, this method should be overridden to return a
1632 PidfileRunMonitor for that process.
1633 """
1634 return self._NullMonitor()
1635
1636
1637 @property
1638 def owner_username(self):
1639 """
1640 Return login of user responsible for this task. May be None. Must be
1641 overridden.
1642 """
1643 raise NotImplementedError
1644
1645
1646 def _working_directory(self):
1647 """
1648 Return the directory where this AgentTask's process executes. Must be
1649 overridden.
1650 """
1651 raise NotImplementedError
1652
1653
1654 def _pidfile_name(self):
1655 """
1656 Return the name of the pidfile this AgentTask's process uses. To be
1657 overridden if necessary.
1658 """
1659 return _AUTOSERV_PID_FILE
1660
1661
1662 def _check_paired_results_exist(self):
1663 if not self._paired_with_monitor().has_process():
1664 email_manager.manager.enqueue_notify_email(
1665 'No paired results in task',
1666 'No paired results in task %s at %s'
1667 % (self, self._paired_with_monitor().pidfile_id))
1668 self.finished(False)
1669 return False
1670 return True
1671
1672
1673 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001674 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001675 self.monitor = PidfileRunMonitor()
1676
1677
1678 def run(self):
1679 if not self._check_paired_results_exist():
1680 return
1681
1682 self._create_monitor()
1683 self.monitor.run(
1684 self._command_line(), self._working_directory(),
1685 num_processes=self.num_processes,
1686 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1687 pidfile_name=self._pidfile_name(),
1688 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
1689 username=self.owner_username)
1690
1691
1692 def register_necessary_pidfiles(self):
1693 pidfile_id = _drone_manager.get_pidfile_id_from(
1694 self._working_directory(), self._pidfile_name())
1695 _drone_manager.register_pidfile(pidfile_id)
1696
1697 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1698 if paired_pidfile_id:
1699 _drone_manager.register_pidfile(paired_pidfile_id)
1700
1701
1702 def recover(self):
1703 if not self._check_paired_results_exist():
1704 return
1705
1706 self._create_monitor()
1707 self.monitor.attach_to_existing_process(
1708 self._working_directory(), pidfile_name=self._pidfile_name(),
1709 num_processes=self.num_processes)
1710 if not self.monitor.has_process():
1711 # no process to recover; wait to be started normally
1712 self.monitor = None
1713 return
1714
1715 self.started = True
1716 logging.info('Recovering process %s for %s at %s'
1717 % (self.monitor.get_process(), type(self).__name__,
1718 self._working_directory()))
mbligh36768f02008-02-22 18:28:33 +00001719
1720
showardd9205182009-04-27 20:09:55 +00001721class TaskWithJobKeyvals(object):
1722 """AgentTask mixin providing functionality to help with job keyval files."""
1723 _KEYVAL_FILE = 'keyval'
1724 def _format_keyval(self, key, value):
1725 return '%s=%s' % (key, value)
1726
1727
1728 def _keyval_path(self):
1729 """Subclasses must override this"""
1730 raise NotImplemented
1731
1732
1733 def _write_keyval_after_job(self, field, value):
1734 assert self.monitor
1735 if not self.monitor.has_process():
1736 return
1737 _drone_manager.write_lines_to_file(
1738 self._keyval_path(), [self._format_keyval(field, value)],
1739 paired_with_process=self.monitor.get_process())
1740
1741
1742 def _job_queued_keyval(self, job):
1743 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1744
1745
1746 def _write_job_finished(self):
1747 self._write_keyval_after_job("job_finished", int(time.time()))
1748
1749
showarddb502762009-09-09 15:31:20 +00001750 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1751 keyval_contents = '\n'.join(self._format_keyval(key, value)
1752 for key, value in keyval_dict.iteritems())
1753 # always end with a newline to allow additional keyvals to be written
1754 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001755 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001756 keyval_contents,
1757 file_path=keyval_path)
1758
1759
1760 def _write_keyvals_before_job(self, keyval_dict):
1761 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1762
1763
1764 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001765 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001766 host.hostname)
1767 platform, all_labels = host.platform_and_labels()
1768 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1769 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1770
1771
showard8cc058f2009-09-08 16:26:33 +00001772class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001773 """
1774 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1775 """
1776
1777 TASK_TYPE = None
1778 host = None
1779 queue_entry = None
1780
showardd1195652009-12-08 22:21:02 +00001781 def __init__(self, task, extra_command_args):
1782 super(SpecialAgentTask, self).__init__()
1783
showarded2afea2009-07-07 20:54:07 +00001784 assert (self.TASK_TYPE is not None,
1785 'self.TASK_TYPE must be overridden')
showard8cc058f2009-09-08 16:26:33 +00001786
1787 self.host = Host(id=task.host.id)
1788 self.queue_entry = None
1789 if task.queue_entry:
1790 self.queue_entry = HostQueueEntry(id=task.queue_entry.id)
1791
showarded2afea2009-07-07 20:54:07 +00001792 self.task = task
1793 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001794
1795
showard8cc058f2009-09-08 16:26:33 +00001796 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001797 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1798
1799
1800 def _command_line(self):
1801 return _autoserv_command_line(self.host.hostname,
1802 self._extra_command_args,
1803 queue_entry=self.queue_entry)
1804
1805
1806 def _working_directory(self):
1807 return self.task.execution_path()
1808
1809
1810 @property
1811 def owner_username(self):
1812 if self.task.requested_by:
1813 return self.task.requested_by.login
1814 return None
showard8cc058f2009-09-08 16:26:33 +00001815
1816
showarded2afea2009-07-07 20:54:07 +00001817 def prolog(self):
1818 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001819 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001820 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001821
1822
showardde634ee2009-01-30 01:44:24 +00001823 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001824 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001825
showard2fe3f1d2009-07-06 20:19:11 +00001826 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001827 return # don't fail metahost entries, they'll be reassigned
1828
showard2fe3f1d2009-07-06 20:19:11 +00001829 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001830 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001831 return # entry has been aborted
1832
showard2fe3f1d2009-07-06 20:19:11 +00001833 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001834 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001835 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001836 self._write_keyval_after_job(queued_key, queued_time)
1837 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001838
showard8cc058f2009-09-08 16:26:33 +00001839 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001840 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001841 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001842 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001843
showard2fe3f1d2009-07-06 20:19:11 +00001844 self._copy_results([self.queue_entry])
showardd1195652009-12-08 22:21:02 +00001845
1846 if not self.queue_entry.job.parse_failed_repair:
1847 self.queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
1848 return
showard8cc058f2009-09-08 16:26:33 +00001849
1850 pidfile_id = _drone_manager.get_pidfile_id_from(
1851 self.queue_entry.execution_path(),
1852 pidfile_name=_AUTOSERV_PID_FILE)
1853 _drone_manager.register_pidfile(pidfile_id)
showardd1195652009-12-08 22:21:02 +00001854 self._parse_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001855
1856
1857 def cleanup(self):
1858 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001859
1860 # We will consider an aborted task to be "Failed"
1861 self.task.finish(bool(self.success))
1862
showardf85a0b72009-10-07 20:48:45 +00001863 if self.monitor:
1864 if self.monitor.has_process():
1865 self._copy_results([self.task])
1866 if self.monitor.pidfile_id is not None:
1867 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001868
1869
1870class RepairTask(SpecialAgentTask):
1871 TASK_TYPE = models.SpecialTask.Task.REPAIR
1872
1873
showardd1195652009-12-08 22:21:02 +00001874 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001875 """\
1876 queue_entry: queue entry to mark failed if this repair fails.
1877 """
1878 protection = host_protections.Protection.get_string(
1879 task.host.protection)
1880 # normalize the protection name
1881 protection = host_protections.Protection.get_attr_name(protection)
1882
1883 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001884 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001885
1886 # *don't* include the queue entry in IDs -- if the queue entry is
1887 # aborted, we want to leave the repair task running
1888 self._set_ids(host=self.host)
1889
1890
1891 def prolog(self):
1892 super(RepairTask, self).prolog()
1893 logging.info("repair_task starting")
1894 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001895
1896
jadmanski0afbb632008-06-06 21:10:57 +00001897 def epilog(self):
1898 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001899
jadmanski0afbb632008-06-06 21:10:57 +00001900 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001901 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001902 else:
showard8cc058f2009-09-08 16:26:33 +00001903 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001904 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001905 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001906
1907
showarded2afea2009-07-07 20:54:07 +00001908class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001909 def _copy_to_results_repository(self):
1910 if not self.queue_entry or self.queue_entry.meta_host:
1911 return
1912
1913 self.queue_entry.set_execution_subdir()
1914 log_name = os.path.basename(self.task.execution_path())
1915 source = os.path.join(self.task.execution_path(), 'debug',
1916 'autoserv.DEBUG')
1917 destination = os.path.join(
1918 self.queue_entry.execution_path(), log_name)
1919
1920 self.monitor.try_copy_to_results_repository(
1921 source, destination_path=destination)
1922
1923
showard170873e2009-01-07 00:22:26 +00001924 def epilog(self):
1925 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001926
showard775300b2009-09-09 15:30:50 +00001927 if self.success:
1928 return
showard8fe93b52008-11-18 17:53:22 +00001929
showard775300b2009-09-09 15:30:50 +00001930 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001931
showard775300b2009-09-09 15:30:50 +00001932 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001933 # effectively ignore failure for these hosts
1934 self.success = True
showard775300b2009-09-09 15:30:50 +00001935 return
1936
1937 if self.queue_entry:
1938 self.queue_entry.requeue()
1939
1940 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001941 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001942 queue_entry__id=self.queue_entry.id):
1943 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1944 self._fail_queue_entry()
1945 return
1946
showard9bb960b2009-11-19 01:02:11 +00001947 queue_entry = models.HostQueueEntry.objects.get(
1948 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00001949 else:
1950 queue_entry = None
1951
1952 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00001953 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00001954 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00001955 queue_entry=queue_entry,
1956 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00001957
showard8fe93b52008-11-18 17:53:22 +00001958
1959class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001960 TASK_TYPE = models.SpecialTask.Task.VERIFY
1961
1962
showardd1195652009-12-08 22:21:02 +00001963 def __init__(self, task):
1964 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00001965 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001966
1967
jadmanski0afbb632008-06-06 21:10:57 +00001968 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001969 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001970
showardb18134f2009-03-20 20:52:18 +00001971 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001972 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001973 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1974 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001975
showarded2afea2009-07-07 20:54:07 +00001976 # Delete any other queued verifies for this host. One verify will do
1977 # and there's no need to keep records of other requests.
1978 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001979 host__id=self.host.id,
1980 task=models.SpecialTask.Task.VERIFY,
showarded2afea2009-07-07 20:54:07 +00001981 is_active=False, is_complete=False)
1982 queued_verifies = queued_verifies.exclude(id=self.task.id)
1983 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001984
mbligh36768f02008-02-22 18:28:33 +00001985
jadmanski0afbb632008-06-06 21:10:57 +00001986 def epilog(self):
1987 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001988 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001989 if self.queue_entry:
1990 self.queue_entry.on_pending()
1991 else:
1992 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001993
1994
showarda9545c02009-12-18 22:44:26 +00001995class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
1996 """
1997 Common functionality for QueueTask and HostlessQueueTask
1998 """
1999 def __init__(self, queue_entries):
2000 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00002001 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00002002 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00002003
2004
showard73ec0442009-02-07 02:05:20 +00002005 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00002006 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00002007
2008
showardd1195652009-12-08 22:21:02 +00002009 def _command_line(self):
2010 return self.job.get_autoserv_params(self.queue_entries)
2011
2012
2013 @property
2014 def num_processes(self):
2015 return len(self.queue_entries)
2016
2017
2018 @property
2019 def owner_username(self):
2020 return self.job.owner
2021
2022
2023 def _working_directory(self):
2024 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00002025
2026
jadmanski0afbb632008-06-06 21:10:57 +00002027 def prolog(self):
showardd9205182009-04-27 20:09:55 +00002028 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00002029 keyval_dict = {queued_key: queued_time}
showardd1195652009-12-08 22:21:02 +00002030 group_name = self.queue_entries[0].get_group_name()
2031 if group_name:
2032 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00002033 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00002034 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002035 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00002036 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00002037
2038
showard35162b02009-03-03 02:17:30 +00002039 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00002040 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00002041 _drone_manager.write_lines_to_file(error_file_path,
2042 [_LOST_PROCESS_ERROR])
2043
2044
showardd3dc1992009-04-22 21:01:40 +00002045 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00002046 if not self.monitor:
2047 return
2048
showardd9205182009-04-27 20:09:55 +00002049 self._write_job_finished()
2050
showard35162b02009-03-03 02:17:30 +00002051 if self.monitor.lost_process:
2052 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00002053
jadmanskif7fa2cc2008-10-01 14:13:23 +00002054
showardcbd74612008-11-19 21:42:02 +00002055 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00002056 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00002057 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00002058 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00002059 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00002060
2061
jadmanskif7fa2cc2008-10-01 14:13:23 +00002062 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00002063 if not self.monitor or not self.monitor.has_process():
2064 return
2065
jadmanskif7fa2cc2008-10-01 14:13:23 +00002066 # build up sets of all the aborted_by and aborted_on values
2067 aborted_by, aborted_on = set(), set()
2068 for queue_entry in self.queue_entries:
2069 if queue_entry.aborted_by:
2070 aborted_by.add(queue_entry.aborted_by)
2071 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
2072 aborted_on.add(t)
2073
2074 # extract some actual, unique aborted by value and write it out
2075 assert len(aborted_by) <= 1
2076 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00002077 aborted_by_value = aborted_by.pop()
2078 aborted_on_value = max(aborted_on)
2079 else:
2080 aborted_by_value = 'autotest_system'
2081 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00002082
showarda0382352009-02-11 23:36:43 +00002083 self._write_keyval_after_job("aborted_by", aborted_by_value)
2084 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00002085
showardcbd74612008-11-19 21:42:02 +00002086 aborted_on_string = str(datetime.datetime.fromtimestamp(
2087 aborted_on_value))
2088 self._write_status_comment('Job aborted by %s on %s' %
2089 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00002090
2091
jadmanski0afbb632008-06-06 21:10:57 +00002092 def abort(self):
showarda9545c02009-12-18 22:44:26 +00002093 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00002094 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00002095 self._finish_task()
showard21baa452008-10-21 00:08:39 +00002096
2097
jadmanski0afbb632008-06-06 21:10:57 +00002098 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00002099 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002100 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00002101
2102
2103class QueueTask(AbstractQueueTask):
2104 def __init__(self, queue_entries):
2105 super(QueueTask, self).__init__(queue_entries)
2106 self._set_ids(queue_entries=queue_entries)
2107
2108
2109 def prolog(self):
2110 for entry in self.queue_entries:
2111 if entry.status not in (models.HostQueueEntry.Status.STARTING,
2112 models.HostQueueEntry.Status.RUNNING):
2113 raise SchedulerError('Queue task attempting to start '
2114 'entry with invalid status %s: %s'
2115 % (entry.status, entry))
2116 if entry.host.status not in (models.Host.Status.PENDING,
2117 models.Host.Status.RUNNING):
2118 raise SchedulerError('Queue task attempting to start on queue '
2119 'entry with invalid host status %s: %s'
2120 % (entry.host.status, entry))
2121
2122 super(QueueTask, self).prolog()
2123
2124 for queue_entry in self.queue_entries:
2125 self._write_host_keyvals(queue_entry.host)
2126 queue_entry.host.set_status(models.Host.Status.RUNNING)
2127 queue_entry.host.update_field('dirty', 1)
2128 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
2129 # TODO(gps): Remove this if nothing needs it anymore.
2130 # A potential user is: tko/parser
2131 self.job.write_to_machines_file(self.queue_entries[0])
2132
2133
2134 def _finish_task(self):
2135 super(QueueTask, self)._finish_task()
2136
2137 for queue_entry in self.queue_entries:
2138 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
mbligh36768f02008-02-22 18:28:33 +00002139
2140
showardd3dc1992009-04-22 21:01:40 +00002141class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00002142 def __init__(self, queue_entries, log_file_name):
2143 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00002144
showardd1195652009-12-08 22:21:02 +00002145 self.queue_entries = queue_entries
2146
showardd3dc1992009-04-22 21:01:40 +00002147 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00002148 self._autoserv_monitor.attach_to_existing_process(
2149 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00002150
showardd1195652009-12-08 22:21:02 +00002151
2152 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00002153 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00002154 return 'true'
2155 return self._generate_command(
2156 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00002157
2158
2159 def _generate_command(self, results_dir):
2160 raise NotImplementedError('Subclasses must override this')
2161
2162
showardd1195652009-12-08 22:21:02 +00002163 @property
2164 def owner_username(self):
2165 return self.queue_entries[0].job.owner
2166
2167
2168 def _working_directory(self):
2169 return self._get_consistent_execution_path(self.queue_entries)
2170
2171
2172 def _paired_with_monitor(self):
2173 return self._autoserv_monitor
2174
2175
showardd3dc1992009-04-22 21:01:40 +00002176 def _job_was_aborted(self):
2177 was_aborted = None
showardd1195652009-12-08 22:21:02 +00002178 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002179 queue_entry.update_from_database()
2180 if was_aborted is None: # first queue entry
2181 was_aborted = bool(queue_entry.aborted)
2182 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
2183 email_manager.manager.enqueue_notify_email(
2184 'Inconsistent abort state',
2185 'Queue entries have inconsistent abort state: ' +
2186 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
2187 # don't crash here, just assume true
2188 return True
2189 return was_aborted
2190
2191
showardd1195652009-12-08 22:21:02 +00002192 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002193 if self._job_was_aborted():
2194 return models.HostQueueEntry.Status.ABORTED
2195
2196 # we'll use a PidfileRunMonitor to read the autoserv exit status
2197 if self._autoserv_monitor.exit_code() == 0:
2198 return models.HostQueueEntry.Status.COMPLETED
2199 return models.HostQueueEntry.Status.FAILED
2200
2201
showardd3dc1992009-04-22 21:01:40 +00002202 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002203 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002204 queue_entry.set_status(status)
2205
2206
2207 def abort(self):
2208 # override AgentTask.abort() to avoid killing the process and ending
2209 # the task. post-job tasks continue when the job is aborted.
2210 pass
2211
2212
showard9bb960b2009-11-19 01:02:11 +00002213class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002214 """
2215 Task responsible for
2216 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2217 * copying logs to the results repository
2218 * spawning CleanupTasks for hosts, if necessary
2219 * spawning a FinalReparseTask for the job
2220 """
showardd1195652009-12-08 22:21:02 +00002221 def __init__(self, queue_entries, recover_run_monitor=None):
2222 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002223 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002224 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002225 self._set_ids(queue_entries=queue_entries)
2226
2227
2228 def _generate_command(self, results_dir):
2229 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002230 for queue_entry in self.queue_entries)
showardd3dc1992009-04-22 21:01:40 +00002231 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
2232 '-r', results_dir]
2233
2234
showardd1195652009-12-08 22:21:02 +00002235 @property
2236 def num_processes(self):
2237 return len(self.queue_entries)
2238
2239
2240 def _pidfile_name(self):
2241 return _CRASHINFO_PID_FILE
2242
2243
showardd3dc1992009-04-22 21:01:40 +00002244 def prolog(self):
showardd1195652009-12-08 22:21:02 +00002245 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002246 if queue_entry.status != models.HostQueueEntry.Status.GATHERING:
2247 raise SchedulerError('Gather task attempting to start on '
2248 'non-gathering entry: %s' % queue_entry)
2249 if queue_entry.host.status != models.Host.Status.RUNNING:
2250 raise SchedulerError('Gather task attempting to start on queue '
showard0c5c18d2009-09-24 22:20:45 +00002251 'entry with non-running host status %s: %s'
2252 % (queue_entry.host.status, queue_entry))
showard8cc058f2009-09-08 16:26:33 +00002253
showardd3dc1992009-04-22 21:01:40 +00002254 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002255
2256
showardd3dc1992009-04-22 21:01:40 +00002257 def epilog(self):
2258 super(GatherLogsTask, self).epilog()
showardb5626452009-06-30 01:57:28 +00002259
showardd1195652009-12-08 22:21:02 +00002260 self._copy_and_parse_results(self.queue_entries,
showard6d1c1432009-08-20 23:30:39 +00002261 use_monitor=self._autoserv_monitor)
showard9bb960b2009-11-19 01:02:11 +00002262 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002263
showard9bb960b2009-11-19 01:02:11 +00002264
2265 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002266 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002267 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002268 models.HostQueueEntry.Status.COMPLETED)
2269 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2270 else:
2271 final_success = False
2272 num_tests_failed = 0
2273
showard9bb960b2009-11-19 01:02:11 +00002274 reboot_after = self._job.reboot_after
2275 do_reboot = (
2276 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002277 self._final_status() == models.HostQueueEntry.Status.ABORTED
showard9bb960b2009-11-19 01:02:11 +00002278 or reboot_after == models.RebootAfter.ALWAYS
2279 or (reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED
2280 and final_success and num_tests_failed == 0))
2281
showardd1195652009-12-08 22:21:02 +00002282 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002283 if do_reboot:
2284 # don't pass the queue entry to the CleanupTask. if the cleanup
2285 # fails, the job doesn't care -- it's over.
2286 models.SpecialTask.objects.create(
2287 host=models.Host.objects.get(id=queue_entry.host.id),
2288 task=models.SpecialTask.Task.CLEANUP,
2289 requested_by=self._job.owner_model())
2290 else:
2291 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002292
2293
showard0bbfc212009-04-29 21:06:13 +00002294 def run(self):
showard597bfd32009-05-08 18:22:50 +00002295 autoserv_exit_code = self._autoserv_monitor.exit_code()
2296 # only run if Autoserv exited due to some signal. if we have no exit
2297 # code, assume something bad (and signal-like) happened.
2298 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002299 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002300 else:
2301 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002302
2303
showard8fe93b52008-11-18 17:53:22 +00002304class CleanupTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002305 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2306
2307
showard8cc058f2009-09-08 16:26:33 +00002308 def __init__(self, task, recover_run_monitor=None):
showardd1195652009-12-08 22:21:02 +00002309 super(CleanupTask, self).__init__(task, ['--cleanup'])
showard8cc058f2009-09-08 16:26:33 +00002310 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mbligh16c722d2008-03-05 00:58:44 +00002311
mblighd5c95802008-03-05 00:33:46 +00002312
jadmanski0afbb632008-06-06 21:10:57 +00002313 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002314 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00002315 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard8cc058f2009-09-08 16:26:33 +00002316 self.host.set_status(models.Host.Status.CLEANING)
2317 if self.queue_entry:
2318 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2319
2320
showard775300b2009-09-09 15:30:50 +00002321 def _finish_epilog(self):
showard7b2d7cb2009-10-28 19:53:03 +00002322 if not self.queue_entry or not self.success:
showard775300b2009-09-09 15:30:50 +00002323 return
2324
showard7b2d7cb2009-10-28 19:53:03 +00002325 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
2326 should_run_verify = (
2327 self.queue_entry.job.run_verify
2328 and self.host.protection != do_not_verify_protection)
2329 if should_run_verify:
showard9bb960b2009-11-19 01:02:11 +00002330 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
showard7b2d7cb2009-10-28 19:53:03 +00002331 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00002332 host=models.Host.objects.get(id=self.host.id),
showard7b2d7cb2009-10-28 19:53:03 +00002333 queue_entry=entry,
2334 task=models.SpecialTask.Task.VERIFY)
2335 else:
showard775300b2009-09-09 15:30:50 +00002336 self.queue_entry.on_pending()
mblighd5c95802008-03-05 00:33:46 +00002337
mblighd5c95802008-03-05 00:33:46 +00002338
showard21baa452008-10-21 00:08:39 +00002339 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00002340 super(CleanupTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002341
showard21baa452008-10-21 00:08:39 +00002342 if self.success:
2343 self.host.update_field('dirty', 0)
showard775300b2009-09-09 15:30:50 +00002344 self.host.set_status(models.Host.Status.READY)
showard21baa452008-10-21 00:08:39 +00002345
showard775300b2009-09-09 15:30:50 +00002346 self._finish_epilog()
showard8cc058f2009-09-08 16:26:33 +00002347
showard21baa452008-10-21 00:08:39 +00002348
showardd3dc1992009-04-22 21:01:40 +00002349class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00002350 _num_running_parses = 0
2351
showardd1195652009-12-08 22:21:02 +00002352 def __init__(self, queue_entries):
2353 super(FinalReparseTask, self).__init__(queue_entries,
2354 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002355 # don't use _set_ids, since we don't want to set the host_ids
2356 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002357
2358
2359 def _generate_command(self, results_dir):
2360 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
2361 results_dir]
2362
2363
2364 @property
2365 def num_processes(self):
2366 return 0 # don't include parser processes in accounting
2367
2368
2369 def _pidfile_name(self):
2370 return _PARSER_PID_FILE
2371
2372
2373 def _parse_started(self):
2374 return bool(self.monitor)
showard97aed502008-11-04 02:01:24 +00002375
showard97aed502008-11-04 02:01:24 +00002376
2377 @classmethod
2378 def _increment_running_parses(cls):
2379 cls._num_running_parses += 1
2380
2381
2382 @classmethod
2383 def _decrement_running_parses(cls):
2384 cls._num_running_parses -= 1
2385
2386
2387 @classmethod
2388 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002389 return (cls._num_running_parses <
2390 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002391
2392
2393 def prolog(self):
showardd1195652009-12-08 22:21:02 +00002394 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002395 if queue_entry.status != models.HostQueueEntry.Status.PARSING:
2396 raise SchedulerError('Parse task attempting to start on '
2397 'non-parsing entry: %s' % queue_entry)
2398
showard97aed502008-11-04 02:01:24 +00002399 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002400
2401
2402 def epilog(self):
2403 super(FinalReparseTask, self).epilog()
showardd1195652009-12-08 22:21:02 +00002404 self._set_all_statuses(self._final_status())
showard97aed502008-11-04 02:01:24 +00002405
2406
showard08a36412009-05-05 01:01:13 +00002407 def tick(self):
2408 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002409 # and we can, at which point we revert to default behavior
showardd1195652009-12-08 22:21:02 +00002410 if self._parse_started():
showard08a36412009-05-05 01:01:13 +00002411 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002412 else:
2413 self._try_starting_parse()
2414
2415
2416 def run(self):
2417 # override run() to not actually run unless we can
2418 self._try_starting_parse()
2419
2420
2421 def _try_starting_parse(self):
2422 if not self._can_run_new_parse():
2423 return
showard170873e2009-01-07 00:22:26 +00002424
showard97aed502008-11-04 02:01:24 +00002425 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002426 super(FinalReparseTask, self).run()
showard97aed502008-11-04 02:01:24 +00002427 self._increment_running_parses()
showard97aed502008-11-04 02:01:24 +00002428
2429
2430 def finished(self, success):
2431 super(FinalReparseTask, self).finished(success)
showardd1195652009-12-08 22:21:02 +00002432 if self._parse_started():
showard678df4f2009-02-04 21:36:39 +00002433 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002434
2435
showarda9545c02009-12-18 22:44:26 +00002436class HostlessQueueTask(AbstractQueueTask):
2437 def __init__(self, queue_entry):
2438 super(HostlessQueueTask, self).__init__([queue_entry])
2439 self.queue_entry_ids = [queue_entry.id]
2440
2441
2442 def prolog(self):
2443 self.queue_entries[0].update_field('execution_subdir', 'hostless')
2444 super(HostlessQueueTask, self).prolog()
2445
2446
2447 def _final_status(self):
2448 if self.queue_entries[0].aborted:
2449 return models.HostQueueEntry.Status.ABORTED
2450 if self.monitor.exit_code() == 0:
2451 return models.HostQueueEntry.Status.COMPLETED
2452 return models.HostQueueEntry.Status.FAILED
2453
2454
2455 def _finish_task(self):
2456 super(HostlessQueueTask, self)._finish_task()
2457 self.queue_entries[0].set_status(self._final_status())
2458
2459
showarda3c58572009-03-12 20:36:59 +00002460class DBError(Exception):
2461 """Raised by the DBObject constructor when its select fails."""
2462
2463
mbligh36768f02008-02-22 18:28:33 +00002464class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002465 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002466
2467 # Subclasses MUST override these:
2468 _table_name = ''
2469 _fields = ()
2470
showarda3c58572009-03-12 20:36:59 +00002471 # A mapping from (type, id) to the instance of the object for that
2472 # particular id. This prevents us from creating new Job() and Host()
2473 # instances for every HostQueueEntry object that we instantiate as
2474 # multiple HQEs often share the same Job.
2475 _instances_by_type_and_id = weakref.WeakValueDictionary()
2476 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002477
showarda3c58572009-03-12 20:36:59 +00002478
2479 def __new__(cls, id=None, **kwargs):
2480 """
2481 Look to see if we already have an instance for this particular type
2482 and id. If so, use it instead of creating a duplicate instance.
2483 """
2484 if id is not None:
2485 instance = cls._instances_by_type_and_id.get((cls, id))
2486 if instance:
2487 return instance
2488 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2489
2490
2491 def __init__(self, id=None, row=None, new_record=False, always_query=True):
showard8cc058f2009-09-08 16:26:33 +00002492 assert bool(id) or bool(row)
2493 if id is not None and row is not None:
2494 assert id == row[0]
showard6ae5ea92009-02-25 00:11:51 +00002495 assert self._table_name, '_table_name must be defined in your class'
2496 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002497 if not new_record:
2498 if self._initialized and not always_query:
2499 return # We've already been initialized.
2500 if id is None:
2501 id = row[0]
2502 # Tell future constructors to use us instead of re-querying while
2503 # this instance is still around.
2504 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002505
showard6ae5ea92009-02-25 00:11:51 +00002506 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002507
jadmanski0afbb632008-06-06 21:10:57 +00002508 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002509
jadmanski0afbb632008-06-06 21:10:57 +00002510 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002511 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002512
showarda3c58572009-03-12 20:36:59 +00002513 if self._initialized:
2514 differences = self._compare_fields_in_row(row)
2515 if differences:
showard7629f142009-03-27 21:02:02 +00002516 logging.warn(
2517 'initialized %s %s instance requery is updating: %s',
2518 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002519 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002520 self._initialized = True
2521
2522
2523 @classmethod
2524 def _clear_instance_cache(cls):
2525 """Used for testing, clear the internal instance cache."""
2526 cls._instances_by_type_and_id.clear()
2527
2528
showardccbd6c52009-03-21 00:10:21 +00002529 def _fetch_row_from_db(self, row_id):
2530 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2531 rows = _db.execute(sql, (row_id,))
2532 if not rows:
showard76e29d12009-04-15 21:53:10 +00002533 raise DBError("row not found (table=%s, row id=%s)"
2534 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002535 return rows[0]
2536
2537
showarda3c58572009-03-12 20:36:59 +00002538 def _assert_row_length(self, row):
2539 assert len(row) == len(self._fields), (
2540 "table = %s, row = %s/%d, fields = %s/%d" % (
2541 self.__table, row, len(row), self._fields, len(self._fields)))
2542
2543
2544 def _compare_fields_in_row(self, row):
2545 """
showarddae680a2009-10-12 20:26:43 +00002546 Given a row as returned by a SELECT query, compare it to our existing in
2547 memory fields. Fractional seconds are stripped from datetime values
2548 before comparison.
showarda3c58572009-03-12 20:36:59 +00002549
2550 @param row - A sequence of values corresponding to fields named in
2551 The class attribute _fields.
2552
2553 @returns A dictionary listing the differences keyed by field name
2554 containing tuples of (current_value, row_value).
2555 """
2556 self._assert_row_length(row)
2557 differences = {}
showarddae680a2009-10-12 20:26:43 +00002558 datetime_cmp_fmt = '%Y-%m-%d %H:%M:%S' # Leave off the microseconds.
showarda3c58572009-03-12 20:36:59 +00002559 for field, row_value in itertools.izip(self._fields, row):
2560 current_value = getattr(self, field)
showarddae680a2009-10-12 20:26:43 +00002561 if (isinstance(current_value, datetime.datetime)
2562 and isinstance(row_value, datetime.datetime)):
2563 current_value = current_value.strftime(datetime_cmp_fmt)
2564 row_value = row_value.strftime(datetime_cmp_fmt)
showarda3c58572009-03-12 20:36:59 +00002565 if current_value != row_value:
2566 differences[field] = (current_value, row_value)
2567 return differences
showard2bab8f42008-11-12 18:15:22 +00002568
2569
2570 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002571 """
2572 Update our field attributes using a single row returned by SELECT.
2573
2574 @param row - A sequence of values corresponding to fields named in
2575 the class fields list.
2576 """
2577 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002578
showard2bab8f42008-11-12 18:15:22 +00002579 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002580 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002581 setattr(self, field, value)
2582 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002583
showard2bab8f42008-11-12 18:15:22 +00002584 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002585
mblighe2586682008-02-29 22:45:46 +00002586
showardccbd6c52009-03-21 00:10:21 +00002587 def update_from_database(self):
2588 assert self.id is not None
2589 row = self._fetch_row_from_db(self.id)
2590 self._update_fields_from_row(row)
2591
2592
jadmanski0afbb632008-06-06 21:10:57 +00002593 def count(self, where, table = None):
2594 if not table:
2595 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002596
jadmanski0afbb632008-06-06 21:10:57 +00002597 rows = _db.execute("""
2598 SELECT count(*) FROM %s
2599 WHERE %s
2600 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002601
jadmanski0afbb632008-06-06 21:10:57 +00002602 assert len(rows) == 1
2603
2604 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002605
2606
showardd3dc1992009-04-22 21:01:40 +00002607 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002608 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002609
showard2bab8f42008-11-12 18:15:22 +00002610 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002611 return
mbligh36768f02008-02-22 18:28:33 +00002612
mblighf8c624d2008-07-03 16:58:45 +00002613 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002614 _db.execute(query, (value, self.id))
2615
showard2bab8f42008-11-12 18:15:22 +00002616 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002617
2618
jadmanski0afbb632008-06-06 21:10:57 +00002619 def save(self):
2620 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002621 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002622 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002623 values = []
2624 for key in keys:
2625 value = getattr(self, key)
2626 if value is None:
2627 values.append('NULL')
2628 else:
2629 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002630 values_str = ','.join(values)
2631 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2632 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002633 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002634 # Update our id to the one the database just assigned to us.
2635 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002636
2637
jadmanski0afbb632008-06-06 21:10:57 +00002638 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002639 self._instances_by_type_and_id.pop((type(self), id), None)
2640 self._initialized = False
2641 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002642 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2643 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002644
2645
showard63a34772008-08-18 19:32:50 +00002646 @staticmethod
2647 def _prefix_with(string, prefix):
2648 if string:
2649 string = prefix + string
2650 return string
2651
2652
jadmanski0afbb632008-06-06 21:10:57 +00002653 @classmethod
showard989f25d2008-10-01 11:38:11 +00002654 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002655 """
2656 Construct instances of our class based on the given database query.
2657
2658 @yields One class instance for each row fetched.
2659 """
showard63a34772008-08-18 19:32:50 +00002660 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2661 where = cls._prefix_with(where, 'WHERE ')
2662 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002663 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002664 'joins' : joins,
2665 'where' : where,
2666 'order_by' : order_by})
2667 rows = _db.execute(query, params)
showard8cc058f2009-09-08 16:26:33 +00002668 return [cls(id=row[0], row=row) for row in rows]
mblighe2586682008-02-29 22:45:46 +00002669
mbligh36768f02008-02-22 18:28:33 +00002670
2671class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002672 _table_name = 'ineligible_host_queues'
2673 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002674
2675
showard89f84db2009-03-12 20:39:13 +00002676class AtomicGroup(DBObject):
2677 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002678 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2679 'invalid')
showard89f84db2009-03-12 20:39:13 +00002680
2681
showard989f25d2008-10-01 11:38:11 +00002682class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002683 _table_name = 'labels'
2684 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002685 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002686
2687
showard6157c632009-07-06 20:19:31 +00002688 def __repr__(self):
2689 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
2690 self.name, self.id, self.atomic_group_id)
2691
2692
mbligh36768f02008-02-22 18:28:33 +00002693class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002694 _table_name = 'hosts'
2695 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2696 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2697
2698
jadmanski0afbb632008-06-06 21:10:57 +00002699 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002700 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002701 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002702
2703
showard170873e2009-01-07 00:22:26 +00002704 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002705 """
showard170873e2009-01-07 00:22:26 +00002706 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002707 """
2708 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002709 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002710 FROM labels
2711 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002712 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002713 ORDER BY labels.name
2714 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002715 platform = None
2716 all_labels = []
2717 for label_name, is_platform in rows:
2718 if is_platform:
2719 platform = label_name
2720 all_labels.append(label_name)
2721 return platform, all_labels
2722
2723
showard54c1ea92009-05-20 00:32:58 +00002724 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2725
2726
2727 @classmethod
2728 def cmp_for_sort(cls, a, b):
2729 """
2730 A comparison function for sorting Host objects by hostname.
2731
2732 This strips any trailing numeric digits, ignores leading 0s and
2733 compares hostnames by the leading name and the trailing digits as a
2734 number. If both hostnames do not match this pattern, they are simply
2735 compared as lower case strings.
2736
2737 Example of how hostnames will be sorted:
2738
2739 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2740
2741 This hopefully satisfy most people's hostname sorting needs regardless
2742 of their exact naming schemes. Nobody sane should have both a host10
2743 and host010 (but the algorithm works regardless).
2744 """
2745 lower_a = a.hostname.lower()
2746 lower_b = b.hostname.lower()
2747 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2748 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2749 if match_a and match_b:
2750 name_a, number_a_str = match_a.groups()
2751 name_b, number_b_str = match_b.groups()
2752 number_a = int(number_a_str.lstrip('0'))
2753 number_b = int(number_b_str.lstrip('0'))
2754 result = cmp((name_a, number_a), (name_b, number_b))
2755 if result == 0 and lower_a != lower_b:
2756 # If they compared equal above but the lower case names are
2757 # indeed different, don't report equality. abc012 != abc12.
2758 return cmp(lower_a, lower_b)
2759 return result
2760 else:
2761 return cmp(lower_a, lower_b)
2762
2763
mbligh36768f02008-02-22 18:28:33 +00002764class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002765 _table_name = 'host_queue_entries'
2766 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002767 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002768 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002769
2770
showarda3c58572009-03-12 20:36:59 +00002771 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002772 assert id or row
showarda3c58572009-03-12 20:36:59 +00002773 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002774 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002775
jadmanski0afbb632008-06-06 21:10:57 +00002776 if self.host_id:
2777 self.host = Host(self.host_id)
2778 else:
2779 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002780
showard77182562009-06-10 00:16:05 +00002781 if self.atomic_group_id:
2782 self.atomic_group = AtomicGroup(self.atomic_group_id,
2783 always_query=False)
2784 else:
2785 self.atomic_group = None
2786
showard170873e2009-01-07 00:22:26 +00002787 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002788 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002789
2790
showard89f84db2009-03-12 20:39:13 +00002791 @classmethod
2792 def clone(cls, template):
2793 """
2794 Creates a new row using the values from a template instance.
2795
2796 The new instance will not exist in the database or have a valid
2797 id attribute until its save() method is called.
2798 """
2799 assert isinstance(template, cls)
2800 new_row = [getattr(template, field) for field in cls._fields]
2801 clone = cls(row=new_row, new_record=True)
2802 clone.id = None
2803 return clone
2804
2805
showardc85c21b2008-11-24 22:17:37 +00002806 def _view_job_url(self):
2807 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2808
2809
showardf1ae3542009-05-11 19:26:02 +00002810 def get_labels(self):
2811 """
2812 Get all labels associated with this host queue entry (either via the
2813 meta_host or as a job dependency label). The labels yielded are not
2814 guaranteed to be unique.
2815
2816 @yields Label instances associated with this host_queue_entry.
2817 """
2818 if self.meta_host:
2819 yield Label(id=self.meta_host, always_query=False)
2820 labels = Label.fetch(
2821 joins="JOIN jobs_dependency_labels AS deps "
2822 "ON (labels.id = deps.label_id)",
2823 where="deps.job_id = %d" % self.job.id)
2824 for label in labels:
2825 yield label
2826
2827
jadmanski0afbb632008-06-06 21:10:57 +00002828 def set_host(self, host):
2829 if host:
2830 self.queue_log_record('Assigning host ' + host.hostname)
2831 self.update_field('host_id', host.id)
2832 self.update_field('active', True)
2833 self.block_host(host.id)
2834 else:
2835 self.queue_log_record('Releasing host')
2836 self.unblock_host(self.host.id)
2837 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002838
jadmanski0afbb632008-06-06 21:10:57 +00002839 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002840
2841
jadmanski0afbb632008-06-06 21:10:57 +00002842 def queue_log_record(self, log_line):
2843 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002844 _drone_manager.write_lines_to_file(self.queue_log_path,
2845 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002846
2847
jadmanski0afbb632008-06-06 21:10:57 +00002848 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002849 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002850 row = [0, self.job.id, host_id]
2851 block = IneligibleHostQueue(row=row, new_record=True)
2852 block.save()
mblighe2586682008-02-29 22:45:46 +00002853
2854
jadmanski0afbb632008-06-06 21:10:57 +00002855 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002856 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002857 blocks = IneligibleHostQueue.fetch(
2858 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2859 for block in blocks:
2860 block.delete()
mblighe2586682008-02-29 22:45:46 +00002861
2862
showard2bab8f42008-11-12 18:15:22 +00002863 def set_execution_subdir(self, subdir=None):
2864 if subdir is None:
showarda9545c02009-12-18 22:44:26 +00002865 assert self.host
2866 subdir = self.host.hostname
showard2bab8f42008-11-12 18:15:22 +00002867 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002868
2869
showard6355f6b2008-12-05 18:52:13 +00002870 def _get_hostname(self):
2871 if self.host:
2872 return self.host.hostname
2873 return 'no host'
2874
2875
showard170873e2009-01-07 00:22:26 +00002876 def __str__(self):
showard828fc4c2009-09-14 20:31:00 +00002877 flags = []
2878 if self.active:
2879 flags.append('active')
2880 if self.complete:
2881 flags.append('complete')
2882 if self.deleted:
2883 flags.append('deleted')
2884 if self.aborted:
2885 flags.append('aborted')
2886 flags_str = ','.join(flags)
2887 if flags_str:
2888 flags_str = ' [%s]' % flags_str
2889 return "%s/%d (%d) %s%s" % (self._get_hostname(), self.job.id, self.id,
2890 self.status, flags_str)
showard170873e2009-01-07 00:22:26 +00002891
2892
jadmanski0afbb632008-06-06 21:10:57 +00002893 def set_status(self, status):
showard56824072009-10-12 20:30:21 +00002894 logging.info("%s -> %s", self, status)
mblighf8c624d2008-07-03 16:58:45 +00002895
showard56824072009-10-12 20:30:21 +00002896 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002897
showard8cc058f2009-09-08 16:26:33 +00002898 if status in (models.HostQueueEntry.Status.QUEUED,
2899 models.HostQueueEntry.Status.PARSING):
jadmanski0afbb632008-06-06 21:10:57 +00002900 self.update_field('complete', False)
2901 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002902
showard8cc058f2009-09-08 16:26:33 +00002903 if status in (models.HostQueueEntry.Status.PENDING,
2904 models.HostQueueEntry.Status.RUNNING,
2905 models.HostQueueEntry.Status.VERIFYING,
2906 models.HostQueueEntry.Status.STARTING,
2907 models.HostQueueEntry.Status.GATHERING):
jadmanski0afbb632008-06-06 21:10:57 +00002908 self.update_field('complete', False)
2909 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002910
showard8cc058f2009-09-08 16:26:33 +00002911 if status in (models.HostQueueEntry.Status.FAILED,
2912 models.HostQueueEntry.Status.COMPLETED,
2913 models.HostQueueEntry.Status.STOPPED,
2914 models.HostQueueEntry.Status.ABORTED):
jadmanski0afbb632008-06-06 21:10:57 +00002915 self.update_field('complete', True)
2916 self.update_field('active', False)
showardf85a0b72009-10-07 20:48:45 +00002917 self._on_complete()
showardc85c21b2008-11-24 22:17:37 +00002918
2919 should_email_status = (status.lower() in _notify_email_statuses or
2920 'all' in _notify_email_statuses)
2921 if should_email_status:
2922 self._email_on_status(status)
2923
2924 self._email_on_job_complete()
2925
2926
showardf85a0b72009-10-07 20:48:45 +00002927 def _on_complete(self):
showardd1195652009-12-08 22:21:02 +00002928 self.job.stop_if_necessary()
showardf85a0b72009-10-07 20:48:45 +00002929 if not self.execution_subdir:
2930 return
2931 # unregister any possible pidfiles associated with this queue entry
2932 for pidfile_name in _ALL_PIDFILE_NAMES:
2933 pidfile_id = _drone_manager.get_pidfile_id_from(
2934 self.execution_path(), pidfile_name=pidfile_name)
2935 _drone_manager.unregister_pidfile(pidfile_id)
2936
2937
showardc85c21b2008-11-24 22:17:37 +00002938 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002939 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002940
2941 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2942 self.job.id, self.job.name, hostname, status)
2943 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2944 self.job.id, self.job.name, hostname, status,
2945 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002946 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002947
2948
2949 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002950 if not self.job.is_finished():
2951 return
showard542e8402008-09-19 20:16:18 +00002952
showardc85c21b2008-11-24 22:17:37 +00002953 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002954 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002955 for queue_entry in hosts_queue:
2956 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002957 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002958 queue_entry.status))
2959
2960 summary_text = "\n".join(summary_text)
2961 status_counts = models.Job.objects.get_status_counts(
2962 [self.job.id])[self.job.id]
2963 status = ', '.join('%d %s' % (count, status) for status, count
2964 in status_counts.iteritems())
2965
2966 subject = 'Autotest: Job ID: %s "%s" %s' % (
2967 self.job.id, self.job.name, status)
2968 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2969 self.job.id, self.job.name, status, self._view_job_url(),
2970 summary_text)
showard170873e2009-01-07 00:22:26 +00002971 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002972
2973
showard8cc058f2009-09-08 16:26:33 +00002974 def schedule_pre_job_tasks(self, assigned_host=None):
showard77182562009-06-10 00:16:05 +00002975 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002976 assert assigned_host
2977 # ensure results dir exists for the queue log
showard08356c12009-06-15 20:24:01 +00002978 if self.host_id is None:
2979 self.set_host(assigned_host)
2980 else:
2981 assert assigned_host.id == self.host_id
mbligh36768f02008-02-22 18:28:33 +00002982
showard2ca64c92009-12-10 21:41:02 +00002983 logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",
showardb18134f2009-03-20 20:52:18 +00002984 self.job.name, self.meta_host, self.atomic_group_id,
showard2ca64c92009-12-10 21:41:02 +00002985 self.job.id, self.id, self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002986
showard8cc058f2009-09-08 16:26:33 +00002987 self._do_schedule_pre_job_tasks()
showard77182562009-06-10 00:16:05 +00002988
2989
showard8cc058f2009-09-08 16:26:33 +00002990 def _do_schedule_pre_job_tasks(self):
showard77182562009-06-10 00:16:05 +00002991 # Every host goes thru the Verifying stage (which may or may not
2992 # actually do anything as determined by get_pre_job_tasks).
2993 self.set_status(models.HostQueueEntry.Status.VERIFYING)
showard8cc058f2009-09-08 16:26:33 +00002994 self.job.schedule_pre_job_tasks(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002995
showard6ae5ea92009-02-25 00:11:51 +00002996
jadmanski0afbb632008-06-06 21:10:57 +00002997 def requeue(self):
showardcfd4a7e2009-07-11 01:47:33 +00002998 assert self.host
showard8cc058f2009-09-08 16:26:33 +00002999 self.set_status(models.HostQueueEntry.Status.QUEUED)
showard12f3e322009-05-13 21:27:42 +00003000 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00003001 # verify/cleanup failure sets the execution subdir, so reset it here
3002 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00003003 if self.meta_host:
3004 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00003005
3006
jadmanskif7fa2cc2008-10-01 14:13:23 +00003007 @property
3008 def aborted_by(self):
3009 self._load_abort_info()
3010 return self._aborted_by
3011
3012
3013 @property
3014 def aborted_on(self):
3015 self._load_abort_info()
3016 return self._aborted_on
3017
3018
3019 def _load_abort_info(self):
3020 """ Fetch info about who aborted the job. """
3021 if hasattr(self, "_aborted_by"):
3022 return
3023 rows = _db.execute("""
3024 SELECT users.login, aborted_host_queue_entries.aborted_on
3025 FROM aborted_host_queue_entries
3026 INNER JOIN users
3027 ON users.id = aborted_host_queue_entries.aborted_by_id
3028 WHERE aborted_host_queue_entries.queue_entry_id = %s
3029 """, (self.id,))
3030 if rows:
3031 self._aborted_by, self._aborted_on = rows[0]
3032 else:
3033 self._aborted_by = self._aborted_on = None
3034
3035
showardb2e2c322008-10-14 17:33:55 +00003036 def on_pending(self):
3037 """
3038 Called when an entry in a synchronous job has passed verify. If the
showard8cc058f2009-09-08 16:26:33 +00003039 job is ready to run, sets the entries to STARTING. Otherwise, it leaves
3040 them in PENDING.
showardb2e2c322008-10-14 17:33:55 +00003041 """
showard8cc058f2009-09-08 16:26:33 +00003042 self.set_status(models.HostQueueEntry.Status.PENDING)
3043 self.host.set_status(models.Host.Status.PENDING)
showardb000a8d2009-07-28 20:02:07 +00003044
3045 # Some debug code here: sends an email if an asynchronous job does not
3046 # immediately enter Starting.
3047 # TODO: Remove this once we figure out why asynchronous jobs are getting
3048 # stuck in Pending.
showard8cc058f2009-09-08 16:26:33 +00003049 self.job.run_if_ready(queue_entry=self)
3050 if (self.job.synch_count == 1 and
3051 self.status == models.HostQueueEntry.Status.PENDING):
showardb000a8d2009-07-28 20:02:07 +00003052 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
3053 message = 'Asynchronous job stuck in Pending'
3054 email_manager.manager.enqueue_notify_email(subject, message)
showardb2e2c322008-10-14 17:33:55 +00003055
3056
showardd3dc1992009-04-22 21:01:40 +00003057 def abort(self, dispatcher):
3058 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00003059
showardd3dc1992009-04-22 21:01:40 +00003060 Status = models.HostQueueEntry.Status
showard8cc058f2009-09-08 16:26:33 +00003061 if self.status in (Status.GATHERING, Status.PARSING):
showardd3dc1992009-04-22 21:01:40 +00003062 # do nothing; post-job tasks will finish and then mark this entry
3063 # with status "Aborted" and take care of the host
3064 return
3065
showard8cc058f2009-09-08 16:26:33 +00003066 if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING):
3067 assert not dispatcher.get_agents_for_entry(self)
showardd3dc1992009-04-22 21:01:40 +00003068 self.host.set_status(models.Host.Status.READY)
3069 elif self.status == Status.VERIFYING:
showard8cc058f2009-09-08 16:26:33 +00003070 models.SpecialTask.objects.create(
3071 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +00003072 host=models.Host.objects.get(id=self.host.id),
3073 requested_by=self.job.owner_model())
showardd3dc1992009-04-22 21:01:40 +00003074
3075 self.set_status(Status.ABORTED)
showardd2014822009-10-12 20:26:58 +00003076 self.job.abort_delay_ready_task()
showard170873e2009-01-07 00:22:26 +00003077
showard8cc058f2009-09-08 16:26:33 +00003078
3079 def get_group_name(self):
3080 atomic_group = self.atomic_group
3081 if not atomic_group:
3082 return ''
3083
3084 # Look at any meta_host and dependency labels and pick the first
3085 # one that also specifies this atomic group. Use that label name
3086 # as the group name if possible (it is more specific).
3087 for label in self.get_labels():
3088 if label.atomic_group_id:
3089 assert label.atomic_group_id == atomic_group.id
3090 return label.name
3091 return atomic_group.name
3092
3093
showard170873e2009-01-07 00:22:26 +00003094 def execution_tag(self):
3095 assert self.execution_subdir
mblighe7d9c602009-07-02 19:02:33 +00003096 return "%s/%s" % (self.job.tag(), self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00003097
3098
showarded2afea2009-07-07 20:54:07 +00003099 def execution_path(self):
3100 return self.execution_tag()
3101
3102
showarda9545c02009-12-18 22:44:26 +00003103 def set_started_on_now(self):
3104 self.update_field('started_on', datetime.datetime.now())
3105
3106
3107 def is_hostless(self):
3108 return (self.host_id is None
3109 and self.meta_host is None
3110 and self.atomic_group_id is None)
3111
3112
mbligh36768f02008-02-22 18:28:33 +00003113class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00003114 _table_name = 'jobs'
3115 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
3116 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00003117 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00003118 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00003119
showard77182562009-06-10 00:16:05 +00003120 # This does not need to be a column in the DB. The delays are likely to
3121 # be configured short. If the scheduler is stopped and restarted in
3122 # the middle of a job's delay cycle, the delay cycle will either be
3123 # repeated or skipped depending on the number of Pending machines found
3124 # when the restarted scheduler recovers to track it. Not a problem.
3125 #
3126 # A reference to the DelayedCallTask that will wake up the job should
3127 # no other HQEs change state in time. Its end_time attribute is used
3128 # by our run_with_ready_delay() method to determine if the wait is over.
3129 _delay_ready_task = None
3130
3131 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
3132 # all status='Pending' atomic group HQEs incase a delay was running when the
3133 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00003134
showarda3c58572009-03-12 20:36:59 +00003135 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00003136 assert id or row
showarda3c58572009-03-12 20:36:59 +00003137 super(Job, self).__init__(id=id, row=row, **kwargs)
showard9bb960b2009-11-19 01:02:11 +00003138 self._owner_model = None # caches model instance of owner
3139
3140
3141 def owner_model(self):
3142 # work around the fact that the Job owner field is a string, not a
3143 # foreign key
3144 if not self._owner_model:
3145 self._owner_model = models.User.objects.get(login=self.owner)
3146 return self._owner_model
mbligh36768f02008-02-22 18:28:33 +00003147
mblighe2586682008-02-29 22:45:46 +00003148
jadmanski0afbb632008-06-06 21:10:57 +00003149 def is_server_job(self):
3150 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00003151
3152
showard170873e2009-01-07 00:22:26 +00003153 def tag(self):
3154 return "%s-%s" % (self.id, self.owner)
3155
3156
jadmanski0afbb632008-06-06 21:10:57 +00003157 def get_host_queue_entries(self):
3158 rows = _db.execute("""
3159 SELECT * FROM host_queue_entries
3160 WHERE job_id= %s
3161 """, (self.id,))
3162 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00003163
jadmanski0afbb632008-06-06 21:10:57 +00003164 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00003165
jadmanski0afbb632008-06-06 21:10:57 +00003166 return entries
mbligh36768f02008-02-22 18:28:33 +00003167
3168
jadmanski0afbb632008-06-06 21:10:57 +00003169 def set_status(self, status, update_queues=False):
3170 self.update_field('status',status)
3171
3172 if update_queues:
3173 for queue_entry in self.get_host_queue_entries():
3174 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00003175
3176
showard77182562009-06-10 00:16:05 +00003177 def _atomic_and_has_started(self):
3178 """
3179 @returns True if any of the HostQueueEntries associated with this job
3180 have entered the Status.STARTING state or beyond.
3181 """
3182 atomic_entries = models.HostQueueEntry.objects.filter(
3183 job=self.id, atomic_group__isnull=False)
3184 if atomic_entries.count() <= 0:
3185 return False
3186
showardaf8b4ca2009-06-16 18:47:26 +00003187 # These states may *only* be reached if Job.run() has been called.
3188 started_statuses = (models.HostQueueEntry.Status.STARTING,
3189 models.HostQueueEntry.Status.RUNNING,
3190 models.HostQueueEntry.Status.COMPLETED)
3191
3192 started_entries = atomic_entries.filter(status__in=started_statuses)
showard77182562009-06-10 00:16:05 +00003193 return started_entries.count() > 0
3194
3195
showard708b3522009-08-20 23:26:15 +00003196 def _hosts_assigned_count(self):
3197 """The number of HostQueueEntries assigned a Host for this job."""
3198 entries = models.HostQueueEntry.objects.filter(job=self.id,
3199 host__isnull=False)
3200 return entries.count()
3201
3202
showard77182562009-06-10 00:16:05 +00003203 def _pending_count(self):
3204 """The number of HostQueueEntries for this job in the Pending state."""
3205 pending_entries = models.HostQueueEntry.objects.filter(
3206 job=self.id, status=models.HostQueueEntry.Status.PENDING)
3207 return pending_entries.count()
3208
3209
showardd07a5f32009-12-07 19:36:20 +00003210 def _max_hosts_needed_to_run(self, atomic_group):
showardd2014822009-10-12 20:26:58 +00003211 """
3212 @param atomic_group: The AtomicGroup associated with this job that we
showardd07a5f32009-12-07 19:36:20 +00003213 are using to set an upper bound on the threshold.
3214 @returns The maximum number of HostQueueEntries assigned a Host before
showardd2014822009-10-12 20:26:58 +00003215 this job can run.
3216 """
3217 return min(self._hosts_assigned_count(),
3218 atomic_group.max_number_of_machines)
3219
3220
showardd07a5f32009-12-07 19:36:20 +00003221 def _min_hosts_needed_to_run(self):
3222 """Return the minumum number of hsots needed to run this job."""
3223 return self.synch_count
3224
3225
jadmanski0afbb632008-06-06 21:10:57 +00003226 def is_ready(self):
showard77182562009-06-10 00:16:05 +00003227 # NOTE: Atomic group jobs stop reporting ready after they have been
3228 # started to avoid launching multiple copies of one atomic job.
3229 # Only possible if synch_count is less than than half the number of
3230 # machines in the atomic group.
showardb000a8d2009-07-28 20:02:07 +00003231 pending_count = self._pending_count()
3232 atomic_and_has_started = self._atomic_and_has_started()
3233 ready = (pending_count >= self.synch_count
showardd2014822009-10-12 20:26:58 +00003234 and not atomic_and_has_started)
showardb000a8d2009-07-28 20:02:07 +00003235
3236 if not ready:
3237 logging.info(
3238 'Job %s not ready: %s pending, %s required '
3239 '(Atomic and started: %s)',
3240 self, pending_count, self.synch_count,
3241 atomic_and_has_started)
3242
3243 return ready
mbligh36768f02008-02-22 18:28:33 +00003244
3245
jadmanski0afbb632008-06-06 21:10:57 +00003246 def num_machines(self, clause = None):
3247 sql = "job_id=%s" % self.id
3248 if clause:
3249 sql += " AND (%s)" % clause
3250 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00003251
3252
jadmanski0afbb632008-06-06 21:10:57 +00003253 def num_queued(self):
3254 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00003255
3256
jadmanski0afbb632008-06-06 21:10:57 +00003257 def num_active(self):
3258 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00003259
3260
jadmanski0afbb632008-06-06 21:10:57 +00003261 def num_complete(self):
3262 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00003263
3264
jadmanski0afbb632008-06-06 21:10:57 +00003265 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00003266 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00003267
mbligh36768f02008-02-22 18:28:33 +00003268
showard6bb7c292009-01-30 01:44:51 +00003269 def _not_yet_run_entries(self, include_verifying=True):
3270 statuses = [models.HostQueueEntry.Status.QUEUED,
3271 models.HostQueueEntry.Status.PENDING]
3272 if include_verifying:
3273 statuses.append(models.HostQueueEntry.Status.VERIFYING)
3274 return models.HostQueueEntry.objects.filter(job=self.id,
3275 status__in=statuses)
3276
3277
3278 def _stop_all_entries(self):
3279 entries_to_stop = self._not_yet_run_entries(
3280 include_verifying=False)
3281 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00003282 assert not child_entry.complete, (
3283 '%s status=%s, active=%s, complete=%s' %
3284 (child_entry.id, child_entry.status, child_entry.active,
3285 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00003286 if child_entry.status == models.HostQueueEntry.Status.PENDING:
3287 child_entry.host.status = models.Host.Status.READY
3288 child_entry.host.save()
3289 child_entry.status = models.HostQueueEntry.Status.STOPPED
3290 child_entry.save()
3291
showard2bab8f42008-11-12 18:15:22 +00003292 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00003293 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00003294 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00003295 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00003296
3297
jadmanski0afbb632008-06-06 21:10:57 +00003298 def write_to_machines_file(self, queue_entry):
showarda9545c02009-12-18 22:44:26 +00003299 hostname = queue_entry.host.hostname
showard170873e2009-01-07 00:22:26 +00003300 file_path = os.path.join(self.tag(), '.machines')
3301 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00003302
3303
showardf1ae3542009-05-11 19:26:02 +00003304 def _next_group_name(self, group_name=''):
3305 """@returns a directory name to use for the next host group results."""
3306 if group_name:
3307 # Sanitize for use as a pathname.
3308 group_name = group_name.replace(os.path.sep, '_')
3309 if group_name.startswith('.'):
3310 group_name = '_' + group_name[1:]
3311 # Add a separator between the group name and 'group%d'.
3312 group_name += '.'
3313 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00003314 query = models.HostQueueEntry.objects.filter(
3315 job=self.id).values('execution_subdir').distinct()
3316 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00003317 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
3318 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00003319 if ids:
3320 next_id = max(ids) + 1
3321 else:
3322 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00003323 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00003324
3325
showarddb502762009-09-09 15:31:20 +00003326 def _write_control_file(self, execution_path):
showard170873e2009-01-07 00:22:26 +00003327 control_path = _drone_manager.attach_file_to_execution(
showarddb502762009-09-09 15:31:20 +00003328 execution_path, self.control_file)
showard170873e2009-01-07 00:22:26 +00003329 return control_path
mbligh36768f02008-02-22 18:28:33 +00003330
showardb2e2c322008-10-14 17:33:55 +00003331
showard2bab8f42008-11-12 18:15:22 +00003332 def get_group_entries(self, queue_entry_from_group):
showard8375ce02009-10-12 20:35:13 +00003333 """
3334 @param queue_entry_from_group: A HostQueueEntry instance to find other
3335 group entries on this job for.
3336
3337 @returns A list of HostQueueEntry objects all executing this job as
3338 part of the same group as the one supplied (having the same
3339 execution_subdir).
3340 """
showard2bab8f42008-11-12 18:15:22 +00003341 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00003342 return list(HostQueueEntry.fetch(
3343 where='job_id=%s AND execution_subdir=%s',
3344 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00003345
3346
showard8cc058f2009-09-08 16:26:33 +00003347 def get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00003348 assert queue_entries
showarddb502762009-09-09 15:31:20 +00003349 execution_path = queue_entries[0].execution_path()
3350 control_path = self._write_control_file(execution_path)
showarda9545c02009-12-18 22:44:26 +00003351 hostnames = ','.join(entry.host.hostname
3352 for entry in queue_entries
3353 if not entry.is_hostless())
mbligh36768f02008-02-22 18:28:33 +00003354
showarddb502762009-09-09 15:31:20 +00003355 execution_tag = queue_entries[0].execution_tag()
showard87ba02a2009-04-20 19:37:32 +00003356 params = _autoserv_command_line(
showarded2afea2009-07-07 20:54:07 +00003357 hostnames,
showard87ba02a2009-04-20 19:37:32 +00003358 ['-P', execution_tag, '-n',
3359 _drone_manager.absolute_path(control_path)],
showarde9c69362009-06-30 01:58:03 +00003360 job=self, verbose=False)
mbligh36768f02008-02-22 18:28:33 +00003361
jadmanski0afbb632008-06-06 21:10:57 +00003362 if not self.is_server_job():
3363 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00003364
showardb2e2c322008-10-14 17:33:55 +00003365 return params
mblighe2586682008-02-29 22:45:46 +00003366
mbligh36768f02008-02-22 18:28:33 +00003367
showardc9ae1782009-01-30 01:42:37 +00003368 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00003369 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00003370 return True
showard0fc38302008-10-23 00:44:07 +00003371 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showarda9545c02009-12-18 22:44:26 +00003372 return queue_entry.host.dirty
showardc9ae1782009-01-30 01:42:37 +00003373 return False
showard21baa452008-10-21 00:08:39 +00003374
showardc9ae1782009-01-30 01:42:37 +00003375
showard8cc058f2009-09-08 16:26:33 +00003376 def _should_run_verify(self, queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003377 do_not_verify = (queue_entry.host.protection ==
3378 host_protections.Protection.DO_NOT_VERIFY)
3379 if do_not_verify:
3380 return False
3381 return self.run_verify
3382
3383
showard8cc058f2009-09-08 16:26:33 +00003384 def schedule_pre_job_tasks(self, queue_entry):
showard77182562009-06-10 00:16:05 +00003385 """
3386 Get a list of tasks to perform before the host_queue_entry
3387 may be used to run this Job (such as Cleanup & Verify).
3388
3389 @returns A list of tasks to be done to the given queue_entry before
mbligh6fbdb802009-08-03 16:42:55 +00003390 it should be considered be ready to run this job. The last
showard77182562009-06-10 00:16:05 +00003391 task in the list calls HostQueueEntry.on_pending(), which
3392 continues the flow of the job.
3393 """
showardc9ae1782009-01-30 01:42:37 +00003394 if self._should_run_cleanup(queue_entry):
showard8cc058f2009-09-08 16:26:33 +00003395 task = models.SpecialTask.Task.CLEANUP
3396 elif self._should_run_verify(queue_entry):
3397 task = models.SpecialTask.Task.VERIFY
3398 else:
3399 queue_entry.on_pending()
3400 return
3401
showard9bb960b2009-11-19 01:02:11 +00003402 queue_entry = models.HostQueueEntry.objects.get(id=queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00003403 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00003404 host=models.Host.objects.get(id=queue_entry.host_id),
3405 queue_entry=queue_entry, task=task)
showard21baa452008-10-21 00:08:39 +00003406
3407
showardf1ae3542009-05-11 19:26:02 +00003408 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00003409 if len(queue_entries) == 1:
showarda9545c02009-12-18 22:44:26 +00003410 group_subdir_name = queue_entries[0].host.hostname
showard2bab8f42008-11-12 18:15:22 +00003411 else:
showardf1ae3542009-05-11 19:26:02 +00003412 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00003413 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00003414 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00003415 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003416
3417 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00003418 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003419
3420
3421 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00003422 """
3423 @returns A tuple containing a list of HostQueueEntry instances to be
3424 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00003425 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00003426 """
showard77182562009-06-10 00:16:05 +00003427 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00003428 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00003429 if atomic_group:
3430 num_entries_wanted = atomic_group.max_number_of_machines
3431 else:
3432 num_entries_wanted = self.synch_count
3433 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00003434
showardf1ae3542009-05-11 19:26:02 +00003435 if num_entries_wanted > 0:
3436 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00003437 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00003438 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00003439 params=(self.id, include_queue_entry.id)))
3440
3441 # Sort the chosen hosts by hostname before slicing.
3442 def cmp_queue_entries_by_hostname(entry_a, entry_b):
3443 return Host.cmp_for_sort(entry_a.host, entry_b.host)
3444 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
3445 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00003446
showardf1ae3542009-05-11 19:26:02 +00003447 # Sanity check. We'll only ever be called if this can be met.
showard828fc4c2009-09-14 20:31:00 +00003448 if len(chosen_entries) < self.synch_count:
3449 message = ('job %s got less than %s chosen entries: %s' % (
3450 self.id, self.synch_count, chosen_entries))
3451 logging.error(message)
3452 email_manager.manager.enqueue_notify_email(
3453 'Job not started, too few chosen entries', message)
3454 return []
showardf1ae3542009-05-11 19:26:02 +00003455
showard8cc058f2009-09-08 16:26:33 +00003456 group_name = include_queue_entry.get_group_name()
showardf1ae3542009-05-11 19:26:02 +00003457
3458 self._assign_new_group(chosen_entries, group_name=group_name)
showard8cc058f2009-09-08 16:26:33 +00003459 return chosen_entries
showard2bab8f42008-11-12 18:15:22 +00003460
3461
showard77182562009-06-10 00:16:05 +00003462 def run_if_ready(self, queue_entry):
3463 """
showard8375ce02009-10-12 20:35:13 +00003464 Run this job by kicking its HQEs into status='Starting' if enough
3465 hosts are ready for it to run.
3466
3467 Cleans up by kicking HQEs into status='Stopped' if this Job is not
3468 ready to run.
showard77182562009-06-10 00:16:05 +00003469 """
showardb2e2c322008-10-14 17:33:55 +00003470 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00003471 self.stop_if_necessary()
showard8cc058f2009-09-08 16:26:33 +00003472 elif queue_entry.atomic_group:
3473 self.run_with_ready_delay(queue_entry)
3474 else:
3475 self.run(queue_entry)
showard77182562009-06-10 00:16:05 +00003476
3477
3478 def run_with_ready_delay(self, queue_entry):
3479 """
3480 Start a delay to wait for more hosts to enter Pending state before
3481 launching an atomic group job. Once set, the a delay cannot be reset.
3482
3483 @param queue_entry: The HostQueueEntry object to get atomic group
3484 info from and pass to run_if_ready when the delay is up.
3485
3486 @returns An Agent to run the job as appropriate or None if a delay
3487 has already been set.
3488 """
3489 assert queue_entry.job_id == self.id
3490 assert queue_entry.atomic_group
3491 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
showardd2014822009-10-12 20:26:58 +00003492 over_max_threshold = (self._pending_count() >=
showardd07a5f32009-12-07 19:36:20 +00003493 self._max_hosts_needed_to_run(queue_entry.atomic_group))
showard77182562009-06-10 00:16:05 +00003494 delay_expired = (self._delay_ready_task and
3495 time.time() >= self._delay_ready_task.end_time)
3496
3497 # Delay is disabled or we already have enough? Do not wait to run.
3498 if not delay or over_max_threshold or delay_expired:
showard8cc058f2009-09-08 16:26:33 +00003499 self.run(queue_entry)
3500 else:
3501 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
showard77182562009-06-10 00:16:05 +00003502
showard8cc058f2009-09-08 16:26:33 +00003503
showardd07a5f32009-12-07 19:36:20 +00003504 def request_abort(self):
3505 """Request that this Job be aborted on the next scheduler cycle."""
3506 queue_entries = HostQueueEntry.fetch(where="job_id=%s" % self.id)
3507 for hqe in queue_entries:
3508 hqe.update_field('aborted', True)
3509
3510
showard8cc058f2009-09-08 16:26:33 +00003511 def schedule_delayed_callback_task(self, queue_entry):
3512 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
3513
showard77182562009-06-10 00:16:05 +00003514 if self._delay_ready_task:
3515 return None
3516
showard8cc058f2009-09-08 16:26:33 +00003517 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
3518
showard77182562009-06-10 00:16:05 +00003519 def run_job_after_delay():
showardd2014822009-10-12 20:26:58 +00003520 logging.info('Job %s done waiting for extra hosts.', self)
3521 # Check to see if the job is still relevant. It could have aborted
3522 # while we were waiting or hosts could have disappearred, etc.
showardd07a5f32009-12-07 19:36:20 +00003523 if self._pending_count() < self._min_hosts_needed_to_run():
showardd2014822009-10-12 20:26:58 +00003524 logging.info('Job %s had too few Pending hosts after waiting '
3525 'for extras. Not running.', self)
showardd07a5f32009-12-07 19:36:20 +00003526 self.request_abort()
showardd2014822009-10-12 20:26:58 +00003527 return
showard77182562009-06-10 00:16:05 +00003528 return self.run(queue_entry)
3529
showard708b3522009-08-20 23:26:15 +00003530 logging.info('Job %s waiting up to %s seconds for more hosts.',
3531 self.id, delay)
showard77182562009-06-10 00:16:05 +00003532 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3533 callback=run_job_after_delay)
showard8cc058f2009-09-08 16:26:33 +00003534 return self._delay_ready_task
showard77182562009-06-10 00:16:05 +00003535
3536
3537 def run(self, queue_entry):
3538 """
3539 @param queue_entry: The HostQueueEntry instance calling this method.
showard77182562009-06-10 00:16:05 +00003540 """
3541 if queue_entry.atomic_group and self._atomic_and_has_started():
3542 logging.error('Job.run() called on running atomic Job %d '
3543 'with HQE %s.', self.id, queue_entry)
showard8cc058f2009-09-08 16:26:33 +00003544 return
3545 queue_entries = self._choose_group_to_run(queue_entry)
showard828fc4c2009-09-14 20:31:00 +00003546 if queue_entries:
3547 self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00003548
3549
showard8cc058f2009-09-08 16:26:33 +00003550 def _finish_run(self, queue_entries):
showardb2ccdda2008-10-28 20:39:05 +00003551 for queue_entry in queue_entries:
showard8cc058f2009-09-08 16:26:33 +00003552 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showardd2014822009-10-12 20:26:58 +00003553 self.abort_delay_ready_task()
3554
3555
3556 def abort_delay_ready_task(self):
3557 """Abort the delayed task associated with this job, if any."""
showard77182562009-06-10 00:16:05 +00003558 if self._delay_ready_task:
3559 # Cancel any pending callback that would try to run again
3560 # as we are already running.
3561 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003562
showardd2014822009-10-12 20:26:58 +00003563
showardb000a8d2009-07-28 20:02:07 +00003564 def __str__(self):
3565 return '%s-%s' % (self.id, self.owner)
3566
3567
mbligh36768f02008-02-22 18:28:33 +00003568if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003569 main()