blob: b917effc8a69f7f348a60cc73abb1b766cdf2dfd [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showard136e6dc2009-06-10 19:38:49 +000010import itertools, logging, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard043c62a2009-06-10 19:48:57 +000013from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard136e6dc2009-06-10 19:38:49 +000015from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000016from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000017from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000018from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000019from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000020from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000021from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000022
showard549afad2009-08-20 23:33:36 +000023BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
24PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000025
mbligh36768f02008-02-22 18:28:33 +000026RESULTS_DIR = '.'
27AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000028DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000029AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
30
31if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000032 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000033AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
34AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
35
36if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000037 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000038
showardd3dc1992009-04-22 21:01:40 +000039_AUTOSERV_PID_FILE = '.autoserv_execute'
40_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
41_PARSER_PID_FILE = '.parser_execute'
42
43_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
44 _PARSER_PID_FILE)
45
showard35162b02009-03-03 02:17:30 +000046# error message to leave in results dir when an autoserv process disappears
47# mysteriously
48_LOST_PROCESS_ERROR = """\
49Autoserv failed abnormally during execution for this job, probably due to a
50system error on the Autotest server. Full results may not be available. Sorry.
51"""
52
mbligh6f8bab42008-02-29 22:45:14 +000053_db = None
mbligh36768f02008-02-22 18:28:33 +000054_shutdown = False
showard170873e2009-01-07 00:22:26 +000055_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
56_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000057_testing_mode = False
showard542e8402008-09-19 20:16:18 +000058_base_url = None
showardc85c21b2008-11-24 22:17:37 +000059_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000060_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000061
62
showardec6a3b92009-09-25 20:29:13 +000063def _get_pidfile_timeout_secs():
64 """@returns How long to wait for autoserv to write pidfile."""
65 pidfile_timeout_mins = global_config.global_config.get_config_value(
66 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
67 return pidfile_timeout_mins * 60
68
69
mbligh83c1e9e2009-05-01 23:10:41 +000070def _site_init_monitor_db_dummy():
71 return {}
72
73
mbligh36768f02008-02-22 18:28:33 +000074def main():
showard27f33872009-04-07 18:20:53 +000075 try:
showard549afad2009-08-20 23:33:36 +000076 try:
77 main_without_exception_handling()
78 except SystemExit:
79 raise
80 except:
81 logging.exception('Exception escaping in monitor_db')
82 raise
83 finally:
84 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000085
86
87def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000088 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000089
showard136e6dc2009-06-10 19:38:49 +000090 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000091 parser = optparse.OptionParser(usage)
92 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
93 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000094 parser.add_option('--test', help='Indicate that scheduler is under ' +
95 'test and should use dummy autoserv and no parsing',
96 action='store_true')
97 (options, args) = parser.parse_args()
98 if len(args) != 1:
99 parser.print_usage()
100 return
mbligh36768f02008-02-22 18:28:33 +0000101
showard5613c662009-06-08 23:30:33 +0000102 scheduler_enabled = global_config.global_config.get_config_value(
103 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
104
105 if not scheduler_enabled:
106 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
107 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000108 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000109 sys.exit(1)
110
jadmanski0afbb632008-06-06 21:10:57 +0000111 global RESULTS_DIR
112 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000113
mbligh83c1e9e2009-05-01 23:10:41 +0000114 site_init = utils.import_site_function(__file__,
115 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
116 _site_init_monitor_db_dummy)
117 site_init()
118
showardcca334f2009-03-12 20:38:34 +0000119 # Change the cwd while running to avoid issues incase we were launched from
120 # somewhere odd (such as a random NFS home directory of the person running
121 # sudo to launch us as the appropriate user).
122 os.chdir(RESULTS_DIR)
123
jadmanski0afbb632008-06-06 21:10:57 +0000124 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000125 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
126 "notify_email_statuses",
127 default='')
showardc85c21b2008-11-24 22:17:37 +0000128 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000129 _notify_email_statuses = [status for status in
130 re.split(r'[\s,;:]', notify_statuses_list.lower())
131 if status]
showardc85c21b2008-11-24 22:17:37 +0000132
jadmanski0afbb632008-06-06 21:10:57 +0000133 if options.test:
134 global _autoserv_path
135 _autoserv_path = 'autoserv_dummy'
136 global _testing_mode
137 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000138
mbligh37eceaa2008-12-15 22:56:37 +0000139 # AUTOTEST_WEB.base_url is still a supported config option as some people
140 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000141 global _base_url
showard170873e2009-01-07 00:22:26 +0000142 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
143 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000144 if config_base_url:
145 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000146 else:
mbligh37eceaa2008-12-15 22:56:37 +0000147 # For the common case of everything running on a single server you
148 # can just set the hostname in a single place in the config file.
149 server_name = c.get_config_value('SERVER', 'hostname')
150 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000151 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000152 sys.exit(1)
153 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000154
showardc5afc462009-01-13 00:09:39 +0000155 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000156 server.start()
157
jadmanski0afbb632008-06-06 21:10:57 +0000158 try:
showard136e6dc2009-06-10 19:38:49 +0000159 init()
showardc5afc462009-01-13 00:09:39 +0000160 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000161 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000162
jadmanski0afbb632008-06-06 21:10:57 +0000163 while not _shutdown:
164 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000165 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000166 except:
showard170873e2009-01-07 00:22:26 +0000167 email_manager.manager.log_stacktrace(
168 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000169
showard170873e2009-01-07 00:22:26 +0000170 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000171 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000172 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000173 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000174
175
showard136e6dc2009-06-10 19:38:49 +0000176def setup_logging():
177 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
178 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
179 logging_manager.configure_logging(
180 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
181 logfile_name=log_name)
182
183
mbligh36768f02008-02-22 18:28:33 +0000184def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000185 global _shutdown
186 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000187 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000188
189
showard136e6dc2009-06-10 19:38:49 +0000190def init():
showardb18134f2009-03-20 20:52:18 +0000191 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
192 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000193
showard8de37132009-08-31 18:33:08 +0000194 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000195 logging.critical("monitor_db already running, aborting!")
196 sys.exit(1)
197 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000198
showardb1e51872008-10-07 11:08:18 +0000199 if _testing_mode:
200 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000201 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000202
jadmanski0afbb632008-06-06 21:10:57 +0000203 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
204 global _db
showard170873e2009-01-07 00:22:26 +0000205 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000206 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000207
showardfa8629c2008-11-04 16:51:23 +0000208 # ensure Django connection is in autocommit
209 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000210 # bypass the readonly connection
211 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000212
showardb18134f2009-03-20 20:52:18 +0000213 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000214 signal.signal(signal.SIGINT, handle_sigint)
215
showardd1ee1dd2009-01-07 21:33:08 +0000216 drones = global_config.global_config.get_config_value(
217 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
218 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000219 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000220 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000221 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
222
showardb18134f2009-03-20 20:52:18 +0000223 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000224
225
showarded2afea2009-07-07 20:54:07 +0000226def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
227 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000228 """
229 @returns The autoserv command line as a list of executable + parameters.
230
231 @param machines - string - A machine or comma separated list of machines
232 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000233 @param extra_args - list - Additional arguments to pass to autoserv.
234 @param job - Job object - If supplied, -u owner and -l name parameters
235 will be added.
236 @param queue_entry - A HostQueueEntry object - If supplied and no Job
237 object was supplied, this will be used to lookup the Job object.
238 """
showard87ba02a2009-04-20 19:37:32 +0000239 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
showarded2afea2009-07-07 20:54:07 +0000240 '-r', drone_manager.WORKING_DIRECTORY]
showard87ba02a2009-04-20 19:37:32 +0000241 if job or queue_entry:
242 if not job:
243 job = queue_entry.job
244 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000245 if verbose:
246 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000247 return autoserv_argv + extra_args
248
249
showard89f84db2009-03-12 20:39:13 +0000250class SchedulerError(Exception):
251 """Raised by HostScheduler when an inconsistent state occurs."""
252
253
showard63a34772008-08-18 19:32:50 +0000254class HostScheduler(object):
255 def _get_ready_hosts(self):
256 # avoid any host with a currently active queue entry against it
257 hosts = Host.fetch(
258 joins='LEFT JOIN host_queue_entries AS active_hqe '
259 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000260 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000261 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000262 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000263 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
264 return dict((host.id, host) for host in hosts)
265
266
267 @staticmethod
268 def _get_sql_id_list(id_list):
269 return ','.join(str(item_id) for item_id in id_list)
270
271
272 @classmethod
showard989f25d2008-10-01 11:38:11 +0000273 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000274 if not id_list:
275 return {}
showard63a34772008-08-18 19:32:50 +0000276 query %= cls._get_sql_id_list(id_list)
277 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000278 return cls._process_many2many_dict(rows, flip)
279
280
281 @staticmethod
282 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000283 result = {}
284 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000285 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000286 if flip:
287 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000288 result.setdefault(left_id, set()).add(right_id)
289 return result
290
291
292 @classmethod
293 def _get_job_acl_groups(cls, job_ids):
294 query = """
showardd9ac4452009-02-07 02:04:37 +0000295 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000296 FROM jobs
297 INNER JOIN users ON users.login = jobs.owner
298 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
299 WHERE jobs.id IN (%s)
300 """
301 return cls._get_many2many_dict(query, job_ids)
302
303
304 @classmethod
305 def _get_job_ineligible_hosts(cls, job_ids):
306 query = """
307 SELECT job_id, host_id
308 FROM ineligible_host_queues
309 WHERE job_id IN (%s)
310 """
311 return cls._get_many2many_dict(query, job_ids)
312
313
314 @classmethod
showard989f25d2008-10-01 11:38:11 +0000315 def _get_job_dependencies(cls, job_ids):
316 query = """
317 SELECT job_id, label_id
318 FROM jobs_dependency_labels
319 WHERE job_id IN (%s)
320 """
321 return cls._get_many2many_dict(query, job_ids)
322
323
324 @classmethod
showard63a34772008-08-18 19:32:50 +0000325 def _get_host_acls(cls, host_ids):
326 query = """
showardd9ac4452009-02-07 02:04:37 +0000327 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000328 FROM acl_groups_hosts
329 WHERE host_id IN (%s)
330 """
331 return cls._get_many2many_dict(query, host_ids)
332
333
334 @classmethod
335 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000336 if not host_ids:
337 return {}, {}
showard63a34772008-08-18 19:32:50 +0000338 query = """
339 SELECT label_id, host_id
340 FROM hosts_labels
341 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000342 """ % cls._get_sql_id_list(host_ids)
343 rows = _db.execute(query)
344 labels_to_hosts = cls._process_many2many_dict(rows)
345 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
346 return labels_to_hosts, hosts_to_labels
347
348
349 @classmethod
350 def _get_labels(cls):
351 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000352
353
354 def refresh(self, pending_queue_entries):
355 self._hosts_available = self._get_ready_hosts()
356
357 relevant_jobs = [queue_entry.job_id
358 for queue_entry in pending_queue_entries]
359 self._job_acls = self._get_job_acl_groups(relevant_jobs)
360 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000361 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000362
363 host_ids = self._hosts_available.keys()
364 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000365 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
366
367 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000368
369
370 def _is_acl_accessible(self, host_id, queue_entry):
371 job_acls = self._job_acls.get(queue_entry.job_id, set())
372 host_acls = self._host_acls.get(host_id, set())
373 return len(host_acls.intersection(job_acls)) > 0
374
375
showard989f25d2008-10-01 11:38:11 +0000376 def _check_job_dependencies(self, job_dependencies, host_labels):
377 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000378 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000379
380
381 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
382 queue_entry):
showardade14e22009-01-26 22:38:32 +0000383 if not queue_entry.meta_host:
384 # bypass only_if_needed labels when a specific host is selected
385 return True
386
showard989f25d2008-10-01 11:38:11 +0000387 for label_id in host_labels:
388 label = self._labels[label_id]
389 if not label.only_if_needed:
390 # we don't care about non-only_if_needed labels
391 continue
392 if queue_entry.meta_host == label_id:
393 # if the label was requested in a metahost it's OK
394 continue
395 if label_id not in job_dependencies:
396 return False
397 return True
398
399
showard89f84db2009-03-12 20:39:13 +0000400 def _check_atomic_group_labels(self, host_labels, queue_entry):
401 """
402 Determine if the given HostQueueEntry's atomic group settings are okay
403 to schedule on a host with the given labels.
404
showard6157c632009-07-06 20:19:31 +0000405 @param host_labels: A list of label ids that the host has.
406 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000407
408 @returns True if atomic group settings are okay, False otherwise.
409 """
showard6157c632009-07-06 20:19:31 +0000410 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000411 queue_entry.atomic_group_id)
412
413
showard6157c632009-07-06 20:19:31 +0000414 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000415 """
416 Return the atomic group label id for a host with the given set of
417 labels if any, or None otherwise. Raises an exception if more than
418 one atomic group are found in the set of labels.
419
showard6157c632009-07-06 20:19:31 +0000420 @param host_labels: A list of label ids that the host has.
421 @param queue_entry: The HostQueueEntry we're testing. Only used for
422 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000423
424 @returns The id of the atomic group found on a label in host_labels
425 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000426 """
showard6157c632009-07-06 20:19:31 +0000427 atomic_labels = [self._labels[label_id] for label_id in host_labels
428 if self._labels[label_id].atomic_group_id is not None]
429 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000430 if not atomic_ids:
431 return None
432 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000433 logging.error('More than one Atomic Group on HQE "%s" via: %r',
434 queue_entry, atomic_labels)
435 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000436
437
438 def _get_atomic_group_labels(self, atomic_group_id):
439 """
440 Lookup the label ids that an atomic_group is associated with.
441
442 @param atomic_group_id - The id of the AtomicGroup to look up.
443
444 @returns A generator yeilding Label ids for this atomic group.
445 """
446 return (id for id, label in self._labels.iteritems()
447 if label.atomic_group_id == atomic_group_id
448 and not label.invalid)
449
450
showard54c1ea92009-05-20 00:32:58 +0000451 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000452 """
453 @param group_hosts - A sequence of Host ids to test for usability
454 and eligibility against the Job associated with queue_entry.
455 @param queue_entry - The HostQueueEntry that these hosts are being
456 tested for eligibility against.
457
458 @returns A subset of group_hosts Host ids that are eligible for the
459 supplied queue_entry.
460 """
461 return set(host_id for host_id in group_hosts
462 if self._is_host_usable(host_id)
463 and self._is_host_eligible_for_job(host_id, queue_entry))
464
465
showard989f25d2008-10-01 11:38:11 +0000466 def _is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000467 if self._is_host_invalid(host_id):
468 # if an invalid host is scheduled for a job, it's a one-time host
469 # and it therefore bypasses eligibility checks. note this can only
470 # happen for non-metahosts, because invalid hosts have their label
471 # relationships cleared.
472 return True
473
showard989f25d2008-10-01 11:38:11 +0000474 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
475 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000476
showard89f84db2009-03-12 20:39:13 +0000477 return (self._is_acl_accessible(host_id, queue_entry) and
478 self._check_job_dependencies(job_dependencies, host_labels) and
479 self._check_only_if_needed_labels(
480 job_dependencies, host_labels, queue_entry) and
481 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000482
483
showard2924b0a2009-06-18 23:16:15 +0000484 def _is_host_invalid(self, host_id):
485 host_object = self._hosts_available.get(host_id, None)
486 return host_object and host_object.invalid
487
488
showard63a34772008-08-18 19:32:50 +0000489 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000490 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000491 return None
492 return self._hosts_available.pop(queue_entry.host_id, None)
493
494
495 def _is_host_usable(self, host_id):
496 if host_id not in self._hosts_available:
497 # host was already used during this scheduling cycle
498 return False
499 if self._hosts_available[host_id].invalid:
500 # Invalid hosts cannot be used for metahosts. They're included in
501 # the original query because they can be used by non-metahosts.
502 return False
503 return True
504
505
506 def _schedule_metahost(self, queue_entry):
507 label_id = queue_entry.meta_host
508 hosts_in_label = self._label_hosts.get(label_id, set())
509 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
510 set())
511
512 # must iterate over a copy so we can mutate the original while iterating
513 for host_id in list(hosts_in_label):
514 if not self._is_host_usable(host_id):
515 hosts_in_label.remove(host_id)
516 continue
517 if host_id in ineligible_host_ids:
518 continue
showard989f25d2008-10-01 11:38:11 +0000519 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000520 continue
521
showard89f84db2009-03-12 20:39:13 +0000522 # Remove the host from our cached internal state before returning
523 # the host object.
showard63a34772008-08-18 19:32:50 +0000524 hosts_in_label.remove(host_id)
525 return self._hosts_available.pop(host_id)
526 return None
527
528
529 def find_eligible_host(self, queue_entry):
530 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000531 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000532 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000533 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000534 return self._schedule_metahost(queue_entry)
535
536
showard89f84db2009-03-12 20:39:13 +0000537 def find_eligible_atomic_group(self, queue_entry):
538 """
539 Given an atomic group host queue entry, locate an appropriate group
540 of hosts for the associated job to run on.
541
542 The caller is responsible for creating new HQEs for the additional
543 hosts returned in order to run the actual job on them.
544
545 @returns A list of Host instances in a ready state to satisfy this
546 atomic group scheduling. Hosts will all belong to the same
547 atomic group label as specified by the queue_entry.
548 An empty list will be returned if no suitable atomic
549 group could be found.
550
551 TODO(gps): what is responsible for kicking off any attempted repairs on
552 a group of hosts? not this function, but something needs to. We do
553 not communicate that reason for returning [] outside of here...
554 For now, we'll just be unschedulable if enough hosts within one group
555 enter Repair Failed state.
556 """
557 assert queue_entry.atomic_group_id is not None
558 job = queue_entry.job
559 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000560 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000561 if job.synch_count > atomic_group.max_number_of_machines:
562 # Such a Job and HostQueueEntry should never be possible to
563 # create using the frontend. Regardless, we can't process it.
564 # Abort it immediately and log an error on the scheduler.
565 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000566 logging.error(
567 'Error: job %d synch_count=%d > requested atomic_group %d '
568 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
569 job.id, job.synch_count, atomic_group.id,
570 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000571 return []
572 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
573 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
574 set())
575
576 # Look in each label associated with atomic_group until we find one with
577 # enough hosts to satisfy the job.
578 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
579 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
580 if queue_entry.meta_host is not None:
581 # If we have a metahost label, only allow its hosts.
582 group_hosts.intersection_update(hosts_in_label)
583 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000584 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000585 group_hosts, queue_entry)
586
587 # Job.synch_count is treated as "minimum synch count" when
588 # scheduling for an atomic group of hosts. The atomic group
589 # number of machines is the maximum to pick out of a single
590 # atomic group label for scheduling at one time.
591 min_hosts = job.synch_count
592 max_hosts = atomic_group.max_number_of_machines
593
showard54c1ea92009-05-20 00:32:58 +0000594 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000595 # Not enough eligible hosts in this atomic group label.
596 continue
597
showard54c1ea92009-05-20 00:32:58 +0000598 eligible_hosts_in_group = [self._hosts_available[id]
599 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000600 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000601 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000602
showard89f84db2009-03-12 20:39:13 +0000603 # Limit ourselves to scheduling the atomic group size.
604 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000605 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000606
607 # Remove the selected hosts from our cached internal state
608 # of available hosts in order to return the Host objects.
609 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000610 for host in eligible_hosts_in_group:
611 hosts_in_label.discard(host.id)
612 self._hosts_available.pop(host.id)
613 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000614 return host_list
615
616 return []
617
618
showard170873e2009-01-07 00:22:26 +0000619class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000620 def __init__(self):
621 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000622 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000623 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000624 user_cleanup_time = scheduler_config.config.clean_interval
625 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
626 _db, user_cleanup_time)
627 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000628 self._host_agents = {}
629 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000630
mbligh36768f02008-02-22 18:28:33 +0000631
showard915958d2009-04-22 21:00:58 +0000632 def initialize(self, recover_hosts=True):
633 self._periodic_cleanup.initialize()
634 self._24hr_upkeep.initialize()
635
jadmanski0afbb632008-06-06 21:10:57 +0000636 # always recover processes
637 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000638
jadmanski0afbb632008-06-06 21:10:57 +0000639 if recover_hosts:
640 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000641
642
jadmanski0afbb632008-06-06 21:10:57 +0000643 def tick(self):
showard170873e2009-01-07 00:22:26 +0000644 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000645 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000646 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000647 self._process_recurring_runs()
showard8cc058f2009-09-08 16:26:33 +0000648 self._schedule_delay_tasks()
jadmanski0afbb632008-06-06 21:10:57 +0000649 self._schedule_new_jobs()
showard8cc058f2009-09-08 16:26:33 +0000650 self._schedule_running_host_queue_entries()
651 self._schedule_special_tasks()
jadmanski0afbb632008-06-06 21:10:57 +0000652 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000653 _drone_manager.execute_actions()
654 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000655
showard97aed502008-11-04 02:01:24 +0000656
mblighf3294cc2009-04-08 21:17:38 +0000657 def _run_cleanup(self):
658 self._periodic_cleanup.run_cleanup_maybe()
659 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000660
mbligh36768f02008-02-22 18:28:33 +0000661
showard170873e2009-01-07 00:22:26 +0000662 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
663 for object_id in object_ids:
664 agent_dict.setdefault(object_id, set()).add(agent)
665
666
667 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
668 for object_id in object_ids:
669 assert object_id in agent_dict
670 agent_dict[object_id].remove(agent)
671
672
jadmanski0afbb632008-06-06 21:10:57 +0000673 def add_agent(self, agent):
674 self._agents.append(agent)
675 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000676 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
677 self._register_agent_for_ids(self._queue_entry_agents,
678 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000679
showard170873e2009-01-07 00:22:26 +0000680
681 def get_agents_for_entry(self, queue_entry):
682 """
683 Find agents corresponding to the specified queue_entry.
684 """
showardd3dc1992009-04-22 21:01:40 +0000685 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000686
687
688 def host_has_agent(self, host):
689 """
690 Determine if there is currently an Agent present using this host.
691 """
692 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000693
694
jadmanski0afbb632008-06-06 21:10:57 +0000695 def remove_agent(self, agent):
696 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000697 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
698 agent)
699 self._unregister_agent_for_ids(self._queue_entry_agents,
700 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000701
702
showard8cc058f2009-09-08 16:26:33 +0000703 def _host_has_scheduled_special_task(self, host):
704 return bool(models.SpecialTask.objects.filter(host__id=host.id,
705 is_active=False,
706 is_complete=False))
707
708
jadmanski0afbb632008-06-06 21:10:57 +0000709 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000710 self._register_pidfiles()
711 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000712 self._recover_all_recoverable_entries()
showard8cc058f2009-09-08 16:26:33 +0000713 self._recover_pending_entries()
showard6878e8b2009-07-20 22:37:45 +0000714 self._check_for_remaining_active_entries()
showard170873e2009-01-07 00:22:26 +0000715 self._reverify_remaining_hosts()
716 # reinitialize drones after killing orphaned processes, since they can
717 # leave around files when they die
718 _drone_manager.execute_actions()
719 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000720
showard170873e2009-01-07 00:22:26 +0000721
722 def _register_pidfiles(self):
723 # during recovery we may need to read pidfiles for both running and
724 # parsing entries
725 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000726 where="status IN ('Running', 'Gathering', 'Parsing')")
showarded2afea2009-07-07 20:54:07 +0000727 special_tasks = models.SpecialTask.objects.filter(is_active=True)
728 for execution_entry in itertools.chain(queue_entries, special_tasks):
showardd3dc1992009-04-22 21:01:40 +0000729 for pidfile_name in _ALL_PIDFILE_NAMES:
730 pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000731 execution_entry.execution_path(), pidfile_name=pidfile_name)
showardd3dc1992009-04-22 21:01:40 +0000732 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000733
734
showarded2afea2009-07-07 20:54:07 +0000735 def _get_recovery_run_monitor(self, execution_path, pidfile_name, orphans):
736 run_monitor = PidfileRunMonitor()
737 run_monitor.attach_to_existing_process(execution_path,
738 pidfile_name=pidfile_name)
739 if run_monitor.has_process():
740 orphans.discard(run_monitor.get_process())
741 return run_monitor, '(process %s)' % run_monitor.get_process()
742 return None, 'without process'
743
744
showard8cc058f2009-09-08 16:26:33 +0000745 def _get_unassigned_entries(self, status):
746 for entry in HostQueueEntry.fetch(where="status = '%s'" % status):
showard0db3d432009-10-12 20:29:15 +0000747 if entry.status == status and not self.get_agents_for_entry(entry):
748 # The status can change during iteration, e.g., if job.run()
749 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000750 yield entry
751
752
showardd3dc1992009-04-22 21:01:40 +0000753 def _recover_entries_with_status(self, status, orphans, pidfile_name,
754 recover_entries_fn):
showard8cc058f2009-09-08 16:26:33 +0000755 for queue_entry in self._get_unassigned_entries(status):
showardd3dc1992009-04-22 21:01:40 +0000756 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showarded2afea2009-07-07 20:54:07 +0000757 run_monitor, process_string = self._get_recovery_run_monitor(
758 queue_entry.execution_path(), pidfile_name, orphans)
showard8cc058f2009-09-08 16:26:33 +0000759 if not run_monitor:
760 # _schedule_running_host_queue_entries should schedule and
761 # recover these entries
762 continue
showard597bfd32009-05-08 18:22:50 +0000763
showarded2afea2009-07-07 20:54:07 +0000764 logging.info('Recovering %s entry %s %s',status.lower(),
765 ', '.join(str(entry) for entry in queue_entries),
766 process_string)
showardd3dc1992009-04-22 21:01:40 +0000767 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000768
769
showard6878e8b2009-07-20 22:37:45 +0000770 def _check_for_remaining_orphan_processes(self, orphans):
771 if not orphans:
772 return
773 subject = 'Unrecovered orphan autoserv processes remain'
774 message = '\n'.join(str(process) for process in orphans)
775 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000776
777 die_on_orphans = global_config.global_config.get_config_value(
778 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
779
780 if die_on_orphans:
781 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000782
showard170873e2009-01-07 00:22:26 +0000783
showardd3dc1992009-04-22 21:01:40 +0000784 def _recover_running_entries(self, orphans):
785 def recover_entries(job, queue_entries, run_monitor):
showard8cc058f2009-09-08 16:26:33 +0000786 queue_task = QueueTask(job=job, queue_entries=queue_entries,
787 recover_run_monitor=run_monitor)
788 self.add_agent(Agent(task=queue_task,
789 num_processes=len(queue_entries)))
showardd3dc1992009-04-22 21:01:40 +0000790
791 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
showarded2afea2009-07-07 20:54:07 +0000792 orphans, _AUTOSERV_PID_FILE,
showardd3dc1992009-04-22 21:01:40 +0000793 recover_entries)
794
795
796 def _recover_gathering_entries(self, orphans):
797 def recover_entries(job, queue_entries, run_monitor):
798 gather_task = GatherLogsTask(job, queue_entries,
showarded2afea2009-07-07 20:54:07 +0000799 recover_run_monitor=run_monitor)
showard8cc058f2009-09-08 16:26:33 +0000800 self.add_agent(Agent(gather_task))
showardd3dc1992009-04-22 21:01:40 +0000801
802 self._recover_entries_with_status(
803 models.HostQueueEntry.Status.GATHERING,
804 orphans, _CRASHINFO_PID_FILE, recover_entries)
805
806
807 def _recover_parsing_entries(self, orphans):
808 def recover_entries(job, queue_entries, run_monitor):
809 reparse_task = FinalReparseTask(queue_entries,
showarded2afea2009-07-07 20:54:07 +0000810 recover_run_monitor=run_monitor)
showard8cc058f2009-09-08 16:26:33 +0000811 self.add_agent(Agent(reparse_task, num_processes=0))
showardd3dc1992009-04-22 21:01:40 +0000812
813 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
814 orphans, _PARSER_PID_FILE,
815 recover_entries)
816
817
showard8cc058f2009-09-08 16:26:33 +0000818 def _recover_pending_entries(self):
819 for entry in self._get_unassigned_entries(
820 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000821 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000822 entry.on_pending()
823
824
showardd3dc1992009-04-22 21:01:40 +0000825 def _recover_all_recoverable_entries(self):
826 orphans = _drone_manager.get_orphaned_autoserv_processes()
827 self._recover_running_entries(orphans)
828 self._recover_gathering_entries(orphans)
829 self._recover_parsing_entries(orphans)
showarded2afea2009-07-07 20:54:07 +0000830 self._recover_special_tasks(orphans)
showard6878e8b2009-07-20 22:37:45 +0000831 self._check_for_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000832
showard97aed502008-11-04 02:01:24 +0000833
showarded2afea2009-07-07 20:54:07 +0000834 def _recover_special_tasks(self, orphans):
showard2fe3f1d2009-07-06 20:19:11 +0000835 """\
836 Recovers all special tasks that have started running but have not
837 completed.
838 """
839
840 tasks = models.SpecialTask.objects.filter(is_active=True,
841 is_complete=False)
842 # Use ordering to force NULL queue_entry_id's to the end of the list
showarda5288b42009-07-28 20:06:08 +0000843 for task in tasks.order_by('-queue_entry__id'):
showard9b6ec502009-08-20 23:25:17 +0000844 if self.host_has_agent(task.host):
845 raise SchedulerError(
846 "%s already has a host agent %s." % (
showard8cc058f2009-09-08 16:26:33 +0000847 task, self._host_agents.get(task.host.id)))
showard2fe3f1d2009-07-06 20:19:11 +0000848
showarded2afea2009-07-07 20:54:07 +0000849 run_monitor, process_string = self._get_recovery_run_monitor(
850 task.execution_path(), _AUTOSERV_PID_FILE, orphans)
851
852 logging.info('Recovering %s %s', task, process_string)
showard8cc058f2009-09-08 16:26:33 +0000853 self._recover_special_task(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000854
855
showard8cc058f2009-09-08 16:26:33 +0000856 def _recover_special_task(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000857 """\
858 Recovers a single special task.
859 """
860 if task.task == models.SpecialTask.Task.VERIFY:
showard8cc058f2009-09-08 16:26:33 +0000861 agent_task = self._recover_verify(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000862 elif task.task == models.SpecialTask.Task.REPAIR:
showard8cc058f2009-09-08 16:26:33 +0000863 agent_task = self._recover_repair(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000864 elif task.task == models.SpecialTask.Task.CLEANUP:
showard8cc058f2009-09-08 16:26:33 +0000865 agent_task = self._recover_cleanup(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000866 else:
867 # Should never happen
868 logging.error(
869 "Special task id %d had invalid task %s", (task.id, task.task))
870
showard8cc058f2009-09-08 16:26:33 +0000871 self.add_agent(Agent(agent_task))
showard2fe3f1d2009-07-06 20:19:11 +0000872
873
showard8cc058f2009-09-08 16:26:33 +0000874 def _recover_verify(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000875 """\
876 Recovers a verify task.
877 No associated queue entry: Verify host
878 With associated queue entry: Verify host, and run associated queue
879 entry
880 """
showard8cc058f2009-09-08 16:26:33 +0000881 return VerifyTask(task=task, recover_run_monitor=run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000882
883
showard8cc058f2009-09-08 16:26:33 +0000884 def _recover_repair(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000885 """\
886 Recovers a repair task.
887 Always repair host
888 """
showard8cc058f2009-09-08 16:26:33 +0000889 return RepairTask(task=task, recover_run_monitor=run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000890
891
showard8cc058f2009-09-08 16:26:33 +0000892 def _recover_cleanup(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000893 """\
894 Recovers a cleanup task.
895 No associated queue entry: Clean host
896 With associated queue entry: Clean host, verify host if needed, and
897 run associated queue entry
898 """
showard8cc058f2009-09-08 16:26:33 +0000899 return CleanupTask(task=task, recover_run_monitor=run_monitor)
showard6af73ad2009-07-28 20:00:58 +0000900
901
showard6878e8b2009-07-20 22:37:45 +0000902 def _check_for_remaining_active_entries(self):
showard170873e2009-01-07 00:22:26 +0000903 queue_entries = HostQueueEntry.fetch(
showard8cc058f2009-09-08 16:26:33 +0000904 where='active AND NOT complete AND status NOT IN '
905 '("Starting", "Gathering", "Pending")')
showardd3dc1992009-04-22 21:01:40 +0000906
showarde8e37072009-08-20 23:31:30 +0000907 unrecovered_active_hqes = [entry for entry in queue_entries
showard8cc058f2009-09-08 16:26:33 +0000908 if not self.get_agents_for_entry(entry) and
909 not self._host_has_scheduled_special_task(
910 entry.host)]
showarde8e37072009-08-20 23:31:30 +0000911 if unrecovered_active_hqes:
912 message = '\n'.join(str(hqe) for hqe in unrecovered_active_hqes)
913 raise SchedulerError(
914 '%d unrecovered active host queue entries:\n%s' %
915 (len(unrecovered_active_hqes), message))
showard170873e2009-01-07 00:22:26 +0000916
917
showard8cc058f2009-09-08 16:26:33 +0000918 def _schedule_special_tasks(self):
919 tasks = models.SpecialTask.objects.filter(is_active=False,
920 is_complete=False,
921 host__locked=False)
922 # We want lower ids to come first, but the NULL queue_entry_ids need to
923 # come last
924 tasks = tasks.extra(select={'isnull' : 'queue_entry_id IS NULL'})
925 tasks = tasks.extra(order_by=['isnull', 'id'])
showard6d7b2ff2009-06-10 00:16:47 +0000926
showard2fe3f1d2009-07-06 20:19:11 +0000927 for task in tasks:
showard8cc058f2009-09-08 16:26:33 +0000928 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000929 continue
showard6d7b2ff2009-06-10 00:16:47 +0000930
showard8cc058f2009-09-08 16:26:33 +0000931 if task.task == models.SpecialTask.Task.CLEANUP:
932 agent_task = CleanupTask(task=task)
933 elif task.task == models.SpecialTask.Task.VERIFY:
934 agent_task = VerifyTask(task=task)
935 elif task.task == models.SpecialTask.Task.REPAIR:
936 agent_task = RepairTask(task=task)
937 else:
938 email_manager.manager.enqueue_notify_email(
939 'Special task with invalid task', task)
940 continue
941
942 self.add_agent(Agent(agent_task))
showard1ff7b2e2009-05-15 23:17:18 +0000943
944
showard170873e2009-01-07 00:22:26 +0000945 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000946 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000947 # should never happen
showarded2afea2009-07-07 20:54:07 +0000948 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000949 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000950 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000951 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000952 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000953
954
jadmanski0afbb632008-06-06 21:10:57 +0000955 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000956 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +0000957 full_where='locked = 0 AND invalid = 0 AND ' + where
958 for host in Host.fetch(where=full_where):
959 if self.host_has_agent(host):
960 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000961 continue
showard8cc058f2009-09-08 16:26:33 +0000962 if self._host_has_scheduled_special_task(host):
963 # host will have a special task scheduled on the next cycle
964 continue
showard170873e2009-01-07 00:22:26 +0000965 if print_message:
showardb18134f2009-03-20 20:52:18 +0000966 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000967 models.SpecialTask.objects.create(
968 task=models.SpecialTask.Task.CLEANUP,
969 host=models.Host(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000970
971
jadmanski0afbb632008-06-06 21:10:57 +0000972 def _recover_hosts(self):
973 # recover "Repair Failed" hosts
974 message = 'Reverifying dead host %s'
975 self._reverify_hosts_where("status = 'Repair Failed'",
976 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000977
978
showard04c82c52008-05-29 19:38:12 +0000979
showardb95b1bd2008-08-15 18:11:04 +0000980 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000981 # prioritize by job priority, then non-metahost over metahost, then FIFO
982 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000983 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000984 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000985 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000986
987
showard89f84db2009-03-12 20:39:13 +0000988 def _refresh_pending_queue_entries(self):
989 """
990 Lookup the pending HostQueueEntries and call our HostScheduler
991 refresh() method given that list. Return the list.
992
993 @returns A list of pending HostQueueEntries sorted in priority order.
994 """
showard63a34772008-08-18 19:32:50 +0000995 queue_entries = self._get_pending_queue_entries()
996 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000997 return []
showardb95b1bd2008-08-15 18:11:04 +0000998
showard63a34772008-08-18 19:32:50 +0000999 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +00001000
showard89f84db2009-03-12 20:39:13 +00001001 return queue_entries
1002
1003
1004 def _schedule_atomic_group(self, queue_entry):
1005 """
1006 Schedule the given queue_entry on an atomic group of hosts.
1007
1008 Returns immediately if there are insufficient available hosts.
1009
1010 Creates new HostQueueEntries based off of queue_entry for the
1011 scheduled hosts and starts them all running.
1012 """
1013 # This is a virtual host queue entry representing an entire
1014 # atomic group, find a group and schedule their hosts.
1015 group_hosts = self._host_scheduler.find_eligible_atomic_group(
1016 queue_entry)
1017 if not group_hosts:
1018 return
showardcbe6f942009-06-17 19:33:49 +00001019
1020 logging.info('Expanding atomic group entry %s with hosts %s',
1021 queue_entry,
1022 ', '.join(host.hostname for host in group_hosts))
showard89f84db2009-03-12 20:39:13 +00001023 # The first assigned host uses the original HostQueueEntry
1024 group_queue_entries = [queue_entry]
1025 for assigned_host in group_hosts[1:]:
1026 # Create a new HQE for every additional assigned_host.
1027 new_hqe = HostQueueEntry.clone(queue_entry)
1028 new_hqe.save()
1029 group_queue_entries.append(new_hqe)
1030 assert len(group_queue_entries) == len(group_hosts)
1031 for queue_entry, host in itertools.izip(group_queue_entries,
1032 group_hosts):
1033 self._run_queue_entry(queue_entry, host)
1034
1035
1036 def _schedule_new_jobs(self):
1037 queue_entries = self._refresh_pending_queue_entries()
1038 if not queue_entries:
1039 return
1040
showard63a34772008-08-18 19:32:50 +00001041 for queue_entry in queue_entries:
showarde55955f2009-10-07 20:48:58 +00001042 is_unassigned_atomic_group = (
1043 queue_entry.atomic_group_id is not None
1044 and queue_entry.host_id is None)
1045 if is_unassigned_atomic_group:
1046 self._schedule_atomic_group(queue_entry)
1047 else:
showard89f84db2009-03-12 20:39:13 +00001048 assigned_host = self._host_scheduler.find_eligible_host(
1049 queue_entry)
1050 if assigned_host:
1051 self._run_queue_entry(queue_entry, assigned_host)
showardb95b1bd2008-08-15 18:11:04 +00001052
1053
showard8cc058f2009-09-08 16:26:33 +00001054 def _schedule_running_host_queue_entries(self):
1055 entries = HostQueueEntry.fetch(
1056 where="status IN "
1057 "('Starting', 'Running', 'Gathering', 'Parsing')")
1058 for entry in entries:
1059 if self.get_agents_for_entry(entry):
1060 continue
1061
1062 task_entries = entry.job.get_group_entries(entry)
1063 for task_entry in task_entries:
1064 if (task_entry.status != models.HostQueueEntry.Status.PARSING
1065 and self.host_has_agent(task_entry.host)):
1066 agent = self._host_agents.get(task_entry.host.id)[0]
1067 raise SchedulerError('Attempted to schedule on host that '
1068 'already has agent: %s (previous '
1069 'agent task: %s)'
1070 % (task_entry, agent.task))
1071
1072 if entry.status in (models.HostQueueEntry.Status.STARTING,
1073 models.HostQueueEntry.Status.RUNNING):
1074 params = entry.job.get_autoserv_params(task_entries)
1075 agent_task = QueueTask(job=entry.job,
1076 queue_entries=task_entries, cmd=params)
1077 elif entry.status == models.HostQueueEntry.Status.GATHERING:
1078 agent_task = GatherLogsTask(
1079 job=entry.job, queue_entries=task_entries)
1080 elif entry.status == models.HostQueueEntry.Status.PARSING:
1081 agent_task = FinalReparseTask(queue_entries=task_entries)
1082 else:
1083 raise SchedulerError('_schedule_running_host_queue_entries got '
1084 'entry with invalid status %s: %s'
1085 % (entry.status, entry))
1086
1087 self.add_agent(Agent(agent_task, num_processes=len(task_entries)))
1088
1089
1090 def _schedule_delay_tasks(self):
showardd2014822009-10-12 20:26:58 +00001091 for entry in HostQueueEntry.fetch(where='status = "%s"' %
1092 models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +00001093 task = entry.job.schedule_delayed_callback_task(entry)
1094 if task:
1095 self.add_agent(Agent(task, num_processes=0))
1096
1097
showardb95b1bd2008-08-15 18:11:04 +00001098 def _run_queue_entry(self, queue_entry, host):
showard8cc058f2009-09-08 16:26:33 +00001099 queue_entry.schedule_pre_job_tasks(assigned_host=host)
mblighd5c95802008-03-05 00:33:46 +00001100
1101
jadmanski0afbb632008-06-06 21:10:57 +00001102 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +00001103 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001104 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001105 for agent in self.get_agents_for_entry(entry):
1106 agent.abort()
1107 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001108
1109
showard324bf812009-01-20 23:23:38 +00001110 def _can_start_agent(self, agent, num_started_this_cycle,
1111 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001112 # always allow zero-process agents to run
1113 if agent.num_processes == 0:
1114 return True
1115 # don't allow any nonzero-process agents to run after we've reached a
1116 # limit (this avoids starvation of many-process agents)
1117 if have_reached_limit:
1118 return False
1119 # total process throttling
showard324bf812009-01-20 23:23:38 +00001120 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +00001121 return False
1122 # if a single agent exceeds the per-cycle throttling, still allow it to
1123 # run when it's the first agent in the cycle
1124 if num_started_this_cycle == 0:
1125 return True
1126 # per-cycle throttling
1127 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +00001128 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001129 return False
1130 return True
1131
1132
jadmanski0afbb632008-06-06 21:10:57 +00001133 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001134 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001135 have_reached_limit = False
1136 # iterate over copy, so we can remove agents during iteration
1137 for agent in list(self._agents):
showard8cc058f2009-09-08 16:26:33 +00001138 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001139 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001140 have_reached_limit):
1141 have_reached_limit = True
1142 continue
showard4c5374f2008-09-04 17:02:56 +00001143 num_started_this_cycle += agent.num_processes
1144 agent.tick()
showard8cc058f2009-09-08 16:26:33 +00001145 if agent.is_done():
1146 logging.info("agent finished")
1147 self.remove_agent(agent)
showarda9435c02009-05-13 21:28:17 +00001148 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001149 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001150
1151
showard29f7cd22009-04-29 21:16:24 +00001152 def _process_recurring_runs(self):
1153 recurring_runs = models.RecurringRun.objects.filter(
1154 start_date__lte=datetime.datetime.now())
1155 for rrun in recurring_runs:
1156 # Create job from template
1157 job = rrun.job
1158 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001159 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001160
1161 host_objects = info['hosts']
1162 one_time_hosts = info['one_time_hosts']
1163 metahost_objects = info['meta_hosts']
1164 dependencies = info['dependencies']
1165 atomic_group = info['atomic_group']
1166
1167 for host in one_time_hosts or []:
1168 this_host = models.Host.create_one_time_host(host.hostname)
1169 host_objects.append(this_host)
1170
1171 try:
1172 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001173 options=options,
showard29f7cd22009-04-29 21:16:24 +00001174 host_objects=host_objects,
1175 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001176 atomic_group=atomic_group)
1177
1178 except Exception, ex:
1179 logging.exception(ex)
1180 #TODO send email
1181
1182 if rrun.loop_count == 1:
1183 rrun.delete()
1184 else:
1185 if rrun.loop_count != 0: # if not infinite loop
1186 # calculate new start_date
1187 difference = datetime.timedelta(seconds=rrun.loop_period)
1188 rrun.start_date = rrun.start_date + difference
1189 rrun.loop_count -= 1
1190 rrun.save()
1191
1192
showard170873e2009-01-07 00:22:26 +00001193class PidfileRunMonitor(object):
1194 """
1195 Client must call either run() to start a new process or
1196 attach_to_existing_process().
1197 """
mbligh36768f02008-02-22 18:28:33 +00001198
showard170873e2009-01-07 00:22:26 +00001199 class _PidfileException(Exception):
1200 """
1201 Raised when there's some unexpected behavior with the pid file, but only
1202 used internally (never allowed to escape this class).
1203 """
mbligh36768f02008-02-22 18:28:33 +00001204
1205
showard170873e2009-01-07 00:22:26 +00001206 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001207 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001208 self._start_time = None
1209 self.pidfile_id = None
1210 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001211
1212
showard170873e2009-01-07 00:22:26 +00001213 def _add_nice_command(self, command, nice_level):
1214 if not nice_level:
1215 return command
1216 return ['nice', '-n', str(nice_level)] + command
1217
1218
1219 def _set_start_time(self):
1220 self._start_time = time.time()
1221
1222
1223 def run(self, command, working_directory, nice_level=None, log_file=None,
1224 pidfile_name=None, paired_with_pidfile=None):
1225 assert command is not None
1226 if nice_level is not None:
1227 command = ['nice', '-n', str(nice_level)] + command
1228 self._set_start_time()
1229 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001230 command, working_directory, pidfile_name=pidfile_name,
1231 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001232
1233
showarded2afea2009-07-07 20:54:07 +00001234 def attach_to_existing_process(self, execution_path,
showardd3dc1992009-04-22 21:01:40 +00001235 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001236 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001237 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001238 execution_path, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001239 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001240
1241
jadmanski0afbb632008-06-06 21:10:57 +00001242 def kill(self):
showard170873e2009-01-07 00:22:26 +00001243 if self.has_process():
1244 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001245
mbligh36768f02008-02-22 18:28:33 +00001246
showard170873e2009-01-07 00:22:26 +00001247 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001248 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001249 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001250
1251
showard170873e2009-01-07 00:22:26 +00001252 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001253 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001254 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001255 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001256
1257
showard170873e2009-01-07 00:22:26 +00001258 def _read_pidfile(self, use_second_read=False):
1259 assert self.pidfile_id is not None, (
1260 'You must call run() or attach_to_existing_process()')
1261 contents = _drone_manager.get_pidfile_contents(
1262 self.pidfile_id, use_second_read=use_second_read)
1263 if contents.is_invalid():
1264 self._state = drone_manager.PidfileContents()
1265 raise self._PidfileException(contents)
1266 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001267
1268
showard21baa452008-10-21 00:08:39 +00001269 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001270 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1271 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001272 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001273 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001274
1275
1276 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001277 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001278 return
mblighbb421852008-03-11 22:36:16 +00001279
showard21baa452008-10-21 00:08:39 +00001280 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001281
showard170873e2009-01-07 00:22:26 +00001282 if self._state.process is None:
1283 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001284 return
mbligh90a549d2008-03-25 23:52:34 +00001285
showard21baa452008-10-21 00:08:39 +00001286 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001287 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001288 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001289 return
mbligh90a549d2008-03-25 23:52:34 +00001290
showard170873e2009-01-07 00:22:26 +00001291 # pid but no running process - maybe process *just* exited
1292 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001293 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001294 # autoserv exited without writing an exit code
1295 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001296 self._handle_pidfile_error(
1297 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001298
showard21baa452008-10-21 00:08:39 +00001299
1300 def _get_pidfile_info(self):
1301 """\
1302 After completion, self._state will contain:
1303 pid=None, exit_status=None if autoserv has not yet run
1304 pid!=None, exit_status=None if autoserv is running
1305 pid!=None, exit_status!=None if autoserv has completed
1306 """
1307 try:
1308 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001309 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001310 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001311
1312
showard170873e2009-01-07 00:22:26 +00001313 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001314 """\
1315 Called when no pidfile is found or no pid is in the pidfile.
1316 """
showard170873e2009-01-07 00:22:26 +00001317 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001318 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001319 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001320 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001321 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001322
1323
showard35162b02009-03-03 02:17:30 +00001324 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001325 """\
1326 Called when autoserv has exited without writing an exit status,
1327 or we've timed out waiting for autoserv to write a pid to the
1328 pidfile. In either case, we just return failure and the caller
1329 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001330
showard170873e2009-01-07 00:22:26 +00001331 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001332 """
1333 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001334 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001335 self._state.exit_status = 1
1336 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001337
1338
jadmanski0afbb632008-06-06 21:10:57 +00001339 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001340 self._get_pidfile_info()
1341 return self._state.exit_status
1342
1343
1344 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001345 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001346 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001347 if self._state.num_tests_failed is None:
1348 return -1
showard21baa452008-10-21 00:08:39 +00001349 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001350
1351
showardcdaeae82009-08-31 18:32:48 +00001352 def try_copy_results_on_drone(self, **kwargs):
1353 if self.has_process():
1354 # copy results logs into the normal place for job results
1355 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1356
1357
1358 def try_copy_to_results_repository(self, source, **kwargs):
1359 if self.has_process():
1360 _drone_manager.copy_to_results_repository(self.get_process(),
1361 source, **kwargs)
1362
1363
mbligh36768f02008-02-22 18:28:33 +00001364class Agent(object):
showard77182562009-06-10 00:16:05 +00001365 """
showard8cc058f2009-09-08 16:26:33 +00001366 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001367
1368 The following methods are required on all task objects:
1369 poll() - Called periodically to let the task check its status and
1370 update its internal state. If the task succeeded.
1371 is_done() - Returns True if the task is finished.
1372 abort() - Called when an abort has been requested. The task must
1373 set its aborted attribute to True if it actually aborted.
1374
1375 The following attributes are required on all task objects:
1376 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001377 success - bool, True if this task succeeded.
1378 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1379 host_ids - A sequence of Host ids this task represents.
1380
1381 The following attribute is written to all task objects:
1382 agent - A reference to the Agent instance that the task has been
1383 added to.
1384 """
1385
1386
showard8cc058f2009-09-08 16:26:33 +00001387 def __init__(self, task, num_processes=1):
showard77182562009-06-10 00:16:05 +00001388 """
showard8cc058f2009-09-08 16:26:33 +00001389 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001390 @param num_processes: The number of subprocesses the Agent represents.
1391 This is used by the Dispatcher for managing the load on the
1392 system. Defaults to 1.
1393 """
showard8cc058f2009-09-08 16:26:33 +00001394 self.task = task
1395 task.agent = self
1396
showard77182562009-06-10 00:16:05 +00001397 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001398 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001399 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001400
showard8cc058f2009-09-08 16:26:33 +00001401 self.queue_entry_ids = task.queue_entry_ids
1402 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001403
showard8cc058f2009-09-08 16:26:33 +00001404 self.started = False
mbligh36768f02008-02-22 18:28:33 +00001405
1406
jadmanski0afbb632008-06-06 21:10:57 +00001407 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001408 self.started = True
1409 if self.task:
1410 self.task.poll()
1411 if self.task.is_done():
1412 self.task = None
showardec113162008-05-08 00:52:49 +00001413
1414
jadmanski0afbb632008-06-06 21:10:57 +00001415 def is_done(self):
showard8cc058f2009-09-08 16:26:33 +00001416 return self.task is None
mbligh36768f02008-02-22 18:28:33 +00001417
1418
showardd3dc1992009-04-22 21:01:40 +00001419 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001420 if self.task:
1421 self.task.abort()
1422 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001423 # tasks can choose to ignore aborts
showard8cc058f2009-09-08 16:26:33 +00001424 self.task = None
showard20f9bdd2009-04-29 19:48:33 +00001425
showardd3dc1992009-04-22 21:01:40 +00001426
showard77182562009-06-10 00:16:05 +00001427class DelayedCallTask(object):
1428 """
1429 A task object like AgentTask for an Agent to run that waits for the
1430 specified amount of time to have elapsed before calling the supplied
1431 callback once and finishing. If the callback returns anything, it is
1432 assumed to be a new Agent instance and will be added to the dispatcher.
1433
1434 @attribute end_time: The absolute posix time after which this task will
1435 call its callback when it is polled and be finished.
1436
1437 Also has all attributes required by the Agent class.
1438 """
1439 def __init__(self, delay_seconds, callback, now_func=None):
1440 """
1441 @param delay_seconds: The delay in seconds from now that this task
1442 will call the supplied callback and be done.
1443 @param callback: A callable to be called by this task once after at
1444 least delay_seconds time has elapsed. It must return None
1445 or a new Agent instance.
1446 @param now_func: A time.time like function. Default: time.time.
1447 Used for testing.
1448 """
1449 assert delay_seconds > 0
1450 assert callable(callback)
1451 if not now_func:
1452 now_func = time.time
1453 self._now_func = now_func
1454 self._callback = callback
1455
1456 self.end_time = self._now_func() + delay_seconds
1457
1458 # These attributes are required by Agent.
1459 self.aborted = False
showard77182562009-06-10 00:16:05 +00001460 self.host_ids = ()
1461 self.success = False
1462 self.queue_entry_ids = ()
1463 # This is filled in by Agent.add_task().
1464 self.agent = None
1465
1466
1467 def poll(self):
showard8cc058f2009-09-08 16:26:33 +00001468 if not self.is_done() and self._now_func() >= self.end_time:
1469 self._callback()
showard77182562009-06-10 00:16:05 +00001470 self.success = True
1471
1472
1473 def is_done(self):
showard8cc058f2009-09-08 16:26:33 +00001474 return self.success or self.aborted
showard77182562009-06-10 00:16:05 +00001475
1476
1477 def abort(self):
1478 self.aborted = True
showard77182562009-06-10 00:16:05 +00001479
1480
mbligh36768f02008-02-22 18:28:33 +00001481class AgentTask(object):
showard8cc058f2009-09-08 16:26:33 +00001482 def __init__(self, cmd=None, working_directory=None,
showarded2afea2009-07-07 20:54:07 +00001483 pidfile_name=None, paired_with_pidfile=None,
1484 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001485 self.done = False
jadmanski0afbb632008-06-06 21:10:57 +00001486 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001487 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001488 self.agent = None
showarded2afea2009-07-07 20:54:07 +00001489 self.monitor = recover_run_monitor
1490 self.started = bool(recover_run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +00001491 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001492 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001493 self.queue_entry_ids = []
1494 self.host_ids = []
1495 self.log_file = None
1496
1497
1498 def _set_ids(self, host=None, queue_entries=None):
1499 if queue_entries and queue_entries != [None]:
1500 self.host_ids = [entry.host.id for entry in queue_entries]
1501 self.queue_entry_ids = [entry.id for entry in queue_entries]
1502 else:
1503 assert host
1504 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001505
1506
jadmanski0afbb632008-06-06 21:10:57 +00001507 def poll(self):
showard08a36412009-05-05 01:01:13 +00001508 if not self.started:
1509 self.start()
1510 self.tick()
1511
1512
1513 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001514 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001515 exit_code = self.monitor.exit_code()
1516 if exit_code is None:
1517 return
1518 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001519 else:
1520 success = False
mbligh36768f02008-02-22 18:28:33 +00001521
jadmanski0afbb632008-06-06 21:10:57 +00001522 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001523
1524
jadmanski0afbb632008-06-06 21:10:57 +00001525 def is_done(self):
1526 return self.done
mbligh36768f02008-02-22 18:28:33 +00001527
1528
jadmanski0afbb632008-06-06 21:10:57 +00001529 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001530 if self.done:
1531 return
jadmanski0afbb632008-06-06 21:10:57 +00001532 self.done = True
1533 self.success = success
1534 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001535
1536
jadmanski0afbb632008-06-06 21:10:57 +00001537 def prolog(self):
showarded2afea2009-07-07 20:54:07 +00001538 assert not self.monitor
mblighd64e5702008-04-04 21:39:28 +00001539
mbligh36768f02008-02-22 18:28:33 +00001540
jadmanski0afbb632008-06-06 21:10:57 +00001541 def cleanup(self):
showardcdaeae82009-08-31 18:32:48 +00001542 if self.monitor and self.log_file:
1543 self.monitor.try_copy_to_results_repository(self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001544
1545
jadmanski0afbb632008-06-06 21:10:57 +00001546 def epilog(self):
1547 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001548
1549
jadmanski0afbb632008-06-06 21:10:57 +00001550 def start(self):
1551 assert self.agent
1552
1553 if not self.started:
1554 self.prolog()
1555 self.run()
1556
1557 self.started = True
1558
1559
1560 def abort(self):
1561 if self.monitor:
1562 self.monitor.kill()
1563 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001564 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001565 self.cleanup()
1566
1567
showarded2afea2009-07-07 20:54:07 +00001568 def _get_consistent_execution_path(self, execution_entries):
1569 first_execution_path = execution_entries[0].execution_path()
1570 for execution_entry in execution_entries[1:]:
1571 assert execution_entry.execution_path() == first_execution_path, (
1572 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1573 execution_entry,
1574 first_execution_path,
1575 execution_entries[0]))
1576 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001577
1578
showarded2afea2009-07-07 20:54:07 +00001579 def _copy_results(self, execution_entries, use_monitor=None):
1580 """
1581 @param execution_entries: list of objects with execution_path() method
1582 """
showard6d1c1432009-08-20 23:30:39 +00001583 if use_monitor is not None and not use_monitor.has_process():
1584 return
1585
showarded2afea2009-07-07 20:54:07 +00001586 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001587 if use_monitor is None:
1588 assert self.monitor
1589 use_monitor = self.monitor
1590 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001591 execution_path = self._get_consistent_execution_path(execution_entries)
1592 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001593 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001594
showarda1e74b32009-05-12 17:32:04 +00001595
1596 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001597 for queue_entry in queue_entries:
1598 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001599
1600
showarda1e74b32009-05-12 17:32:04 +00001601 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1602 self._copy_results(queue_entries, use_monitor)
1603 self._parse_results(queue_entries)
1604
1605
showardd3dc1992009-04-22 21:01:40 +00001606 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
showarded2afea2009-07-07 20:54:07 +00001607 assert not self.monitor
jadmanski0afbb632008-06-06 21:10:57 +00001608 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001609 self.monitor = PidfileRunMonitor()
1610 self.monitor.run(self.cmd, self._working_directory,
1611 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001612 log_file=self.log_file,
1613 pidfile_name=pidfile_name,
1614 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001615
1616
showardd9205182009-04-27 20:09:55 +00001617class TaskWithJobKeyvals(object):
1618 """AgentTask mixin providing functionality to help with job keyval files."""
1619 _KEYVAL_FILE = 'keyval'
1620 def _format_keyval(self, key, value):
1621 return '%s=%s' % (key, value)
1622
1623
1624 def _keyval_path(self):
1625 """Subclasses must override this"""
1626 raise NotImplemented
1627
1628
1629 def _write_keyval_after_job(self, field, value):
1630 assert self.monitor
1631 if not self.monitor.has_process():
1632 return
1633 _drone_manager.write_lines_to_file(
1634 self._keyval_path(), [self._format_keyval(field, value)],
1635 paired_with_process=self.monitor.get_process())
1636
1637
1638 def _job_queued_keyval(self, job):
1639 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1640
1641
1642 def _write_job_finished(self):
1643 self._write_keyval_after_job("job_finished", int(time.time()))
1644
1645
showarddb502762009-09-09 15:31:20 +00001646 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1647 keyval_contents = '\n'.join(self._format_keyval(key, value)
1648 for key, value in keyval_dict.iteritems())
1649 # always end with a newline to allow additional keyvals to be written
1650 keyval_contents += '\n'
1651 _drone_manager.attach_file_to_execution(self._working_directory,
1652 keyval_contents,
1653 file_path=keyval_path)
1654
1655
1656 def _write_keyvals_before_job(self, keyval_dict):
1657 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1658
1659
1660 def _write_host_keyvals(self, host):
1661 keyval_path = os.path.join(self._working_directory, 'host_keyvals',
1662 host.hostname)
1663 platform, all_labels = host.platform_and_labels()
1664 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1665 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1666
1667
showard8cc058f2009-09-08 16:26:33 +00001668class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001669 """
1670 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1671 """
1672
1673 TASK_TYPE = None
1674 host = None
1675 queue_entry = None
1676
1677 def __init__(self, task, extra_command_args, **kwargs):
showarded2afea2009-07-07 20:54:07 +00001678 assert (self.TASK_TYPE is not None,
1679 'self.TASK_TYPE must be overridden')
showard8cc058f2009-09-08 16:26:33 +00001680
1681 self.host = Host(id=task.host.id)
1682 self.queue_entry = None
1683 if task.queue_entry:
1684 self.queue_entry = HostQueueEntry(id=task.queue_entry.id)
1685
showarded2afea2009-07-07 20:54:07 +00001686 self.task = task
showarddb502762009-09-09 15:31:20 +00001687 kwargs['working_directory'] = task.execution_path()
showarded2afea2009-07-07 20:54:07 +00001688 self._extra_command_args = extra_command_args
1689 super(SpecialAgentTask, self).__init__(**kwargs)
1690
1691
showard8cc058f2009-09-08 16:26:33 +00001692 def _keyval_path(self):
1693 return os.path.join(self._working_directory, self._KEYVAL_FILE)
1694
1695
showarded2afea2009-07-07 20:54:07 +00001696 def prolog(self):
1697 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001698 self.cmd = _autoserv_command_line(self.host.hostname,
1699 self._extra_command_args,
1700 queue_entry=self.queue_entry)
1701 self._working_directory = self.task.execution_path()
1702 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001703 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001704
1705
showardde634ee2009-01-30 01:44:24 +00001706 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001707 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001708
showard2fe3f1d2009-07-06 20:19:11 +00001709 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001710 return # don't fail metahost entries, they'll be reassigned
1711
showard2fe3f1d2009-07-06 20:19:11 +00001712 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001713 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001714 return # entry has been aborted
1715
showard2fe3f1d2009-07-06 20:19:11 +00001716 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001717 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001718 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001719 self._write_keyval_after_job(queued_key, queued_time)
1720 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001721
showard8cc058f2009-09-08 16:26:33 +00001722 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001723 self.monitor.try_copy_results_on_drone(
1724 source_path=self._working_directory + '/',
1725 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001726
showard2fe3f1d2009-07-06 20:19:11 +00001727 self._copy_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001728 self.queue_entry.handle_host_failure()
showard2fe3f1d2009-07-06 20:19:11 +00001729 if self.queue_entry.job.parse_failed_repair:
1730 self._parse_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001731
1732 pidfile_id = _drone_manager.get_pidfile_id_from(
1733 self.queue_entry.execution_path(),
1734 pidfile_name=_AUTOSERV_PID_FILE)
1735 _drone_manager.register_pidfile(pidfile_id)
1736
1737
1738 def cleanup(self):
1739 super(SpecialAgentTask, self).cleanup()
1740 self.task.finish()
showardf85a0b72009-10-07 20:48:45 +00001741 if self.monitor:
1742 if self.monitor.has_process():
1743 self._copy_results([self.task])
1744 if self.monitor.pidfile_id is not None:
1745 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001746
1747
1748class RepairTask(SpecialAgentTask):
1749 TASK_TYPE = models.SpecialTask.Task.REPAIR
1750
1751
1752 def __init__(self, task, recover_run_monitor=None):
1753 """\
1754 queue_entry: queue entry to mark failed if this repair fails.
1755 """
1756 protection = host_protections.Protection.get_string(
1757 task.host.protection)
1758 # normalize the protection name
1759 protection = host_protections.Protection.get_attr_name(protection)
1760
1761 super(RepairTask, self).__init__(
1762 task, ['-R', '--host-protection', protection],
1763 recover_run_monitor=recover_run_monitor)
1764
1765 # *don't* include the queue entry in IDs -- if the queue entry is
1766 # aborted, we want to leave the repair task running
1767 self._set_ids(host=self.host)
1768
1769
1770 def prolog(self):
1771 super(RepairTask, self).prolog()
1772 logging.info("repair_task starting")
1773 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001774
1775
jadmanski0afbb632008-06-06 21:10:57 +00001776 def epilog(self):
1777 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001778
jadmanski0afbb632008-06-06 21:10:57 +00001779 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001780 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001781 else:
showard8cc058f2009-09-08 16:26:33 +00001782 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001783 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001784 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001785
1786
showarded2afea2009-07-07 20:54:07 +00001787class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001788 def _copy_to_results_repository(self):
1789 if not self.queue_entry or self.queue_entry.meta_host:
1790 return
1791
1792 self.queue_entry.set_execution_subdir()
1793 log_name = os.path.basename(self.task.execution_path())
1794 source = os.path.join(self.task.execution_path(), 'debug',
1795 'autoserv.DEBUG')
1796 destination = os.path.join(
1797 self.queue_entry.execution_path(), log_name)
1798
1799 self.monitor.try_copy_to_results_repository(
1800 source, destination_path=destination)
1801
1802
showard170873e2009-01-07 00:22:26 +00001803 def epilog(self):
1804 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001805
showard775300b2009-09-09 15:30:50 +00001806 if self.success:
1807 return
showard8fe93b52008-11-18 17:53:22 +00001808
showard775300b2009-09-09 15:30:50 +00001809 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001810
showard775300b2009-09-09 15:30:50 +00001811 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
1812 return
1813
1814 if self.queue_entry:
1815 self.queue_entry.requeue()
1816
1817 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001818 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001819 queue_entry__id=self.queue_entry.id):
1820 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1821 self._fail_queue_entry()
1822 return
1823
1824 queue_entry = models.HostQueueEntry(id=self.queue_entry.id)
1825 else:
1826 queue_entry = None
1827
1828 models.SpecialTask.objects.create(
1829 host=models.Host(id=self.host.id),
1830 task=models.SpecialTask.Task.REPAIR,
1831 queue_entry=queue_entry)
showard58721a82009-08-20 23:32:40 +00001832
showard8fe93b52008-11-18 17:53:22 +00001833
1834class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001835 TASK_TYPE = models.SpecialTask.Task.VERIFY
1836
1837
showard8cc058f2009-09-08 16:26:33 +00001838 def __init__(self, task, recover_run_monitor=None):
showarded2afea2009-07-07 20:54:07 +00001839 super(VerifyTask, self).__init__(
showard8cc058f2009-09-08 16:26:33 +00001840 task, ['-v'], recover_run_monitor=recover_run_monitor)
mblighe2586682008-02-29 22:45:46 +00001841
showard8cc058f2009-09-08 16:26:33 +00001842 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001843
1844
jadmanski0afbb632008-06-06 21:10:57 +00001845 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001846 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001847
showardb18134f2009-03-20 20:52:18 +00001848 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001849 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001850 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1851 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001852
showarded2afea2009-07-07 20:54:07 +00001853 # Delete any other queued verifies for this host. One verify will do
1854 # and there's no need to keep records of other requests.
1855 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001856 host__id=self.host.id,
1857 task=models.SpecialTask.Task.VERIFY,
showarded2afea2009-07-07 20:54:07 +00001858 is_active=False, is_complete=False)
1859 queued_verifies = queued_verifies.exclude(id=self.task.id)
1860 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001861
mbligh36768f02008-02-22 18:28:33 +00001862
jadmanski0afbb632008-06-06 21:10:57 +00001863 def epilog(self):
1864 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001865 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001866 if self.queue_entry:
1867 self.queue_entry.on_pending()
1868 else:
1869 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001870
1871
showardb5626452009-06-30 01:57:28 +00001872class CleanupHostsMixin(object):
1873 def _reboot_hosts(self, job, queue_entries, final_success,
1874 num_tests_failed):
1875 reboot_after = job.reboot_after
1876 do_reboot = (
1877 # always reboot after aborted jobs
1878 self._final_status == models.HostQueueEntry.Status.ABORTED
1879 or reboot_after == models.RebootAfter.ALWAYS
1880 or (reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED
1881 and final_success and num_tests_failed == 0))
1882
1883 for queue_entry in queue_entries:
1884 if do_reboot:
1885 # don't pass the queue entry to the CleanupTask. if the cleanup
1886 # fails, the job doesn't care -- it's over.
showard8cc058f2009-09-08 16:26:33 +00001887 models.SpecialTask.objects.create(
1888 host=models.Host(id=queue_entry.host.id),
1889 task=models.SpecialTask.Task.CLEANUP)
showardb5626452009-06-30 01:57:28 +00001890 else:
showard8cc058f2009-09-08 16:26:33 +00001891 queue_entry.host.set_status(models.Host.Status.READY)
showardb5626452009-06-30 01:57:28 +00001892
1893
1894class QueueTask(AgentTask, TaskWithJobKeyvals, CleanupHostsMixin):
showard8cc058f2009-09-08 16:26:33 +00001895 def __init__(self, job, queue_entries, cmd=None, recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001896 self.job = job
1897 self.queue_entries = queue_entries
showard8cc058f2009-09-08 16:26:33 +00001898 self.group_name = queue_entries[0].get_group_name()
showarded2afea2009-07-07 20:54:07 +00001899 super(QueueTask, self).__init__(
1900 cmd, self._execution_path(),
1901 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00001902 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001903
1904
showard73ec0442009-02-07 02:05:20 +00001905 def _keyval_path(self):
showarded2afea2009-07-07 20:54:07 +00001906 return os.path.join(self._execution_path(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001907
1908
showarded2afea2009-07-07 20:54:07 +00001909 def _execution_path(self):
1910 return self.queue_entries[0].execution_path()
mblighbb421852008-03-11 22:36:16 +00001911
1912
jadmanski0afbb632008-06-06 21:10:57 +00001913 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00001914 for entry in self.queue_entries:
1915 if entry.status not in (models.HostQueueEntry.Status.STARTING,
1916 models.HostQueueEntry.Status.RUNNING):
1917 raise SchedulerError('Queue task attempting to start '
1918 'entry with invalid status %s: %s'
1919 % (entry.status, entry))
1920 if entry.host.status not in (models.Host.Status.PENDING,
1921 models.Host.Status.RUNNING):
1922 raise SchedulerError('Queue task attempting to start on queue '
1923 'entry with invalid host status %s: %s'
1924 % (entry.host.status, entry))
1925
showardd9205182009-04-27 20:09:55 +00001926 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001927 keyval_dict = {queued_key: queued_time}
1928 if self.group_name:
1929 keyval_dict['host_group_name'] = self.group_name
1930 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001931 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001932 self._write_host_keyvals(queue_entry.host)
showard8cc058f2009-09-08 16:26:33 +00001933 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showard12f3e322009-05-13 21:27:42 +00001934 queue_entry.update_field('started_on', datetime.datetime.now())
showard8cc058f2009-09-08 16:26:33 +00001935 queue_entry.host.set_status(models.Host.Status.RUNNING)
showard21baa452008-10-21 00:08:39 +00001936 queue_entry.host.update_field('dirty', 1)
showardc6a56872009-07-28 20:11:58 +00001937 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
1938 # TODO(gps): Remove this if nothing needs it anymore.
1939 # A potential user is: tko/parser
jadmanski0afbb632008-06-06 21:10:57 +00001940 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001941
1942
showard35162b02009-03-03 02:17:30 +00001943 def _write_lost_process_error_file(self):
showarded2afea2009-07-07 20:54:07 +00001944 error_file_path = os.path.join(self._execution_path(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001945 _drone_manager.write_lines_to_file(error_file_path,
1946 [_LOST_PROCESS_ERROR])
1947
1948
showardd3dc1992009-04-22 21:01:40 +00001949 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001950 if not self.monitor:
1951 return
1952
showardd9205182009-04-27 20:09:55 +00001953 self._write_job_finished()
1954
showard35162b02009-03-03 02:17:30 +00001955 if self.monitor.lost_process:
1956 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001957
showard8cc058f2009-09-08 16:26:33 +00001958 for queue_entry in self.queue_entries:
1959 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001960
1961
showardcbd74612008-11-19 21:42:02 +00001962 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001963 _drone_manager.write_lines_to_file(
showarded2afea2009-07-07 20:54:07 +00001964 os.path.join(self._execution_path(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001965 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001966 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001967
1968
jadmanskif7fa2cc2008-10-01 14:13:23 +00001969 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001970 if not self.monitor or not self.monitor.has_process():
1971 return
1972
jadmanskif7fa2cc2008-10-01 14:13:23 +00001973 # build up sets of all the aborted_by and aborted_on values
1974 aborted_by, aborted_on = set(), set()
1975 for queue_entry in self.queue_entries:
1976 if queue_entry.aborted_by:
1977 aborted_by.add(queue_entry.aborted_by)
1978 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1979 aborted_on.add(t)
1980
1981 # extract some actual, unique aborted by value and write it out
1982 assert len(aborted_by) <= 1
1983 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001984 aborted_by_value = aborted_by.pop()
1985 aborted_on_value = max(aborted_on)
1986 else:
1987 aborted_by_value = 'autotest_system'
1988 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001989
showarda0382352009-02-11 23:36:43 +00001990 self._write_keyval_after_job("aborted_by", aborted_by_value)
1991 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001992
showardcbd74612008-11-19 21:42:02 +00001993 aborted_on_string = str(datetime.datetime.fromtimestamp(
1994 aborted_on_value))
1995 self._write_status_comment('Job aborted by %s on %s' %
1996 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001997
1998
jadmanski0afbb632008-06-06 21:10:57 +00001999 def abort(self):
2000 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00002001 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00002002 self._finish_task()
showard21baa452008-10-21 00:08:39 +00002003
2004
jadmanski0afbb632008-06-06 21:10:57 +00002005 def epilog(self):
2006 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002007 self._finish_task()
2008 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00002009
2010
showardd3dc1992009-04-22 21:01:40 +00002011class PostJobTask(AgentTask):
2012 def __init__(self, queue_entries, pidfile_name, logfile_name,
showarded2afea2009-07-07 20:54:07 +00002013 recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00002014 self._queue_entries = queue_entries
2015 self._pidfile_name = pidfile_name
showardd3dc1992009-04-22 21:01:40 +00002016
showarded2afea2009-07-07 20:54:07 +00002017 self._execution_path = self._get_consistent_execution_path(
2018 queue_entries)
2019 self._results_dir = _drone_manager.absolute_path(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00002020 self._autoserv_monitor = PidfileRunMonitor()
showarded2afea2009-07-07 20:54:07 +00002021 self._autoserv_monitor.attach_to_existing_process(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00002022 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
2023
2024 if _testing_mode:
2025 command = 'true'
2026 else:
2027 command = self._generate_command(self._results_dir)
2028
showarded2afea2009-07-07 20:54:07 +00002029 super(PostJobTask, self).__init__(
2030 cmd=command, working_directory=self._execution_path,
2031 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00002032
showarded2afea2009-07-07 20:54:07 +00002033 self.log_file = os.path.join(self._execution_path, logfile_name)
showardd3dc1992009-04-22 21:01:40 +00002034 self._final_status = self._determine_final_status()
2035
2036
2037 def _generate_command(self, results_dir):
2038 raise NotImplementedError('Subclasses must override this')
2039
2040
2041 def _job_was_aborted(self):
2042 was_aborted = None
2043 for queue_entry in self._queue_entries:
2044 queue_entry.update_from_database()
2045 if was_aborted is None: # first queue entry
2046 was_aborted = bool(queue_entry.aborted)
2047 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
2048 email_manager.manager.enqueue_notify_email(
2049 'Inconsistent abort state',
2050 'Queue entries have inconsistent abort state: ' +
2051 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
2052 # don't crash here, just assume true
2053 return True
2054 return was_aborted
2055
2056
2057 def _determine_final_status(self):
2058 if self._job_was_aborted():
2059 return models.HostQueueEntry.Status.ABORTED
2060
2061 # we'll use a PidfileRunMonitor to read the autoserv exit status
2062 if self._autoserv_monitor.exit_code() == 0:
2063 return models.HostQueueEntry.Status.COMPLETED
2064 return models.HostQueueEntry.Status.FAILED
2065
2066
2067 def run(self):
showard8cc058f2009-09-08 16:26:33 +00002068 # Make sure we actually have results to work with.
2069 # This should never happen in normal operation.
showard5add1c82009-05-26 19:27:46 +00002070 if not self._autoserv_monitor.has_process():
2071 email_manager.manager.enqueue_notify_email(
showard8cc058f2009-09-08 16:26:33 +00002072 'No results in post-job task',
2073 'No results in post-job task at %s' %
2074 self._autoserv_monitor.pidfile_id)
showard5add1c82009-05-26 19:27:46 +00002075 self.finished(False)
2076 return
2077
2078 super(PostJobTask, self).run(
2079 pidfile_name=self._pidfile_name,
2080 paired_with_pidfile=self._paired_with_pidfile)
showardd3dc1992009-04-22 21:01:40 +00002081
2082
2083 def _set_all_statuses(self, status):
2084 for queue_entry in self._queue_entries:
2085 queue_entry.set_status(status)
2086
2087
2088 def abort(self):
2089 # override AgentTask.abort() to avoid killing the process and ending
2090 # the task. post-job tasks continue when the job is aborted.
2091 pass
2092
2093
showardb5626452009-06-30 01:57:28 +00002094class GatherLogsTask(PostJobTask, CleanupHostsMixin):
showardd3dc1992009-04-22 21:01:40 +00002095 """
2096 Task responsible for
2097 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2098 * copying logs to the results repository
2099 * spawning CleanupTasks for hosts, if necessary
2100 * spawning a FinalReparseTask for the job
2101 """
showarded2afea2009-07-07 20:54:07 +00002102 def __init__(self, job, queue_entries, recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00002103 self._job = job
2104 super(GatherLogsTask, self).__init__(
2105 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
showarded2afea2009-07-07 20:54:07 +00002106 logfile_name='.collect_crashinfo.log',
2107 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00002108 self._set_ids(queue_entries=queue_entries)
2109
2110
2111 def _generate_command(self, results_dir):
2112 host_list = ','.join(queue_entry.host.hostname
2113 for queue_entry in self._queue_entries)
2114 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
2115 '-r', results_dir]
2116
2117
2118 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00002119 for queue_entry in self._queue_entries:
2120 if queue_entry.status != models.HostQueueEntry.Status.GATHERING:
2121 raise SchedulerError('Gather task attempting to start on '
2122 'non-gathering entry: %s' % queue_entry)
2123 if queue_entry.host.status != models.Host.Status.RUNNING:
2124 raise SchedulerError('Gather task attempting to start on queue '
showard0c5c18d2009-09-24 22:20:45 +00002125 'entry with non-running host status %s: %s'
2126 % (queue_entry.host.status, queue_entry))
showard8cc058f2009-09-08 16:26:33 +00002127
showardd3dc1992009-04-22 21:01:40 +00002128 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002129
2130
showardd3dc1992009-04-22 21:01:40 +00002131 def epilog(self):
2132 super(GatherLogsTask, self).epilog()
showardb5626452009-06-30 01:57:28 +00002133
showard6d1c1432009-08-20 23:30:39 +00002134 self._copy_and_parse_results(self._queue_entries,
2135 use_monitor=self._autoserv_monitor)
2136
2137 if self._autoserv_monitor.has_process():
2138 final_success = (self._final_status ==
2139 models.HostQueueEntry.Status.COMPLETED)
2140 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2141 else:
2142 final_success = False
2143 num_tests_failed = 0
2144
showardb5626452009-06-30 01:57:28 +00002145 self._reboot_hosts(self._job, self._queue_entries, final_success,
2146 num_tests_failed)
showardd3dc1992009-04-22 21:01:40 +00002147
2148
showard0bbfc212009-04-29 21:06:13 +00002149 def run(self):
showard597bfd32009-05-08 18:22:50 +00002150 autoserv_exit_code = self._autoserv_monitor.exit_code()
2151 # only run if Autoserv exited due to some signal. if we have no exit
2152 # code, assume something bad (and signal-like) happened.
2153 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002154 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002155 else:
2156 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002157
2158
showard8fe93b52008-11-18 17:53:22 +00002159class CleanupTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002160 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2161
2162
showard8cc058f2009-09-08 16:26:33 +00002163 def __init__(self, task, recover_run_monitor=None):
showarded2afea2009-07-07 20:54:07 +00002164 super(CleanupTask, self).__init__(
showard8cc058f2009-09-08 16:26:33 +00002165 task, ['--cleanup'], recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002166
showard8cc058f2009-09-08 16:26:33 +00002167 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mbligh16c722d2008-03-05 00:58:44 +00002168
mblighd5c95802008-03-05 00:33:46 +00002169
jadmanski0afbb632008-06-06 21:10:57 +00002170 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002171 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00002172 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard8cc058f2009-09-08 16:26:33 +00002173 self.host.set_status(models.Host.Status.CLEANING)
2174 if self.queue_entry:
2175 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2176
2177
showard775300b2009-09-09 15:30:50 +00002178 def _finish_epilog(self):
2179 if not self.queue_entry:
2180 return
2181
2182 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
2183 self.queue_entry.on_pending()
2184 elif self.success:
2185 if self.queue_entry.job.run_verify:
2186 entry = models.HostQueueEntry(id=self.queue_entry.id)
2187 models.SpecialTask.objects.create(
2188 host=models.Host(id=self.host.id),
2189 queue_entry=entry,
2190 task=models.SpecialTask.Task.VERIFY)
2191 else:
2192 self.queue_entry.on_pending()
mblighd5c95802008-03-05 00:33:46 +00002193
mblighd5c95802008-03-05 00:33:46 +00002194
showard21baa452008-10-21 00:08:39 +00002195 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00002196 super(CleanupTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002197
showard21baa452008-10-21 00:08:39 +00002198 if self.success:
2199 self.host.update_field('dirty', 0)
showard775300b2009-09-09 15:30:50 +00002200 self.host.set_status(models.Host.Status.READY)
showard21baa452008-10-21 00:08:39 +00002201
showard775300b2009-09-09 15:30:50 +00002202 self._finish_epilog()
showard8cc058f2009-09-08 16:26:33 +00002203
showard21baa452008-10-21 00:08:39 +00002204
showardd3dc1992009-04-22 21:01:40 +00002205class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00002206 _num_running_parses = 0
2207
showarded2afea2009-07-07 20:54:07 +00002208 def __init__(self, queue_entries, recover_run_monitor=None):
2209 super(FinalReparseTask, self).__init__(
2210 queue_entries, pidfile_name=_PARSER_PID_FILE,
2211 logfile_name='.parse.log',
2212 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002213 # don't use _set_ids, since we don't want to set the host_ids
2214 self.queue_entry_ids = [entry.id for entry in queue_entries]
showarded2afea2009-07-07 20:54:07 +00002215 self._parse_started = self.started
showard97aed502008-11-04 02:01:24 +00002216
showard97aed502008-11-04 02:01:24 +00002217
2218 @classmethod
2219 def _increment_running_parses(cls):
2220 cls._num_running_parses += 1
2221
2222
2223 @classmethod
2224 def _decrement_running_parses(cls):
2225 cls._num_running_parses -= 1
2226
2227
2228 @classmethod
2229 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002230 return (cls._num_running_parses <
2231 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002232
2233
2234 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00002235 for queue_entry in self._queue_entries:
2236 if queue_entry.status != models.HostQueueEntry.Status.PARSING:
2237 raise SchedulerError('Parse task attempting to start on '
2238 'non-parsing entry: %s' % queue_entry)
2239
showard97aed502008-11-04 02:01:24 +00002240 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002241
2242
2243 def epilog(self):
2244 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002245 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00002246
2247
showardd3dc1992009-04-22 21:01:40 +00002248 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00002249 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00002250 results_dir]
showard97aed502008-11-04 02:01:24 +00002251
2252
showard08a36412009-05-05 01:01:13 +00002253 def tick(self):
2254 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002255 # and we can, at which point we revert to default behavior
2256 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00002257 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002258 else:
2259 self._try_starting_parse()
2260
2261
2262 def run(self):
2263 # override run() to not actually run unless we can
2264 self._try_starting_parse()
2265
2266
2267 def _try_starting_parse(self):
2268 if not self._can_run_new_parse():
2269 return
showard170873e2009-01-07 00:22:26 +00002270
showard97aed502008-11-04 02:01:24 +00002271 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002272 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00002273
showard97aed502008-11-04 02:01:24 +00002274 self._increment_running_parses()
2275 self._parse_started = True
2276
2277
2278 def finished(self, success):
2279 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00002280 if self._parse_started:
2281 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002282
2283
showarda3c58572009-03-12 20:36:59 +00002284class DBError(Exception):
2285 """Raised by the DBObject constructor when its select fails."""
2286
2287
mbligh36768f02008-02-22 18:28:33 +00002288class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002289 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002290
2291 # Subclasses MUST override these:
2292 _table_name = ''
2293 _fields = ()
2294
showarda3c58572009-03-12 20:36:59 +00002295 # A mapping from (type, id) to the instance of the object for that
2296 # particular id. This prevents us from creating new Job() and Host()
2297 # instances for every HostQueueEntry object that we instantiate as
2298 # multiple HQEs often share the same Job.
2299 _instances_by_type_and_id = weakref.WeakValueDictionary()
2300 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002301
showarda3c58572009-03-12 20:36:59 +00002302
2303 def __new__(cls, id=None, **kwargs):
2304 """
2305 Look to see if we already have an instance for this particular type
2306 and id. If so, use it instead of creating a duplicate instance.
2307 """
2308 if id is not None:
2309 instance = cls._instances_by_type_and_id.get((cls, id))
2310 if instance:
2311 return instance
2312 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2313
2314
2315 def __init__(self, id=None, row=None, new_record=False, always_query=True):
showard8cc058f2009-09-08 16:26:33 +00002316 assert bool(id) or bool(row)
2317 if id is not None and row is not None:
2318 assert id == row[0]
showard6ae5ea92009-02-25 00:11:51 +00002319 assert self._table_name, '_table_name must be defined in your class'
2320 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002321 if not new_record:
2322 if self._initialized and not always_query:
2323 return # We've already been initialized.
2324 if id is None:
2325 id = row[0]
2326 # Tell future constructors to use us instead of re-querying while
2327 # this instance is still around.
2328 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002329
showard6ae5ea92009-02-25 00:11:51 +00002330 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002331
jadmanski0afbb632008-06-06 21:10:57 +00002332 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002333
jadmanski0afbb632008-06-06 21:10:57 +00002334 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002335 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002336
showarda3c58572009-03-12 20:36:59 +00002337 if self._initialized:
2338 differences = self._compare_fields_in_row(row)
2339 if differences:
showard7629f142009-03-27 21:02:02 +00002340 logging.warn(
2341 'initialized %s %s instance requery is updating: %s',
2342 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002343 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002344 self._initialized = True
2345
2346
2347 @classmethod
2348 def _clear_instance_cache(cls):
2349 """Used for testing, clear the internal instance cache."""
2350 cls._instances_by_type_and_id.clear()
2351
2352
showardccbd6c52009-03-21 00:10:21 +00002353 def _fetch_row_from_db(self, row_id):
2354 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2355 rows = _db.execute(sql, (row_id,))
2356 if not rows:
showard76e29d12009-04-15 21:53:10 +00002357 raise DBError("row not found (table=%s, row id=%s)"
2358 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002359 return rows[0]
2360
2361
showarda3c58572009-03-12 20:36:59 +00002362 def _assert_row_length(self, row):
2363 assert len(row) == len(self._fields), (
2364 "table = %s, row = %s/%d, fields = %s/%d" % (
2365 self.__table, row, len(row), self._fields, len(self._fields)))
2366
2367
2368 def _compare_fields_in_row(self, row):
2369 """
showarddae680a2009-10-12 20:26:43 +00002370 Given a row as returned by a SELECT query, compare it to our existing in
2371 memory fields. Fractional seconds are stripped from datetime values
2372 before comparison.
showarda3c58572009-03-12 20:36:59 +00002373
2374 @param row - A sequence of values corresponding to fields named in
2375 The class attribute _fields.
2376
2377 @returns A dictionary listing the differences keyed by field name
2378 containing tuples of (current_value, row_value).
2379 """
2380 self._assert_row_length(row)
2381 differences = {}
showarddae680a2009-10-12 20:26:43 +00002382 datetime_cmp_fmt = '%Y-%m-%d %H:%M:%S' # Leave off the microseconds.
showarda3c58572009-03-12 20:36:59 +00002383 for field, row_value in itertools.izip(self._fields, row):
2384 current_value = getattr(self, field)
showarddae680a2009-10-12 20:26:43 +00002385 if (isinstance(current_value, datetime.datetime)
2386 and isinstance(row_value, datetime.datetime)):
2387 current_value = current_value.strftime(datetime_cmp_fmt)
2388 row_value = row_value.strftime(datetime_cmp_fmt)
showarda3c58572009-03-12 20:36:59 +00002389 if current_value != row_value:
2390 differences[field] = (current_value, row_value)
2391 return differences
showard2bab8f42008-11-12 18:15:22 +00002392
2393
2394 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002395 """
2396 Update our field attributes using a single row returned by SELECT.
2397
2398 @param row - A sequence of values corresponding to fields named in
2399 the class fields list.
2400 """
2401 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002402
showard2bab8f42008-11-12 18:15:22 +00002403 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002404 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002405 setattr(self, field, value)
2406 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002407
showard2bab8f42008-11-12 18:15:22 +00002408 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002409
mblighe2586682008-02-29 22:45:46 +00002410
showardccbd6c52009-03-21 00:10:21 +00002411 def update_from_database(self):
2412 assert self.id is not None
2413 row = self._fetch_row_from_db(self.id)
2414 self._update_fields_from_row(row)
2415
2416
jadmanski0afbb632008-06-06 21:10:57 +00002417 def count(self, where, table = None):
2418 if not table:
2419 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002420
jadmanski0afbb632008-06-06 21:10:57 +00002421 rows = _db.execute("""
2422 SELECT count(*) FROM %s
2423 WHERE %s
2424 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002425
jadmanski0afbb632008-06-06 21:10:57 +00002426 assert len(rows) == 1
2427
2428 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002429
2430
showardd3dc1992009-04-22 21:01:40 +00002431 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002432 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002433
showard2bab8f42008-11-12 18:15:22 +00002434 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002435 return
mbligh36768f02008-02-22 18:28:33 +00002436
mblighf8c624d2008-07-03 16:58:45 +00002437 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002438 _db.execute(query, (value, self.id))
2439
showard2bab8f42008-11-12 18:15:22 +00002440 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002441
2442
jadmanski0afbb632008-06-06 21:10:57 +00002443 def save(self):
2444 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002445 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002446 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002447 values = []
2448 for key in keys:
2449 value = getattr(self, key)
2450 if value is None:
2451 values.append('NULL')
2452 else:
2453 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002454 values_str = ','.join(values)
2455 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2456 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002457 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002458 # Update our id to the one the database just assigned to us.
2459 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002460
2461
jadmanski0afbb632008-06-06 21:10:57 +00002462 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002463 self._instances_by_type_and_id.pop((type(self), id), None)
2464 self._initialized = False
2465 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002466 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2467 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002468
2469
showard63a34772008-08-18 19:32:50 +00002470 @staticmethod
2471 def _prefix_with(string, prefix):
2472 if string:
2473 string = prefix + string
2474 return string
2475
2476
jadmanski0afbb632008-06-06 21:10:57 +00002477 @classmethod
showard989f25d2008-10-01 11:38:11 +00002478 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002479 """
2480 Construct instances of our class based on the given database query.
2481
2482 @yields One class instance for each row fetched.
2483 """
showard63a34772008-08-18 19:32:50 +00002484 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2485 where = cls._prefix_with(where, 'WHERE ')
2486 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002487 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002488 'joins' : joins,
2489 'where' : where,
2490 'order_by' : order_by})
2491 rows = _db.execute(query, params)
showard8cc058f2009-09-08 16:26:33 +00002492 return [cls(id=row[0], row=row) for row in rows]
mblighe2586682008-02-29 22:45:46 +00002493
mbligh36768f02008-02-22 18:28:33 +00002494
2495class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002496 _table_name = 'ineligible_host_queues'
2497 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002498
2499
showard89f84db2009-03-12 20:39:13 +00002500class AtomicGroup(DBObject):
2501 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002502 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2503 'invalid')
showard89f84db2009-03-12 20:39:13 +00002504
2505
showard989f25d2008-10-01 11:38:11 +00002506class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002507 _table_name = 'labels'
2508 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002509 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002510
2511
showard6157c632009-07-06 20:19:31 +00002512 def __repr__(self):
2513 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
2514 self.name, self.id, self.atomic_group_id)
2515
2516
mbligh36768f02008-02-22 18:28:33 +00002517class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002518 _table_name = 'hosts'
2519 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2520 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2521
2522
jadmanski0afbb632008-06-06 21:10:57 +00002523 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002524 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002525 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002526
2527
showard170873e2009-01-07 00:22:26 +00002528 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002529 """
showard170873e2009-01-07 00:22:26 +00002530 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002531 """
2532 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002533 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002534 FROM labels
2535 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002536 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002537 ORDER BY labels.name
2538 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002539 platform = None
2540 all_labels = []
2541 for label_name, is_platform in rows:
2542 if is_platform:
2543 platform = label_name
2544 all_labels.append(label_name)
2545 return platform, all_labels
2546
2547
showard54c1ea92009-05-20 00:32:58 +00002548 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2549
2550
2551 @classmethod
2552 def cmp_for_sort(cls, a, b):
2553 """
2554 A comparison function for sorting Host objects by hostname.
2555
2556 This strips any trailing numeric digits, ignores leading 0s and
2557 compares hostnames by the leading name and the trailing digits as a
2558 number. If both hostnames do not match this pattern, they are simply
2559 compared as lower case strings.
2560
2561 Example of how hostnames will be sorted:
2562
2563 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2564
2565 This hopefully satisfy most people's hostname sorting needs regardless
2566 of their exact naming schemes. Nobody sane should have both a host10
2567 and host010 (but the algorithm works regardless).
2568 """
2569 lower_a = a.hostname.lower()
2570 lower_b = b.hostname.lower()
2571 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2572 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2573 if match_a and match_b:
2574 name_a, number_a_str = match_a.groups()
2575 name_b, number_b_str = match_b.groups()
2576 number_a = int(number_a_str.lstrip('0'))
2577 number_b = int(number_b_str.lstrip('0'))
2578 result = cmp((name_a, number_a), (name_b, number_b))
2579 if result == 0 and lower_a != lower_b:
2580 # If they compared equal above but the lower case names are
2581 # indeed different, don't report equality. abc012 != abc12.
2582 return cmp(lower_a, lower_b)
2583 return result
2584 else:
2585 return cmp(lower_a, lower_b)
2586
2587
mbligh36768f02008-02-22 18:28:33 +00002588class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002589 _table_name = 'host_queue_entries'
2590 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002591 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002592 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002593
2594
showarda3c58572009-03-12 20:36:59 +00002595 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002596 assert id or row
showarda3c58572009-03-12 20:36:59 +00002597 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002598 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002599
jadmanski0afbb632008-06-06 21:10:57 +00002600 if self.host_id:
2601 self.host = Host(self.host_id)
2602 else:
2603 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002604
showard77182562009-06-10 00:16:05 +00002605 if self.atomic_group_id:
2606 self.atomic_group = AtomicGroup(self.atomic_group_id,
2607 always_query=False)
2608 else:
2609 self.atomic_group = None
2610
showard170873e2009-01-07 00:22:26 +00002611 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002612 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002613
2614
showard89f84db2009-03-12 20:39:13 +00002615 @classmethod
2616 def clone(cls, template):
2617 """
2618 Creates a new row using the values from a template instance.
2619
2620 The new instance will not exist in the database or have a valid
2621 id attribute until its save() method is called.
2622 """
2623 assert isinstance(template, cls)
2624 new_row = [getattr(template, field) for field in cls._fields]
2625 clone = cls(row=new_row, new_record=True)
2626 clone.id = None
2627 return clone
2628
2629
showardc85c21b2008-11-24 22:17:37 +00002630 def _view_job_url(self):
2631 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2632
2633
showardf1ae3542009-05-11 19:26:02 +00002634 def get_labels(self):
2635 """
2636 Get all labels associated with this host queue entry (either via the
2637 meta_host or as a job dependency label). The labels yielded are not
2638 guaranteed to be unique.
2639
2640 @yields Label instances associated with this host_queue_entry.
2641 """
2642 if self.meta_host:
2643 yield Label(id=self.meta_host, always_query=False)
2644 labels = Label.fetch(
2645 joins="JOIN jobs_dependency_labels AS deps "
2646 "ON (labels.id = deps.label_id)",
2647 where="deps.job_id = %d" % self.job.id)
2648 for label in labels:
2649 yield label
2650
2651
jadmanski0afbb632008-06-06 21:10:57 +00002652 def set_host(self, host):
2653 if host:
2654 self.queue_log_record('Assigning host ' + host.hostname)
2655 self.update_field('host_id', host.id)
2656 self.update_field('active', True)
2657 self.block_host(host.id)
2658 else:
2659 self.queue_log_record('Releasing host')
2660 self.unblock_host(self.host.id)
2661 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002662
jadmanski0afbb632008-06-06 21:10:57 +00002663 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002664
2665
jadmanski0afbb632008-06-06 21:10:57 +00002666 def get_host(self):
2667 return self.host
mbligh36768f02008-02-22 18:28:33 +00002668
2669
jadmanski0afbb632008-06-06 21:10:57 +00002670 def queue_log_record(self, log_line):
2671 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002672 _drone_manager.write_lines_to_file(self.queue_log_path,
2673 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002674
2675
jadmanski0afbb632008-06-06 21:10:57 +00002676 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002677 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002678 row = [0, self.job.id, host_id]
2679 block = IneligibleHostQueue(row=row, new_record=True)
2680 block.save()
mblighe2586682008-02-29 22:45:46 +00002681
2682
jadmanski0afbb632008-06-06 21:10:57 +00002683 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002684 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002685 blocks = IneligibleHostQueue.fetch(
2686 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2687 for block in blocks:
2688 block.delete()
mblighe2586682008-02-29 22:45:46 +00002689
2690
showard2bab8f42008-11-12 18:15:22 +00002691 def set_execution_subdir(self, subdir=None):
2692 if subdir is None:
2693 assert self.get_host()
2694 subdir = self.get_host().hostname
2695 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002696
2697
showard6355f6b2008-12-05 18:52:13 +00002698 def _get_hostname(self):
2699 if self.host:
2700 return self.host.hostname
2701 return 'no host'
2702
2703
showard170873e2009-01-07 00:22:26 +00002704 def __str__(self):
showard828fc4c2009-09-14 20:31:00 +00002705 flags = []
2706 if self.active:
2707 flags.append('active')
2708 if self.complete:
2709 flags.append('complete')
2710 if self.deleted:
2711 flags.append('deleted')
2712 if self.aborted:
2713 flags.append('aborted')
2714 flags_str = ','.join(flags)
2715 if flags_str:
2716 flags_str = ' [%s]' % flags_str
2717 return "%s/%d (%d) %s%s" % (self._get_hostname(), self.job.id, self.id,
2718 self.status, flags_str)
showard170873e2009-01-07 00:22:26 +00002719
2720
jadmanski0afbb632008-06-06 21:10:57 +00002721 def set_status(self, status):
showard56824072009-10-12 20:30:21 +00002722 logging.info("%s -> %s", self, status)
mblighf8c624d2008-07-03 16:58:45 +00002723
showard56824072009-10-12 20:30:21 +00002724 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002725
showard8cc058f2009-09-08 16:26:33 +00002726 if status in (models.HostQueueEntry.Status.QUEUED,
2727 models.HostQueueEntry.Status.PARSING):
jadmanski0afbb632008-06-06 21:10:57 +00002728 self.update_field('complete', False)
2729 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002730
showard8cc058f2009-09-08 16:26:33 +00002731 if status in (models.HostQueueEntry.Status.PENDING,
2732 models.HostQueueEntry.Status.RUNNING,
2733 models.HostQueueEntry.Status.VERIFYING,
2734 models.HostQueueEntry.Status.STARTING,
2735 models.HostQueueEntry.Status.GATHERING):
jadmanski0afbb632008-06-06 21:10:57 +00002736 self.update_field('complete', False)
2737 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002738
showard8cc058f2009-09-08 16:26:33 +00002739 if status in (models.HostQueueEntry.Status.FAILED,
2740 models.HostQueueEntry.Status.COMPLETED,
2741 models.HostQueueEntry.Status.STOPPED,
2742 models.HostQueueEntry.Status.ABORTED):
jadmanski0afbb632008-06-06 21:10:57 +00002743 self.update_field('complete', True)
2744 self.update_field('active', False)
showardf85a0b72009-10-07 20:48:45 +00002745 self._on_complete()
showardc85c21b2008-11-24 22:17:37 +00002746
2747 should_email_status = (status.lower() in _notify_email_statuses or
2748 'all' in _notify_email_statuses)
2749 if should_email_status:
2750 self._email_on_status(status)
2751
2752 self._email_on_job_complete()
2753
2754
showardf85a0b72009-10-07 20:48:45 +00002755 def _on_complete(self):
2756 if not self.execution_subdir:
2757 return
2758 # unregister any possible pidfiles associated with this queue entry
2759 for pidfile_name in _ALL_PIDFILE_NAMES:
2760 pidfile_id = _drone_manager.get_pidfile_id_from(
2761 self.execution_path(), pidfile_name=pidfile_name)
2762 _drone_manager.unregister_pidfile(pidfile_id)
2763
2764
showardc85c21b2008-11-24 22:17:37 +00002765 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002766 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002767
2768 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2769 self.job.id, self.job.name, hostname, status)
2770 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2771 self.job.id, self.job.name, hostname, status,
2772 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002773 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002774
2775
2776 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002777 if not self.job.is_finished():
2778 return
showard542e8402008-09-19 20:16:18 +00002779
showardc85c21b2008-11-24 22:17:37 +00002780 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002781 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002782 for queue_entry in hosts_queue:
2783 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002784 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002785 queue_entry.status))
2786
2787 summary_text = "\n".join(summary_text)
2788 status_counts = models.Job.objects.get_status_counts(
2789 [self.job.id])[self.job.id]
2790 status = ', '.join('%d %s' % (count, status) for status, count
2791 in status_counts.iteritems())
2792
2793 subject = 'Autotest: Job ID: %s "%s" %s' % (
2794 self.job.id, self.job.name, status)
2795 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2796 self.job.id, self.job.name, status, self._view_job_url(),
2797 summary_text)
showard170873e2009-01-07 00:22:26 +00002798 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002799
2800
showard8cc058f2009-09-08 16:26:33 +00002801 def schedule_pre_job_tasks(self, assigned_host=None):
showard77182562009-06-10 00:16:05 +00002802 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002803 assert assigned_host
2804 # ensure results dir exists for the queue log
showard08356c12009-06-15 20:24:01 +00002805 if self.host_id is None:
2806 self.set_host(assigned_host)
2807 else:
2808 assert assigned_host.id == self.host_id
mbligh36768f02008-02-22 18:28:33 +00002809
showardcfd4a7e2009-07-11 01:47:33 +00002810 logging.info("%s/%s/%s scheduled on %s, status=%s",
showardb18134f2009-03-20 20:52:18 +00002811 self.job.name, self.meta_host, self.atomic_group_id,
2812 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002813
showard8cc058f2009-09-08 16:26:33 +00002814 self._do_schedule_pre_job_tasks()
showard77182562009-06-10 00:16:05 +00002815
2816
showard8cc058f2009-09-08 16:26:33 +00002817 def _do_schedule_pre_job_tasks(self):
showard77182562009-06-10 00:16:05 +00002818 # Every host goes thru the Verifying stage (which may or may not
2819 # actually do anything as determined by get_pre_job_tasks).
2820 self.set_status(models.HostQueueEntry.Status.VERIFYING)
showard8cc058f2009-09-08 16:26:33 +00002821 self.job.schedule_pre_job_tasks(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002822
showard6ae5ea92009-02-25 00:11:51 +00002823
jadmanski0afbb632008-06-06 21:10:57 +00002824 def requeue(self):
showardcfd4a7e2009-07-11 01:47:33 +00002825 assert self.host
showard8cc058f2009-09-08 16:26:33 +00002826 self.set_status(models.HostQueueEntry.Status.QUEUED)
showard12f3e322009-05-13 21:27:42 +00002827 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002828 # verify/cleanup failure sets the execution subdir, so reset it here
2829 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002830 if self.meta_host:
2831 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002832
2833
jadmanski0afbb632008-06-06 21:10:57 +00002834 def handle_host_failure(self):
2835 """\
2836 Called when this queue entry's host has failed verification and
2837 repair.
2838 """
2839 assert not self.meta_host
showard8cc058f2009-09-08 16:26:33 +00002840 self.set_status(models.HostQueueEntry.Status.FAILED)
showard2bab8f42008-11-12 18:15:22 +00002841 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002842
2843
jadmanskif7fa2cc2008-10-01 14:13:23 +00002844 @property
2845 def aborted_by(self):
2846 self._load_abort_info()
2847 return self._aborted_by
2848
2849
2850 @property
2851 def aborted_on(self):
2852 self._load_abort_info()
2853 return self._aborted_on
2854
2855
2856 def _load_abort_info(self):
2857 """ Fetch info about who aborted the job. """
2858 if hasattr(self, "_aborted_by"):
2859 return
2860 rows = _db.execute("""
2861 SELECT users.login, aborted_host_queue_entries.aborted_on
2862 FROM aborted_host_queue_entries
2863 INNER JOIN users
2864 ON users.id = aborted_host_queue_entries.aborted_by_id
2865 WHERE aborted_host_queue_entries.queue_entry_id = %s
2866 """, (self.id,))
2867 if rows:
2868 self._aborted_by, self._aborted_on = rows[0]
2869 else:
2870 self._aborted_by = self._aborted_on = None
2871
2872
showardb2e2c322008-10-14 17:33:55 +00002873 def on_pending(self):
2874 """
2875 Called when an entry in a synchronous job has passed verify. If the
showard8cc058f2009-09-08 16:26:33 +00002876 job is ready to run, sets the entries to STARTING. Otherwise, it leaves
2877 them in PENDING.
showardb2e2c322008-10-14 17:33:55 +00002878 """
showard8cc058f2009-09-08 16:26:33 +00002879 self.set_status(models.HostQueueEntry.Status.PENDING)
2880 self.host.set_status(models.Host.Status.PENDING)
showardb000a8d2009-07-28 20:02:07 +00002881
2882 # Some debug code here: sends an email if an asynchronous job does not
2883 # immediately enter Starting.
2884 # TODO: Remove this once we figure out why asynchronous jobs are getting
2885 # stuck in Pending.
showard8cc058f2009-09-08 16:26:33 +00002886 self.job.run_if_ready(queue_entry=self)
2887 if (self.job.synch_count == 1 and
2888 self.status == models.HostQueueEntry.Status.PENDING):
showardb000a8d2009-07-28 20:02:07 +00002889 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
2890 message = 'Asynchronous job stuck in Pending'
2891 email_manager.manager.enqueue_notify_email(subject, message)
showardb2e2c322008-10-14 17:33:55 +00002892
2893
showardd3dc1992009-04-22 21:01:40 +00002894 def abort(self, dispatcher):
2895 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002896
showardd3dc1992009-04-22 21:01:40 +00002897 Status = models.HostQueueEntry.Status
showard8cc058f2009-09-08 16:26:33 +00002898 if self.status in (Status.GATHERING, Status.PARSING):
showardd3dc1992009-04-22 21:01:40 +00002899 # do nothing; post-job tasks will finish and then mark this entry
2900 # with status "Aborted" and take care of the host
2901 return
2902
showard8cc058f2009-09-08 16:26:33 +00002903 if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING):
2904 assert not dispatcher.get_agents_for_entry(self)
showardd3dc1992009-04-22 21:01:40 +00002905 self.host.set_status(models.Host.Status.READY)
2906 elif self.status == Status.VERIFYING:
showard8cc058f2009-09-08 16:26:33 +00002907 models.SpecialTask.objects.create(
2908 task=models.SpecialTask.Task.CLEANUP,
2909 host=models.Host(id=self.host.id))
showardd3dc1992009-04-22 21:01:40 +00002910
2911 self.set_status(Status.ABORTED)
showardd2014822009-10-12 20:26:58 +00002912 self.job.abort_delay_ready_task()
showard170873e2009-01-07 00:22:26 +00002913
showard8cc058f2009-09-08 16:26:33 +00002914
2915 def get_group_name(self):
2916 atomic_group = self.atomic_group
2917 if not atomic_group:
2918 return ''
2919
2920 # Look at any meta_host and dependency labels and pick the first
2921 # one that also specifies this atomic group. Use that label name
2922 # as the group name if possible (it is more specific).
2923 for label in self.get_labels():
2924 if label.atomic_group_id:
2925 assert label.atomic_group_id == atomic_group.id
2926 return label.name
2927 return atomic_group.name
2928
2929
showard170873e2009-01-07 00:22:26 +00002930 def execution_tag(self):
2931 assert self.execution_subdir
mblighe7d9c602009-07-02 19:02:33 +00002932 return "%s/%s" % (self.job.tag(), self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002933
2934
showarded2afea2009-07-07 20:54:07 +00002935 def execution_path(self):
2936 return self.execution_tag()
2937
2938
mbligh36768f02008-02-22 18:28:33 +00002939class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002940 _table_name = 'jobs'
2941 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2942 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002943 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002944 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002945
showard77182562009-06-10 00:16:05 +00002946 # This does not need to be a column in the DB. The delays are likely to
2947 # be configured short. If the scheduler is stopped and restarted in
2948 # the middle of a job's delay cycle, the delay cycle will either be
2949 # repeated or skipped depending on the number of Pending machines found
2950 # when the restarted scheduler recovers to track it. Not a problem.
2951 #
2952 # A reference to the DelayedCallTask that will wake up the job should
2953 # no other HQEs change state in time. Its end_time attribute is used
2954 # by our run_with_ready_delay() method to determine if the wait is over.
2955 _delay_ready_task = None
2956
2957 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
2958 # all status='Pending' atomic group HQEs incase a delay was running when the
2959 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00002960
showarda3c58572009-03-12 20:36:59 +00002961 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002962 assert id or row
showarda3c58572009-03-12 20:36:59 +00002963 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002964
mblighe2586682008-02-29 22:45:46 +00002965
jadmanski0afbb632008-06-06 21:10:57 +00002966 def is_server_job(self):
2967 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002968
2969
showard170873e2009-01-07 00:22:26 +00002970 def tag(self):
2971 return "%s-%s" % (self.id, self.owner)
2972
2973
jadmanski0afbb632008-06-06 21:10:57 +00002974 def get_host_queue_entries(self):
2975 rows = _db.execute("""
2976 SELECT * FROM host_queue_entries
2977 WHERE job_id= %s
2978 """, (self.id,))
2979 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002980
jadmanski0afbb632008-06-06 21:10:57 +00002981 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002982
jadmanski0afbb632008-06-06 21:10:57 +00002983 return entries
mbligh36768f02008-02-22 18:28:33 +00002984
2985
jadmanski0afbb632008-06-06 21:10:57 +00002986 def set_status(self, status, update_queues=False):
2987 self.update_field('status',status)
2988
2989 if update_queues:
2990 for queue_entry in self.get_host_queue_entries():
2991 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002992
2993
showard77182562009-06-10 00:16:05 +00002994 def _atomic_and_has_started(self):
2995 """
2996 @returns True if any of the HostQueueEntries associated with this job
2997 have entered the Status.STARTING state or beyond.
2998 """
2999 atomic_entries = models.HostQueueEntry.objects.filter(
3000 job=self.id, atomic_group__isnull=False)
3001 if atomic_entries.count() <= 0:
3002 return False
3003
showardaf8b4ca2009-06-16 18:47:26 +00003004 # These states may *only* be reached if Job.run() has been called.
3005 started_statuses = (models.HostQueueEntry.Status.STARTING,
3006 models.HostQueueEntry.Status.RUNNING,
3007 models.HostQueueEntry.Status.COMPLETED)
3008
3009 started_entries = atomic_entries.filter(status__in=started_statuses)
showard77182562009-06-10 00:16:05 +00003010 return started_entries.count() > 0
3011
3012
showard708b3522009-08-20 23:26:15 +00003013 def _hosts_assigned_count(self):
3014 """The number of HostQueueEntries assigned a Host for this job."""
3015 entries = models.HostQueueEntry.objects.filter(job=self.id,
3016 host__isnull=False)
3017 return entries.count()
3018
3019
showard77182562009-06-10 00:16:05 +00003020 def _pending_count(self):
3021 """The number of HostQueueEntries for this job in the Pending state."""
3022 pending_entries = models.HostQueueEntry.objects.filter(
3023 job=self.id, status=models.HostQueueEntry.Status.PENDING)
3024 return pending_entries.count()
3025
3026
showardd2014822009-10-12 20:26:58 +00003027 def _pending_threshold(self, atomic_group):
3028 """
3029 @param atomic_group: The AtomicGroup associated with this job that we
3030 are using to bound the threshold.
3031 @returns The minimum number of HostQueueEntries assigned a Host before
3032 this job can run.
3033 """
3034 return min(self._hosts_assigned_count(),
3035 atomic_group.max_number_of_machines)
3036
3037
jadmanski0afbb632008-06-06 21:10:57 +00003038 def is_ready(self):
showard77182562009-06-10 00:16:05 +00003039 # NOTE: Atomic group jobs stop reporting ready after they have been
3040 # started to avoid launching multiple copies of one atomic job.
3041 # Only possible if synch_count is less than than half the number of
3042 # machines in the atomic group.
showardb000a8d2009-07-28 20:02:07 +00003043 pending_count = self._pending_count()
3044 atomic_and_has_started = self._atomic_and_has_started()
3045 ready = (pending_count >= self.synch_count
showardd2014822009-10-12 20:26:58 +00003046 and not atomic_and_has_started)
showardb000a8d2009-07-28 20:02:07 +00003047
3048 if not ready:
3049 logging.info(
3050 'Job %s not ready: %s pending, %s required '
3051 '(Atomic and started: %s)',
3052 self, pending_count, self.synch_count,
3053 atomic_and_has_started)
3054
3055 return ready
mbligh36768f02008-02-22 18:28:33 +00003056
3057
jadmanski0afbb632008-06-06 21:10:57 +00003058 def num_machines(self, clause = None):
3059 sql = "job_id=%s" % self.id
3060 if clause:
3061 sql += " AND (%s)" % clause
3062 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00003063
3064
jadmanski0afbb632008-06-06 21:10:57 +00003065 def num_queued(self):
3066 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00003067
3068
jadmanski0afbb632008-06-06 21:10:57 +00003069 def num_active(self):
3070 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00003071
3072
jadmanski0afbb632008-06-06 21:10:57 +00003073 def num_complete(self):
3074 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00003075
3076
jadmanski0afbb632008-06-06 21:10:57 +00003077 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00003078 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00003079
mbligh36768f02008-02-22 18:28:33 +00003080
showard6bb7c292009-01-30 01:44:51 +00003081 def _not_yet_run_entries(self, include_verifying=True):
3082 statuses = [models.HostQueueEntry.Status.QUEUED,
3083 models.HostQueueEntry.Status.PENDING]
3084 if include_verifying:
3085 statuses.append(models.HostQueueEntry.Status.VERIFYING)
3086 return models.HostQueueEntry.objects.filter(job=self.id,
3087 status__in=statuses)
3088
3089
3090 def _stop_all_entries(self):
3091 entries_to_stop = self._not_yet_run_entries(
3092 include_verifying=False)
3093 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00003094 assert not child_entry.complete, (
3095 '%s status=%s, active=%s, complete=%s' %
3096 (child_entry.id, child_entry.status, child_entry.active,
3097 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00003098 if child_entry.status == models.HostQueueEntry.Status.PENDING:
3099 child_entry.host.status = models.Host.Status.READY
3100 child_entry.host.save()
3101 child_entry.status = models.HostQueueEntry.Status.STOPPED
3102 child_entry.save()
3103
showard2bab8f42008-11-12 18:15:22 +00003104 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00003105 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00003106 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00003107 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00003108
3109
jadmanski0afbb632008-06-06 21:10:57 +00003110 def write_to_machines_file(self, queue_entry):
3111 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00003112 file_path = os.path.join(self.tag(), '.machines')
3113 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00003114
3115
showardf1ae3542009-05-11 19:26:02 +00003116 def _next_group_name(self, group_name=''):
3117 """@returns a directory name to use for the next host group results."""
3118 if group_name:
3119 # Sanitize for use as a pathname.
3120 group_name = group_name.replace(os.path.sep, '_')
3121 if group_name.startswith('.'):
3122 group_name = '_' + group_name[1:]
3123 # Add a separator between the group name and 'group%d'.
3124 group_name += '.'
3125 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00003126 query = models.HostQueueEntry.objects.filter(
3127 job=self.id).values('execution_subdir').distinct()
3128 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00003129 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
3130 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00003131 if ids:
3132 next_id = max(ids) + 1
3133 else:
3134 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00003135 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00003136
3137
showarddb502762009-09-09 15:31:20 +00003138 def _write_control_file(self, execution_path):
showard170873e2009-01-07 00:22:26 +00003139 control_path = _drone_manager.attach_file_to_execution(
showarddb502762009-09-09 15:31:20 +00003140 execution_path, self.control_file)
showard170873e2009-01-07 00:22:26 +00003141 return control_path
mbligh36768f02008-02-22 18:28:33 +00003142
showardb2e2c322008-10-14 17:33:55 +00003143
showard2bab8f42008-11-12 18:15:22 +00003144 def get_group_entries(self, queue_entry_from_group):
3145 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00003146 return list(HostQueueEntry.fetch(
3147 where='job_id=%s AND execution_subdir=%s',
3148 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00003149
3150
showard8cc058f2009-09-08 16:26:33 +00003151 def get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00003152 assert queue_entries
showarddb502762009-09-09 15:31:20 +00003153 execution_path = queue_entries[0].execution_path()
3154 control_path = self._write_control_file(execution_path)
jadmanski0afbb632008-06-06 21:10:57 +00003155 hostnames = ','.join([entry.get_host().hostname
3156 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00003157
showarddb502762009-09-09 15:31:20 +00003158 execution_tag = queue_entries[0].execution_tag()
showard87ba02a2009-04-20 19:37:32 +00003159 params = _autoserv_command_line(
showarded2afea2009-07-07 20:54:07 +00003160 hostnames,
showard87ba02a2009-04-20 19:37:32 +00003161 ['-P', execution_tag, '-n',
3162 _drone_manager.absolute_path(control_path)],
showarde9c69362009-06-30 01:58:03 +00003163 job=self, verbose=False)
mbligh36768f02008-02-22 18:28:33 +00003164
jadmanski0afbb632008-06-06 21:10:57 +00003165 if not self.is_server_job():
3166 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00003167
showardb2e2c322008-10-14 17:33:55 +00003168 return params
mblighe2586682008-02-29 22:45:46 +00003169
mbligh36768f02008-02-22 18:28:33 +00003170
showardc9ae1782009-01-30 01:42:37 +00003171 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00003172 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00003173 return True
showard0fc38302008-10-23 00:44:07 +00003174 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00003175 return queue_entry.get_host().dirty
3176 return False
showard21baa452008-10-21 00:08:39 +00003177
showardc9ae1782009-01-30 01:42:37 +00003178
showard8cc058f2009-09-08 16:26:33 +00003179 def _should_run_verify(self, queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003180 do_not_verify = (queue_entry.host.protection ==
3181 host_protections.Protection.DO_NOT_VERIFY)
3182 if do_not_verify:
3183 return False
3184 return self.run_verify
3185
3186
showard8cc058f2009-09-08 16:26:33 +00003187 def schedule_pre_job_tasks(self, queue_entry):
showard77182562009-06-10 00:16:05 +00003188 """
3189 Get a list of tasks to perform before the host_queue_entry
3190 may be used to run this Job (such as Cleanup & Verify).
3191
3192 @returns A list of tasks to be done to the given queue_entry before
mbligh6fbdb802009-08-03 16:42:55 +00003193 it should be considered be ready to run this job. The last
showard77182562009-06-10 00:16:05 +00003194 task in the list calls HostQueueEntry.on_pending(), which
3195 continues the flow of the job.
3196 """
showardc9ae1782009-01-30 01:42:37 +00003197 if self._should_run_cleanup(queue_entry):
showard8cc058f2009-09-08 16:26:33 +00003198 task = models.SpecialTask.Task.CLEANUP
3199 elif self._should_run_verify(queue_entry):
3200 task = models.SpecialTask.Task.VERIFY
3201 else:
3202 queue_entry.on_pending()
3203 return
3204
3205 models.SpecialTask.objects.create(
3206 host=models.Host(id=queue_entry.host_id),
3207 queue_entry=models.HostQueueEntry(id=queue_entry.id),
3208 task=task)
showard21baa452008-10-21 00:08:39 +00003209
3210
showardf1ae3542009-05-11 19:26:02 +00003211 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00003212 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00003213 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00003214 else:
showardf1ae3542009-05-11 19:26:02 +00003215 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00003216 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00003217 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00003218 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003219
3220 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00003221 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003222
3223
3224 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00003225 """
3226 @returns A tuple containing a list of HostQueueEntry instances to be
3227 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00003228 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00003229 """
showard77182562009-06-10 00:16:05 +00003230 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00003231 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00003232 if atomic_group:
3233 num_entries_wanted = atomic_group.max_number_of_machines
3234 else:
3235 num_entries_wanted = self.synch_count
3236 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00003237
showardf1ae3542009-05-11 19:26:02 +00003238 if num_entries_wanted > 0:
3239 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00003240 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00003241 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00003242 params=(self.id, include_queue_entry.id)))
3243
3244 # Sort the chosen hosts by hostname before slicing.
3245 def cmp_queue_entries_by_hostname(entry_a, entry_b):
3246 return Host.cmp_for_sort(entry_a.host, entry_b.host)
3247 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
3248 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00003249
showardf1ae3542009-05-11 19:26:02 +00003250 # Sanity check. We'll only ever be called if this can be met.
showard828fc4c2009-09-14 20:31:00 +00003251 if len(chosen_entries) < self.synch_count:
3252 message = ('job %s got less than %s chosen entries: %s' % (
3253 self.id, self.synch_count, chosen_entries))
3254 logging.error(message)
3255 email_manager.manager.enqueue_notify_email(
3256 'Job not started, too few chosen entries', message)
3257 return []
showardf1ae3542009-05-11 19:26:02 +00003258
showard8cc058f2009-09-08 16:26:33 +00003259 group_name = include_queue_entry.get_group_name()
showardf1ae3542009-05-11 19:26:02 +00003260
3261 self._assign_new_group(chosen_entries, group_name=group_name)
showard8cc058f2009-09-08 16:26:33 +00003262 return chosen_entries
showard2bab8f42008-11-12 18:15:22 +00003263
3264
showard77182562009-06-10 00:16:05 +00003265 def run_if_ready(self, queue_entry):
3266 """
3267 @returns An Agent instance to ultimately run this job if enough hosts
3268 are ready for it to run.
3269 @returns None and potentially cleans up excess hosts if this Job
3270 is not ready to run.
3271 """
showardb2e2c322008-10-14 17:33:55 +00003272 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00003273 self.stop_if_necessary()
showard8cc058f2009-09-08 16:26:33 +00003274 elif queue_entry.atomic_group:
3275 self.run_with_ready_delay(queue_entry)
3276 else:
3277 self.run(queue_entry)
showard77182562009-06-10 00:16:05 +00003278
3279
3280 def run_with_ready_delay(self, queue_entry):
3281 """
3282 Start a delay to wait for more hosts to enter Pending state before
3283 launching an atomic group job. Once set, the a delay cannot be reset.
3284
3285 @param queue_entry: The HostQueueEntry object to get atomic group
3286 info from and pass to run_if_ready when the delay is up.
3287
3288 @returns An Agent to run the job as appropriate or None if a delay
3289 has already been set.
3290 """
3291 assert queue_entry.job_id == self.id
3292 assert queue_entry.atomic_group
3293 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
showardd2014822009-10-12 20:26:58 +00003294 over_max_threshold = (self._pending_count() >=
3295 self._pending_threshold(queue_entry.atomic_group))
showard77182562009-06-10 00:16:05 +00003296 delay_expired = (self._delay_ready_task and
3297 time.time() >= self._delay_ready_task.end_time)
3298
3299 # Delay is disabled or we already have enough? Do not wait to run.
3300 if not delay or over_max_threshold or delay_expired:
showard8cc058f2009-09-08 16:26:33 +00003301 self.run(queue_entry)
3302 else:
3303 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
showard77182562009-06-10 00:16:05 +00003304
showard8cc058f2009-09-08 16:26:33 +00003305
3306 def schedule_delayed_callback_task(self, queue_entry):
3307 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
3308
showard77182562009-06-10 00:16:05 +00003309 if self._delay_ready_task:
3310 return None
3311
showard8cc058f2009-09-08 16:26:33 +00003312 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
3313
showard77182562009-06-10 00:16:05 +00003314 def run_job_after_delay():
showardd2014822009-10-12 20:26:58 +00003315 logging.info('Job %s done waiting for extra hosts.', self)
3316 # Check to see if the job is still relevant. It could have aborted
3317 # while we were waiting or hosts could have disappearred, etc.
3318 threshold = self._pending_threshold(queue_entry.atomic_group)
3319 if self._pending_count() < threshold:
3320 logging.info('Job %s had too few Pending hosts after waiting '
3321 'for extras. Not running.', self)
3322 return
showard77182562009-06-10 00:16:05 +00003323 return self.run(queue_entry)
3324
showard708b3522009-08-20 23:26:15 +00003325 logging.info('Job %s waiting up to %s seconds for more hosts.',
3326 self.id, delay)
showard77182562009-06-10 00:16:05 +00003327 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3328 callback=run_job_after_delay)
showard8cc058f2009-09-08 16:26:33 +00003329 return self._delay_ready_task
showard77182562009-06-10 00:16:05 +00003330
3331
3332 def run(self, queue_entry):
3333 """
3334 @param queue_entry: The HostQueueEntry instance calling this method.
showard77182562009-06-10 00:16:05 +00003335 """
3336 if queue_entry.atomic_group and self._atomic_and_has_started():
3337 logging.error('Job.run() called on running atomic Job %d '
3338 'with HQE %s.', self.id, queue_entry)
showard8cc058f2009-09-08 16:26:33 +00003339 return
3340 queue_entries = self._choose_group_to_run(queue_entry)
showard828fc4c2009-09-14 20:31:00 +00003341 if queue_entries:
3342 self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00003343
3344
showard8cc058f2009-09-08 16:26:33 +00003345 def _finish_run(self, queue_entries):
showardb2ccdda2008-10-28 20:39:05 +00003346 for queue_entry in queue_entries:
showard8cc058f2009-09-08 16:26:33 +00003347 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showardd2014822009-10-12 20:26:58 +00003348 self.abort_delay_ready_task()
3349
3350
3351 def abort_delay_ready_task(self):
3352 """Abort the delayed task associated with this job, if any."""
showard77182562009-06-10 00:16:05 +00003353 if self._delay_ready_task:
3354 # Cancel any pending callback that would try to run again
3355 # as we are already running.
3356 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003357
showardd2014822009-10-12 20:26:58 +00003358
showardb000a8d2009-07-28 20:02:07 +00003359 def __str__(self):
3360 return '%s-%s' % (self.id, self.owner)
3361
3362
mbligh36768f02008-02-22 18:28:33 +00003363if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003364 main()