blob: 6330b893997d13c2254d232b052f9961ac83be04 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showard136e6dc2009-06-10 19:38:49 +000010import itertools, logging, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard043c62a2009-06-10 19:48:57 +000013from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard136e6dc2009-06-10 19:38:49 +000015from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000016from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000017from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000018from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000019from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000020from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000021from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000022
showard549afad2009-08-20 23:33:36 +000023BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
24PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000025
mbligh36768f02008-02-22 18:28:33 +000026RESULTS_DIR = '.'
27AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000028DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000029AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
30
31if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000032 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000033AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
34AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
35
36if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000037 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000038
showardd3dc1992009-04-22 21:01:40 +000039_AUTOSERV_PID_FILE = '.autoserv_execute'
40_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
41_PARSER_PID_FILE = '.parser_execute'
42
43_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
44 _PARSER_PID_FILE)
45
showard35162b02009-03-03 02:17:30 +000046# error message to leave in results dir when an autoserv process disappears
47# mysteriously
48_LOST_PROCESS_ERROR = """\
49Autoserv failed abnormally during execution for this job, probably due to a
50system error on the Autotest server. Full results may not be available. Sorry.
51"""
52
mbligh6f8bab42008-02-29 22:45:14 +000053_db = None
mbligh36768f02008-02-22 18:28:33 +000054_shutdown = False
showard170873e2009-01-07 00:22:26 +000055_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
56_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000057_testing_mode = False
showard542e8402008-09-19 20:16:18 +000058_base_url = None
showardc85c21b2008-11-24 22:17:37 +000059_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000060_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000061
62
showardec6a3b92009-09-25 20:29:13 +000063def _get_pidfile_timeout_secs():
64 """@returns How long to wait for autoserv to write pidfile."""
65 pidfile_timeout_mins = global_config.global_config.get_config_value(
66 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
67 return pidfile_timeout_mins * 60
68
69
mbligh83c1e9e2009-05-01 23:10:41 +000070def _site_init_monitor_db_dummy():
71 return {}
72
73
mbligh36768f02008-02-22 18:28:33 +000074def main():
showard27f33872009-04-07 18:20:53 +000075 try:
showard549afad2009-08-20 23:33:36 +000076 try:
77 main_without_exception_handling()
78 except SystemExit:
79 raise
80 except:
81 logging.exception('Exception escaping in monitor_db')
82 raise
83 finally:
84 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000085
86
87def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000088 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000089
showard136e6dc2009-06-10 19:38:49 +000090 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000091 parser = optparse.OptionParser(usage)
92 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
93 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000094 parser.add_option('--test', help='Indicate that scheduler is under ' +
95 'test and should use dummy autoserv and no parsing',
96 action='store_true')
97 (options, args) = parser.parse_args()
98 if len(args) != 1:
99 parser.print_usage()
100 return
mbligh36768f02008-02-22 18:28:33 +0000101
showard5613c662009-06-08 23:30:33 +0000102 scheduler_enabled = global_config.global_config.get_config_value(
103 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
104
105 if not scheduler_enabled:
106 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
107 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000108 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000109 sys.exit(1)
110
jadmanski0afbb632008-06-06 21:10:57 +0000111 global RESULTS_DIR
112 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000113
mbligh83c1e9e2009-05-01 23:10:41 +0000114 site_init = utils.import_site_function(__file__,
115 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
116 _site_init_monitor_db_dummy)
117 site_init()
118
showardcca334f2009-03-12 20:38:34 +0000119 # Change the cwd while running to avoid issues incase we were launched from
120 # somewhere odd (such as a random NFS home directory of the person running
121 # sudo to launch us as the appropriate user).
122 os.chdir(RESULTS_DIR)
123
jadmanski0afbb632008-06-06 21:10:57 +0000124 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000125 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
126 "notify_email_statuses",
127 default='')
showardc85c21b2008-11-24 22:17:37 +0000128 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000129 _notify_email_statuses = [status for status in
130 re.split(r'[\s,;:]', notify_statuses_list.lower())
131 if status]
showardc85c21b2008-11-24 22:17:37 +0000132
jadmanski0afbb632008-06-06 21:10:57 +0000133 if options.test:
134 global _autoserv_path
135 _autoserv_path = 'autoserv_dummy'
136 global _testing_mode
137 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000138
mbligh37eceaa2008-12-15 22:56:37 +0000139 # AUTOTEST_WEB.base_url is still a supported config option as some people
140 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000141 global _base_url
showard170873e2009-01-07 00:22:26 +0000142 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
143 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000144 if config_base_url:
145 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000146 else:
mbligh37eceaa2008-12-15 22:56:37 +0000147 # For the common case of everything running on a single server you
148 # can just set the hostname in a single place in the config file.
149 server_name = c.get_config_value('SERVER', 'hostname')
150 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000151 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000152 sys.exit(1)
153 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000154
showardc5afc462009-01-13 00:09:39 +0000155 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000156 server.start()
157
jadmanski0afbb632008-06-06 21:10:57 +0000158 try:
showard136e6dc2009-06-10 19:38:49 +0000159 init()
showardc5afc462009-01-13 00:09:39 +0000160 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000161 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000162
jadmanski0afbb632008-06-06 21:10:57 +0000163 while not _shutdown:
164 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000165 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000166 except:
showard170873e2009-01-07 00:22:26 +0000167 email_manager.manager.log_stacktrace(
168 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000169
showard170873e2009-01-07 00:22:26 +0000170 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000171 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000172 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000173 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000174
175
showard136e6dc2009-06-10 19:38:49 +0000176def setup_logging():
177 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
178 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
179 logging_manager.configure_logging(
180 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
181 logfile_name=log_name)
182
183
mbligh36768f02008-02-22 18:28:33 +0000184def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000185 global _shutdown
186 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000187 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000188
189
showard136e6dc2009-06-10 19:38:49 +0000190def init():
showardb18134f2009-03-20 20:52:18 +0000191 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
192 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000193
showard8de37132009-08-31 18:33:08 +0000194 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000195 logging.critical("monitor_db already running, aborting!")
196 sys.exit(1)
197 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000198
showardb1e51872008-10-07 11:08:18 +0000199 if _testing_mode:
200 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000201 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000202
jadmanski0afbb632008-06-06 21:10:57 +0000203 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
204 global _db
showard170873e2009-01-07 00:22:26 +0000205 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000206 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000207
showardfa8629c2008-11-04 16:51:23 +0000208 # ensure Django connection is in autocommit
209 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000210 # bypass the readonly connection
211 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000212
showardb18134f2009-03-20 20:52:18 +0000213 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000214 signal.signal(signal.SIGINT, handle_sigint)
215
showardd1ee1dd2009-01-07 21:33:08 +0000216 drones = global_config.global_config.get_config_value(
217 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
218 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000219 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000220 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000221 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
222
showardb18134f2009-03-20 20:52:18 +0000223 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000224
225
showarded2afea2009-07-07 20:54:07 +0000226def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
227 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000228 """
229 @returns The autoserv command line as a list of executable + parameters.
230
231 @param machines - string - A machine or comma separated list of machines
232 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000233 @param extra_args - list - Additional arguments to pass to autoserv.
234 @param job - Job object - If supplied, -u owner and -l name parameters
235 will be added.
236 @param queue_entry - A HostQueueEntry object - If supplied and no Job
237 object was supplied, this will be used to lookup the Job object.
238 """
showarda9545c02009-12-18 22:44:26 +0000239 autoserv_argv = [_autoserv_path, '-p',
showarded2afea2009-07-07 20:54:07 +0000240 '-r', drone_manager.WORKING_DIRECTORY]
showarda9545c02009-12-18 22:44:26 +0000241 if machines:
242 autoserv_argv += ['-m', machines]
showard87ba02a2009-04-20 19:37:32 +0000243 if job or queue_entry:
244 if not job:
245 job = queue_entry.job
246 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000247 if verbose:
248 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000249 return autoserv_argv + extra_args
250
251
showard89f84db2009-03-12 20:39:13 +0000252class SchedulerError(Exception):
253 """Raised by HostScheduler when an inconsistent state occurs."""
254
255
showard63a34772008-08-18 19:32:50 +0000256class HostScheduler(object):
257 def _get_ready_hosts(self):
258 # avoid any host with a currently active queue entry against it
259 hosts = Host.fetch(
260 joins='LEFT JOIN host_queue_entries AS active_hqe '
261 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000262 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000263 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000264 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000265 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
266 return dict((host.id, host) for host in hosts)
267
268
269 @staticmethod
270 def _get_sql_id_list(id_list):
271 return ','.join(str(item_id) for item_id in id_list)
272
273
274 @classmethod
showard989f25d2008-10-01 11:38:11 +0000275 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000276 if not id_list:
277 return {}
showard63a34772008-08-18 19:32:50 +0000278 query %= cls._get_sql_id_list(id_list)
279 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000280 return cls._process_many2many_dict(rows, flip)
281
282
283 @staticmethod
284 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000285 result = {}
286 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000287 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000288 if flip:
289 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000290 result.setdefault(left_id, set()).add(right_id)
291 return result
292
293
294 @classmethod
295 def _get_job_acl_groups(cls, job_ids):
296 query = """
showardd9ac4452009-02-07 02:04:37 +0000297 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000298 FROM jobs
299 INNER JOIN users ON users.login = jobs.owner
300 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
301 WHERE jobs.id IN (%s)
302 """
303 return cls._get_many2many_dict(query, job_ids)
304
305
306 @classmethod
307 def _get_job_ineligible_hosts(cls, job_ids):
308 query = """
309 SELECT job_id, host_id
310 FROM ineligible_host_queues
311 WHERE job_id IN (%s)
312 """
313 return cls._get_many2many_dict(query, job_ids)
314
315
316 @classmethod
showard989f25d2008-10-01 11:38:11 +0000317 def _get_job_dependencies(cls, job_ids):
318 query = """
319 SELECT job_id, label_id
320 FROM jobs_dependency_labels
321 WHERE job_id IN (%s)
322 """
323 return cls._get_many2many_dict(query, job_ids)
324
325
326 @classmethod
showard63a34772008-08-18 19:32:50 +0000327 def _get_host_acls(cls, host_ids):
328 query = """
showardd9ac4452009-02-07 02:04:37 +0000329 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000330 FROM acl_groups_hosts
331 WHERE host_id IN (%s)
332 """
333 return cls._get_many2many_dict(query, host_ids)
334
335
336 @classmethod
337 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000338 if not host_ids:
339 return {}, {}
showard63a34772008-08-18 19:32:50 +0000340 query = """
341 SELECT label_id, host_id
342 FROM hosts_labels
343 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000344 """ % cls._get_sql_id_list(host_ids)
345 rows = _db.execute(query)
346 labels_to_hosts = cls._process_many2many_dict(rows)
347 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
348 return labels_to_hosts, hosts_to_labels
349
350
351 @classmethod
352 def _get_labels(cls):
353 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000354
355
356 def refresh(self, pending_queue_entries):
357 self._hosts_available = self._get_ready_hosts()
358
359 relevant_jobs = [queue_entry.job_id
360 for queue_entry in pending_queue_entries]
361 self._job_acls = self._get_job_acl_groups(relevant_jobs)
362 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000363 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000364
365 host_ids = self._hosts_available.keys()
366 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000367 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
368
369 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000370
371
372 def _is_acl_accessible(self, host_id, queue_entry):
373 job_acls = self._job_acls.get(queue_entry.job_id, set())
374 host_acls = self._host_acls.get(host_id, set())
375 return len(host_acls.intersection(job_acls)) > 0
376
377
showard989f25d2008-10-01 11:38:11 +0000378 def _check_job_dependencies(self, job_dependencies, host_labels):
379 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000380 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000381
382
383 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
384 queue_entry):
showardade14e22009-01-26 22:38:32 +0000385 if not queue_entry.meta_host:
386 # bypass only_if_needed labels when a specific host is selected
387 return True
388
showard989f25d2008-10-01 11:38:11 +0000389 for label_id in host_labels:
390 label = self._labels[label_id]
391 if not label.only_if_needed:
392 # we don't care about non-only_if_needed labels
393 continue
394 if queue_entry.meta_host == label_id:
395 # if the label was requested in a metahost it's OK
396 continue
397 if label_id not in job_dependencies:
398 return False
399 return True
400
401
showard89f84db2009-03-12 20:39:13 +0000402 def _check_atomic_group_labels(self, host_labels, queue_entry):
403 """
404 Determine if the given HostQueueEntry's atomic group settings are okay
405 to schedule on a host with the given labels.
406
showard6157c632009-07-06 20:19:31 +0000407 @param host_labels: A list of label ids that the host has.
408 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000409
410 @returns True if atomic group settings are okay, False otherwise.
411 """
showard6157c632009-07-06 20:19:31 +0000412 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000413 queue_entry.atomic_group_id)
414
415
showard6157c632009-07-06 20:19:31 +0000416 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000417 """
418 Return the atomic group label id for a host with the given set of
419 labels if any, or None otherwise. Raises an exception if more than
420 one atomic group are found in the set of labels.
421
showard6157c632009-07-06 20:19:31 +0000422 @param host_labels: A list of label ids that the host has.
423 @param queue_entry: The HostQueueEntry we're testing. Only used for
424 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000425
426 @returns The id of the atomic group found on a label in host_labels
427 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000428 """
showard6157c632009-07-06 20:19:31 +0000429 atomic_labels = [self._labels[label_id] for label_id in host_labels
430 if self._labels[label_id].atomic_group_id is not None]
431 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000432 if not atomic_ids:
433 return None
434 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000435 logging.error('More than one Atomic Group on HQE "%s" via: %r',
436 queue_entry, atomic_labels)
437 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000438
439
440 def _get_atomic_group_labels(self, atomic_group_id):
441 """
442 Lookup the label ids that an atomic_group is associated with.
443
444 @param atomic_group_id - The id of the AtomicGroup to look up.
445
446 @returns A generator yeilding Label ids for this atomic group.
447 """
448 return (id for id, label in self._labels.iteritems()
449 if label.atomic_group_id == atomic_group_id
450 and not label.invalid)
451
452
showard54c1ea92009-05-20 00:32:58 +0000453 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000454 """
455 @param group_hosts - A sequence of Host ids to test for usability
456 and eligibility against the Job associated with queue_entry.
457 @param queue_entry - The HostQueueEntry that these hosts are being
458 tested for eligibility against.
459
460 @returns A subset of group_hosts Host ids that are eligible for the
461 supplied queue_entry.
462 """
463 return set(host_id for host_id in group_hosts
464 if self._is_host_usable(host_id)
465 and self._is_host_eligible_for_job(host_id, queue_entry))
466
467
showard989f25d2008-10-01 11:38:11 +0000468 def _is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000469 if self._is_host_invalid(host_id):
470 # if an invalid host is scheduled for a job, it's a one-time host
471 # and it therefore bypasses eligibility checks. note this can only
472 # happen for non-metahosts, because invalid hosts have their label
473 # relationships cleared.
474 return True
475
showard989f25d2008-10-01 11:38:11 +0000476 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
477 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000478
showard89f84db2009-03-12 20:39:13 +0000479 return (self._is_acl_accessible(host_id, queue_entry) and
480 self._check_job_dependencies(job_dependencies, host_labels) and
481 self._check_only_if_needed_labels(
482 job_dependencies, host_labels, queue_entry) and
483 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000484
485
showard2924b0a2009-06-18 23:16:15 +0000486 def _is_host_invalid(self, host_id):
487 host_object = self._hosts_available.get(host_id, None)
488 return host_object and host_object.invalid
489
490
showard63a34772008-08-18 19:32:50 +0000491 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000492 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000493 return None
494 return self._hosts_available.pop(queue_entry.host_id, None)
495
496
497 def _is_host_usable(self, host_id):
498 if host_id not in self._hosts_available:
499 # host was already used during this scheduling cycle
500 return False
501 if self._hosts_available[host_id].invalid:
502 # Invalid hosts cannot be used for metahosts. They're included in
503 # the original query because they can be used by non-metahosts.
504 return False
505 return True
506
507
508 def _schedule_metahost(self, queue_entry):
509 label_id = queue_entry.meta_host
510 hosts_in_label = self._label_hosts.get(label_id, set())
511 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
512 set())
513
514 # must iterate over a copy so we can mutate the original while iterating
515 for host_id in list(hosts_in_label):
516 if not self._is_host_usable(host_id):
517 hosts_in_label.remove(host_id)
518 continue
519 if host_id in ineligible_host_ids:
520 continue
showard989f25d2008-10-01 11:38:11 +0000521 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000522 continue
523
showard89f84db2009-03-12 20:39:13 +0000524 # Remove the host from our cached internal state before returning
525 # the host object.
showard63a34772008-08-18 19:32:50 +0000526 hosts_in_label.remove(host_id)
527 return self._hosts_available.pop(host_id)
528 return None
529
530
531 def find_eligible_host(self, queue_entry):
532 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000533 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000534 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000535 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000536 return self._schedule_metahost(queue_entry)
537
538
showard89f84db2009-03-12 20:39:13 +0000539 def find_eligible_atomic_group(self, queue_entry):
540 """
541 Given an atomic group host queue entry, locate an appropriate group
542 of hosts for the associated job to run on.
543
544 The caller is responsible for creating new HQEs for the additional
545 hosts returned in order to run the actual job on them.
546
547 @returns A list of Host instances in a ready state to satisfy this
548 atomic group scheduling. Hosts will all belong to the same
549 atomic group label as specified by the queue_entry.
550 An empty list will be returned if no suitable atomic
551 group could be found.
552
553 TODO(gps): what is responsible for kicking off any attempted repairs on
554 a group of hosts? not this function, but something needs to. We do
555 not communicate that reason for returning [] outside of here...
556 For now, we'll just be unschedulable if enough hosts within one group
557 enter Repair Failed state.
558 """
559 assert queue_entry.atomic_group_id is not None
560 job = queue_entry.job
561 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000562 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000563 if job.synch_count > atomic_group.max_number_of_machines:
564 # Such a Job and HostQueueEntry should never be possible to
565 # create using the frontend. Regardless, we can't process it.
566 # Abort it immediately and log an error on the scheduler.
567 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000568 logging.error(
569 'Error: job %d synch_count=%d > requested atomic_group %d '
570 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
571 job.id, job.synch_count, atomic_group.id,
572 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000573 return []
574 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
575 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
576 set())
577
578 # Look in each label associated with atomic_group until we find one with
579 # enough hosts to satisfy the job.
580 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
581 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
582 if queue_entry.meta_host is not None:
583 # If we have a metahost label, only allow its hosts.
584 group_hosts.intersection_update(hosts_in_label)
585 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000586 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000587 group_hosts, queue_entry)
588
589 # Job.synch_count is treated as "minimum synch count" when
590 # scheduling for an atomic group of hosts. The atomic group
591 # number of machines is the maximum to pick out of a single
592 # atomic group label for scheduling at one time.
593 min_hosts = job.synch_count
594 max_hosts = atomic_group.max_number_of_machines
595
showard54c1ea92009-05-20 00:32:58 +0000596 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000597 # Not enough eligible hosts in this atomic group label.
598 continue
599
showard54c1ea92009-05-20 00:32:58 +0000600 eligible_hosts_in_group = [self._hosts_available[id]
601 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000602 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000603 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000604
showard89f84db2009-03-12 20:39:13 +0000605 # Limit ourselves to scheduling the atomic group size.
606 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000607 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000608
609 # Remove the selected hosts from our cached internal state
610 # of available hosts in order to return the Host objects.
611 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000612 for host in eligible_hosts_in_group:
613 hosts_in_label.discard(host.id)
614 self._hosts_available.pop(host.id)
615 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000616 return host_list
617
618 return []
619
620
showard170873e2009-01-07 00:22:26 +0000621class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000622 def __init__(self):
623 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000624 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000625 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000626 user_cleanup_time = scheduler_config.config.clean_interval
627 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
628 _db, user_cleanup_time)
629 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000630 self._host_agents = {}
631 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000632
mbligh36768f02008-02-22 18:28:33 +0000633
showard915958d2009-04-22 21:00:58 +0000634 def initialize(self, recover_hosts=True):
635 self._periodic_cleanup.initialize()
636 self._24hr_upkeep.initialize()
637
jadmanski0afbb632008-06-06 21:10:57 +0000638 # always recover processes
639 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000640
jadmanski0afbb632008-06-06 21:10:57 +0000641 if recover_hosts:
642 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000643
644
jadmanski0afbb632008-06-06 21:10:57 +0000645 def tick(self):
showard170873e2009-01-07 00:22:26 +0000646 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000647 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000648 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000649 self._process_recurring_runs()
showard8cc058f2009-09-08 16:26:33 +0000650 self._schedule_delay_tasks()
showard8cc058f2009-09-08 16:26:33 +0000651 self._schedule_running_host_queue_entries()
652 self._schedule_special_tasks()
showard65db3932009-10-28 19:54:35 +0000653 self._schedule_new_jobs()
jadmanski0afbb632008-06-06 21:10:57 +0000654 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000655 _drone_manager.execute_actions()
656 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000657
showard97aed502008-11-04 02:01:24 +0000658
mblighf3294cc2009-04-08 21:17:38 +0000659 def _run_cleanup(self):
660 self._periodic_cleanup.run_cleanup_maybe()
661 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000662
mbligh36768f02008-02-22 18:28:33 +0000663
showard170873e2009-01-07 00:22:26 +0000664 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
665 for object_id in object_ids:
666 agent_dict.setdefault(object_id, set()).add(agent)
667
668
669 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
670 for object_id in object_ids:
671 assert object_id in agent_dict
672 agent_dict[object_id].remove(agent)
673
674
showardd1195652009-12-08 22:21:02 +0000675 def add_agent_task(self, agent_task):
676 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000677 self._agents.append(agent)
678 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000679 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
680 self._register_agent_for_ids(self._queue_entry_agents,
681 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000682
showard170873e2009-01-07 00:22:26 +0000683
684 def get_agents_for_entry(self, queue_entry):
685 """
686 Find agents corresponding to the specified queue_entry.
687 """
showardd3dc1992009-04-22 21:01:40 +0000688 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000689
690
691 def host_has_agent(self, host):
692 """
693 Determine if there is currently an Agent present using this host.
694 """
695 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000696
697
jadmanski0afbb632008-06-06 21:10:57 +0000698 def remove_agent(self, agent):
699 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000700 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
701 agent)
702 self._unregister_agent_for_ids(self._queue_entry_agents,
703 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000704
705
showard8cc058f2009-09-08 16:26:33 +0000706 def _host_has_scheduled_special_task(self, host):
707 return bool(models.SpecialTask.objects.filter(host__id=host.id,
708 is_active=False,
709 is_complete=False))
710
711
jadmanski0afbb632008-06-06 21:10:57 +0000712 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000713 agent_tasks = self._create_recovery_agent_tasks()
714 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000715 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000716 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000717 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000718 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000719 self._reverify_remaining_hosts()
720 # reinitialize drones after killing orphaned processes, since they can
721 # leave around files when they die
722 _drone_manager.execute_actions()
723 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000724
showard170873e2009-01-07 00:22:26 +0000725
showardd1195652009-12-08 22:21:02 +0000726 def _create_recovery_agent_tasks(self):
727 return (self._get_queue_entry_agent_tasks()
728 + self._get_special_task_agent_tasks(is_active=True))
729
730
731 def _get_queue_entry_agent_tasks(self):
732 # host queue entry statuses handled directly by AgentTasks (Verifying is
733 # handled through SpecialTasks, so is not listed here)
734 statuses = (models.HostQueueEntry.Status.STARTING,
735 models.HostQueueEntry.Status.RUNNING,
736 models.HostQueueEntry.Status.GATHERING,
737 models.HostQueueEntry.Status.PARSING)
738 status_list = ','.join("'%s'" % status for status in statuses)
showard170873e2009-01-07 00:22:26 +0000739 queue_entries = HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000740 where='status IN (%s)' % status_list)
741
742 agent_tasks = []
743 used_queue_entries = set()
744 for entry in queue_entries:
745 if self.get_agents_for_entry(entry):
746 # already being handled
747 continue
748 if entry in used_queue_entries:
749 # already picked up by a synchronous job
750 continue
751 agent_task = self._get_agent_task_for_queue_entry(entry)
752 agent_tasks.append(agent_task)
753 used_queue_entries.update(agent_task.queue_entries)
754 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000755
756
showardd1195652009-12-08 22:21:02 +0000757 def _get_special_task_agent_tasks(self, is_active=False):
758 special_tasks = models.SpecialTask.objects.filter(
759 is_active=is_active, is_complete=False)
760 return [self._get_agent_task_for_special_task(task)
761 for task in special_tasks]
762
763
764 def _get_agent_task_for_queue_entry(self, queue_entry):
765 """
766 Construct an AgentTask instance for the given active HostQueueEntry,
767 if one can currently run it.
768 @param queue_entry: a HostQueueEntry
769 @returns an AgentTask to run the queue entry
770 """
771 task_entries = queue_entry.job.get_group_entries(queue_entry)
772 self._check_for_duplicate_host_entries(task_entries)
773
774 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
775 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000776 if queue_entry.is_hostless():
777 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000778 return QueueTask(queue_entries=task_entries)
779 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
780 return GatherLogsTask(queue_entries=task_entries)
781 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
782 return FinalReparseTask(queue_entries=task_entries)
783
784 raise SchedulerError('_get_agent_task_for_queue_entry got entry with '
785 'invalid status %s: %s' % (entry.status, entry))
786
787
788 def _check_for_duplicate_host_entries(self, task_entries):
showarda9545c02009-12-18 22:44:26 +0000789 parsing_status = models.HostQueueEntry.Status.PARSING
showardd1195652009-12-08 22:21:02 +0000790 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000791 using_host = (task_entry.host is not None
792 and task_entry.status != parsing_status)
793 if using_host:
showardd1195652009-12-08 22:21:02 +0000794 self._assert_host_has_no_agent(task_entry)
795
796
797 def _assert_host_has_no_agent(self, entry):
798 """
799 @param entry: a HostQueueEntry or a SpecialTask
800 """
801 if self.host_has_agent(entry.host):
802 agent = tuple(self._host_agents.get(entry.host.id))[0]
803 raise SchedulerError(
804 'While scheduling %s, host %s already has a host agent %s'
805 % (entry, entry.host, agent.task))
806
807
808 def _get_agent_task_for_special_task(self, special_task):
809 """
810 Construct an AgentTask class to run the given SpecialTask and add it
811 to this dispatcher.
812 @param special_task: a models.SpecialTask instance
813 @returns an AgentTask to run this SpecialTask
814 """
815 self._assert_host_has_no_agent(special_task)
816
817 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
818 for agent_task_class in special_agent_task_classes:
819 if agent_task_class.TASK_TYPE == special_task.task:
820 return agent_task_class(task=special_task)
821
822 raise SchedulerError('No AgentTask class for task', str(special_task))
823
824
825 def _register_pidfiles(self, agent_tasks):
826 for agent_task in agent_tasks:
827 agent_task.register_necessary_pidfiles()
828
829
830 def _recover_tasks(self, agent_tasks):
831 orphans = _drone_manager.get_orphaned_autoserv_processes()
832
833 for agent_task in agent_tasks:
834 agent_task.recover()
835 if agent_task.monitor and agent_task.monitor.has_process():
836 orphans.discard(agent_task.monitor.get_process())
837 self.add_agent_task(agent_task)
838
839 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000840
841
showard8cc058f2009-09-08 16:26:33 +0000842 def _get_unassigned_entries(self, status):
843 for entry in HostQueueEntry.fetch(where="status = '%s'" % status):
showard0db3d432009-10-12 20:29:15 +0000844 if entry.status == status and not self.get_agents_for_entry(entry):
845 # The status can change during iteration, e.g., if job.run()
846 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000847 yield entry
848
849
showard6878e8b2009-07-20 22:37:45 +0000850 def _check_for_remaining_orphan_processes(self, orphans):
851 if not orphans:
852 return
853 subject = 'Unrecovered orphan autoserv processes remain'
854 message = '\n'.join(str(process) for process in orphans)
855 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000856
857 die_on_orphans = global_config.global_config.get_config_value(
858 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
859
860 if die_on_orphans:
861 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000862
showard170873e2009-01-07 00:22:26 +0000863
showard8cc058f2009-09-08 16:26:33 +0000864 def _recover_pending_entries(self):
865 for entry in self._get_unassigned_entries(
866 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000867 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000868 entry.on_pending()
869
870
showardb8900452009-10-12 20:31:01 +0000871 def _check_for_unrecovered_verifying_entries(self):
showard170873e2009-01-07 00:22:26 +0000872 queue_entries = HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000873 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
874 unrecovered_hqes = []
875 for queue_entry in queue_entries:
876 special_tasks = models.SpecialTask.objects.filter(
877 task__in=(models.SpecialTask.Task.CLEANUP,
878 models.SpecialTask.Task.VERIFY),
879 queue_entry__id=queue_entry.id,
880 is_complete=False)
881 if special_tasks.count() == 0:
882 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000883
showardb8900452009-10-12 20:31:01 +0000884 if unrecovered_hqes:
885 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
showarde8e37072009-08-20 23:31:30 +0000886 raise SchedulerError(
showard37757f32009-10-19 18:34:24 +0000887 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000888 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000889
890
showard65db3932009-10-28 19:54:35 +0000891 def _get_prioritized_special_tasks(self):
892 """
893 Returns all queued SpecialTasks prioritized for repair first, then
894 cleanup, then verify.
895 """
896 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
897 is_complete=False,
898 host__locked=False)
899 # exclude hosts with active queue entries unless the SpecialTask is for
900 # that queue entry
901 queued_tasks = models.Host.objects.add_join(
902 queued_tasks, 'host_queue_entries', 'host_id',
903 join_condition='host_queue_entries.active',
904 force_left_join=True)
905 queued_tasks = queued_tasks.extra(
906 where=['(host_queue_entries.id IS NULL OR '
907 'host_queue_entries.id = special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000908
showard65db3932009-10-28 19:54:35 +0000909 # reorder tasks by priority
910 task_priority_order = [models.SpecialTask.Task.REPAIR,
911 models.SpecialTask.Task.CLEANUP,
912 models.SpecialTask.Task.VERIFY]
913 def task_priority_key(task):
914 return task_priority_order.index(task.task)
915 return sorted(queued_tasks, key=task_priority_key)
916
917
showard65db3932009-10-28 19:54:35 +0000918 def _schedule_special_tasks(self):
919 """
920 Execute queued SpecialTasks that are ready to run on idle hosts.
921 """
922 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000923 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000924 continue
showardd1195652009-12-08 22:21:02 +0000925 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000926
927
showard170873e2009-01-07 00:22:26 +0000928 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000929 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000930 # should never happen
showarded2afea2009-07-07 20:54:07 +0000931 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000932 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000933 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000934 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000935 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000936
937
jadmanski0afbb632008-06-06 21:10:57 +0000938 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000939 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +0000940 full_where='locked = 0 AND invalid = 0 AND ' + where
941 for host in Host.fetch(where=full_where):
942 if self.host_has_agent(host):
943 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000944 continue
showard8cc058f2009-09-08 16:26:33 +0000945 if self._host_has_scheduled_special_task(host):
946 # host will have a special task scheduled on the next cycle
947 continue
showard170873e2009-01-07 00:22:26 +0000948 if print_message:
showardb18134f2009-03-20 20:52:18 +0000949 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000950 models.SpecialTask.objects.create(
951 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000952 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000953
954
jadmanski0afbb632008-06-06 21:10:57 +0000955 def _recover_hosts(self):
956 # recover "Repair Failed" hosts
957 message = 'Reverifying dead host %s'
958 self._reverify_hosts_where("status = 'Repair Failed'",
959 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000960
961
showard04c82c52008-05-29 19:38:12 +0000962
showardb95b1bd2008-08-15 18:11:04 +0000963 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000964 # prioritize by job priority, then non-metahost over metahost, then FIFO
965 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000966 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000967 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000968 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000969
970
showard89f84db2009-03-12 20:39:13 +0000971 def _refresh_pending_queue_entries(self):
972 """
973 Lookup the pending HostQueueEntries and call our HostScheduler
974 refresh() method given that list. Return the list.
975
976 @returns A list of pending HostQueueEntries sorted in priority order.
977 """
showard63a34772008-08-18 19:32:50 +0000978 queue_entries = self._get_pending_queue_entries()
979 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000980 return []
showardb95b1bd2008-08-15 18:11:04 +0000981
showard63a34772008-08-18 19:32:50 +0000982 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000983
showard89f84db2009-03-12 20:39:13 +0000984 return queue_entries
985
986
987 def _schedule_atomic_group(self, queue_entry):
988 """
989 Schedule the given queue_entry on an atomic group of hosts.
990
991 Returns immediately if there are insufficient available hosts.
992
993 Creates new HostQueueEntries based off of queue_entry for the
994 scheduled hosts and starts them all running.
995 """
996 # This is a virtual host queue entry representing an entire
997 # atomic group, find a group and schedule their hosts.
998 group_hosts = self._host_scheduler.find_eligible_atomic_group(
999 queue_entry)
1000 if not group_hosts:
1001 return
showardcbe6f942009-06-17 19:33:49 +00001002
1003 logging.info('Expanding atomic group entry %s with hosts %s',
1004 queue_entry,
1005 ', '.join(host.hostname for host in group_hosts))
showard89f84db2009-03-12 20:39:13 +00001006 # The first assigned host uses the original HostQueueEntry
1007 group_queue_entries = [queue_entry]
1008 for assigned_host in group_hosts[1:]:
1009 # Create a new HQE for every additional assigned_host.
1010 new_hqe = HostQueueEntry.clone(queue_entry)
1011 new_hqe.save()
1012 group_queue_entries.append(new_hqe)
1013 assert len(group_queue_entries) == len(group_hosts)
1014 for queue_entry, host in itertools.izip(group_queue_entries,
1015 group_hosts):
1016 self._run_queue_entry(queue_entry, host)
1017
1018
showarda9545c02009-12-18 22:44:26 +00001019 def _schedule_hostless_job(self, queue_entry):
1020 self.add_agent_task(HostlessQueueTask(queue_entry))
1021
1022
showard89f84db2009-03-12 20:39:13 +00001023 def _schedule_new_jobs(self):
1024 queue_entries = self._refresh_pending_queue_entries()
1025 if not queue_entries:
1026 return
1027
showard63a34772008-08-18 19:32:50 +00001028 for queue_entry in queue_entries:
showarde55955f2009-10-07 20:48:58 +00001029 is_unassigned_atomic_group = (
1030 queue_entry.atomic_group_id is not None
1031 and queue_entry.host_id is None)
1032 if is_unassigned_atomic_group:
1033 self._schedule_atomic_group(queue_entry)
showarda9545c02009-12-18 22:44:26 +00001034 elif queue_entry.is_hostless():
1035 self._schedule_hostless_job(queue_entry)
showarde55955f2009-10-07 20:48:58 +00001036 else:
showard89f84db2009-03-12 20:39:13 +00001037 assigned_host = self._host_scheduler.find_eligible_host(
1038 queue_entry)
showard65db3932009-10-28 19:54:35 +00001039 if assigned_host and not self.host_has_agent(assigned_host):
showard89f84db2009-03-12 20:39:13 +00001040 self._run_queue_entry(queue_entry, assigned_host)
showardb95b1bd2008-08-15 18:11:04 +00001041
1042
showard8cc058f2009-09-08 16:26:33 +00001043 def _schedule_running_host_queue_entries(self):
showardd1195652009-12-08 22:21:02 +00001044 for agent_task in self._get_queue_entry_agent_tasks():
1045 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +00001046
1047
1048 def _schedule_delay_tasks(self):
showardd2014822009-10-12 20:26:58 +00001049 for entry in HostQueueEntry.fetch(where='status = "%s"' %
1050 models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +00001051 task = entry.job.schedule_delayed_callback_task(entry)
1052 if task:
showardd1195652009-12-08 22:21:02 +00001053 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +00001054
1055
showardb95b1bd2008-08-15 18:11:04 +00001056 def _run_queue_entry(self, queue_entry, host):
showard8cc058f2009-09-08 16:26:33 +00001057 queue_entry.schedule_pre_job_tasks(assigned_host=host)
mblighd5c95802008-03-05 00:33:46 +00001058
1059
jadmanski0afbb632008-06-06 21:10:57 +00001060 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +00001061 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001062 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001063 for agent in self.get_agents_for_entry(entry):
1064 agent.abort()
1065 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001066
1067
showard324bf812009-01-20 23:23:38 +00001068 def _can_start_agent(self, agent, num_started_this_cycle,
1069 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001070 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +00001071 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +00001072 return True
1073 # don't allow any nonzero-process agents to run after we've reached a
1074 # limit (this avoids starvation of many-process agents)
1075 if have_reached_limit:
1076 return False
1077 # total process throttling
showard9bb960b2009-11-19 01:02:11 +00001078 max_runnable_processes = _drone_manager.max_runnable_processes(
showardd1195652009-12-08 22:21:02 +00001079 agent.task.owner_username)
1080 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +00001081 return False
1082 # if a single agent exceeds the per-cycle throttling, still allow it to
1083 # run when it's the first agent in the cycle
1084 if num_started_this_cycle == 0:
1085 return True
1086 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +00001087 if (num_started_this_cycle + agent.task.num_processes >
1088 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001089 return False
1090 return True
1091
1092
jadmanski0afbb632008-06-06 21:10:57 +00001093 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001094 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001095 have_reached_limit = False
1096 # iterate over copy, so we can remove agents during iteration
1097 for agent in list(self._agents):
showard8cc058f2009-09-08 16:26:33 +00001098 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001099 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001100 have_reached_limit):
1101 have_reached_limit = True
1102 continue
showardd1195652009-12-08 22:21:02 +00001103 num_started_this_cycle += agent.task.num_processes
showard4c5374f2008-09-04 17:02:56 +00001104 agent.tick()
showard8cc058f2009-09-08 16:26:33 +00001105 if agent.is_done():
1106 logging.info("agent finished")
1107 self.remove_agent(agent)
showarda9435c02009-05-13 21:28:17 +00001108 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001109 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001110
1111
showard29f7cd22009-04-29 21:16:24 +00001112 def _process_recurring_runs(self):
1113 recurring_runs = models.RecurringRun.objects.filter(
1114 start_date__lte=datetime.datetime.now())
1115 for rrun in recurring_runs:
1116 # Create job from template
1117 job = rrun.job
1118 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001119 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001120
1121 host_objects = info['hosts']
1122 one_time_hosts = info['one_time_hosts']
1123 metahost_objects = info['meta_hosts']
1124 dependencies = info['dependencies']
1125 atomic_group = info['atomic_group']
1126
1127 for host in one_time_hosts or []:
1128 this_host = models.Host.create_one_time_host(host.hostname)
1129 host_objects.append(this_host)
1130
1131 try:
1132 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001133 options=options,
showard29f7cd22009-04-29 21:16:24 +00001134 host_objects=host_objects,
1135 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001136 atomic_group=atomic_group)
1137
1138 except Exception, ex:
1139 logging.exception(ex)
1140 #TODO send email
1141
1142 if rrun.loop_count == 1:
1143 rrun.delete()
1144 else:
1145 if rrun.loop_count != 0: # if not infinite loop
1146 # calculate new start_date
1147 difference = datetime.timedelta(seconds=rrun.loop_period)
1148 rrun.start_date = rrun.start_date + difference
1149 rrun.loop_count -= 1
1150 rrun.save()
1151
1152
showard170873e2009-01-07 00:22:26 +00001153class PidfileRunMonitor(object):
1154 """
1155 Client must call either run() to start a new process or
1156 attach_to_existing_process().
1157 """
mbligh36768f02008-02-22 18:28:33 +00001158
showard170873e2009-01-07 00:22:26 +00001159 class _PidfileException(Exception):
1160 """
1161 Raised when there's some unexpected behavior with the pid file, but only
1162 used internally (never allowed to escape this class).
1163 """
mbligh36768f02008-02-22 18:28:33 +00001164
1165
showard170873e2009-01-07 00:22:26 +00001166 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001167 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001168 self._start_time = None
1169 self.pidfile_id = None
1170 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001171
1172
showard170873e2009-01-07 00:22:26 +00001173 def _add_nice_command(self, command, nice_level):
1174 if not nice_level:
1175 return command
1176 return ['nice', '-n', str(nice_level)] + command
1177
1178
1179 def _set_start_time(self):
1180 self._start_time = time.time()
1181
1182
showard418785b2009-11-23 20:19:59 +00001183 def run(self, command, working_directory, num_processes, nice_level=None,
1184 log_file=None, pidfile_name=None, paired_with_pidfile=None,
1185 username=None):
showard170873e2009-01-07 00:22:26 +00001186 assert command is not None
1187 if nice_level is not None:
1188 command = ['nice', '-n', str(nice_level)] + command
1189 self._set_start_time()
1190 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001191 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +00001192 num_processes=num_processes, log_file=log_file,
1193 paired_with_pidfile=paired_with_pidfile, username=username)
showard170873e2009-01-07 00:22:26 +00001194
1195
showarded2afea2009-07-07 20:54:07 +00001196 def attach_to_existing_process(self, execution_path,
showardd1195652009-12-08 22:21:02 +00001197 pidfile_name=_AUTOSERV_PID_FILE,
1198 num_processes=None):
showard170873e2009-01-07 00:22:26 +00001199 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001200 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001201 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +00001202 if num_processes is not None:
1203 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +00001204
1205
jadmanski0afbb632008-06-06 21:10:57 +00001206 def kill(self):
showard170873e2009-01-07 00:22:26 +00001207 if self.has_process():
1208 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001209
mbligh36768f02008-02-22 18:28:33 +00001210
showard170873e2009-01-07 00:22:26 +00001211 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001212 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001213 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001214
1215
showard170873e2009-01-07 00:22:26 +00001216 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001217 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001218 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001219 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001220
1221
showard170873e2009-01-07 00:22:26 +00001222 def _read_pidfile(self, use_second_read=False):
1223 assert self.pidfile_id is not None, (
1224 'You must call run() or attach_to_existing_process()')
1225 contents = _drone_manager.get_pidfile_contents(
1226 self.pidfile_id, use_second_read=use_second_read)
1227 if contents.is_invalid():
1228 self._state = drone_manager.PidfileContents()
1229 raise self._PidfileException(contents)
1230 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001231
1232
showard21baa452008-10-21 00:08:39 +00001233 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001234 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1235 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001236 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001237 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001238
1239
1240 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001241 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001242 return
mblighbb421852008-03-11 22:36:16 +00001243
showard21baa452008-10-21 00:08:39 +00001244 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001245
showard170873e2009-01-07 00:22:26 +00001246 if self._state.process is None:
1247 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001248 return
mbligh90a549d2008-03-25 23:52:34 +00001249
showard21baa452008-10-21 00:08:39 +00001250 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001251 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001252 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001253 return
mbligh90a549d2008-03-25 23:52:34 +00001254
showard170873e2009-01-07 00:22:26 +00001255 # pid but no running process - maybe process *just* exited
1256 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001257 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001258 # autoserv exited without writing an exit code
1259 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001260 self._handle_pidfile_error(
1261 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001262
showard21baa452008-10-21 00:08:39 +00001263
1264 def _get_pidfile_info(self):
1265 """\
1266 After completion, self._state will contain:
1267 pid=None, exit_status=None if autoserv has not yet run
1268 pid!=None, exit_status=None if autoserv is running
1269 pid!=None, exit_status!=None if autoserv has completed
1270 """
1271 try:
1272 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001273 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001274 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001275
1276
showard170873e2009-01-07 00:22:26 +00001277 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001278 """\
1279 Called when no pidfile is found or no pid is in the pidfile.
1280 """
showard170873e2009-01-07 00:22:26 +00001281 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001282 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001283 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001284 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001285 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001286
1287
showard35162b02009-03-03 02:17:30 +00001288 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001289 """\
1290 Called when autoserv has exited without writing an exit status,
1291 or we've timed out waiting for autoserv to write a pid to the
1292 pidfile. In either case, we just return failure and the caller
1293 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001294
showard170873e2009-01-07 00:22:26 +00001295 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001296 """
1297 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001298 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001299 self._state.exit_status = 1
1300 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001301
1302
jadmanski0afbb632008-06-06 21:10:57 +00001303 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001304 self._get_pidfile_info()
1305 return self._state.exit_status
1306
1307
1308 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001309 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001310 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001311 if self._state.num_tests_failed is None:
1312 return -1
showard21baa452008-10-21 00:08:39 +00001313 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001314
1315
showardcdaeae82009-08-31 18:32:48 +00001316 def try_copy_results_on_drone(self, **kwargs):
1317 if self.has_process():
1318 # copy results logs into the normal place for job results
1319 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1320
1321
1322 def try_copy_to_results_repository(self, source, **kwargs):
1323 if self.has_process():
1324 _drone_manager.copy_to_results_repository(self.get_process(),
1325 source, **kwargs)
1326
1327
mbligh36768f02008-02-22 18:28:33 +00001328class Agent(object):
showard77182562009-06-10 00:16:05 +00001329 """
showard8cc058f2009-09-08 16:26:33 +00001330 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001331
1332 The following methods are required on all task objects:
1333 poll() - Called periodically to let the task check its status and
1334 update its internal state. If the task succeeded.
1335 is_done() - Returns True if the task is finished.
1336 abort() - Called when an abort has been requested. The task must
1337 set its aborted attribute to True if it actually aborted.
1338
1339 The following attributes are required on all task objects:
1340 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001341 success - bool, True if this task succeeded.
1342 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1343 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001344 """
1345
1346
showard418785b2009-11-23 20:19:59 +00001347 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001348 """
showard8cc058f2009-09-08 16:26:33 +00001349 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001350 """
showard8cc058f2009-09-08 16:26:33 +00001351 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001352
showard77182562009-06-10 00:16:05 +00001353 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001354 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001355
showard8cc058f2009-09-08 16:26:33 +00001356 self.queue_entry_ids = task.queue_entry_ids
1357 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001358
showard8cc058f2009-09-08 16:26:33 +00001359 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001360 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001361
1362
jadmanski0afbb632008-06-06 21:10:57 +00001363 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001364 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001365 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001366 self.task.poll()
1367 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001368 self.finished = True
showardec113162008-05-08 00:52:49 +00001369
1370
jadmanski0afbb632008-06-06 21:10:57 +00001371 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001372 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001373
1374
showardd3dc1992009-04-22 21:01:40 +00001375 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001376 if self.task:
1377 self.task.abort()
1378 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001379 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001380 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001381
showardd3dc1992009-04-22 21:01:40 +00001382
showard77182562009-06-10 00:16:05 +00001383class DelayedCallTask(object):
1384 """
1385 A task object like AgentTask for an Agent to run that waits for the
1386 specified amount of time to have elapsed before calling the supplied
1387 callback once and finishing. If the callback returns anything, it is
1388 assumed to be a new Agent instance and will be added to the dispatcher.
1389
1390 @attribute end_time: The absolute posix time after which this task will
1391 call its callback when it is polled and be finished.
1392
1393 Also has all attributes required by the Agent class.
1394 """
1395 def __init__(self, delay_seconds, callback, now_func=None):
1396 """
1397 @param delay_seconds: The delay in seconds from now that this task
1398 will call the supplied callback and be done.
1399 @param callback: A callable to be called by this task once after at
1400 least delay_seconds time has elapsed. It must return None
1401 or a new Agent instance.
1402 @param now_func: A time.time like function. Default: time.time.
1403 Used for testing.
1404 """
1405 assert delay_seconds > 0
1406 assert callable(callback)
1407 if not now_func:
1408 now_func = time.time
1409 self._now_func = now_func
1410 self._callback = callback
1411
1412 self.end_time = self._now_func() + delay_seconds
1413
1414 # These attributes are required by Agent.
1415 self.aborted = False
showard77182562009-06-10 00:16:05 +00001416 self.host_ids = ()
1417 self.success = False
1418 self.queue_entry_ids = ()
showard418785b2009-11-23 20:19:59 +00001419 self.num_processes = 0
showard77182562009-06-10 00:16:05 +00001420
1421
1422 def poll(self):
showard8cc058f2009-09-08 16:26:33 +00001423 if not self.is_done() and self._now_func() >= self.end_time:
1424 self._callback()
showard77182562009-06-10 00:16:05 +00001425 self.success = True
1426
1427
1428 def is_done(self):
showard8cc058f2009-09-08 16:26:33 +00001429 return self.success or self.aborted
showard77182562009-06-10 00:16:05 +00001430
1431
1432 def abort(self):
1433 self.aborted = True
showard77182562009-06-10 00:16:05 +00001434
1435
mbligh36768f02008-02-22 18:28:33 +00001436class AgentTask(object):
showardd1195652009-12-08 22:21:02 +00001437 class _NullMonitor(object):
1438 pidfile_id = None
1439
1440 def has_process(self):
1441 return True
1442
1443
1444 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001445 """
showardd1195652009-12-08 22:21:02 +00001446 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001447 """
jadmanski0afbb632008-06-06 21:10:57 +00001448 self.done = False
showardd1195652009-12-08 22:21:02 +00001449 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001450 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001451 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001452 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001453 self.queue_entry_ids = []
1454 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001455 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001456
1457
1458 def _set_ids(self, host=None, queue_entries=None):
1459 if queue_entries and queue_entries != [None]:
1460 self.host_ids = [entry.host.id for entry in queue_entries]
1461 self.queue_entry_ids = [entry.id for entry in queue_entries]
1462 else:
1463 assert host
1464 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001465
1466
jadmanski0afbb632008-06-06 21:10:57 +00001467 def poll(self):
showard08a36412009-05-05 01:01:13 +00001468 if not self.started:
1469 self.start()
showardd1195652009-12-08 22:21:02 +00001470 if not self.done:
1471 self.tick()
showard08a36412009-05-05 01:01:13 +00001472
1473
1474 def tick(self):
showardd1195652009-12-08 22:21:02 +00001475 assert self.monitor
1476 exit_code = self.monitor.exit_code()
1477 if exit_code is None:
1478 return
mbligh36768f02008-02-22 18:28:33 +00001479
showardd1195652009-12-08 22:21:02 +00001480 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001481 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001482
1483
jadmanski0afbb632008-06-06 21:10:57 +00001484 def is_done(self):
1485 return self.done
mbligh36768f02008-02-22 18:28:33 +00001486
1487
jadmanski0afbb632008-06-06 21:10:57 +00001488 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001489 if self.done:
showardd1195652009-12-08 22:21:02 +00001490 assert self.started
showard08a36412009-05-05 01:01:13 +00001491 return
showardd1195652009-12-08 22:21:02 +00001492 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001493 self.done = True
1494 self.success = success
1495 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001496
1497
jadmanski0afbb632008-06-06 21:10:57 +00001498 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001499 """
1500 To be overridden.
1501 """
showarded2afea2009-07-07 20:54:07 +00001502 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001503 self.register_necessary_pidfiles()
1504
1505
1506 def _log_file(self):
1507 if not self._log_file_name:
1508 return None
1509 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001510
mbligh36768f02008-02-22 18:28:33 +00001511
jadmanski0afbb632008-06-06 21:10:57 +00001512 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001513 log_file = self._log_file()
1514 if self.monitor and log_file:
1515 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001516
1517
jadmanski0afbb632008-06-06 21:10:57 +00001518 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001519 """
1520 To be overridden.
1521 """
jadmanski0afbb632008-06-06 21:10:57 +00001522 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001523 logging.info("%s finished with success=%s", type(self).__name__,
1524 self.success)
1525
mbligh36768f02008-02-22 18:28:33 +00001526
1527
jadmanski0afbb632008-06-06 21:10:57 +00001528 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001529 if not self.started:
1530 self.prolog()
1531 self.run()
1532
1533 self.started = True
1534
1535
1536 def abort(self):
1537 if self.monitor:
1538 self.monitor.kill()
1539 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001540 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001541 self.cleanup()
1542
1543
showarded2afea2009-07-07 20:54:07 +00001544 def _get_consistent_execution_path(self, execution_entries):
1545 first_execution_path = execution_entries[0].execution_path()
1546 for execution_entry in execution_entries[1:]:
1547 assert execution_entry.execution_path() == first_execution_path, (
1548 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1549 execution_entry,
1550 first_execution_path,
1551 execution_entries[0]))
1552 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001553
1554
showarded2afea2009-07-07 20:54:07 +00001555 def _copy_results(self, execution_entries, use_monitor=None):
1556 """
1557 @param execution_entries: list of objects with execution_path() method
1558 """
showard6d1c1432009-08-20 23:30:39 +00001559 if use_monitor is not None and not use_monitor.has_process():
1560 return
1561
showarded2afea2009-07-07 20:54:07 +00001562 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001563 if use_monitor is None:
1564 assert self.monitor
1565 use_monitor = self.monitor
1566 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001567 execution_path = self._get_consistent_execution_path(execution_entries)
1568 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001569 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001570
showarda1e74b32009-05-12 17:32:04 +00001571
1572 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001573 for queue_entry in queue_entries:
1574 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001575
1576
showarda1e74b32009-05-12 17:32:04 +00001577 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1578 self._copy_results(queue_entries, use_monitor)
1579 self._parse_results(queue_entries)
1580
1581
showardd1195652009-12-08 22:21:02 +00001582 def _command_line(self):
1583 """
1584 Return the command line to run. Must be overridden.
1585 """
1586 raise NotImplementedError
1587
1588
1589 @property
1590 def num_processes(self):
1591 """
1592 Return the number of processes forked by this AgentTask's process. It
1593 may only be approximate. To be overridden if necessary.
1594 """
1595 return 1
1596
1597
1598 def _paired_with_monitor(self):
1599 """
1600 If this AgentTask's process must run on the same machine as some
1601 previous process, this method should be overridden to return a
1602 PidfileRunMonitor for that process.
1603 """
1604 return self._NullMonitor()
1605
1606
1607 @property
1608 def owner_username(self):
1609 """
1610 Return login of user responsible for this task. May be None. Must be
1611 overridden.
1612 """
1613 raise NotImplementedError
1614
1615
1616 def _working_directory(self):
1617 """
1618 Return the directory where this AgentTask's process executes. Must be
1619 overridden.
1620 """
1621 raise NotImplementedError
1622
1623
1624 def _pidfile_name(self):
1625 """
1626 Return the name of the pidfile this AgentTask's process uses. To be
1627 overridden if necessary.
1628 """
1629 return _AUTOSERV_PID_FILE
1630
1631
1632 def _check_paired_results_exist(self):
1633 if not self._paired_with_monitor().has_process():
1634 email_manager.manager.enqueue_notify_email(
1635 'No paired results in task',
1636 'No paired results in task %s at %s'
1637 % (self, self._paired_with_monitor().pidfile_id))
1638 self.finished(False)
1639 return False
1640 return True
1641
1642
1643 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001644 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001645 self.monitor = PidfileRunMonitor()
1646
1647
1648 def run(self):
1649 if not self._check_paired_results_exist():
1650 return
1651
1652 self._create_monitor()
1653 self.monitor.run(
1654 self._command_line(), self._working_directory(),
1655 num_processes=self.num_processes,
1656 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1657 pidfile_name=self._pidfile_name(),
1658 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
1659 username=self.owner_username)
1660
1661
1662 def register_necessary_pidfiles(self):
1663 pidfile_id = _drone_manager.get_pidfile_id_from(
1664 self._working_directory(), self._pidfile_name())
1665 _drone_manager.register_pidfile(pidfile_id)
1666
1667 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1668 if paired_pidfile_id:
1669 _drone_manager.register_pidfile(paired_pidfile_id)
1670
1671
1672 def recover(self):
1673 if not self._check_paired_results_exist():
1674 return
1675
1676 self._create_monitor()
1677 self.monitor.attach_to_existing_process(
1678 self._working_directory(), pidfile_name=self._pidfile_name(),
1679 num_processes=self.num_processes)
1680 if not self.monitor.has_process():
1681 # no process to recover; wait to be started normally
1682 self.monitor = None
1683 return
1684
1685 self.started = True
1686 logging.info('Recovering process %s for %s at %s'
1687 % (self.monitor.get_process(), type(self).__name__,
1688 self._working_directory()))
mbligh36768f02008-02-22 18:28:33 +00001689
1690
showardd9205182009-04-27 20:09:55 +00001691class TaskWithJobKeyvals(object):
1692 """AgentTask mixin providing functionality to help with job keyval files."""
1693 _KEYVAL_FILE = 'keyval'
1694 def _format_keyval(self, key, value):
1695 return '%s=%s' % (key, value)
1696
1697
1698 def _keyval_path(self):
1699 """Subclasses must override this"""
1700 raise NotImplemented
1701
1702
1703 def _write_keyval_after_job(self, field, value):
1704 assert self.monitor
1705 if not self.monitor.has_process():
1706 return
1707 _drone_manager.write_lines_to_file(
1708 self._keyval_path(), [self._format_keyval(field, value)],
1709 paired_with_process=self.monitor.get_process())
1710
1711
1712 def _job_queued_keyval(self, job):
1713 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1714
1715
1716 def _write_job_finished(self):
1717 self._write_keyval_after_job("job_finished", int(time.time()))
1718
1719
showarddb502762009-09-09 15:31:20 +00001720 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1721 keyval_contents = '\n'.join(self._format_keyval(key, value)
1722 for key, value in keyval_dict.iteritems())
1723 # always end with a newline to allow additional keyvals to be written
1724 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001725 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001726 keyval_contents,
1727 file_path=keyval_path)
1728
1729
1730 def _write_keyvals_before_job(self, keyval_dict):
1731 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1732
1733
1734 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001735 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001736 host.hostname)
1737 platform, all_labels = host.platform_and_labels()
1738 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1739 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1740
1741
showard8cc058f2009-09-08 16:26:33 +00001742class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001743 """
1744 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1745 """
1746
1747 TASK_TYPE = None
1748 host = None
1749 queue_entry = None
1750
showardd1195652009-12-08 22:21:02 +00001751 def __init__(self, task, extra_command_args):
1752 super(SpecialAgentTask, self).__init__()
1753
showarded2afea2009-07-07 20:54:07 +00001754 assert (self.TASK_TYPE is not None,
1755 'self.TASK_TYPE must be overridden')
showard8cc058f2009-09-08 16:26:33 +00001756
1757 self.host = Host(id=task.host.id)
1758 self.queue_entry = None
1759 if task.queue_entry:
1760 self.queue_entry = HostQueueEntry(id=task.queue_entry.id)
1761
showarded2afea2009-07-07 20:54:07 +00001762 self.task = task
1763 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001764
1765
showard8cc058f2009-09-08 16:26:33 +00001766 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001767 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1768
1769
1770 def _command_line(self):
1771 return _autoserv_command_line(self.host.hostname,
1772 self._extra_command_args,
1773 queue_entry=self.queue_entry)
1774
1775
1776 def _working_directory(self):
1777 return self.task.execution_path()
1778
1779
1780 @property
1781 def owner_username(self):
1782 if self.task.requested_by:
1783 return self.task.requested_by.login
1784 return None
showard8cc058f2009-09-08 16:26:33 +00001785
1786
showarded2afea2009-07-07 20:54:07 +00001787 def prolog(self):
1788 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001789 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001790 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001791
1792
showardde634ee2009-01-30 01:44:24 +00001793 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001794 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001795
showard2fe3f1d2009-07-06 20:19:11 +00001796 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001797 return # don't fail metahost entries, they'll be reassigned
1798
showard2fe3f1d2009-07-06 20:19:11 +00001799 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001800 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001801 return # entry has been aborted
1802
showard2fe3f1d2009-07-06 20:19:11 +00001803 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001804 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001805 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001806 self._write_keyval_after_job(queued_key, queued_time)
1807 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001808
showard8cc058f2009-09-08 16:26:33 +00001809 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001810 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001811 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001812 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001813
showard2fe3f1d2009-07-06 20:19:11 +00001814 self._copy_results([self.queue_entry])
showardd1195652009-12-08 22:21:02 +00001815
1816 if not self.queue_entry.job.parse_failed_repair:
1817 self.queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
1818 return
showard8cc058f2009-09-08 16:26:33 +00001819
1820 pidfile_id = _drone_manager.get_pidfile_id_from(
1821 self.queue_entry.execution_path(),
1822 pidfile_name=_AUTOSERV_PID_FILE)
1823 _drone_manager.register_pidfile(pidfile_id)
showardd1195652009-12-08 22:21:02 +00001824 self._parse_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001825
1826
1827 def cleanup(self):
1828 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001829
1830 # We will consider an aborted task to be "Failed"
1831 self.task.finish(bool(self.success))
1832
showardf85a0b72009-10-07 20:48:45 +00001833 if self.monitor:
1834 if self.monitor.has_process():
1835 self._copy_results([self.task])
1836 if self.monitor.pidfile_id is not None:
1837 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001838
1839
1840class RepairTask(SpecialAgentTask):
1841 TASK_TYPE = models.SpecialTask.Task.REPAIR
1842
1843
showardd1195652009-12-08 22:21:02 +00001844 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001845 """\
1846 queue_entry: queue entry to mark failed if this repair fails.
1847 """
1848 protection = host_protections.Protection.get_string(
1849 task.host.protection)
1850 # normalize the protection name
1851 protection = host_protections.Protection.get_attr_name(protection)
1852
1853 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001854 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001855
1856 # *don't* include the queue entry in IDs -- if the queue entry is
1857 # aborted, we want to leave the repair task running
1858 self._set_ids(host=self.host)
1859
1860
1861 def prolog(self):
1862 super(RepairTask, self).prolog()
1863 logging.info("repair_task starting")
1864 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001865
1866
jadmanski0afbb632008-06-06 21:10:57 +00001867 def epilog(self):
1868 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001869
jadmanski0afbb632008-06-06 21:10:57 +00001870 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001871 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001872 else:
showard8cc058f2009-09-08 16:26:33 +00001873 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001874 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001875 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001876
1877
showarded2afea2009-07-07 20:54:07 +00001878class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001879 def _copy_to_results_repository(self):
1880 if not self.queue_entry or self.queue_entry.meta_host:
1881 return
1882
1883 self.queue_entry.set_execution_subdir()
1884 log_name = os.path.basename(self.task.execution_path())
1885 source = os.path.join(self.task.execution_path(), 'debug',
1886 'autoserv.DEBUG')
1887 destination = os.path.join(
1888 self.queue_entry.execution_path(), log_name)
1889
1890 self.monitor.try_copy_to_results_repository(
1891 source, destination_path=destination)
1892
1893
showard170873e2009-01-07 00:22:26 +00001894 def epilog(self):
1895 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001896
showard775300b2009-09-09 15:30:50 +00001897 if self.success:
1898 return
showard8fe93b52008-11-18 17:53:22 +00001899
showard775300b2009-09-09 15:30:50 +00001900 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001901
showard775300b2009-09-09 15:30:50 +00001902 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001903 # effectively ignore failure for these hosts
1904 self.success = True
showard775300b2009-09-09 15:30:50 +00001905 return
1906
1907 if self.queue_entry:
1908 self.queue_entry.requeue()
1909
1910 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001911 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001912 queue_entry__id=self.queue_entry.id):
1913 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1914 self._fail_queue_entry()
1915 return
1916
showard9bb960b2009-11-19 01:02:11 +00001917 queue_entry = models.HostQueueEntry.objects.get(
1918 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00001919 else:
1920 queue_entry = None
1921
1922 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00001923 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00001924 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00001925 queue_entry=queue_entry,
1926 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00001927
showard8fe93b52008-11-18 17:53:22 +00001928
1929class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001930 TASK_TYPE = models.SpecialTask.Task.VERIFY
1931
1932
showardd1195652009-12-08 22:21:02 +00001933 def __init__(self, task):
1934 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00001935 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001936
1937
jadmanski0afbb632008-06-06 21:10:57 +00001938 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001939 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001940
showardb18134f2009-03-20 20:52:18 +00001941 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001942 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001943 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1944 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001945
showarded2afea2009-07-07 20:54:07 +00001946 # Delete any other queued verifies for this host. One verify will do
1947 # and there's no need to keep records of other requests.
1948 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001949 host__id=self.host.id,
1950 task=models.SpecialTask.Task.VERIFY,
showarded2afea2009-07-07 20:54:07 +00001951 is_active=False, is_complete=False)
1952 queued_verifies = queued_verifies.exclude(id=self.task.id)
1953 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001954
mbligh36768f02008-02-22 18:28:33 +00001955
jadmanski0afbb632008-06-06 21:10:57 +00001956 def epilog(self):
1957 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001958 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001959 if self.queue_entry:
1960 self.queue_entry.on_pending()
1961 else:
1962 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001963
1964
showarda9545c02009-12-18 22:44:26 +00001965class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
1966 """
1967 Common functionality for QueueTask and HostlessQueueTask
1968 """
1969 def __init__(self, queue_entries):
1970 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001971 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001972 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001973
1974
showard73ec0442009-02-07 02:05:20 +00001975 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001976 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001977
1978
showardd1195652009-12-08 22:21:02 +00001979 def _command_line(self):
1980 return self.job.get_autoserv_params(self.queue_entries)
1981
1982
1983 @property
1984 def num_processes(self):
1985 return len(self.queue_entries)
1986
1987
1988 @property
1989 def owner_username(self):
1990 return self.job.owner
1991
1992
1993 def _working_directory(self):
1994 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001995
1996
jadmanski0afbb632008-06-06 21:10:57 +00001997 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001998 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001999 keyval_dict = {queued_key: queued_time}
showardd1195652009-12-08 22:21:02 +00002000 group_name = self.queue_entries[0].get_group_name()
2001 if group_name:
2002 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00002003 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00002004 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002005 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00002006 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00002007
2008
showard35162b02009-03-03 02:17:30 +00002009 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00002010 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00002011 _drone_manager.write_lines_to_file(error_file_path,
2012 [_LOST_PROCESS_ERROR])
2013
2014
showardd3dc1992009-04-22 21:01:40 +00002015 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00002016 if not self.monitor:
2017 return
2018
showardd9205182009-04-27 20:09:55 +00002019 self._write_job_finished()
2020
showard35162b02009-03-03 02:17:30 +00002021 if self.monitor.lost_process:
2022 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00002023
jadmanskif7fa2cc2008-10-01 14:13:23 +00002024
showardcbd74612008-11-19 21:42:02 +00002025 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00002026 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00002027 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00002028 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00002029 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00002030
2031
jadmanskif7fa2cc2008-10-01 14:13:23 +00002032 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00002033 if not self.monitor or not self.monitor.has_process():
2034 return
2035
jadmanskif7fa2cc2008-10-01 14:13:23 +00002036 # build up sets of all the aborted_by and aborted_on values
2037 aborted_by, aborted_on = set(), set()
2038 for queue_entry in self.queue_entries:
2039 if queue_entry.aborted_by:
2040 aborted_by.add(queue_entry.aborted_by)
2041 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
2042 aborted_on.add(t)
2043
2044 # extract some actual, unique aborted by value and write it out
2045 assert len(aborted_by) <= 1
2046 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00002047 aborted_by_value = aborted_by.pop()
2048 aborted_on_value = max(aborted_on)
2049 else:
2050 aborted_by_value = 'autotest_system'
2051 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00002052
showarda0382352009-02-11 23:36:43 +00002053 self._write_keyval_after_job("aborted_by", aborted_by_value)
2054 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00002055
showardcbd74612008-11-19 21:42:02 +00002056 aborted_on_string = str(datetime.datetime.fromtimestamp(
2057 aborted_on_value))
2058 self._write_status_comment('Job aborted by %s on %s' %
2059 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00002060
2061
jadmanski0afbb632008-06-06 21:10:57 +00002062 def abort(self):
showarda9545c02009-12-18 22:44:26 +00002063 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00002064 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00002065 self._finish_task()
showard21baa452008-10-21 00:08:39 +00002066
2067
jadmanski0afbb632008-06-06 21:10:57 +00002068 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00002069 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002070 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00002071
2072
2073class QueueTask(AbstractQueueTask):
2074 def __init__(self, queue_entries):
2075 super(QueueTask, self).__init__(queue_entries)
2076 self._set_ids(queue_entries=queue_entries)
2077
2078
2079 def prolog(self):
2080 for entry in self.queue_entries:
2081 if entry.status not in (models.HostQueueEntry.Status.STARTING,
2082 models.HostQueueEntry.Status.RUNNING):
2083 raise SchedulerError('Queue task attempting to start '
2084 'entry with invalid status %s: %s'
2085 % (entry.status, entry))
2086 if entry.host.status not in (models.Host.Status.PENDING,
2087 models.Host.Status.RUNNING):
2088 raise SchedulerError('Queue task attempting to start on queue '
2089 'entry with invalid host status %s: %s'
2090 % (entry.host.status, entry))
2091
2092 super(QueueTask, self).prolog()
2093
2094 for queue_entry in self.queue_entries:
2095 self._write_host_keyvals(queue_entry.host)
2096 queue_entry.host.set_status(models.Host.Status.RUNNING)
2097 queue_entry.host.update_field('dirty', 1)
2098 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
2099 # TODO(gps): Remove this if nothing needs it anymore.
2100 # A potential user is: tko/parser
2101 self.job.write_to_machines_file(self.queue_entries[0])
2102
2103
2104 def _finish_task(self):
2105 super(QueueTask, self)._finish_task()
2106
2107 for queue_entry in self.queue_entries:
2108 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
mbligh36768f02008-02-22 18:28:33 +00002109
2110
showardd3dc1992009-04-22 21:01:40 +00002111class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00002112 def __init__(self, queue_entries, log_file_name):
2113 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00002114
showardd1195652009-12-08 22:21:02 +00002115 self.queue_entries = queue_entries
2116
showardd3dc1992009-04-22 21:01:40 +00002117 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00002118 self._autoserv_monitor.attach_to_existing_process(
2119 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00002120
showardd1195652009-12-08 22:21:02 +00002121
2122 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00002123 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00002124 return 'true'
2125 return self._generate_command(
2126 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00002127
2128
2129 def _generate_command(self, results_dir):
2130 raise NotImplementedError('Subclasses must override this')
2131
2132
showardd1195652009-12-08 22:21:02 +00002133 @property
2134 def owner_username(self):
2135 return self.queue_entries[0].job.owner
2136
2137
2138 def _working_directory(self):
2139 return self._get_consistent_execution_path(self.queue_entries)
2140
2141
2142 def _paired_with_monitor(self):
2143 return self._autoserv_monitor
2144
2145
showardd3dc1992009-04-22 21:01:40 +00002146 def _job_was_aborted(self):
2147 was_aborted = None
showardd1195652009-12-08 22:21:02 +00002148 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002149 queue_entry.update_from_database()
2150 if was_aborted is None: # first queue entry
2151 was_aborted = bool(queue_entry.aborted)
2152 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
2153 email_manager.manager.enqueue_notify_email(
2154 'Inconsistent abort state',
2155 'Queue entries have inconsistent abort state: ' +
2156 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
2157 # don't crash here, just assume true
2158 return True
2159 return was_aborted
2160
2161
showardd1195652009-12-08 22:21:02 +00002162 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002163 if self._job_was_aborted():
2164 return models.HostQueueEntry.Status.ABORTED
2165
2166 # we'll use a PidfileRunMonitor to read the autoserv exit status
2167 if self._autoserv_monitor.exit_code() == 0:
2168 return models.HostQueueEntry.Status.COMPLETED
2169 return models.HostQueueEntry.Status.FAILED
2170
2171
showardd3dc1992009-04-22 21:01:40 +00002172 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002173 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002174 queue_entry.set_status(status)
2175
2176
2177 def abort(self):
2178 # override AgentTask.abort() to avoid killing the process and ending
2179 # the task. post-job tasks continue when the job is aborted.
2180 pass
2181
2182
showard9bb960b2009-11-19 01:02:11 +00002183class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002184 """
2185 Task responsible for
2186 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2187 * copying logs to the results repository
2188 * spawning CleanupTasks for hosts, if necessary
2189 * spawning a FinalReparseTask for the job
2190 """
showardd1195652009-12-08 22:21:02 +00002191 def __init__(self, queue_entries, recover_run_monitor=None):
2192 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002193 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002194 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002195 self._set_ids(queue_entries=queue_entries)
2196
2197
2198 def _generate_command(self, results_dir):
2199 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002200 for queue_entry in self.queue_entries)
showardd3dc1992009-04-22 21:01:40 +00002201 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
2202 '-r', results_dir]
2203
2204
showardd1195652009-12-08 22:21:02 +00002205 @property
2206 def num_processes(self):
2207 return len(self.queue_entries)
2208
2209
2210 def _pidfile_name(self):
2211 return _CRASHINFO_PID_FILE
2212
2213
showardd3dc1992009-04-22 21:01:40 +00002214 def prolog(self):
showardd1195652009-12-08 22:21:02 +00002215 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002216 if queue_entry.status != models.HostQueueEntry.Status.GATHERING:
2217 raise SchedulerError('Gather task attempting to start on '
2218 'non-gathering entry: %s' % queue_entry)
2219 if queue_entry.host.status != models.Host.Status.RUNNING:
2220 raise SchedulerError('Gather task attempting to start on queue '
showard0c5c18d2009-09-24 22:20:45 +00002221 'entry with non-running host status %s: %s'
2222 % (queue_entry.host.status, queue_entry))
showard8cc058f2009-09-08 16:26:33 +00002223
showardd3dc1992009-04-22 21:01:40 +00002224 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002225
2226
showardd3dc1992009-04-22 21:01:40 +00002227 def epilog(self):
2228 super(GatherLogsTask, self).epilog()
showardb5626452009-06-30 01:57:28 +00002229
showardd1195652009-12-08 22:21:02 +00002230 self._copy_and_parse_results(self.queue_entries,
showard6d1c1432009-08-20 23:30:39 +00002231 use_monitor=self._autoserv_monitor)
showard9bb960b2009-11-19 01:02:11 +00002232 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002233
showard9bb960b2009-11-19 01:02:11 +00002234
2235 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002236 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002237 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002238 models.HostQueueEntry.Status.COMPLETED)
2239 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2240 else:
2241 final_success = False
2242 num_tests_failed = 0
2243
showard9bb960b2009-11-19 01:02:11 +00002244 reboot_after = self._job.reboot_after
2245 do_reboot = (
2246 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002247 self._final_status() == models.HostQueueEntry.Status.ABORTED
showard9bb960b2009-11-19 01:02:11 +00002248 or reboot_after == models.RebootAfter.ALWAYS
2249 or (reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED
2250 and final_success and num_tests_failed == 0))
2251
showardd1195652009-12-08 22:21:02 +00002252 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002253 if do_reboot:
2254 # don't pass the queue entry to the CleanupTask. if the cleanup
2255 # fails, the job doesn't care -- it's over.
2256 models.SpecialTask.objects.create(
2257 host=models.Host.objects.get(id=queue_entry.host.id),
2258 task=models.SpecialTask.Task.CLEANUP,
2259 requested_by=self._job.owner_model())
2260 else:
2261 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002262
2263
showard0bbfc212009-04-29 21:06:13 +00002264 def run(self):
showard597bfd32009-05-08 18:22:50 +00002265 autoserv_exit_code = self._autoserv_monitor.exit_code()
2266 # only run if Autoserv exited due to some signal. if we have no exit
2267 # code, assume something bad (and signal-like) happened.
2268 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002269 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002270 else:
2271 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002272
2273
showard8fe93b52008-11-18 17:53:22 +00002274class CleanupTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002275 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2276
2277
showard8cc058f2009-09-08 16:26:33 +00002278 def __init__(self, task, recover_run_monitor=None):
showardd1195652009-12-08 22:21:02 +00002279 super(CleanupTask, self).__init__(task, ['--cleanup'])
showard8cc058f2009-09-08 16:26:33 +00002280 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mbligh16c722d2008-03-05 00:58:44 +00002281
mblighd5c95802008-03-05 00:33:46 +00002282
jadmanski0afbb632008-06-06 21:10:57 +00002283 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002284 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00002285 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard8cc058f2009-09-08 16:26:33 +00002286 self.host.set_status(models.Host.Status.CLEANING)
2287 if self.queue_entry:
2288 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2289
2290
showard775300b2009-09-09 15:30:50 +00002291 def _finish_epilog(self):
showard7b2d7cb2009-10-28 19:53:03 +00002292 if not self.queue_entry or not self.success:
showard775300b2009-09-09 15:30:50 +00002293 return
2294
showard7b2d7cb2009-10-28 19:53:03 +00002295 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
2296 should_run_verify = (
2297 self.queue_entry.job.run_verify
2298 and self.host.protection != do_not_verify_protection)
2299 if should_run_verify:
showard9bb960b2009-11-19 01:02:11 +00002300 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
showard7b2d7cb2009-10-28 19:53:03 +00002301 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00002302 host=models.Host.objects.get(id=self.host.id),
showard7b2d7cb2009-10-28 19:53:03 +00002303 queue_entry=entry,
2304 task=models.SpecialTask.Task.VERIFY)
2305 else:
showard775300b2009-09-09 15:30:50 +00002306 self.queue_entry.on_pending()
mblighd5c95802008-03-05 00:33:46 +00002307
mblighd5c95802008-03-05 00:33:46 +00002308
showard21baa452008-10-21 00:08:39 +00002309 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00002310 super(CleanupTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002311
showard21baa452008-10-21 00:08:39 +00002312 if self.success:
2313 self.host.update_field('dirty', 0)
showard775300b2009-09-09 15:30:50 +00002314 self.host.set_status(models.Host.Status.READY)
showard21baa452008-10-21 00:08:39 +00002315
showard775300b2009-09-09 15:30:50 +00002316 self._finish_epilog()
showard8cc058f2009-09-08 16:26:33 +00002317
showard21baa452008-10-21 00:08:39 +00002318
showardd3dc1992009-04-22 21:01:40 +00002319class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00002320 _num_running_parses = 0
2321
showardd1195652009-12-08 22:21:02 +00002322 def __init__(self, queue_entries):
2323 super(FinalReparseTask, self).__init__(queue_entries,
2324 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002325 # don't use _set_ids, since we don't want to set the host_ids
2326 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002327
2328
2329 def _generate_command(self, results_dir):
2330 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
2331 results_dir]
2332
2333
2334 @property
2335 def num_processes(self):
2336 return 0 # don't include parser processes in accounting
2337
2338
2339 def _pidfile_name(self):
2340 return _PARSER_PID_FILE
2341
2342
2343 def _parse_started(self):
2344 return bool(self.monitor)
showard97aed502008-11-04 02:01:24 +00002345
showard97aed502008-11-04 02:01:24 +00002346
2347 @classmethod
2348 def _increment_running_parses(cls):
2349 cls._num_running_parses += 1
2350
2351
2352 @classmethod
2353 def _decrement_running_parses(cls):
2354 cls._num_running_parses -= 1
2355
2356
2357 @classmethod
2358 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002359 return (cls._num_running_parses <
2360 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002361
2362
2363 def prolog(self):
showardd1195652009-12-08 22:21:02 +00002364 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002365 if queue_entry.status != models.HostQueueEntry.Status.PARSING:
2366 raise SchedulerError('Parse task attempting to start on '
2367 'non-parsing entry: %s' % queue_entry)
2368
showard97aed502008-11-04 02:01:24 +00002369 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002370
2371
2372 def epilog(self):
2373 super(FinalReparseTask, self).epilog()
showardd1195652009-12-08 22:21:02 +00002374 self._set_all_statuses(self._final_status())
showard97aed502008-11-04 02:01:24 +00002375
2376
showard08a36412009-05-05 01:01:13 +00002377 def tick(self):
2378 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002379 # and we can, at which point we revert to default behavior
showardd1195652009-12-08 22:21:02 +00002380 if self._parse_started():
showard08a36412009-05-05 01:01:13 +00002381 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002382 else:
2383 self._try_starting_parse()
2384
2385
2386 def run(self):
2387 # override run() to not actually run unless we can
2388 self._try_starting_parse()
2389
2390
2391 def _try_starting_parse(self):
2392 if not self._can_run_new_parse():
2393 return
showard170873e2009-01-07 00:22:26 +00002394
showard97aed502008-11-04 02:01:24 +00002395 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002396 super(FinalReparseTask, self).run()
showard97aed502008-11-04 02:01:24 +00002397 self._increment_running_parses()
showard97aed502008-11-04 02:01:24 +00002398
2399
2400 def finished(self, success):
2401 super(FinalReparseTask, self).finished(success)
showardd1195652009-12-08 22:21:02 +00002402 if self._parse_started():
showard678df4f2009-02-04 21:36:39 +00002403 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002404
2405
showarda9545c02009-12-18 22:44:26 +00002406class HostlessQueueTask(AbstractQueueTask):
2407 def __init__(self, queue_entry):
2408 super(HostlessQueueTask, self).__init__([queue_entry])
2409 self.queue_entry_ids = [queue_entry.id]
2410
2411
2412 def prolog(self):
2413 self.queue_entries[0].update_field('execution_subdir', 'hostless')
2414 super(HostlessQueueTask, self).prolog()
2415
2416
2417 def _final_status(self):
2418 if self.queue_entries[0].aborted:
2419 return models.HostQueueEntry.Status.ABORTED
2420 if self.monitor.exit_code() == 0:
2421 return models.HostQueueEntry.Status.COMPLETED
2422 return models.HostQueueEntry.Status.FAILED
2423
2424
2425 def _finish_task(self):
2426 super(HostlessQueueTask, self)._finish_task()
2427 self.queue_entries[0].set_status(self._final_status())
2428
2429
showarda3c58572009-03-12 20:36:59 +00002430class DBError(Exception):
2431 """Raised by the DBObject constructor when its select fails."""
2432
2433
mbligh36768f02008-02-22 18:28:33 +00002434class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002435 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002436
2437 # Subclasses MUST override these:
2438 _table_name = ''
2439 _fields = ()
2440
showarda3c58572009-03-12 20:36:59 +00002441 # A mapping from (type, id) to the instance of the object for that
2442 # particular id. This prevents us from creating new Job() and Host()
2443 # instances for every HostQueueEntry object that we instantiate as
2444 # multiple HQEs often share the same Job.
2445 _instances_by_type_and_id = weakref.WeakValueDictionary()
2446 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002447
showarda3c58572009-03-12 20:36:59 +00002448
2449 def __new__(cls, id=None, **kwargs):
2450 """
2451 Look to see if we already have an instance for this particular type
2452 and id. If so, use it instead of creating a duplicate instance.
2453 """
2454 if id is not None:
2455 instance = cls._instances_by_type_and_id.get((cls, id))
2456 if instance:
2457 return instance
2458 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2459
2460
2461 def __init__(self, id=None, row=None, new_record=False, always_query=True):
showard8cc058f2009-09-08 16:26:33 +00002462 assert bool(id) or bool(row)
2463 if id is not None and row is not None:
2464 assert id == row[0]
showard6ae5ea92009-02-25 00:11:51 +00002465 assert self._table_name, '_table_name must be defined in your class'
2466 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002467 if not new_record:
2468 if self._initialized and not always_query:
2469 return # We've already been initialized.
2470 if id is None:
2471 id = row[0]
2472 # Tell future constructors to use us instead of re-querying while
2473 # this instance is still around.
2474 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002475
showard6ae5ea92009-02-25 00:11:51 +00002476 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002477
jadmanski0afbb632008-06-06 21:10:57 +00002478 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002479
jadmanski0afbb632008-06-06 21:10:57 +00002480 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002481 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002482
showarda3c58572009-03-12 20:36:59 +00002483 if self._initialized:
2484 differences = self._compare_fields_in_row(row)
2485 if differences:
showard7629f142009-03-27 21:02:02 +00002486 logging.warn(
2487 'initialized %s %s instance requery is updating: %s',
2488 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002489 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002490 self._initialized = True
2491
2492
2493 @classmethod
2494 def _clear_instance_cache(cls):
2495 """Used for testing, clear the internal instance cache."""
2496 cls._instances_by_type_and_id.clear()
2497
2498
showardccbd6c52009-03-21 00:10:21 +00002499 def _fetch_row_from_db(self, row_id):
2500 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2501 rows = _db.execute(sql, (row_id,))
2502 if not rows:
showard76e29d12009-04-15 21:53:10 +00002503 raise DBError("row not found (table=%s, row id=%s)"
2504 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002505 return rows[0]
2506
2507
showarda3c58572009-03-12 20:36:59 +00002508 def _assert_row_length(self, row):
2509 assert len(row) == len(self._fields), (
2510 "table = %s, row = %s/%d, fields = %s/%d" % (
2511 self.__table, row, len(row), self._fields, len(self._fields)))
2512
2513
2514 def _compare_fields_in_row(self, row):
2515 """
showarddae680a2009-10-12 20:26:43 +00002516 Given a row as returned by a SELECT query, compare it to our existing in
2517 memory fields. Fractional seconds are stripped from datetime values
2518 before comparison.
showarda3c58572009-03-12 20:36:59 +00002519
2520 @param row - A sequence of values corresponding to fields named in
2521 The class attribute _fields.
2522
2523 @returns A dictionary listing the differences keyed by field name
2524 containing tuples of (current_value, row_value).
2525 """
2526 self._assert_row_length(row)
2527 differences = {}
showarddae680a2009-10-12 20:26:43 +00002528 datetime_cmp_fmt = '%Y-%m-%d %H:%M:%S' # Leave off the microseconds.
showarda3c58572009-03-12 20:36:59 +00002529 for field, row_value in itertools.izip(self._fields, row):
2530 current_value = getattr(self, field)
showarddae680a2009-10-12 20:26:43 +00002531 if (isinstance(current_value, datetime.datetime)
2532 and isinstance(row_value, datetime.datetime)):
2533 current_value = current_value.strftime(datetime_cmp_fmt)
2534 row_value = row_value.strftime(datetime_cmp_fmt)
showarda3c58572009-03-12 20:36:59 +00002535 if current_value != row_value:
2536 differences[field] = (current_value, row_value)
2537 return differences
showard2bab8f42008-11-12 18:15:22 +00002538
2539
2540 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002541 """
2542 Update our field attributes using a single row returned by SELECT.
2543
2544 @param row - A sequence of values corresponding to fields named in
2545 the class fields list.
2546 """
2547 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002548
showard2bab8f42008-11-12 18:15:22 +00002549 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002550 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002551 setattr(self, field, value)
2552 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002553
showard2bab8f42008-11-12 18:15:22 +00002554 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002555
mblighe2586682008-02-29 22:45:46 +00002556
showardccbd6c52009-03-21 00:10:21 +00002557 def update_from_database(self):
2558 assert self.id is not None
2559 row = self._fetch_row_from_db(self.id)
2560 self._update_fields_from_row(row)
2561
2562
jadmanski0afbb632008-06-06 21:10:57 +00002563 def count(self, where, table = None):
2564 if not table:
2565 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002566
jadmanski0afbb632008-06-06 21:10:57 +00002567 rows = _db.execute("""
2568 SELECT count(*) FROM %s
2569 WHERE %s
2570 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002571
jadmanski0afbb632008-06-06 21:10:57 +00002572 assert len(rows) == 1
2573
2574 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002575
2576
showardd3dc1992009-04-22 21:01:40 +00002577 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002578 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002579
showard2bab8f42008-11-12 18:15:22 +00002580 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002581 return
mbligh36768f02008-02-22 18:28:33 +00002582
mblighf8c624d2008-07-03 16:58:45 +00002583 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002584 _db.execute(query, (value, self.id))
2585
showard2bab8f42008-11-12 18:15:22 +00002586 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002587
2588
jadmanski0afbb632008-06-06 21:10:57 +00002589 def save(self):
2590 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002591 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002592 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002593 values = []
2594 for key in keys:
2595 value = getattr(self, key)
2596 if value is None:
2597 values.append('NULL')
2598 else:
2599 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002600 values_str = ','.join(values)
2601 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2602 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002603 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002604 # Update our id to the one the database just assigned to us.
2605 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002606
2607
jadmanski0afbb632008-06-06 21:10:57 +00002608 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002609 self._instances_by_type_and_id.pop((type(self), id), None)
2610 self._initialized = False
2611 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002612 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2613 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002614
2615
showard63a34772008-08-18 19:32:50 +00002616 @staticmethod
2617 def _prefix_with(string, prefix):
2618 if string:
2619 string = prefix + string
2620 return string
2621
2622
jadmanski0afbb632008-06-06 21:10:57 +00002623 @classmethod
showard989f25d2008-10-01 11:38:11 +00002624 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002625 """
2626 Construct instances of our class based on the given database query.
2627
2628 @yields One class instance for each row fetched.
2629 """
showard63a34772008-08-18 19:32:50 +00002630 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2631 where = cls._prefix_with(where, 'WHERE ')
2632 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002633 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002634 'joins' : joins,
2635 'where' : where,
2636 'order_by' : order_by})
2637 rows = _db.execute(query, params)
showard8cc058f2009-09-08 16:26:33 +00002638 return [cls(id=row[0], row=row) for row in rows]
mblighe2586682008-02-29 22:45:46 +00002639
mbligh36768f02008-02-22 18:28:33 +00002640
2641class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002642 _table_name = 'ineligible_host_queues'
2643 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002644
2645
showard89f84db2009-03-12 20:39:13 +00002646class AtomicGroup(DBObject):
2647 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002648 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2649 'invalid')
showard89f84db2009-03-12 20:39:13 +00002650
2651
showard989f25d2008-10-01 11:38:11 +00002652class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002653 _table_name = 'labels'
2654 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002655 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002656
2657
showard6157c632009-07-06 20:19:31 +00002658 def __repr__(self):
2659 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
2660 self.name, self.id, self.atomic_group_id)
2661
2662
mbligh36768f02008-02-22 18:28:33 +00002663class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002664 _table_name = 'hosts'
2665 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2666 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2667
2668
jadmanski0afbb632008-06-06 21:10:57 +00002669 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002670 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002671 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002672
2673
showard170873e2009-01-07 00:22:26 +00002674 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002675 """
showard170873e2009-01-07 00:22:26 +00002676 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002677 """
2678 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002679 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002680 FROM labels
2681 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002682 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002683 ORDER BY labels.name
2684 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002685 platform = None
2686 all_labels = []
2687 for label_name, is_platform in rows:
2688 if is_platform:
2689 platform = label_name
2690 all_labels.append(label_name)
2691 return platform, all_labels
2692
2693
showard54c1ea92009-05-20 00:32:58 +00002694 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2695
2696
2697 @classmethod
2698 def cmp_for_sort(cls, a, b):
2699 """
2700 A comparison function for sorting Host objects by hostname.
2701
2702 This strips any trailing numeric digits, ignores leading 0s and
2703 compares hostnames by the leading name and the trailing digits as a
2704 number. If both hostnames do not match this pattern, they are simply
2705 compared as lower case strings.
2706
2707 Example of how hostnames will be sorted:
2708
2709 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2710
2711 This hopefully satisfy most people's hostname sorting needs regardless
2712 of their exact naming schemes. Nobody sane should have both a host10
2713 and host010 (but the algorithm works regardless).
2714 """
2715 lower_a = a.hostname.lower()
2716 lower_b = b.hostname.lower()
2717 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2718 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2719 if match_a and match_b:
2720 name_a, number_a_str = match_a.groups()
2721 name_b, number_b_str = match_b.groups()
2722 number_a = int(number_a_str.lstrip('0'))
2723 number_b = int(number_b_str.lstrip('0'))
2724 result = cmp((name_a, number_a), (name_b, number_b))
2725 if result == 0 and lower_a != lower_b:
2726 # If they compared equal above but the lower case names are
2727 # indeed different, don't report equality. abc012 != abc12.
2728 return cmp(lower_a, lower_b)
2729 return result
2730 else:
2731 return cmp(lower_a, lower_b)
2732
2733
mbligh36768f02008-02-22 18:28:33 +00002734class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002735 _table_name = 'host_queue_entries'
2736 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002737 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002738 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002739
2740
showarda3c58572009-03-12 20:36:59 +00002741 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002742 assert id or row
showarda3c58572009-03-12 20:36:59 +00002743 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002744 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002745
jadmanski0afbb632008-06-06 21:10:57 +00002746 if self.host_id:
2747 self.host = Host(self.host_id)
2748 else:
2749 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002750
showard77182562009-06-10 00:16:05 +00002751 if self.atomic_group_id:
2752 self.atomic_group = AtomicGroup(self.atomic_group_id,
2753 always_query=False)
2754 else:
2755 self.atomic_group = None
2756
showard170873e2009-01-07 00:22:26 +00002757 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002758 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002759
2760
showard89f84db2009-03-12 20:39:13 +00002761 @classmethod
2762 def clone(cls, template):
2763 """
2764 Creates a new row using the values from a template instance.
2765
2766 The new instance will not exist in the database or have a valid
2767 id attribute until its save() method is called.
2768 """
2769 assert isinstance(template, cls)
2770 new_row = [getattr(template, field) for field in cls._fields]
2771 clone = cls(row=new_row, new_record=True)
2772 clone.id = None
2773 return clone
2774
2775
showardc85c21b2008-11-24 22:17:37 +00002776 def _view_job_url(self):
2777 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2778
2779
showardf1ae3542009-05-11 19:26:02 +00002780 def get_labels(self):
2781 """
2782 Get all labels associated with this host queue entry (either via the
2783 meta_host or as a job dependency label). The labels yielded are not
2784 guaranteed to be unique.
2785
2786 @yields Label instances associated with this host_queue_entry.
2787 """
2788 if self.meta_host:
2789 yield Label(id=self.meta_host, always_query=False)
2790 labels = Label.fetch(
2791 joins="JOIN jobs_dependency_labels AS deps "
2792 "ON (labels.id = deps.label_id)",
2793 where="deps.job_id = %d" % self.job.id)
2794 for label in labels:
2795 yield label
2796
2797
jadmanski0afbb632008-06-06 21:10:57 +00002798 def set_host(self, host):
2799 if host:
2800 self.queue_log_record('Assigning host ' + host.hostname)
2801 self.update_field('host_id', host.id)
2802 self.update_field('active', True)
2803 self.block_host(host.id)
2804 else:
2805 self.queue_log_record('Releasing host')
2806 self.unblock_host(self.host.id)
2807 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002808
jadmanski0afbb632008-06-06 21:10:57 +00002809 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002810
2811
jadmanski0afbb632008-06-06 21:10:57 +00002812 def queue_log_record(self, log_line):
2813 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002814 _drone_manager.write_lines_to_file(self.queue_log_path,
2815 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002816
2817
jadmanski0afbb632008-06-06 21:10:57 +00002818 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002819 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002820 row = [0, self.job.id, host_id]
2821 block = IneligibleHostQueue(row=row, new_record=True)
2822 block.save()
mblighe2586682008-02-29 22:45:46 +00002823
2824
jadmanski0afbb632008-06-06 21:10:57 +00002825 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002826 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002827 blocks = IneligibleHostQueue.fetch(
2828 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2829 for block in blocks:
2830 block.delete()
mblighe2586682008-02-29 22:45:46 +00002831
2832
showard2bab8f42008-11-12 18:15:22 +00002833 def set_execution_subdir(self, subdir=None):
2834 if subdir is None:
showarda9545c02009-12-18 22:44:26 +00002835 assert self.host
2836 subdir = self.host.hostname
showard2bab8f42008-11-12 18:15:22 +00002837 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002838
2839
showard6355f6b2008-12-05 18:52:13 +00002840 def _get_hostname(self):
2841 if self.host:
2842 return self.host.hostname
2843 return 'no host'
2844
2845
showard170873e2009-01-07 00:22:26 +00002846 def __str__(self):
showard828fc4c2009-09-14 20:31:00 +00002847 flags = []
2848 if self.active:
2849 flags.append('active')
2850 if self.complete:
2851 flags.append('complete')
2852 if self.deleted:
2853 flags.append('deleted')
2854 if self.aborted:
2855 flags.append('aborted')
2856 flags_str = ','.join(flags)
2857 if flags_str:
2858 flags_str = ' [%s]' % flags_str
2859 return "%s/%d (%d) %s%s" % (self._get_hostname(), self.job.id, self.id,
2860 self.status, flags_str)
showard170873e2009-01-07 00:22:26 +00002861
2862
jadmanski0afbb632008-06-06 21:10:57 +00002863 def set_status(self, status):
showard56824072009-10-12 20:30:21 +00002864 logging.info("%s -> %s", self, status)
mblighf8c624d2008-07-03 16:58:45 +00002865
showard56824072009-10-12 20:30:21 +00002866 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002867
showard8cc058f2009-09-08 16:26:33 +00002868 if status in (models.HostQueueEntry.Status.QUEUED,
2869 models.HostQueueEntry.Status.PARSING):
jadmanski0afbb632008-06-06 21:10:57 +00002870 self.update_field('complete', False)
2871 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002872
showard8cc058f2009-09-08 16:26:33 +00002873 if status in (models.HostQueueEntry.Status.PENDING,
2874 models.HostQueueEntry.Status.RUNNING,
2875 models.HostQueueEntry.Status.VERIFYING,
2876 models.HostQueueEntry.Status.STARTING,
2877 models.HostQueueEntry.Status.GATHERING):
jadmanski0afbb632008-06-06 21:10:57 +00002878 self.update_field('complete', False)
2879 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002880
showard8cc058f2009-09-08 16:26:33 +00002881 if status in (models.HostQueueEntry.Status.FAILED,
2882 models.HostQueueEntry.Status.COMPLETED,
2883 models.HostQueueEntry.Status.STOPPED,
2884 models.HostQueueEntry.Status.ABORTED):
jadmanski0afbb632008-06-06 21:10:57 +00002885 self.update_field('complete', True)
2886 self.update_field('active', False)
showardf85a0b72009-10-07 20:48:45 +00002887 self._on_complete()
showardc85c21b2008-11-24 22:17:37 +00002888
2889 should_email_status = (status.lower() in _notify_email_statuses or
2890 'all' in _notify_email_statuses)
2891 if should_email_status:
2892 self._email_on_status(status)
2893
2894 self._email_on_job_complete()
2895
2896
showardf85a0b72009-10-07 20:48:45 +00002897 def _on_complete(self):
showardd1195652009-12-08 22:21:02 +00002898 self.job.stop_if_necessary()
showardf85a0b72009-10-07 20:48:45 +00002899 if not self.execution_subdir:
2900 return
2901 # unregister any possible pidfiles associated with this queue entry
2902 for pidfile_name in _ALL_PIDFILE_NAMES:
2903 pidfile_id = _drone_manager.get_pidfile_id_from(
2904 self.execution_path(), pidfile_name=pidfile_name)
2905 _drone_manager.unregister_pidfile(pidfile_id)
2906
2907
showardc85c21b2008-11-24 22:17:37 +00002908 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002909 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002910
2911 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2912 self.job.id, self.job.name, hostname, status)
2913 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2914 self.job.id, self.job.name, hostname, status,
2915 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002916 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002917
2918
2919 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002920 if not self.job.is_finished():
2921 return
showard542e8402008-09-19 20:16:18 +00002922
showardc85c21b2008-11-24 22:17:37 +00002923 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002924 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002925 for queue_entry in hosts_queue:
2926 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002927 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002928 queue_entry.status))
2929
2930 summary_text = "\n".join(summary_text)
2931 status_counts = models.Job.objects.get_status_counts(
2932 [self.job.id])[self.job.id]
2933 status = ', '.join('%d %s' % (count, status) for status, count
2934 in status_counts.iteritems())
2935
2936 subject = 'Autotest: Job ID: %s "%s" %s' % (
2937 self.job.id, self.job.name, status)
2938 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2939 self.job.id, self.job.name, status, self._view_job_url(),
2940 summary_text)
showard170873e2009-01-07 00:22:26 +00002941 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002942
2943
showard8cc058f2009-09-08 16:26:33 +00002944 def schedule_pre_job_tasks(self, assigned_host=None):
showard77182562009-06-10 00:16:05 +00002945 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002946 assert assigned_host
2947 # ensure results dir exists for the queue log
showard08356c12009-06-15 20:24:01 +00002948 if self.host_id is None:
2949 self.set_host(assigned_host)
2950 else:
2951 assert assigned_host.id == self.host_id
mbligh36768f02008-02-22 18:28:33 +00002952
showard2ca64c92009-12-10 21:41:02 +00002953 logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",
showardb18134f2009-03-20 20:52:18 +00002954 self.job.name, self.meta_host, self.atomic_group_id,
showard2ca64c92009-12-10 21:41:02 +00002955 self.job.id, self.id, self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002956
showard8cc058f2009-09-08 16:26:33 +00002957 self._do_schedule_pre_job_tasks()
showard77182562009-06-10 00:16:05 +00002958
2959
showard8cc058f2009-09-08 16:26:33 +00002960 def _do_schedule_pre_job_tasks(self):
showard77182562009-06-10 00:16:05 +00002961 # Every host goes thru the Verifying stage (which may or may not
2962 # actually do anything as determined by get_pre_job_tasks).
2963 self.set_status(models.HostQueueEntry.Status.VERIFYING)
showard8cc058f2009-09-08 16:26:33 +00002964 self.job.schedule_pre_job_tasks(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002965
showard6ae5ea92009-02-25 00:11:51 +00002966
jadmanski0afbb632008-06-06 21:10:57 +00002967 def requeue(self):
showardcfd4a7e2009-07-11 01:47:33 +00002968 assert self.host
showard8cc058f2009-09-08 16:26:33 +00002969 self.set_status(models.HostQueueEntry.Status.QUEUED)
showard12f3e322009-05-13 21:27:42 +00002970 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002971 # verify/cleanup failure sets the execution subdir, so reset it here
2972 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002973 if self.meta_host:
2974 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002975
2976
jadmanskif7fa2cc2008-10-01 14:13:23 +00002977 @property
2978 def aborted_by(self):
2979 self._load_abort_info()
2980 return self._aborted_by
2981
2982
2983 @property
2984 def aborted_on(self):
2985 self._load_abort_info()
2986 return self._aborted_on
2987
2988
2989 def _load_abort_info(self):
2990 """ Fetch info about who aborted the job. """
2991 if hasattr(self, "_aborted_by"):
2992 return
2993 rows = _db.execute("""
2994 SELECT users.login, aborted_host_queue_entries.aborted_on
2995 FROM aborted_host_queue_entries
2996 INNER JOIN users
2997 ON users.id = aborted_host_queue_entries.aborted_by_id
2998 WHERE aborted_host_queue_entries.queue_entry_id = %s
2999 """, (self.id,))
3000 if rows:
3001 self._aborted_by, self._aborted_on = rows[0]
3002 else:
3003 self._aborted_by = self._aborted_on = None
3004
3005
showardb2e2c322008-10-14 17:33:55 +00003006 def on_pending(self):
3007 """
3008 Called when an entry in a synchronous job has passed verify. If the
showard8cc058f2009-09-08 16:26:33 +00003009 job is ready to run, sets the entries to STARTING. Otherwise, it leaves
3010 them in PENDING.
showardb2e2c322008-10-14 17:33:55 +00003011 """
showard8cc058f2009-09-08 16:26:33 +00003012 self.set_status(models.HostQueueEntry.Status.PENDING)
3013 self.host.set_status(models.Host.Status.PENDING)
showardb000a8d2009-07-28 20:02:07 +00003014
3015 # Some debug code here: sends an email if an asynchronous job does not
3016 # immediately enter Starting.
3017 # TODO: Remove this once we figure out why asynchronous jobs are getting
3018 # stuck in Pending.
showard8cc058f2009-09-08 16:26:33 +00003019 self.job.run_if_ready(queue_entry=self)
3020 if (self.job.synch_count == 1 and
3021 self.status == models.HostQueueEntry.Status.PENDING):
showardb000a8d2009-07-28 20:02:07 +00003022 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
3023 message = 'Asynchronous job stuck in Pending'
3024 email_manager.manager.enqueue_notify_email(subject, message)
showardb2e2c322008-10-14 17:33:55 +00003025
3026
showardd3dc1992009-04-22 21:01:40 +00003027 def abort(self, dispatcher):
3028 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00003029
showardd3dc1992009-04-22 21:01:40 +00003030 Status = models.HostQueueEntry.Status
showard8cc058f2009-09-08 16:26:33 +00003031 if self.status in (Status.GATHERING, Status.PARSING):
showardd3dc1992009-04-22 21:01:40 +00003032 # do nothing; post-job tasks will finish and then mark this entry
3033 # with status "Aborted" and take care of the host
3034 return
3035
showard8cc058f2009-09-08 16:26:33 +00003036 if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING):
3037 assert not dispatcher.get_agents_for_entry(self)
showardd3dc1992009-04-22 21:01:40 +00003038 self.host.set_status(models.Host.Status.READY)
3039 elif self.status == Status.VERIFYING:
showard8cc058f2009-09-08 16:26:33 +00003040 models.SpecialTask.objects.create(
3041 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +00003042 host=models.Host.objects.get(id=self.host.id),
3043 requested_by=self.job.owner_model())
showardd3dc1992009-04-22 21:01:40 +00003044
3045 self.set_status(Status.ABORTED)
showardd2014822009-10-12 20:26:58 +00003046 self.job.abort_delay_ready_task()
showard170873e2009-01-07 00:22:26 +00003047
showard8cc058f2009-09-08 16:26:33 +00003048
3049 def get_group_name(self):
3050 atomic_group = self.atomic_group
3051 if not atomic_group:
3052 return ''
3053
3054 # Look at any meta_host and dependency labels and pick the first
3055 # one that also specifies this atomic group. Use that label name
3056 # as the group name if possible (it is more specific).
3057 for label in self.get_labels():
3058 if label.atomic_group_id:
3059 assert label.atomic_group_id == atomic_group.id
3060 return label.name
3061 return atomic_group.name
3062
3063
showard170873e2009-01-07 00:22:26 +00003064 def execution_tag(self):
3065 assert self.execution_subdir
mblighe7d9c602009-07-02 19:02:33 +00003066 return "%s/%s" % (self.job.tag(), self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00003067
3068
showarded2afea2009-07-07 20:54:07 +00003069 def execution_path(self):
3070 return self.execution_tag()
3071
3072
showarda9545c02009-12-18 22:44:26 +00003073 def set_started_on_now(self):
3074 self.update_field('started_on', datetime.datetime.now())
3075
3076
3077 def is_hostless(self):
3078 return (self.host_id is None
3079 and self.meta_host is None
3080 and self.atomic_group_id is None)
3081
3082
mbligh36768f02008-02-22 18:28:33 +00003083class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00003084 _table_name = 'jobs'
3085 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
3086 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00003087 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00003088 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00003089
showard77182562009-06-10 00:16:05 +00003090 # This does not need to be a column in the DB. The delays are likely to
3091 # be configured short. If the scheduler is stopped and restarted in
3092 # the middle of a job's delay cycle, the delay cycle will either be
3093 # repeated or skipped depending on the number of Pending machines found
3094 # when the restarted scheduler recovers to track it. Not a problem.
3095 #
3096 # A reference to the DelayedCallTask that will wake up the job should
3097 # no other HQEs change state in time. Its end_time attribute is used
3098 # by our run_with_ready_delay() method to determine if the wait is over.
3099 _delay_ready_task = None
3100
3101 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
3102 # all status='Pending' atomic group HQEs incase a delay was running when the
3103 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00003104
showarda3c58572009-03-12 20:36:59 +00003105 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00003106 assert id or row
showarda3c58572009-03-12 20:36:59 +00003107 super(Job, self).__init__(id=id, row=row, **kwargs)
showard9bb960b2009-11-19 01:02:11 +00003108 self._owner_model = None # caches model instance of owner
3109
3110
3111 def owner_model(self):
3112 # work around the fact that the Job owner field is a string, not a
3113 # foreign key
3114 if not self._owner_model:
3115 self._owner_model = models.User.objects.get(login=self.owner)
3116 return self._owner_model
mbligh36768f02008-02-22 18:28:33 +00003117
mblighe2586682008-02-29 22:45:46 +00003118
jadmanski0afbb632008-06-06 21:10:57 +00003119 def is_server_job(self):
3120 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00003121
3122
showard170873e2009-01-07 00:22:26 +00003123 def tag(self):
3124 return "%s-%s" % (self.id, self.owner)
3125
3126
jadmanski0afbb632008-06-06 21:10:57 +00003127 def get_host_queue_entries(self):
3128 rows = _db.execute("""
3129 SELECT * FROM host_queue_entries
3130 WHERE job_id= %s
3131 """, (self.id,))
3132 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00003133
jadmanski0afbb632008-06-06 21:10:57 +00003134 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00003135
jadmanski0afbb632008-06-06 21:10:57 +00003136 return entries
mbligh36768f02008-02-22 18:28:33 +00003137
3138
jadmanski0afbb632008-06-06 21:10:57 +00003139 def set_status(self, status, update_queues=False):
3140 self.update_field('status',status)
3141
3142 if update_queues:
3143 for queue_entry in self.get_host_queue_entries():
3144 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00003145
3146
showard77182562009-06-10 00:16:05 +00003147 def _atomic_and_has_started(self):
3148 """
3149 @returns True if any of the HostQueueEntries associated with this job
3150 have entered the Status.STARTING state or beyond.
3151 """
3152 atomic_entries = models.HostQueueEntry.objects.filter(
3153 job=self.id, atomic_group__isnull=False)
3154 if atomic_entries.count() <= 0:
3155 return False
3156
showardaf8b4ca2009-06-16 18:47:26 +00003157 # These states may *only* be reached if Job.run() has been called.
3158 started_statuses = (models.HostQueueEntry.Status.STARTING,
3159 models.HostQueueEntry.Status.RUNNING,
3160 models.HostQueueEntry.Status.COMPLETED)
3161
3162 started_entries = atomic_entries.filter(status__in=started_statuses)
showard77182562009-06-10 00:16:05 +00003163 return started_entries.count() > 0
3164
3165
showard708b3522009-08-20 23:26:15 +00003166 def _hosts_assigned_count(self):
3167 """The number of HostQueueEntries assigned a Host for this job."""
3168 entries = models.HostQueueEntry.objects.filter(job=self.id,
3169 host__isnull=False)
3170 return entries.count()
3171
3172
showard77182562009-06-10 00:16:05 +00003173 def _pending_count(self):
3174 """The number of HostQueueEntries for this job in the Pending state."""
3175 pending_entries = models.HostQueueEntry.objects.filter(
3176 job=self.id, status=models.HostQueueEntry.Status.PENDING)
3177 return pending_entries.count()
3178
3179
showardd07a5f32009-12-07 19:36:20 +00003180 def _max_hosts_needed_to_run(self, atomic_group):
showardd2014822009-10-12 20:26:58 +00003181 """
3182 @param atomic_group: The AtomicGroup associated with this job that we
showardd07a5f32009-12-07 19:36:20 +00003183 are using to set an upper bound on the threshold.
3184 @returns The maximum number of HostQueueEntries assigned a Host before
showardd2014822009-10-12 20:26:58 +00003185 this job can run.
3186 """
3187 return min(self._hosts_assigned_count(),
3188 atomic_group.max_number_of_machines)
3189
3190
showardd07a5f32009-12-07 19:36:20 +00003191 def _min_hosts_needed_to_run(self):
3192 """Return the minumum number of hsots needed to run this job."""
3193 return self.synch_count
3194
3195
jadmanski0afbb632008-06-06 21:10:57 +00003196 def is_ready(self):
showard77182562009-06-10 00:16:05 +00003197 # NOTE: Atomic group jobs stop reporting ready after they have been
3198 # started to avoid launching multiple copies of one atomic job.
3199 # Only possible if synch_count is less than than half the number of
3200 # machines in the atomic group.
showardb000a8d2009-07-28 20:02:07 +00003201 pending_count = self._pending_count()
3202 atomic_and_has_started = self._atomic_and_has_started()
3203 ready = (pending_count >= self.synch_count
showardd2014822009-10-12 20:26:58 +00003204 and not atomic_and_has_started)
showardb000a8d2009-07-28 20:02:07 +00003205
3206 if not ready:
3207 logging.info(
3208 'Job %s not ready: %s pending, %s required '
3209 '(Atomic and started: %s)',
3210 self, pending_count, self.synch_count,
3211 atomic_and_has_started)
3212
3213 return ready
mbligh36768f02008-02-22 18:28:33 +00003214
3215
jadmanski0afbb632008-06-06 21:10:57 +00003216 def num_machines(self, clause = None):
3217 sql = "job_id=%s" % self.id
3218 if clause:
3219 sql += " AND (%s)" % clause
3220 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00003221
3222
jadmanski0afbb632008-06-06 21:10:57 +00003223 def num_queued(self):
3224 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00003225
3226
jadmanski0afbb632008-06-06 21:10:57 +00003227 def num_active(self):
3228 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00003229
3230
jadmanski0afbb632008-06-06 21:10:57 +00003231 def num_complete(self):
3232 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00003233
3234
jadmanski0afbb632008-06-06 21:10:57 +00003235 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00003236 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00003237
mbligh36768f02008-02-22 18:28:33 +00003238
showard6bb7c292009-01-30 01:44:51 +00003239 def _not_yet_run_entries(self, include_verifying=True):
3240 statuses = [models.HostQueueEntry.Status.QUEUED,
3241 models.HostQueueEntry.Status.PENDING]
3242 if include_verifying:
3243 statuses.append(models.HostQueueEntry.Status.VERIFYING)
3244 return models.HostQueueEntry.objects.filter(job=self.id,
3245 status__in=statuses)
3246
3247
3248 def _stop_all_entries(self):
3249 entries_to_stop = self._not_yet_run_entries(
3250 include_verifying=False)
3251 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00003252 assert not child_entry.complete, (
3253 '%s status=%s, active=%s, complete=%s' %
3254 (child_entry.id, child_entry.status, child_entry.active,
3255 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00003256 if child_entry.status == models.HostQueueEntry.Status.PENDING:
3257 child_entry.host.status = models.Host.Status.READY
3258 child_entry.host.save()
3259 child_entry.status = models.HostQueueEntry.Status.STOPPED
3260 child_entry.save()
3261
showard2bab8f42008-11-12 18:15:22 +00003262 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00003263 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00003264 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00003265 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00003266
3267
jadmanski0afbb632008-06-06 21:10:57 +00003268 def write_to_machines_file(self, queue_entry):
showarda9545c02009-12-18 22:44:26 +00003269 hostname = queue_entry.host.hostname
showard170873e2009-01-07 00:22:26 +00003270 file_path = os.path.join(self.tag(), '.machines')
3271 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00003272
3273
showardf1ae3542009-05-11 19:26:02 +00003274 def _next_group_name(self, group_name=''):
3275 """@returns a directory name to use for the next host group results."""
3276 if group_name:
3277 # Sanitize for use as a pathname.
3278 group_name = group_name.replace(os.path.sep, '_')
3279 if group_name.startswith('.'):
3280 group_name = '_' + group_name[1:]
3281 # Add a separator between the group name and 'group%d'.
3282 group_name += '.'
3283 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00003284 query = models.HostQueueEntry.objects.filter(
3285 job=self.id).values('execution_subdir').distinct()
3286 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00003287 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
3288 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00003289 if ids:
3290 next_id = max(ids) + 1
3291 else:
3292 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00003293 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00003294
3295
showarddb502762009-09-09 15:31:20 +00003296 def _write_control_file(self, execution_path):
showard170873e2009-01-07 00:22:26 +00003297 control_path = _drone_manager.attach_file_to_execution(
showarddb502762009-09-09 15:31:20 +00003298 execution_path, self.control_file)
showard170873e2009-01-07 00:22:26 +00003299 return control_path
mbligh36768f02008-02-22 18:28:33 +00003300
showardb2e2c322008-10-14 17:33:55 +00003301
showard2bab8f42008-11-12 18:15:22 +00003302 def get_group_entries(self, queue_entry_from_group):
showard8375ce02009-10-12 20:35:13 +00003303 """
3304 @param queue_entry_from_group: A HostQueueEntry instance to find other
3305 group entries on this job for.
3306
3307 @returns A list of HostQueueEntry objects all executing this job as
3308 part of the same group as the one supplied (having the same
3309 execution_subdir).
3310 """
showard2bab8f42008-11-12 18:15:22 +00003311 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00003312 return list(HostQueueEntry.fetch(
3313 where='job_id=%s AND execution_subdir=%s',
3314 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00003315
3316
showard8cc058f2009-09-08 16:26:33 +00003317 def get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00003318 assert queue_entries
showarddb502762009-09-09 15:31:20 +00003319 execution_path = queue_entries[0].execution_path()
3320 control_path = self._write_control_file(execution_path)
showarda9545c02009-12-18 22:44:26 +00003321 hostnames = ','.join(entry.host.hostname
3322 for entry in queue_entries
3323 if not entry.is_hostless())
mbligh36768f02008-02-22 18:28:33 +00003324
showarddb502762009-09-09 15:31:20 +00003325 execution_tag = queue_entries[0].execution_tag()
showard87ba02a2009-04-20 19:37:32 +00003326 params = _autoserv_command_line(
showarded2afea2009-07-07 20:54:07 +00003327 hostnames,
showard87ba02a2009-04-20 19:37:32 +00003328 ['-P', execution_tag, '-n',
3329 _drone_manager.absolute_path(control_path)],
showarde9c69362009-06-30 01:58:03 +00003330 job=self, verbose=False)
mbligh36768f02008-02-22 18:28:33 +00003331
jadmanski0afbb632008-06-06 21:10:57 +00003332 if not self.is_server_job():
3333 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00003334
showardb2e2c322008-10-14 17:33:55 +00003335 return params
mblighe2586682008-02-29 22:45:46 +00003336
mbligh36768f02008-02-22 18:28:33 +00003337
showardc9ae1782009-01-30 01:42:37 +00003338 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00003339 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00003340 return True
showard0fc38302008-10-23 00:44:07 +00003341 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showarda9545c02009-12-18 22:44:26 +00003342 return queue_entry.host.dirty
showardc9ae1782009-01-30 01:42:37 +00003343 return False
showard21baa452008-10-21 00:08:39 +00003344
showardc9ae1782009-01-30 01:42:37 +00003345
showard8cc058f2009-09-08 16:26:33 +00003346 def _should_run_verify(self, queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003347 do_not_verify = (queue_entry.host.protection ==
3348 host_protections.Protection.DO_NOT_VERIFY)
3349 if do_not_verify:
3350 return False
3351 return self.run_verify
3352
3353
showard8cc058f2009-09-08 16:26:33 +00003354 def schedule_pre_job_tasks(self, queue_entry):
showard77182562009-06-10 00:16:05 +00003355 """
3356 Get a list of tasks to perform before the host_queue_entry
3357 may be used to run this Job (such as Cleanup & Verify).
3358
3359 @returns A list of tasks to be done to the given queue_entry before
mbligh6fbdb802009-08-03 16:42:55 +00003360 it should be considered be ready to run this job. The last
showard77182562009-06-10 00:16:05 +00003361 task in the list calls HostQueueEntry.on_pending(), which
3362 continues the flow of the job.
3363 """
showardc9ae1782009-01-30 01:42:37 +00003364 if self._should_run_cleanup(queue_entry):
showard8cc058f2009-09-08 16:26:33 +00003365 task = models.SpecialTask.Task.CLEANUP
3366 elif self._should_run_verify(queue_entry):
3367 task = models.SpecialTask.Task.VERIFY
3368 else:
3369 queue_entry.on_pending()
3370 return
3371
showard9bb960b2009-11-19 01:02:11 +00003372 queue_entry = models.HostQueueEntry.objects.get(id=queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00003373 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00003374 host=models.Host.objects.get(id=queue_entry.host_id),
3375 queue_entry=queue_entry, task=task)
showard21baa452008-10-21 00:08:39 +00003376
3377
showardf1ae3542009-05-11 19:26:02 +00003378 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00003379 if len(queue_entries) == 1:
showarda9545c02009-12-18 22:44:26 +00003380 group_subdir_name = queue_entries[0].host.hostname
showard2bab8f42008-11-12 18:15:22 +00003381 else:
showardf1ae3542009-05-11 19:26:02 +00003382 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00003383 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00003384 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00003385 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003386
3387 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00003388 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003389
3390
3391 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00003392 """
3393 @returns A tuple containing a list of HostQueueEntry instances to be
3394 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00003395 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00003396 """
showard77182562009-06-10 00:16:05 +00003397 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00003398 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00003399 if atomic_group:
3400 num_entries_wanted = atomic_group.max_number_of_machines
3401 else:
3402 num_entries_wanted = self.synch_count
3403 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00003404
showardf1ae3542009-05-11 19:26:02 +00003405 if num_entries_wanted > 0:
3406 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00003407 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00003408 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00003409 params=(self.id, include_queue_entry.id)))
3410
3411 # Sort the chosen hosts by hostname before slicing.
3412 def cmp_queue_entries_by_hostname(entry_a, entry_b):
3413 return Host.cmp_for_sort(entry_a.host, entry_b.host)
3414 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
3415 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00003416
showardf1ae3542009-05-11 19:26:02 +00003417 # Sanity check. We'll only ever be called if this can be met.
showard828fc4c2009-09-14 20:31:00 +00003418 if len(chosen_entries) < self.synch_count:
3419 message = ('job %s got less than %s chosen entries: %s' % (
3420 self.id, self.synch_count, chosen_entries))
3421 logging.error(message)
3422 email_manager.manager.enqueue_notify_email(
3423 'Job not started, too few chosen entries', message)
3424 return []
showardf1ae3542009-05-11 19:26:02 +00003425
showard8cc058f2009-09-08 16:26:33 +00003426 group_name = include_queue_entry.get_group_name()
showardf1ae3542009-05-11 19:26:02 +00003427
3428 self._assign_new_group(chosen_entries, group_name=group_name)
showard8cc058f2009-09-08 16:26:33 +00003429 return chosen_entries
showard2bab8f42008-11-12 18:15:22 +00003430
3431
showard77182562009-06-10 00:16:05 +00003432 def run_if_ready(self, queue_entry):
3433 """
showard8375ce02009-10-12 20:35:13 +00003434 Run this job by kicking its HQEs into status='Starting' if enough
3435 hosts are ready for it to run.
3436
3437 Cleans up by kicking HQEs into status='Stopped' if this Job is not
3438 ready to run.
showard77182562009-06-10 00:16:05 +00003439 """
showardb2e2c322008-10-14 17:33:55 +00003440 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00003441 self.stop_if_necessary()
showard8cc058f2009-09-08 16:26:33 +00003442 elif queue_entry.atomic_group:
3443 self.run_with_ready_delay(queue_entry)
3444 else:
3445 self.run(queue_entry)
showard77182562009-06-10 00:16:05 +00003446
3447
3448 def run_with_ready_delay(self, queue_entry):
3449 """
3450 Start a delay to wait for more hosts to enter Pending state before
3451 launching an atomic group job. Once set, the a delay cannot be reset.
3452
3453 @param queue_entry: The HostQueueEntry object to get atomic group
3454 info from and pass to run_if_ready when the delay is up.
3455
3456 @returns An Agent to run the job as appropriate or None if a delay
3457 has already been set.
3458 """
3459 assert queue_entry.job_id == self.id
3460 assert queue_entry.atomic_group
3461 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
showardd2014822009-10-12 20:26:58 +00003462 over_max_threshold = (self._pending_count() >=
showardd07a5f32009-12-07 19:36:20 +00003463 self._max_hosts_needed_to_run(queue_entry.atomic_group))
showard77182562009-06-10 00:16:05 +00003464 delay_expired = (self._delay_ready_task and
3465 time.time() >= self._delay_ready_task.end_time)
3466
3467 # Delay is disabled or we already have enough? Do not wait to run.
3468 if not delay or over_max_threshold or delay_expired:
showard8cc058f2009-09-08 16:26:33 +00003469 self.run(queue_entry)
3470 else:
3471 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
showard77182562009-06-10 00:16:05 +00003472
showard8cc058f2009-09-08 16:26:33 +00003473
showardd07a5f32009-12-07 19:36:20 +00003474 def request_abort(self):
3475 """Request that this Job be aborted on the next scheduler cycle."""
3476 queue_entries = HostQueueEntry.fetch(where="job_id=%s" % self.id)
3477 for hqe in queue_entries:
3478 hqe.update_field('aborted', True)
3479
3480
showard8cc058f2009-09-08 16:26:33 +00003481 def schedule_delayed_callback_task(self, queue_entry):
3482 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
3483
showard77182562009-06-10 00:16:05 +00003484 if self._delay_ready_task:
3485 return None
3486
showard8cc058f2009-09-08 16:26:33 +00003487 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
3488
showard77182562009-06-10 00:16:05 +00003489 def run_job_after_delay():
showardd2014822009-10-12 20:26:58 +00003490 logging.info('Job %s done waiting for extra hosts.', self)
3491 # Check to see if the job is still relevant. It could have aborted
3492 # while we were waiting or hosts could have disappearred, etc.
showardd07a5f32009-12-07 19:36:20 +00003493 if self._pending_count() < self._min_hosts_needed_to_run():
showardd2014822009-10-12 20:26:58 +00003494 logging.info('Job %s had too few Pending hosts after waiting '
3495 'for extras. Not running.', self)
showardd07a5f32009-12-07 19:36:20 +00003496 self.request_abort()
showardd2014822009-10-12 20:26:58 +00003497 return
showard77182562009-06-10 00:16:05 +00003498 return self.run(queue_entry)
3499
showard708b3522009-08-20 23:26:15 +00003500 logging.info('Job %s waiting up to %s seconds for more hosts.',
3501 self.id, delay)
showard77182562009-06-10 00:16:05 +00003502 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3503 callback=run_job_after_delay)
showard8cc058f2009-09-08 16:26:33 +00003504 return self._delay_ready_task
showard77182562009-06-10 00:16:05 +00003505
3506
3507 def run(self, queue_entry):
3508 """
3509 @param queue_entry: The HostQueueEntry instance calling this method.
showard77182562009-06-10 00:16:05 +00003510 """
3511 if queue_entry.atomic_group and self._atomic_and_has_started():
3512 logging.error('Job.run() called on running atomic Job %d '
3513 'with HQE %s.', self.id, queue_entry)
showard8cc058f2009-09-08 16:26:33 +00003514 return
3515 queue_entries = self._choose_group_to_run(queue_entry)
showard828fc4c2009-09-14 20:31:00 +00003516 if queue_entries:
3517 self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00003518
3519
showard8cc058f2009-09-08 16:26:33 +00003520 def _finish_run(self, queue_entries):
showardb2ccdda2008-10-28 20:39:05 +00003521 for queue_entry in queue_entries:
showard8cc058f2009-09-08 16:26:33 +00003522 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showardd2014822009-10-12 20:26:58 +00003523 self.abort_delay_ready_task()
3524
3525
3526 def abort_delay_ready_task(self):
3527 """Abort the delayed task associated with this job, if any."""
showard77182562009-06-10 00:16:05 +00003528 if self._delay_ready_task:
3529 # Cancel any pending callback that would try to run again
3530 # as we are already running.
3531 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003532
showardd2014822009-10-12 20:26:58 +00003533
showardb000a8d2009-07-28 20:02:07 +00003534 def __str__(self):
3535 return '%s-%s' % (self.id, self.owner)
3536
3537
mbligh36768f02008-02-22 18:28:33 +00003538if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003539 main()