blob: 985df488553d7c4e5783d6e672c9456c8d73c58d [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showard136e6dc2009-06-10 19:38:49 +000010import itertools, logging, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard043c62a2009-06-10 19:48:57 +000013from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard136e6dc2009-06-10 19:38:49 +000015from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000016from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000017from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000018from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000019from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000020from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000021from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000022
showard549afad2009-08-20 23:33:36 +000023BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
24PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000025
mbligh36768f02008-02-22 18:28:33 +000026RESULTS_DIR = '.'
27AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000028DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000029AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
30
31if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000032 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000033AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
34AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
35
36if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000037 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000038
showardd3dc1992009-04-22 21:01:40 +000039_AUTOSERV_PID_FILE = '.autoserv_execute'
40_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
41_PARSER_PID_FILE = '.parser_execute'
42
43_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
44 _PARSER_PID_FILE)
45
showard35162b02009-03-03 02:17:30 +000046# error message to leave in results dir when an autoserv process disappears
47# mysteriously
48_LOST_PROCESS_ERROR = """\
49Autoserv failed abnormally during execution for this job, probably due to a
50system error on the Autotest server. Full results may not be available. Sorry.
51"""
52
mbligh6f8bab42008-02-29 22:45:14 +000053_db = None
mbligh36768f02008-02-22 18:28:33 +000054_shutdown = False
showard170873e2009-01-07 00:22:26 +000055_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
56_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000057_testing_mode = False
showard542e8402008-09-19 20:16:18 +000058_base_url = None
showardc85c21b2008-11-24 22:17:37 +000059_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000060_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000061
62
showardec6a3b92009-09-25 20:29:13 +000063def _get_pidfile_timeout_secs():
64 """@returns How long to wait for autoserv to write pidfile."""
65 pidfile_timeout_mins = global_config.global_config.get_config_value(
66 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
67 return pidfile_timeout_mins * 60
68
69
mbligh83c1e9e2009-05-01 23:10:41 +000070def _site_init_monitor_db_dummy():
71 return {}
72
73
mbligh36768f02008-02-22 18:28:33 +000074def main():
showard27f33872009-04-07 18:20:53 +000075 try:
showard549afad2009-08-20 23:33:36 +000076 try:
77 main_without_exception_handling()
78 except SystemExit:
79 raise
80 except:
81 logging.exception('Exception escaping in monitor_db')
82 raise
83 finally:
84 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000085
86
87def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000088 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000089
showard136e6dc2009-06-10 19:38:49 +000090 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000091 parser = optparse.OptionParser(usage)
92 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
93 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000094 parser.add_option('--test', help='Indicate that scheduler is under ' +
95 'test and should use dummy autoserv and no parsing',
96 action='store_true')
97 (options, args) = parser.parse_args()
98 if len(args) != 1:
99 parser.print_usage()
100 return
mbligh36768f02008-02-22 18:28:33 +0000101
showard5613c662009-06-08 23:30:33 +0000102 scheduler_enabled = global_config.global_config.get_config_value(
103 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
104
105 if not scheduler_enabled:
106 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
107 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000108 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000109 sys.exit(1)
110
jadmanski0afbb632008-06-06 21:10:57 +0000111 global RESULTS_DIR
112 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000113
mbligh83c1e9e2009-05-01 23:10:41 +0000114 site_init = utils.import_site_function(__file__,
115 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
116 _site_init_monitor_db_dummy)
117 site_init()
118
showardcca334f2009-03-12 20:38:34 +0000119 # Change the cwd while running to avoid issues incase we were launched from
120 # somewhere odd (such as a random NFS home directory of the person running
121 # sudo to launch us as the appropriate user).
122 os.chdir(RESULTS_DIR)
123
jadmanski0afbb632008-06-06 21:10:57 +0000124 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000125 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
126 "notify_email_statuses",
127 default='')
showardc85c21b2008-11-24 22:17:37 +0000128 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000129 _notify_email_statuses = [status for status in
130 re.split(r'[\s,;:]', notify_statuses_list.lower())
131 if status]
showardc85c21b2008-11-24 22:17:37 +0000132
jadmanski0afbb632008-06-06 21:10:57 +0000133 if options.test:
134 global _autoserv_path
135 _autoserv_path = 'autoserv_dummy'
136 global _testing_mode
137 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000138
mbligh37eceaa2008-12-15 22:56:37 +0000139 # AUTOTEST_WEB.base_url is still a supported config option as some people
140 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000141 global _base_url
showard170873e2009-01-07 00:22:26 +0000142 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
143 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000144 if config_base_url:
145 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000146 else:
mbligh37eceaa2008-12-15 22:56:37 +0000147 # For the common case of everything running on a single server you
148 # can just set the hostname in a single place in the config file.
149 server_name = c.get_config_value('SERVER', 'hostname')
150 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000151 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000152 sys.exit(1)
153 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000154
showardc5afc462009-01-13 00:09:39 +0000155 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000156 server.start()
157
jadmanski0afbb632008-06-06 21:10:57 +0000158 try:
showard136e6dc2009-06-10 19:38:49 +0000159 init()
showardc5afc462009-01-13 00:09:39 +0000160 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000161 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000162
jadmanski0afbb632008-06-06 21:10:57 +0000163 while not _shutdown:
164 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000165 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000166 except:
showard170873e2009-01-07 00:22:26 +0000167 email_manager.manager.log_stacktrace(
168 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000169
showard170873e2009-01-07 00:22:26 +0000170 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000171 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000172 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000173 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000174
175
showard136e6dc2009-06-10 19:38:49 +0000176def setup_logging():
177 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
178 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
179 logging_manager.configure_logging(
180 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
181 logfile_name=log_name)
182
183
mbligh36768f02008-02-22 18:28:33 +0000184def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000185 global _shutdown
186 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000187 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000188
189
showard136e6dc2009-06-10 19:38:49 +0000190def init():
showardb18134f2009-03-20 20:52:18 +0000191 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
192 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000193
showard8de37132009-08-31 18:33:08 +0000194 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000195 logging.critical("monitor_db already running, aborting!")
196 sys.exit(1)
197 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000198
showardb1e51872008-10-07 11:08:18 +0000199 if _testing_mode:
200 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000201 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000202
jadmanski0afbb632008-06-06 21:10:57 +0000203 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
204 global _db
showard170873e2009-01-07 00:22:26 +0000205 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000206 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000207
showardfa8629c2008-11-04 16:51:23 +0000208 # ensure Django connection is in autocommit
209 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000210 # bypass the readonly connection
211 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000212
showardb18134f2009-03-20 20:52:18 +0000213 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000214 signal.signal(signal.SIGINT, handle_sigint)
215
showardd1ee1dd2009-01-07 21:33:08 +0000216 drones = global_config.global_config.get_config_value(
217 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
218 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000219 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000220 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000221 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
222
showardb18134f2009-03-20 20:52:18 +0000223 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000224
225
showarded2afea2009-07-07 20:54:07 +0000226def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
227 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000228 """
229 @returns The autoserv command line as a list of executable + parameters.
230
231 @param machines - string - A machine or comma separated list of machines
232 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000233 @param extra_args - list - Additional arguments to pass to autoserv.
234 @param job - Job object - If supplied, -u owner and -l name parameters
235 will be added.
236 @param queue_entry - A HostQueueEntry object - If supplied and no Job
237 object was supplied, this will be used to lookup the Job object.
238 """
showard87ba02a2009-04-20 19:37:32 +0000239 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
showarded2afea2009-07-07 20:54:07 +0000240 '-r', drone_manager.WORKING_DIRECTORY]
showard87ba02a2009-04-20 19:37:32 +0000241 if job or queue_entry:
242 if not job:
243 job = queue_entry.job
244 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000245 if verbose:
246 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000247 return autoserv_argv + extra_args
248
249
showard89f84db2009-03-12 20:39:13 +0000250class SchedulerError(Exception):
251 """Raised by HostScheduler when an inconsistent state occurs."""
252
253
showard63a34772008-08-18 19:32:50 +0000254class HostScheduler(object):
255 def _get_ready_hosts(self):
256 # avoid any host with a currently active queue entry against it
257 hosts = Host.fetch(
258 joins='LEFT JOIN host_queue_entries AS active_hqe '
259 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000260 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000261 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000262 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000263 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
264 return dict((host.id, host) for host in hosts)
265
266
267 @staticmethod
268 def _get_sql_id_list(id_list):
269 return ','.join(str(item_id) for item_id in id_list)
270
271
272 @classmethod
showard989f25d2008-10-01 11:38:11 +0000273 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000274 if not id_list:
275 return {}
showard63a34772008-08-18 19:32:50 +0000276 query %= cls._get_sql_id_list(id_list)
277 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000278 return cls._process_many2many_dict(rows, flip)
279
280
281 @staticmethod
282 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000283 result = {}
284 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000285 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000286 if flip:
287 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000288 result.setdefault(left_id, set()).add(right_id)
289 return result
290
291
292 @classmethod
293 def _get_job_acl_groups(cls, job_ids):
294 query = """
showardd9ac4452009-02-07 02:04:37 +0000295 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000296 FROM jobs
297 INNER JOIN users ON users.login = jobs.owner
298 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
299 WHERE jobs.id IN (%s)
300 """
301 return cls._get_many2many_dict(query, job_ids)
302
303
304 @classmethod
305 def _get_job_ineligible_hosts(cls, job_ids):
306 query = """
307 SELECT job_id, host_id
308 FROM ineligible_host_queues
309 WHERE job_id IN (%s)
310 """
311 return cls._get_many2many_dict(query, job_ids)
312
313
314 @classmethod
showard989f25d2008-10-01 11:38:11 +0000315 def _get_job_dependencies(cls, job_ids):
316 query = """
317 SELECT job_id, label_id
318 FROM jobs_dependency_labels
319 WHERE job_id IN (%s)
320 """
321 return cls._get_many2many_dict(query, job_ids)
322
323
324 @classmethod
showard63a34772008-08-18 19:32:50 +0000325 def _get_host_acls(cls, host_ids):
326 query = """
showardd9ac4452009-02-07 02:04:37 +0000327 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000328 FROM acl_groups_hosts
329 WHERE host_id IN (%s)
330 """
331 return cls._get_many2many_dict(query, host_ids)
332
333
334 @classmethod
335 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000336 if not host_ids:
337 return {}, {}
showard63a34772008-08-18 19:32:50 +0000338 query = """
339 SELECT label_id, host_id
340 FROM hosts_labels
341 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000342 """ % cls._get_sql_id_list(host_ids)
343 rows = _db.execute(query)
344 labels_to_hosts = cls._process_many2many_dict(rows)
345 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
346 return labels_to_hosts, hosts_to_labels
347
348
349 @classmethod
350 def _get_labels(cls):
351 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000352
353
354 def refresh(self, pending_queue_entries):
355 self._hosts_available = self._get_ready_hosts()
356
357 relevant_jobs = [queue_entry.job_id
358 for queue_entry in pending_queue_entries]
359 self._job_acls = self._get_job_acl_groups(relevant_jobs)
360 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000361 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000362
363 host_ids = self._hosts_available.keys()
364 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000365 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
366
367 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000368
369
370 def _is_acl_accessible(self, host_id, queue_entry):
371 job_acls = self._job_acls.get(queue_entry.job_id, set())
372 host_acls = self._host_acls.get(host_id, set())
373 return len(host_acls.intersection(job_acls)) > 0
374
375
showard989f25d2008-10-01 11:38:11 +0000376 def _check_job_dependencies(self, job_dependencies, host_labels):
377 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000378 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000379
380
381 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
382 queue_entry):
showardade14e22009-01-26 22:38:32 +0000383 if not queue_entry.meta_host:
384 # bypass only_if_needed labels when a specific host is selected
385 return True
386
showard989f25d2008-10-01 11:38:11 +0000387 for label_id in host_labels:
388 label = self._labels[label_id]
389 if not label.only_if_needed:
390 # we don't care about non-only_if_needed labels
391 continue
392 if queue_entry.meta_host == label_id:
393 # if the label was requested in a metahost it's OK
394 continue
395 if label_id not in job_dependencies:
396 return False
397 return True
398
399
showard89f84db2009-03-12 20:39:13 +0000400 def _check_atomic_group_labels(self, host_labels, queue_entry):
401 """
402 Determine if the given HostQueueEntry's atomic group settings are okay
403 to schedule on a host with the given labels.
404
showard6157c632009-07-06 20:19:31 +0000405 @param host_labels: A list of label ids that the host has.
406 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000407
408 @returns True if atomic group settings are okay, False otherwise.
409 """
showard6157c632009-07-06 20:19:31 +0000410 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000411 queue_entry.atomic_group_id)
412
413
showard6157c632009-07-06 20:19:31 +0000414 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000415 """
416 Return the atomic group label id for a host with the given set of
417 labels if any, or None otherwise. Raises an exception if more than
418 one atomic group are found in the set of labels.
419
showard6157c632009-07-06 20:19:31 +0000420 @param host_labels: A list of label ids that the host has.
421 @param queue_entry: The HostQueueEntry we're testing. Only used for
422 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000423
424 @returns The id of the atomic group found on a label in host_labels
425 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000426 """
showard6157c632009-07-06 20:19:31 +0000427 atomic_labels = [self._labels[label_id] for label_id in host_labels
428 if self._labels[label_id].atomic_group_id is not None]
429 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000430 if not atomic_ids:
431 return None
432 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000433 logging.error('More than one Atomic Group on HQE "%s" via: %r',
434 queue_entry, atomic_labels)
435 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000436
437
438 def _get_atomic_group_labels(self, atomic_group_id):
439 """
440 Lookup the label ids that an atomic_group is associated with.
441
442 @param atomic_group_id - The id of the AtomicGroup to look up.
443
444 @returns A generator yeilding Label ids for this atomic group.
445 """
446 return (id for id, label in self._labels.iteritems()
447 if label.atomic_group_id == atomic_group_id
448 and not label.invalid)
449
450
showard54c1ea92009-05-20 00:32:58 +0000451 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000452 """
453 @param group_hosts - A sequence of Host ids to test for usability
454 and eligibility against the Job associated with queue_entry.
455 @param queue_entry - The HostQueueEntry that these hosts are being
456 tested for eligibility against.
457
458 @returns A subset of group_hosts Host ids that are eligible for the
459 supplied queue_entry.
460 """
461 return set(host_id for host_id in group_hosts
462 if self._is_host_usable(host_id)
463 and self._is_host_eligible_for_job(host_id, queue_entry))
464
465
showard989f25d2008-10-01 11:38:11 +0000466 def _is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000467 if self._is_host_invalid(host_id):
468 # if an invalid host is scheduled for a job, it's a one-time host
469 # and it therefore bypasses eligibility checks. note this can only
470 # happen for non-metahosts, because invalid hosts have their label
471 # relationships cleared.
472 return True
473
showard989f25d2008-10-01 11:38:11 +0000474 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
475 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000476
showard89f84db2009-03-12 20:39:13 +0000477 return (self._is_acl_accessible(host_id, queue_entry) and
478 self._check_job_dependencies(job_dependencies, host_labels) and
479 self._check_only_if_needed_labels(
480 job_dependencies, host_labels, queue_entry) and
481 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000482
483
showard2924b0a2009-06-18 23:16:15 +0000484 def _is_host_invalid(self, host_id):
485 host_object = self._hosts_available.get(host_id, None)
486 return host_object and host_object.invalid
487
488
showard63a34772008-08-18 19:32:50 +0000489 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000490 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000491 return None
492 return self._hosts_available.pop(queue_entry.host_id, None)
493
494
495 def _is_host_usable(self, host_id):
496 if host_id not in self._hosts_available:
497 # host was already used during this scheduling cycle
498 return False
499 if self._hosts_available[host_id].invalid:
500 # Invalid hosts cannot be used for metahosts. They're included in
501 # the original query because they can be used by non-metahosts.
502 return False
503 return True
504
505
506 def _schedule_metahost(self, queue_entry):
507 label_id = queue_entry.meta_host
508 hosts_in_label = self._label_hosts.get(label_id, set())
509 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
510 set())
511
512 # must iterate over a copy so we can mutate the original while iterating
513 for host_id in list(hosts_in_label):
514 if not self._is_host_usable(host_id):
515 hosts_in_label.remove(host_id)
516 continue
517 if host_id in ineligible_host_ids:
518 continue
showard989f25d2008-10-01 11:38:11 +0000519 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000520 continue
521
showard89f84db2009-03-12 20:39:13 +0000522 # Remove the host from our cached internal state before returning
523 # the host object.
showard63a34772008-08-18 19:32:50 +0000524 hosts_in_label.remove(host_id)
525 return self._hosts_available.pop(host_id)
526 return None
527
528
529 def find_eligible_host(self, queue_entry):
530 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000531 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000532 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000533 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000534 return self._schedule_metahost(queue_entry)
535
536
showard89f84db2009-03-12 20:39:13 +0000537 def find_eligible_atomic_group(self, queue_entry):
538 """
539 Given an atomic group host queue entry, locate an appropriate group
540 of hosts for the associated job to run on.
541
542 The caller is responsible for creating new HQEs for the additional
543 hosts returned in order to run the actual job on them.
544
545 @returns A list of Host instances in a ready state to satisfy this
546 atomic group scheduling. Hosts will all belong to the same
547 atomic group label as specified by the queue_entry.
548 An empty list will be returned if no suitable atomic
549 group could be found.
550
551 TODO(gps): what is responsible for kicking off any attempted repairs on
552 a group of hosts? not this function, but something needs to. We do
553 not communicate that reason for returning [] outside of here...
554 For now, we'll just be unschedulable if enough hosts within one group
555 enter Repair Failed state.
556 """
557 assert queue_entry.atomic_group_id is not None
558 job = queue_entry.job
559 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000560 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000561 if job.synch_count > atomic_group.max_number_of_machines:
562 # Such a Job and HostQueueEntry should never be possible to
563 # create using the frontend. Regardless, we can't process it.
564 # Abort it immediately and log an error on the scheduler.
565 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000566 logging.error(
567 'Error: job %d synch_count=%d > requested atomic_group %d '
568 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
569 job.id, job.synch_count, atomic_group.id,
570 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000571 return []
572 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
573 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
574 set())
575
576 # Look in each label associated with atomic_group until we find one with
577 # enough hosts to satisfy the job.
578 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
579 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
580 if queue_entry.meta_host is not None:
581 # If we have a metahost label, only allow its hosts.
582 group_hosts.intersection_update(hosts_in_label)
583 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000584 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000585 group_hosts, queue_entry)
586
587 # Job.synch_count is treated as "minimum synch count" when
588 # scheduling for an atomic group of hosts. The atomic group
589 # number of machines is the maximum to pick out of a single
590 # atomic group label for scheduling at one time.
591 min_hosts = job.synch_count
592 max_hosts = atomic_group.max_number_of_machines
593
showard54c1ea92009-05-20 00:32:58 +0000594 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000595 # Not enough eligible hosts in this atomic group label.
596 continue
597
showard54c1ea92009-05-20 00:32:58 +0000598 eligible_hosts_in_group = [self._hosts_available[id]
599 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000600 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000601 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000602
showard89f84db2009-03-12 20:39:13 +0000603 # Limit ourselves to scheduling the atomic group size.
604 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000605 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000606
607 # Remove the selected hosts from our cached internal state
608 # of available hosts in order to return the Host objects.
609 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000610 for host in eligible_hosts_in_group:
611 hosts_in_label.discard(host.id)
612 self._hosts_available.pop(host.id)
613 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000614 return host_list
615
616 return []
617
618
showard170873e2009-01-07 00:22:26 +0000619class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000620 def __init__(self):
621 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000622 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000623 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000624 user_cleanup_time = scheduler_config.config.clean_interval
625 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
626 _db, user_cleanup_time)
627 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000628 self._host_agents = {}
629 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000630
mbligh36768f02008-02-22 18:28:33 +0000631
showard915958d2009-04-22 21:00:58 +0000632 def initialize(self, recover_hosts=True):
633 self._periodic_cleanup.initialize()
634 self._24hr_upkeep.initialize()
635
jadmanski0afbb632008-06-06 21:10:57 +0000636 # always recover processes
637 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000638
jadmanski0afbb632008-06-06 21:10:57 +0000639 if recover_hosts:
640 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000641
642
jadmanski0afbb632008-06-06 21:10:57 +0000643 def tick(self):
showard170873e2009-01-07 00:22:26 +0000644 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000645 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000646 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000647 self._process_recurring_runs()
showard8cc058f2009-09-08 16:26:33 +0000648 self._schedule_delay_tasks()
jadmanski0afbb632008-06-06 21:10:57 +0000649 self._schedule_new_jobs()
showard8cc058f2009-09-08 16:26:33 +0000650 self._schedule_running_host_queue_entries()
651 self._schedule_special_tasks()
jadmanski0afbb632008-06-06 21:10:57 +0000652 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000653 _drone_manager.execute_actions()
654 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000655
showard97aed502008-11-04 02:01:24 +0000656
mblighf3294cc2009-04-08 21:17:38 +0000657 def _run_cleanup(self):
658 self._periodic_cleanup.run_cleanup_maybe()
659 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000660
mbligh36768f02008-02-22 18:28:33 +0000661
showard170873e2009-01-07 00:22:26 +0000662 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
663 for object_id in object_ids:
664 agent_dict.setdefault(object_id, set()).add(agent)
665
666
667 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
668 for object_id in object_ids:
669 assert object_id in agent_dict
670 agent_dict[object_id].remove(agent)
671
672
jadmanski0afbb632008-06-06 21:10:57 +0000673 def add_agent(self, agent):
674 self._agents.append(agent)
675 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000676 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
677 self._register_agent_for_ids(self._queue_entry_agents,
678 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000679
showard170873e2009-01-07 00:22:26 +0000680
681 def get_agents_for_entry(self, queue_entry):
682 """
683 Find agents corresponding to the specified queue_entry.
684 """
showardd3dc1992009-04-22 21:01:40 +0000685 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000686
687
688 def host_has_agent(self, host):
689 """
690 Determine if there is currently an Agent present using this host.
691 """
692 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000693
694
jadmanski0afbb632008-06-06 21:10:57 +0000695 def remove_agent(self, agent):
696 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000697 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
698 agent)
699 self._unregister_agent_for_ids(self._queue_entry_agents,
700 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000701
702
showard8cc058f2009-09-08 16:26:33 +0000703 def _host_has_scheduled_special_task(self, host):
704 return bool(models.SpecialTask.objects.filter(host__id=host.id,
705 is_active=False,
706 is_complete=False))
707
708
jadmanski0afbb632008-06-06 21:10:57 +0000709 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000710 self._register_pidfiles()
711 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000712 self._recover_all_recoverable_entries()
showard8cc058f2009-09-08 16:26:33 +0000713 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000714 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000715 self._reverify_remaining_hosts()
716 # reinitialize drones after killing orphaned processes, since they can
717 # leave around files when they die
718 _drone_manager.execute_actions()
719 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000720
showard170873e2009-01-07 00:22:26 +0000721
722 def _register_pidfiles(self):
723 # during recovery we may need to read pidfiles for both running and
724 # parsing entries
725 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000726 where="status IN ('Running', 'Gathering', 'Parsing')")
showarded2afea2009-07-07 20:54:07 +0000727 special_tasks = models.SpecialTask.objects.filter(is_active=True)
728 for execution_entry in itertools.chain(queue_entries, special_tasks):
showardd3dc1992009-04-22 21:01:40 +0000729 for pidfile_name in _ALL_PIDFILE_NAMES:
730 pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000731 execution_entry.execution_path(), pidfile_name=pidfile_name)
showardd3dc1992009-04-22 21:01:40 +0000732 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000733
734
showarded2afea2009-07-07 20:54:07 +0000735 def _get_recovery_run_monitor(self, execution_path, pidfile_name, orphans):
736 run_monitor = PidfileRunMonitor()
737 run_monitor.attach_to_existing_process(execution_path,
738 pidfile_name=pidfile_name)
739 if run_monitor.has_process():
740 orphans.discard(run_monitor.get_process())
741 return run_monitor, '(process %s)' % run_monitor.get_process()
742 return None, 'without process'
743
744
showard8cc058f2009-09-08 16:26:33 +0000745 def _get_unassigned_entries(self, status):
746 for entry in HostQueueEntry.fetch(where="status = '%s'" % status):
showard0db3d432009-10-12 20:29:15 +0000747 if entry.status == status and not self.get_agents_for_entry(entry):
748 # The status can change during iteration, e.g., if job.run()
749 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000750 yield entry
751
752
showardd3dc1992009-04-22 21:01:40 +0000753 def _recover_entries_with_status(self, status, orphans, pidfile_name,
754 recover_entries_fn):
showard8cc058f2009-09-08 16:26:33 +0000755 for queue_entry in self._get_unassigned_entries(status):
showardd3dc1992009-04-22 21:01:40 +0000756 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showarded2afea2009-07-07 20:54:07 +0000757 run_monitor, process_string = self._get_recovery_run_monitor(
758 queue_entry.execution_path(), pidfile_name, orphans)
showard8cc058f2009-09-08 16:26:33 +0000759 if not run_monitor:
760 # _schedule_running_host_queue_entries should schedule and
761 # recover these entries
762 continue
showard597bfd32009-05-08 18:22:50 +0000763
showarded2afea2009-07-07 20:54:07 +0000764 logging.info('Recovering %s entry %s %s',status.lower(),
765 ', '.join(str(entry) for entry in queue_entries),
766 process_string)
showardd3dc1992009-04-22 21:01:40 +0000767 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000768
769
showard6878e8b2009-07-20 22:37:45 +0000770 def _check_for_remaining_orphan_processes(self, orphans):
771 if not orphans:
772 return
773 subject = 'Unrecovered orphan autoserv processes remain'
774 message = '\n'.join(str(process) for process in orphans)
775 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000776
777 die_on_orphans = global_config.global_config.get_config_value(
778 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
779
780 if die_on_orphans:
781 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000782
showard170873e2009-01-07 00:22:26 +0000783
showardd3dc1992009-04-22 21:01:40 +0000784 def _recover_running_entries(self, orphans):
785 def recover_entries(job, queue_entries, run_monitor):
showard8cc058f2009-09-08 16:26:33 +0000786 queue_task = QueueTask(job=job, queue_entries=queue_entries,
787 recover_run_monitor=run_monitor)
788 self.add_agent(Agent(task=queue_task,
789 num_processes=len(queue_entries)))
showardd3dc1992009-04-22 21:01:40 +0000790
791 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
showarded2afea2009-07-07 20:54:07 +0000792 orphans, _AUTOSERV_PID_FILE,
showardd3dc1992009-04-22 21:01:40 +0000793 recover_entries)
794
795
796 def _recover_gathering_entries(self, orphans):
797 def recover_entries(job, queue_entries, run_monitor):
798 gather_task = GatherLogsTask(job, queue_entries,
showarded2afea2009-07-07 20:54:07 +0000799 recover_run_monitor=run_monitor)
showard8cc058f2009-09-08 16:26:33 +0000800 self.add_agent(Agent(gather_task))
showardd3dc1992009-04-22 21:01:40 +0000801
802 self._recover_entries_with_status(
803 models.HostQueueEntry.Status.GATHERING,
804 orphans, _CRASHINFO_PID_FILE, recover_entries)
805
806
807 def _recover_parsing_entries(self, orphans):
808 def recover_entries(job, queue_entries, run_monitor):
809 reparse_task = FinalReparseTask(queue_entries,
showarded2afea2009-07-07 20:54:07 +0000810 recover_run_monitor=run_monitor)
showard8cc058f2009-09-08 16:26:33 +0000811 self.add_agent(Agent(reparse_task, num_processes=0))
showardd3dc1992009-04-22 21:01:40 +0000812
813 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
814 orphans, _PARSER_PID_FILE,
815 recover_entries)
816
817
showard8cc058f2009-09-08 16:26:33 +0000818 def _recover_pending_entries(self):
819 for entry in self._get_unassigned_entries(
820 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000821 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000822 entry.on_pending()
823
824
showardd3dc1992009-04-22 21:01:40 +0000825 def _recover_all_recoverable_entries(self):
826 orphans = _drone_manager.get_orphaned_autoserv_processes()
827 self._recover_running_entries(orphans)
828 self._recover_gathering_entries(orphans)
829 self._recover_parsing_entries(orphans)
showarded2afea2009-07-07 20:54:07 +0000830 self._recover_special_tasks(orphans)
showard6878e8b2009-07-20 22:37:45 +0000831 self._check_for_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000832
showard97aed502008-11-04 02:01:24 +0000833
showarded2afea2009-07-07 20:54:07 +0000834 def _recover_special_tasks(self, orphans):
showard2fe3f1d2009-07-06 20:19:11 +0000835 """\
836 Recovers all special tasks that have started running but have not
837 completed.
838 """
839
840 tasks = models.SpecialTask.objects.filter(is_active=True,
841 is_complete=False)
842 # Use ordering to force NULL queue_entry_id's to the end of the list
showarda5288b42009-07-28 20:06:08 +0000843 for task in tasks.order_by('-queue_entry__id'):
showard9b6ec502009-08-20 23:25:17 +0000844 if self.host_has_agent(task.host):
845 raise SchedulerError(
846 "%s already has a host agent %s." % (
showard8cc058f2009-09-08 16:26:33 +0000847 task, self._host_agents.get(task.host.id)))
showard2fe3f1d2009-07-06 20:19:11 +0000848
showarded2afea2009-07-07 20:54:07 +0000849 run_monitor, process_string = self._get_recovery_run_monitor(
850 task.execution_path(), _AUTOSERV_PID_FILE, orphans)
851
852 logging.info('Recovering %s %s', task, process_string)
showard8cc058f2009-09-08 16:26:33 +0000853 self._recover_special_task(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000854
855
showard8cc058f2009-09-08 16:26:33 +0000856 def _recover_special_task(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000857 """\
858 Recovers a single special task.
859 """
860 if task.task == models.SpecialTask.Task.VERIFY:
showard8cc058f2009-09-08 16:26:33 +0000861 agent_task = self._recover_verify(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000862 elif task.task == models.SpecialTask.Task.REPAIR:
showard8cc058f2009-09-08 16:26:33 +0000863 agent_task = self._recover_repair(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000864 elif task.task == models.SpecialTask.Task.CLEANUP:
showard8cc058f2009-09-08 16:26:33 +0000865 agent_task = self._recover_cleanup(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000866 else:
867 # Should never happen
868 logging.error(
869 "Special task id %d had invalid task %s", (task.id, task.task))
870
showard8cc058f2009-09-08 16:26:33 +0000871 self.add_agent(Agent(agent_task))
showard2fe3f1d2009-07-06 20:19:11 +0000872
873
showard8cc058f2009-09-08 16:26:33 +0000874 def _recover_verify(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000875 """\
876 Recovers a verify task.
877 No associated queue entry: Verify host
878 With associated queue entry: Verify host, and run associated queue
879 entry
880 """
showard8cc058f2009-09-08 16:26:33 +0000881 return VerifyTask(task=task, recover_run_monitor=run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000882
883
showard8cc058f2009-09-08 16:26:33 +0000884 def _recover_repair(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000885 """\
886 Recovers a repair task.
887 Always repair host
888 """
showard8cc058f2009-09-08 16:26:33 +0000889 return RepairTask(task=task, recover_run_monitor=run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000890
891
showard8cc058f2009-09-08 16:26:33 +0000892 def _recover_cleanup(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000893 """\
894 Recovers a cleanup task.
895 No associated queue entry: Clean host
896 With associated queue entry: Clean host, verify host if needed, and
897 run associated queue entry
898 """
showard8cc058f2009-09-08 16:26:33 +0000899 return CleanupTask(task=task, recover_run_monitor=run_monitor)
showard6af73ad2009-07-28 20:00:58 +0000900
901
showardb8900452009-10-12 20:31:01 +0000902 def _check_for_unrecovered_verifying_entries(self):
showard170873e2009-01-07 00:22:26 +0000903 queue_entries = HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000904 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
905 unrecovered_hqes = []
906 for queue_entry in queue_entries:
907 special_tasks = models.SpecialTask.objects.filter(
908 task__in=(models.SpecialTask.Task.CLEANUP,
909 models.SpecialTask.Task.VERIFY),
910 queue_entry__id=queue_entry.id,
911 is_complete=False)
912 if special_tasks.count() == 0:
913 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000914
showardb8900452009-10-12 20:31:01 +0000915 if unrecovered_hqes:
916 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
showarde8e37072009-08-20 23:31:30 +0000917 raise SchedulerError(
showard37757f32009-10-19 18:34:24 +0000918 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000919 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000920
921
showard8cc058f2009-09-08 16:26:33 +0000922 def _schedule_special_tasks(self):
923 tasks = models.SpecialTask.objects.filter(is_active=False,
924 is_complete=False,
925 host__locked=False)
926 # We want lower ids to come first, but the NULL queue_entry_ids need to
927 # come last
928 tasks = tasks.extra(select={'isnull' : 'queue_entry_id IS NULL'})
929 tasks = tasks.extra(order_by=['isnull', 'id'])
showard6d7b2ff2009-06-10 00:16:47 +0000930
showard2fe3f1d2009-07-06 20:19:11 +0000931 for task in tasks:
showard8cc058f2009-09-08 16:26:33 +0000932 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000933 continue
showard6d7b2ff2009-06-10 00:16:47 +0000934
showard8cc058f2009-09-08 16:26:33 +0000935 if task.task == models.SpecialTask.Task.CLEANUP:
936 agent_task = CleanupTask(task=task)
937 elif task.task == models.SpecialTask.Task.VERIFY:
938 agent_task = VerifyTask(task=task)
939 elif task.task == models.SpecialTask.Task.REPAIR:
940 agent_task = RepairTask(task=task)
941 else:
942 email_manager.manager.enqueue_notify_email(
943 'Special task with invalid task', task)
944 continue
945
946 self.add_agent(Agent(agent_task))
showard1ff7b2e2009-05-15 23:17:18 +0000947
948
showard170873e2009-01-07 00:22:26 +0000949 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000950 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000951 # should never happen
showarded2afea2009-07-07 20:54:07 +0000952 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000953 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000954 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000955 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000956 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000957
958
jadmanski0afbb632008-06-06 21:10:57 +0000959 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000960 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +0000961 full_where='locked = 0 AND invalid = 0 AND ' + where
962 for host in Host.fetch(where=full_where):
963 if self.host_has_agent(host):
964 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000965 continue
showard8cc058f2009-09-08 16:26:33 +0000966 if self._host_has_scheduled_special_task(host):
967 # host will have a special task scheduled on the next cycle
968 continue
showard170873e2009-01-07 00:22:26 +0000969 if print_message:
showardb18134f2009-03-20 20:52:18 +0000970 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000971 models.SpecialTask.objects.create(
972 task=models.SpecialTask.Task.CLEANUP,
973 host=models.Host(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000974
975
jadmanski0afbb632008-06-06 21:10:57 +0000976 def _recover_hosts(self):
977 # recover "Repair Failed" hosts
978 message = 'Reverifying dead host %s'
979 self._reverify_hosts_where("status = 'Repair Failed'",
980 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000981
982
showard04c82c52008-05-29 19:38:12 +0000983
showardb95b1bd2008-08-15 18:11:04 +0000984 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000985 # prioritize by job priority, then non-metahost over metahost, then FIFO
986 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000987 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000988 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000989 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000990
991
showard89f84db2009-03-12 20:39:13 +0000992 def _refresh_pending_queue_entries(self):
993 """
994 Lookup the pending HostQueueEntries and call our HostScheduler
995 refresh() method given that list. Return the list.
996
997 @returns A list of pending HostQueueEntries sorted in priority order.
998 """
showard63a34772008-08-18 19:32:50 +0000999 queue_entries = self._get_pending_queue_entries()
1000 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +00001001 return []
showardb95b1bd2008-08-15 18:11:04 +00001002
showard63a34772008-08-18 19:32:50 +00001003 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +00001004
showard89f84db2009-03-12 20:39:13 +00001005 return queue_entries
1006
1007
1008 def _schedule_atomic_group(self, queue_entry):
1009 """
1010 Schedule the given queue_entry on an atomic group of hosts.
1011
1012 Returns immediately if there are insufficient available hosts.
1013
1014 Creates new HostQueueEntries based off of queue_entry for the
1015 scheduled hosts and starts them all running.
1016 """
1017 # This is a virtual host queue entry representing an entire
1018 # atomic group, find a group and schedule their hosts.
1019 group_hosts = self._host_scheduler.find_eligible_atomic_group(
1020 queue_entry)
1021 if not group_hosts:
1022 return
showardcbe6f942009-06-17 19:33:49 +00001023
1024 logging.info('Expanding atomic group entry %s with hosts %s',
1025 queue_entry,
1026 ', '.join(host.hostname for host in group_hosts))
showard89f84db2009-03-12 20:39:13 +00001027 # The first assigned host uses the original HostQueueEntry
1028 group_queue_entries = [queue_entry]
1029 for assigned_host in group_hosts[1:]:
1030 # Create a new HQE for every additional assigned_host.
1031 new_hqe = HostQueueEntry.clone(queue_entry)
1032 new_hqe.save()
1033 group_queue_entries.append(new_hqe)
1034 assert len(group_queue_entries) == len(group_hosts)
1035 for queue_entry, host in itertools.izip(group_queue_entries,
1036 group_hosts):
1037 self._run_queue_entry(queue_entry, host)
1038
1039
1040 def _schedule_new_jobs(self):
1041 queue_entries = self._refresh_pending_queue_entries()
1042 if not queue_entries:
1043 return
1044
showard63a34772008-08-18 19:32:50 +00001045 for queue_entry in queue_entries:
showarde55955f2009-10-07 20:48:58 +00001046 is_unassigned_atomic_group = (
1047 queue_entry.atomic_group_id is not None
1048 and queue_entry.host_id is None)
1049 if is_unassigned_atomic_group:
1050 self._schedule_atomic_group(queue_entry)
1051 else:
showard89f84db2009-03-12 20:39:13 +00001052 assigned_host = self._host_scheduler.find_eligible_host(
1053 queue_entry)
1054 if assigned_host:
1055 self._run_queue_entry(queue_entry, assigned_host)
showardb95b1bd2008-08-15 18:11:04 +00001056
1057
showard8cc058f2009-09-08 16:26:33 +00001058 def _schedule_running_host_queue_entries(self):
showard8375ce02009-10-12 20:35:13 +00001059 status_enum = models.HostQueueEntry.Status
1060 running_statuses = (status_enum.STARTING, status_enum.RUNNING,
1061 status_enum.GATHERING, status_enum.PARSING)
1062 sql_statuses = ', '.join(('"%s"' % s for s in running_statuses))
1063 entries = HostQueueEntry.fetch(where="status IN (%s)" % sql_statuses)
showard8cc058f2009-09-08 16:26:33 +00001064 for entry in entries:
1065 if self.get_agents_for_entry(entry):
1066 continue
1067
1068 task_entries = entry.job.get_group_entries(entry)
1069 for task_entry in task_entries:
1070 if (task_entry.status != models.HostQueueEntry.Status.PARSING
1071 and self.host_has_agent(task_entry.host)):
showard8375ce02009-10-12 20:35:13 +00001072 agent = tuple(self._host_agents.get(task_entry.host.id))[0]
showard8cc058f2009-09-08 16:26:33 +00001073 raise SchedulerError('Attempted to schedule on host that '
1074 'already has agent: %s (previous '
1075 'agent task: %s)'
1076 % (task_entry, agent.task))
1077
1078 if entry.status in (models.HostQueueEntry.Status.STARTING,
1079 models.HostQueueEntry.Status.RUNNING):
1080 params = entry.job.get_autoserv_params(task_entries)
1081 agent_task = QueueTask(job=entry.job,
1082 queue_entries=task_entries, cmd=params)
1083 elif entry.status == models.HostQueueEntry.Status.GATHERING:
1084 agent_task = GatherLogsTask(
1085 job=entry.job, queue_entries=task_entries)
1086 elif entry.status == models.HostQueueEntry.Status.PARSING:
1087 agent_task = FinalReparseTask(queue_entries=task_entries)
1088 else:
1089 raise SchedulerError('_schedule_running_host_queue_entries got '
1090 'entry with invalid status %s: %s'
1091 % (entry.status, entry))
1092
1093 self.add_agent(Agent(agent_task, num_processes=len(task_entries)))
1094
1095
1096 def _schedule_delay_tasks(self):
showardd2014822009-10-12 20:26:58 +00001097 for entry in HostQueueEntry.fetch(where='status = "%s"' %
1098 models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +00001099 task = entry.job.schedule_delayed_callback_task(entry)
1100 if task:
1101 self.add_agent(Agent(task, num_processes=0))
1102
1103
showardb95b1bd2008-08-15 18:11:04 +00001104 def _run_queue_entry(self, queue_entry, host):
showard8cc058f2009-09-08 16:26:33 +00001105 queue_entry.schedule_pre_job_tasks(assigned_host=host)
mblighd5c95802008-03-05 00:33:46 +00001106
1107
jadmanski0afbb632008-06-06 21:10:57 +00001108 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +00001109 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001110 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001111 for agent in self.get_agents_for_entry(entry):
1112 agent.abort()
1113 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001114
1115
showard324bf812009-01-20 23:23:38 +00001116 def _can_start_agent(self, agent, num_started_this_cycle,
1117 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001118 # always allow zero-process agents to run
1119 if agent.num_processes == 0:
1120 return True
1121 # don't allow any nonzero-process agents to run after we've reached a
1122 # limit (this avoids starvation of many-process agents)
1123 if have_reached_limit:
1124 return False
1125 # total process throttling
showard324bf812009-01-20 23:23:38 +00001126 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +00001127 return False
1128 # if a single agent exceeds the per-cycle throttling, still allow it to
1129 # run when it's the first agent in the cycle
1130 if num_started_this_cycle == 0:
1131 return True
1132 # per-cycle throttling
1133 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +00001134 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001135 return False
1136 return True
1137
1138
jadmanski0afbb632008-06-06 21:10:57 +00001139 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001140 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001141 have_reached_limit = False
1142 # iterate over copy, so we can remove agents during iteration
1143 for agent in list(self._agents):
showard8cc058f2009-09-08 16:26:33 +00001144 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001145 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001146 have_reached_limit):
1147 have_reached_limit = True
1148 continue
showard4c5374f2008-09-04 17:02:56 +00001149 num_started_this_cycle += agent.num_processes
1150 agent.tick()
showard8cc058f2009-09-08 16:26:33 +00001151 if agent.is_done():
1152 logging.info("agent finished")
1153 self.remove_agent(agent)
showarda9435c02009-05-13 21:28:17 +00001154 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001155 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001156
1157
showard29f7cd22009-04-29 21:16:24 +00001158 def _process_recurring_runs(self):
1159 recurring_runs = models.RecurringRun.objects.filter(
1160 start_date__lte=datetime.datetime.now())
1161 for rrun in recurring_runs:
1162 # Create job from template
1163 job = rrun.job
1164 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001165 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001166
1167 host_objects = info['hosts']
1168 one_time_hosts = info['one_time_hosts']
1169 metahost_objects = info['meta_hosts']
1170 dependencies = info['dependencies']
1171 atomic_group = info['atomic_group']
1172
1173 for host in one_time_hosts or []:
1174 this_host = models.Host.create_one_time_host(host.hostname)
1175 host_objects.append(this_host)
1176
1177 try:
1178 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001179 options=options,
showard29f7cd22009-04-29 21:16:24 +00001180 host_objects=host_objects,
1181 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001182 atomic_group=atomic_group)
1183
1184 except Exception, ex:
1185 logging.exception(ex)
1186 #TODO send email
1187
1188 if rrun.loop_count == 1:
1189 rrun.delete()
1190 else:
1191 if rrun.loop_count != 0: # if not infinite loop
1192 # calculate new start_date
1193 difference = datetime.timedelta(seconds=rrun.loop_period)
1194 rrun.start_date = rrun.start_date + difference
1195 rrun.loop_count -= 1
1196 rrun.save()
1197
1198
showard170873e2009-01-07 00:22:26 +00001199class PidfileRunMonitor(object):
1200 """
1201 Client must call either run() to start a new process or
1202 attach_to_existing_process().
1203 """
mbligh36768f02008-02-22 18:28:33 +00001204
showard170873e2009-01-07 00:22:26 +00001205 class _PidfileException(Exception):
1206 """
1207 Raised when there's some unexpected behavior with the pid file, but only
1208 used internally (never allowed to escape this class).
1209 """
mbligh36768f02008-02-22 18:28:33 +00001210
1211
showard170873e2009-01-07 00:22:26 +00001212 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001213 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001214 self._start_time = None
1215 self.pidfile_id = None
1216 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001217
1218
showard170873e2009-01-07 00:22:26 +00001219 def _add_nice_command(self, command, nice_level):
1220 if not nice_level:
1221 return command
1222 return ['nice', '-n', str(nice_level)] + command
1223
1224
1225 def _set_start_time(self):
1226 self._start_time = time.time()
1227
1228
1229 def run(self, command, working_directory, nice_level=None, log_file=None,
1230 pidfile_name=None, paired_with_pidfile=None):
1231 assert command is not None
1232 if nice_level is not None:
1233 command = ['nice', '-n', str(nice_level)] + command
1234 self._set_start_time()
1235 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001236 command, working_directory, pidfile_name=pidfile_name,
1237 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001238
1239
showarded2afea2009-07-07 20:54:07 +00001240 def attach_to_existing_process(self, execution_path,
showardd3dc1992009-04-22 21:01:40 +00001241 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001242 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001243 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001244 execution_path, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001245 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001246
1247
jadmanski0afbb632008-06-06 21:10:57 +00001248 def kill(self):
showard170873e2009-01-07 00:22:26 +00001249 if self.has_process():
1250 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001251
mbligh36768f02008-02-22 18:28:33 +00001252
showard170873e2009-01-07 00:22:26 +00001253 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001254 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001255 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001256
1257
showard170873e2009-01-07 00:22:26 +00001258 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001259 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001260 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001261 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001262
1263
showard170873e2009-01-07 00:22:26 +00001264 def _read_pidfile(self, use_second_read=False):
1265 assert self.pidfile_id is not None, (
1266 'You must call run() or attach_to_existing_process()')
1267 contents = _drone_manager.get_pidfile_contents(
1268 self.pidfile_id, use_second_read=use_second_read)
1269 if contents.is_invalid():
1270 self._state = drone_manager.PidfileContents()
1271 raise self._PidfileException(contents)
1272 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001273
1274
showard21baa452008-10-21 00:08:39 +00001275 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001276 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1277 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001278 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001279 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001280
1281
1282 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001283 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001284 return
mblighbb421852008-03-11 22:36:16 +00001285
showard21baa452008-10-21 00:08:39 +00001286 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001287
showard170873e2009-01-07 00:22:26 +00001288 if self._state.process is None:
1289 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001290 return
mbligh90a549d2008-03-25 23:52:34 +00001291
showard21baa452008-10-21 00:08:39 +00001292 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001293 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001294 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001295 return
mbligh90a549d2008-03-25 23:52:34 +00001296
showard170873e2009-01-07 00:22:26 +00001297 # pid but no running process - maybe process *just* exited
1298 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001299 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001300 # autoserv exited without writing an exit code
1301 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001302 self._handle_pidfile_error(
1303 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001304
showard21baa452008-10-21 00:08:39 +00001305
1306 def _get_pidfile_info(self):
1307 """\
1308 After completion, self._state will contain:
1309 pid=None, exit_status=None if autoserv has not yet run
1310 pid!=None, exit_status=None if autoserv is running
1311 pid!=None, exit_status!=None if autoserv has completed
1312 """
1313 try:
1314 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001315 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001316 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001317
1318
showard170873e2009-01-07 00:22:26 +00001319 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001320 """\
1321 Called when no pidfile is found or no pid is in the pidfile.
1322 """
showard170873e2009-01-07 00:22:26 +00001323 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001324 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001325 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001326 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001327 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001328
1329
showard35162b02009-03-03 02:17:30 +00001330 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001331 """\
1332 Called when autoserv has exited without writing an exit status,
1333 or we've timed out waiting for autoserv to write a pid to the
1334 pidfile. In either case, we just return failure and the caller
1335 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001336
showard170873e2009-01-07 00:22:26 +00001337 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001338 """
1339 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001340 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001341 self._state.exit_status = 1
1342 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001343
1344
jadmanski0afbb632008-06-06 21:10:57 +00001345 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001346 self._get_pidfile_info()
1347 return self._state.exit_status
1348
1349
1350 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001351 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001352 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001353 if self._state.num_tests_failed is None:
1354 return -1
showard21baa452008-10-21 00:08:39 +00001355 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001356
1357
showardcdaeae82009-08-31 18:32:48 +00001358 def try_copy_results_on_drone(self, **kwargs):
1359 if self.has_process():
1360 # copy results logs into the normal place for job results
1361 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1362
1363
1364 def try_copy_to_results_repository(self, source, **kwargs):
1365 if self.has_process():
1366 _drone_manager.copy_to_results_repository(self.get_process(),
1367 source, **kwargs)
1368
1369
mbligh36768f02008-02-22 18:28:33 +00001370class Agent(object):
showard77182562009-06-10 00:16:05 +00001371 """
showard8cc058f2009-09-08 16:26:33 +00001372 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001373
1374 The following methods are required on all task objects:
1375 poll() - Called periodically to let the task check its status and
1376 update its internal state. If the task succeeded.
1377 is_done() - Returns True if the task is finished.
1378 abort() - Called when an abort has been requested. The task must
1379 set its aborted attribute to True if it actually aborted.
1380
1381 The following attributes are required on all task objects:
1382 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001383 success - bool, True if this task succeeded.
1384 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1385 host_ids - A sequence of Host ids this task represents.
1386
1387 The following attribute is written to all task objects:
1388 agent - A reference to the Agent instance that the task has been
1389 added to.
1390 """
1391
1392
showard8cc058f2009-09-08 16:26:33 +00001393 def __init__(self, task, num_processes=1):
showard77182562009-06-10 00:16:05 +00001394 """
showard8cc058f2009-09-08 16:26:33 +00001395 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001396 @param num_processes: The number of subprocesses the Agent represents.
1397 This is used by the Dispatcher for managing the load on the
1398 system. Defaults to 1.
1399 """
showard8cc058f2009-09-08 16:26:33 +00001400 self.task = task
1401 task.agent = self
1402
showard77182562009-06-10 00:16:05 +00001403 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001404 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001405 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001406
showard8cc058f2009-09-08 16:26:33 +00001407 self.queue_entry_ids = task.queue_entry_ids
1408 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001409
showard8cc058f2009-09-08 16:26:33 +00001410 self.started = False
mbligh36768f02008-02-22 18:28:33 +00001411
1412
jadmanski0afbb632008-06-06 21:10:57 +00001413 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001414 self.started = True
1415 if self.task:
1416 self.task.poll()
1417 if self.task.is_done():
1418 self.task = None
showardec113162008-05-08 00:52:49 +00001419
1420
jadmanski0afbb632008-06-06 21:10:57 +00001421 def is_done(self):
showard8cc058f2009-09-08 16:26:33 +00001422 return self.task is None
mbligh36768f02008-02-22 18:28:33 +00001423
1424
showardd3dc1992009-04-22 21:01:40 +00001425 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001426 if self.task:
1427 self.task.abort()
1428 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001429 # tasks can choose to ignore aborts
showard8cc058f2009-09-08 16:26:33 +00001430 self.task = None
showard20f9bdd2009-04-29 19:48:33 +00001431
showardd3dc1992009-04-22 21:01:40 +00001432
showard77182562009-06-10 00:16:05 +00001433class DelayedCallTask(object):
1434 """
1435 A task object like AgentTask for an Agent to run that waits for the
1436 specified amount of time to have elapsed before calling the supplied
1437 callback once and finishing. If the callback returns anything, it is
1438 assumed to be a new Agent instance and will be added to the dispatcher.
1439
1440 @attribute end_time: The absolute posix time after which this task will
1441 call its callback when it is polled and be finished.
1442
1443 Also has all attributes required by the Agent class.
1444 """
1445 def __init__(self, delay_seconds, callback, now_func=None):
1446 """
1447 @param delay_seconds: The delay in seconds from now that this task
1448 will call the supplied callback and be done.
1449 @param callback: A callable to be called by this task once after at
1450 least delay_seconds time has elapsed. It must return None
1451 or a new Agent instance.
1452 @param now_func: A time.time like function. Default: time.time.
1453 Used for testing.
1454 """
1455 assert delay_seconds > 0
1456 assert callable(callback)
1457 if not now_func:
1458 now_func = time.time
1459 self._now_func = now_func
1460 self._callback = callback
1461
1462 self.end_time = self._now_func() + delay_seconds
1463
1464 # These attributes are required by Agent.
1465 self.aborted = False
showard77182562009-06-10 00:16:05 +00001466 self.host_ids = ()
1467 self.success = False
1468 self.queue_entry_ids = ()
1469 # This is filled in by Agent.add_task().
1470 self.agent = None
1471
1472
1473 def poll(self):
showard8cc058f2009-09-08 16:26:33 +00001474 if not self.is_done() and self._now_func() >= self.end_time:
1475 self._callback()
showard77182562009-06-10 00:16:05 +00001476 self.success = True
1477
1478
1479 def is_done(self):
showard8cc058f2009-09-08 16:26:33 +00001480 return self.success or self.aborted
showard77182562009-06-10 00:16:05 +00001481
1482
1483 def abort(self):
1484 self.aborted = True
showard77182562009-06-10 00:16:05 +00001485
1486
mbligh36768f02008-02-22 18:28:33 +00001487class AgentTask(object):
showard8cc058f2009-09-08 16:26:33 +00001488 def __init__(self, cmd=None, working_directory=None,
showarded2afea2009-07-07 20:54:07 +00001489 pidfile_name=None, paired_with_pidfile=None,
1490 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001491 self.done = False
jadmanski0afbb632008-06-06 21:10:57 +00001492 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001493 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001494 self.agent = None
showarded2afea2009-07-07 20:54:07 +00001495 self.monitor = recover_run_monitor
1496 self.started = bool(recover_run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +00001497 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001498 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001499 self.queue_entry_ids = []
1500 self.host_ids = []
1501 self.log_file = None
1502
1503
1504 def _set_ids(self, host=None, queue_entries=None):
1505 if queue_entries and queue_entries != [None]:
1506 self.host_ids = [entry.host.id for entry in queue_entries]
1507 self.queue_entry_ids = [entry.id for entry in queue_entries]
1508 else:
1509 assert host
1510 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001511
1512
jadmanski0afbb632008-06-06 21:10:57 +00001513 def poll(self):
showard08a36412009-05-05 01:01:13 +00001514 if not self.started:
1515 self.start()
1516 self.tick()
1517
1518
1519 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001520 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001521 exit_code = self.monitor.exit_code()
1522 if exit_code is None:
1523 return
1524 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001525 else:
1526 success = False
mbligh36768f02008-02-22 18:28:33 +00001527
jadmanski0afbb632008-06-06 21:10:57 +00001528 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001529
1530
jadmanski0afbb632008-06-06 21:10:57 +00001531 def is_done(self):
1532 return self.done
mbligh36768f02008-02-22 18:28:33 +00001533
1534
jadmanski0afbb632008-06-06 21:10:57 +00001535 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001536 if self.done:
1537 return
jadmanski0afbb632008-06-06 21:10:57 +00001538 self.done = True
1539 self.success = success
1540 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001541
1542
jadmanski0afbb632008-06-06 21:10:57 +00001543 def prolog(self):
showarded2afea2009-07-07 20:54:07 +00001544 assert not self.monitor
mblighd64e5702008-04-04 21:39:28 +00001545
mbligh36768f02008-02-22 18:28:33 +00001546
jadmanski0afbb632008-06-06 21:10:57 +00001547 def cleanup(self):
showardcdaeae82009-08-31 18:32:48 +00001548 if self.monitor and self.log_file:
1549 self.monitor.try_copy_to_results_repository(self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001550
1551
jadmanski0afbb632008-06-06 21:10:57 +00001552 def epilog(self):
1553 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001554
1555
jadmanski0afbb632008-06-06 21:10:57 +00001556 def start(self):
1557 assert self.agent
1558
1559 if not self.started:
1560 self.prolog()
1561 self.run()
1562
1563 self.started = True
1564
1565
1566 def abort(self):
1567 if self.monitor:
1568 self.monitor.kill()
1569 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001570 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001571 self.cleanup()
1572
1573
showarded2afea2009-07-07 20:54:07 +00001574 def _get_consistent_execution_path(self, execution_entries):
1575 first_execution_path = execution_entries[0].execution_path()
1576 for execution_entry in execution_entries[1:]:
1577 assert execution_entry.execution_path() == first_execution_path, (
1578 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1579 execution_entry,
1580 first_execution_path,
1581 execution_entries[0]))
1582 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001583
1584
showarded2afea2009-07-07 20:54:07 +00001585 def _copy_results(self, execution_entries, use_monitor=None):
1586 """
1587 @param execution_entries: list of objects with execution_path() method
1588 """
showard6d1c1432009-08-20 23:30:39 +00001589 if use_monitor is not None and not use_monitor.has_process():
1590 return
1591
showarded2afea2009-07-07 20:54:07 +00001592 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001593 if use_monitor is None:
1594 assert self.monitor
1595 use_monitor = self.monitor
1596 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001597 execution_path = self._get_consistent_execution_path(execution_entries)
1598 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001599 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001600
showarda1e74b32009-05-12 17:32:04 +00001601
1602 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001603 for queue_entry in queue_entries:
1604 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001605
1606
showarda1e74b32009-05-12 17:32:04 +00001607 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1608 self._copy_results(queue_entries, use_monitor)
1609 self._parse_results(queue_entries)
1610
1611
showardd3dc1992009-04-22 21:01:40 +00001612 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
showarded2afea2009-07-07 20:54:07 +00001613 assert not self.monitor
jadmanski0afbb632008-06-06 21:10:57 +00001614 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001615 self.monitor = PidfileRunMonitor()
1616 self.monitor.run(self.cmd, self._working_directory,
1617 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001618 log_file=self.log_file,
1619 pidfile_name=pidfile_name,
1620 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001621
1622
showardd9205182009-04-27 20:09:55 +00001623class TaskWithJobKeyvals(object):
1624 """AgentTask mixin providing functionality to help with job keyval files."""
1625 _KEYVAL_FILE = 'keyval'
1626 def _format_keyval(self, key, value):
1627 return '%s=%s' % (key, value)
1628
1629
1630 def _keyval_path(self):
1631 """Subclasses must override this"""
1632 raise NotImplemented
1633
1634
1635 def _write_keyval_after_job(self, field, value):
1636 assert self.monitor
1637 if not self.monitor.has_process():
1638 return
1639 _drone_manager.write_lines_to_file(
1640 self._keyval_path(), [self._format_keyval(field, value)],
1641 paired_with_process=self.monitor.get_process())
1642
1643
1644 def _job_queued_keyval(self, job):
1645 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1646
1647
1648 def _write_job_finished(self):
1649 self._write_keyval_after_job("job_finished", int(time.time()))
1650
1651
showarddb502762009-09-09 15:31:20 +00001652 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1653 keyval_contents = '\n'.join(self._format_keyval(key, value)
1654 for key, value in keyval_dict.iteritems())
1655 # always end with a newline to allow additional keyvals to be written
1656 keyval_contents += '\n'
1657 _drone_manager.attach_file_to_execution(self._working_directory,
1658 keyval_contents,
1659 file_path=keyval_path)
1660
1661
1662 def _write_keyvals_before_job(self, keyval_dict):
1663 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1664
1665
1666 def _write_host_keyvals(self, host):
1667 keyval_path = os.path.join(self._working_directory, 'host_keyvals',
1668 host.hostname)
1669 platform, all_labels = host.platform_and_labels()
1670 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1671 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1672
1673
showard8cc058f2009-09-08 16:26:33 +00001674class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001675 """
1676 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1677 """
1678
1679 TASK_TYPE = None
1680 host = None
1681 queue_entry = None
1682
1683 def __init__(self, task, extra_command_args, **kwargs):
showarded2afea2009-07-07 20:54:07 +00001684 assert (self.TASK_TYPE is not None,
1685 'self.TASK_TYPE must be overridden')
showard8cc058f2009-09-08 16:26:33 +00001686
1687 self.host = Host(id=task.host.id)
1688 self.queue_entry = None
1689 if task.queue_entry:
1690 self.queue_entry = HostQueueEntry(id=task.queue_entry.id)
1691
showarded2afea2009-07-07 20:54:07 +00001692 self.task = task
showarddb502762009-09-09 15:31:20 +00001693 kwargs['working_directory'] = task.execution_path()
showarded2afea2009-07-07 20:54:07 +00001694 self._extra_command_args = extra_command_args
1695 super(SpecialAgentTask, self).__init__(**kwargs)
1696
1697
showard8cc058f2009-09-08 16:26:33 +00001698 def _keyval_path(self):
1699 return os.path.join(self._working_directory, self._KEYVAL_FILE)
1700
1701
showarded2afea2009-07-07 20:54:07 +00001702 def prolog(self):
1703 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001704 self.cmd = _autoserv_command_line(self.host.hostname,
1705 self._extra_command_args,
1706 queue_entry=self.queue_entry)
1707 self._working_directory = self.task.execution_path()
1708 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001709 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001710
1711
showardde634ee2009-01-30 01:44:24 +00001712 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001713 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001714
showard2fe3f1d2009-07-06 20:19:11 +00001715 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001716 return # don't fail metahost entries, they'll be reassigned
1717
showard2fe3f1d2009-07-06 20:19:11 +00001718 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001719 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001720 return # entry has been aborted
1721
showard2fe3f1d2009-07-06 20:19:11 +00001722 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001723 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001724 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001725 self._write_keyval_after_job(queued_key, queued_time)
1726 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001727
showard8cc058f2009-09-08 16:26:33 +00001728 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001729 self.monitor.try_copy_results_on_drone(
1730 source_path=self._working_directory + '/',
1731 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001732
showard2fe3f1d2009-07-06 20:19:11 +00001733 self._copy_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001734 self.queue_entry.handle_host_failure()
showard2fe3f1d2009-07-06 20:19:11 +00001735 if self.queue_entry.job.parse_failed_repair:
1736 self._parse_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001737
1738 pidfile_id = _drone_manager.get_pidfile_id_from(
1739 self.queue_entry.execution_path(),
1740 pidfile_name=_AUTOSERV_PID_FILE)
1741 _drone_manager.register_pidfile(pidfile_id)
1742
1743
1744 def cleanup(self):
1745 super(SpecialAgentTask, self).cleanup()
1746 self.task.finish()
showardf85a0b72009-10-07 20:48:45 +00001747 if self.monitor:
1748 if self.monitor.has_process():
1749 self._copy_results([self.task])
1750 if self.monitor.pidfile_id is not None:
1751 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001752
1753
1754class RepairTask(SpecialAgentTask):
1755 TASK_TYPE = models.SpecialTask.Task.REPAIR
1756
1757
1758 def __init__(self, task, recover_run_monitor=None):
1759 """\
1760 queue_entry: queue entry to mark failed if this repair fails.
1761 """
1762 protection = host_protections.Protection.get_string(
1763 task.host.protection)
1764 # normalize the protection name
1765 protection = host_protections.Protection.get_attr_name(protection)
1766
1767 super(RepairTask, self).__init__(
1768 task, ['-R', '--host-protection', protection],
1769 recover_run_monitor=recover_run_monitor)
1770
1771 # *don't* include the queue entry in IDs -- if the queue entry is
1772 # aborted, we want to leave the repair task running
1773 self._set_ids(host=self.host)
1774
1775
1776 def prolog(self):
1777 super(RepairTask, self).prolog()
1778 logging.info("repair_task starting")
1779 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001780
1781
jadmanski0afbb632008-06-06 21:10:57 +00001782 def epilog(self):
1783 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001784
jadmanski0afbb632008-06-06 21:10:57 +00001785 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001786 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001787 else:
showard8cc058f2009-09-08 16:26:33 +00001788 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001789 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001790 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001791
1792
showarded2afea2009-07-07 20:54:07 +00001793class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001794 def _copy_to_results_repository(self):
1795 if not self.queue_entry or self.queue_entry.meta_host:
1796 return
1797
1798 self.queue_entry.set_execution_subdir()
1799 log_name = os.path.basename(self.task.execution_path())
1800 source = os.path.join(self.task.execution_path(), 'debug',
1801 'autoserv.DEBUG')
1802 destination = os.path.join(
1803 self.queue_entry.execution_path(), log_name)
1804
1805 self.monitor.try_copy_to_results_repository(
1806 source, destination_path=destination)
1807
1808
showard170873e2009-01-07 00:22:26 +00001809 def epilog(self):
1810 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001811
showard775300b2009-09-09 15:30:50 +00001812 if self.success:
1813 return
showard8fe93b52008-11-18 17:53:22 +00001814
showard775300b2009-09-09 15:30:50 +00001815 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001816
showard775300b2009-09-09 15:30:50 +00001817 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
1818 return
1819
1820 if self.queue_entry:
1821 self.queue_entry.requeue()
1822
1823 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001824 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001825 queue_entry__id=self.queue_entry.id):
1826 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1827 self._fail_queue_entry()
1828 return
1829
1830 queue_entry = models.HostQueueEntry(id=self.queue_entry.id)
1831 else:
1832 queue_entry = None
1833
1834 models.SpecialTask.objects.create(
1835 host=models.Host(id=self.host.id),
1836 task=models.SpecialTask.Task.REPAIR,
1837 queue_entry=queue_entry)
showard58721a82009-08-20 23:32:40 +00001838
showard8fe93b52008-11-18 17:53:22 +00001839
1840class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001841 TASK_TYPE = models.SpecialTask.Task.VERIFY
1842
1843
showard8cc058f2009-09-08 16:26:33 +00001844 def __init__(self, task, recover_run_monitor=None):
showarded2afea2009-07-07 20:54:07 +00001845 super(VerifyTask, self).__init__(
showard8cc058f2009-09-08 16:26:33 +00001846 task, ['-v'], recover_run_monitor=recover_run_monitor)
mblighe2586682008-02-29 22:45:46 +00001847
showard8cc058f2009-09-08 16:26:33 +00001848 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001849
1850
jadmanski0afbb632008-06-06 21:10:57 +00001851 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001852 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001853
showardb18134f2009-03-20 20:52:18 +00001854 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001855 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001856 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1857 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001858
showarded2afea2009-07-07 20:54:07 +00001859 # Delete any other queued verifies for this host. One verify will do
1860 # and there's no need to keep records of other requests.
1861 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001862 host__id=self.host.id,
1863 task=models.SpecialTask.Task.VERIFY,
showarded2afea2009-07-07 20:54:07 +00001864 is_active=False, is_complete=False)
1865 queued_verifies = queued_verifies.exclude(id=self.task.id)
1866 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001867
mbligh36768f02008-02-22 18:28:33 +00001868
jadmanski0afbb632008-06-06 21:10:57 +00001869 def epilog(self):
1870 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001871 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001872 if self.queue_entry:
1873 self.queue_entry.on_pending()
1874 else:
1875 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001876
1877
showardb5626452009-06-30 01:57:28 +00001878class CleanupHostsMixin(object):
1879 def _reboot_hosts(self, job, queue_entries, final_success,
1880 num_tests_failed):
1881 reboot_after = job.reboot_after
1882 do_reboot = (
1883 # always reboot after aborted jobs
1884 self._final_status == models.HostQueueEntry.Status.ABORTED
1885 or reboot_after == models.RebootAfter.ALWAYS
1886 or (reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED
1887 and final_success and num_tests_failed == 0))
1888
1889 for queue_entry in queue_entries:
1890 if do_reboot:
1891 # don't pass the queue entry to the CleanupTask. if the cleanup
1892 # fails, the job doesn't care -- it's over.
showard8cc058f2009-09-08 16:26:33 +00001893 models.SpecialTask.objects.create(
1894 host=models.Host(id=queue_entry.host.id),
1895 task=models.SpecialTask.Task.CLEANUP)
showardb5626452009-06-30 01:57:28 +00001896 else:
showard8cc058f2009-09-08 16:26:33 +00001897 queue_entry.host.set_status(models.Host.Status.READY)
showardb5626452009-06-30 01:57:28 +00001898
1899
1900class QueueTask(AgentTask, TaskWithJobKeyvals, CleanupHostsMixin):
showard8cc058f2009-09-08 16:26:33 +00001901 def __init__(self, job, queue_entries, cmd=None, recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001902 self.job = job
1903 self.queue_entries = queue_entries
showard8cc058f2009-09-08 16:26:33 +00001904 self.group_name = queue_entries[0].get_group_name()
showarded2afea2009-07-07 20:54:07 +00001905 super(QueueTask, self).__init__(
1906 cmd, self._execution_path(),
1907 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00001908 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001909
1910
showard73ec0442009-02-07 02:05:20 +00001911 def _keyval_path(self):
showarded2afea2009-07-07 20:54:07 +00001912 return os.path.join(self._execution_path(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001913
1914
showarded2afea2009-07-07 20:54:07 +00001915 def _execution_path(self):
1916 return self.queue_entries[0].execution_path()
mblighbb421852008-03-11 22:36:16 +00001917
1918
jadmanski0afbb632008-06-06 21:10:57 +00001919 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00001920 for entry in self.queue_entries:
1921 if entry.status not in (models.HostQueueEntry.Status.STARTING,
1922 models.HostQueueEntry.Status.RUNNING):
1923 raise SchedulerError('Queue task attempting to start '
1924 'entry with invalid status %s: %s'
1925 % (entry.status, entry))
1926 if entry.host.status not in (models.Host.Status.PENDING,
1927 models.Host.Status.RUNNING):
1928 raise SchedulerError('Queue task attempting to start on queue '
1929 'entry with invalid host status %s: %s'
1930 % (entry.host.status, entry))
1931
showardd9205182009-04-27 20:09:55 +00001932 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001933 keyval_dict = {queued_key: queued_time}
1934 if self.group_name:
1935 keyval_dict['host_group_name'] = self.group_name
1936 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001937 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001938 self._write_host_keyvals(queue_entry.host)
showard8cc058f2009-09-08 16:26:33 +00001939 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showard12f3e322009-05-13 21:27:42 +00001940 queue_entry.update_field('started_on', datetime.datetime.now())
showard8cc058f2009-09-08 16:26:33 +00001941 queue_entry.host.set_status(models.Host.Status.RUNNING)
showard21baa452008-10-21 00:08:39 +00001942 queue_entry.host.update_field('dirty', 1)
showardc6a56872009-07-28 20:11:58 +00001943 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
1944 # TODO(gps): Remove this if nothing needs it anymore.
1945 # A potential user is: tko/parser
jadmanski0afbb632008-06-06 21:10:57 +00001946 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001947
1948
showard35162b02009-03-03 02:17:30 +00001949 def _write_lost_process_error_file(self):
showarded2afea2009-07-07 20:54:07 +00001950 error_file_path = os.path.join(self._execution_path(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001951 _drone_manager.write_lines_to_file(error_file_path,
1952 [_LOST_PROCESS_ERROR])
1953
1954
showardd3dc1992009-04-22 21:01:40 +00001955 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001956 if not self.monitor:
1957 return
1958
showardd9205182009-04-27 20:09:55 +00001959 self._write_job_finished()
1960
showard35162b02009-03-03 02:17:30 +00001961 if self.monitor.lost_process:
1962 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001963
showard8cc058f2009-09-08 16:26:33 +00001964 for queue_entry in self.queue_entries:
1965 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001966
1967
showardcbd74612008-11-19 21:42:02 +00001968 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001969 _drone_manager.write_lines_to_file(
showarded2afea2009-07-07 20:54:07 +00001970 os.path.join(self._execution_path(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001971 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001972 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001973
1974
jadmanskif7fa2cc2008-10-01 14:13:23 +00001975 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001976 if not self.monitor or not self.monitor.has_process():
1977 return
1978
jadmanskif7fa2cc2008-10-01 14:13:23 +00001979 # build up sets of all the aborted_by and aborted_on values
1980 aborted_by, aborted_on = set(), set()
1981 for queue_entry in self.queue_entries:
1982 if queue_entry.aborted_by:
1983 aborted_by.add(queue_entry.aborted_by)
1984 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1985 aborted_on.add(t)
1986
1987 # extract some actual, unique aborted by value and write it out
1988 assert len(aborted_by) <= 1
1989 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001990 aborted_by_value = aborted_by.pop()
1991 aborted_on_value = max(aborted_on)
1992 else:
1993 aborted_by_value = 'autotest_system'
1994 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001995
showarda0382352009-02-11 23:36:43 +00001996 self._write_keyval_after_job("aborted_by", aborted_by_value)
1997 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001998
showardcbd74612008-11-19 21:42:02 +00001999 aborted_on_string = str(datetime.datetime.fromtimestamp(
2000 aborted_on_value))
2001 self._write_status_comment('Job aborted by %s on %s' %
2002 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00002003
2004
jadmanski0afbb632008-06-06 21:10:57 +00002005 def abort(self):
2006 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00002007 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00002008 self._finish_task()
showard21baa452008-10-21 00:08:39 +00002009
2010
jadmanski0afbb632008-06-06 21:10:57 +00002011 def epilog(self):
2012 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002013 self._finish_task()
2014 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00002015
2016
showardd3dc1992009-04-22 21:01:40 +00002017class PostJobTask(AgentTask):
2018 def __init__(self, queue_entries, pidfile_name, logfile_name,
showarded2afea2009-07-07 20:54:07 +00002019 recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00002020 self._queue_entries = queue_entries
2021 self._pidfile_name = pidfile_name
showardd3dc1992009-04-22 21:01:40 +00002022
showarded2afea2009-07-07 20:54:07 +00002023 self._execution_path = self._get_consistent_execution_path(
2024 queue_entries)
2025 self._results_dir = _drone_manager.absolute_path(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00002026 self._autoserv_monitor = PidfileRunMonitor()
showarded2afea2009-07-07 20:54:07 +00002027 self._autoserv_monitor.attach_to_existing_process(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00002028 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
2029
2030 if _testing_mode:
2031 command = 'true'
2032 else:
2033 command = self._generate_command(self._results_dir)
2034
showarded2afea2009-07-07 20:54:07 +00002035 super(PostJobTask, self).__init__(
2036 cmd=command, working_directory=self._execution_path,
2037 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00002038
showarded2afea2009-07-07 20:54:07 +00002039 self.log_file = os.path.join(self._execution_path, logfile_name)
showardd3dc1992009-04-22 21:01:40 +00002040 self._final_status = self._determine_final_status()
2041
2042
2043 def _generate_command(self, results_dir):
2044 raise NotImplementedError('Subclasses must override this')
2045
2046
2047 def _job_was_aborted(self):
2048 was_aborted = None
2049 for queue_entry in self._queue_entries:
2050 queue_entry.update_from_database()
2051 if was_aborted is None: # first queue entry
2052 was_aborted = bool(queue_entry.aborted)
2053 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
2054 email_manager.manager.enqueue_notify_email(
2055 'Inconsistent abort state',
2056 'Queue entries have inconsistent abort state: ' +
2057 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
2058 # don't crash here, just assume true
2059 return True
2060 return was_aborted
2061
2062
2063 def _determine_final_status(self):
2064 if self._job_was_aborted():
2065 return models.HostQueueEntry.Status.ABORTED
2066
2067 # we'll use a PidfileRunMonitor to read the autoserv exit status
2068 if self._autoserv_monitor.exit_code() == 0:
2069 return models.HostQueueEntry.Status.COMPLETED
2070 return models.HostQueueEntry.Status.FAILED
2071
2072
2073 def run(self):
showard8cc058f2009-09-08 16:26:33 +00002074 # Make sure we actually have results to work with.
2075 # This should never happen in normal operation.
showard5add1c82009-05-26 19:27:46 +00002076 if not self._autoserv_monitor.has_process():
2077 email_manager.manager.enqueue_notify_email(
showard8cc058f2009-09-08 16:26:33 +00002078 'No results in post-job task',
2079 'No results in post-job task at %s' %
2080 self._autoserv_monitor.pidfile_id)
showard5add1c82009-05-26 19:27:46 +00002081 self.finished(False)
2082 return
2083
2084 super(PostJobTask, self).run(
2085 pidfile_name=self._pidfile_name,
2086 paired_with_pidfile=self._paired_with_pidfile)
showardd3dc1992009-04-22 21:01:40 +00002087
2088
2089 def _set_all_statuses(self, status):
2090 for queue_entry in self._queue_entries:
2091 queue_entry.set_status(status)
2092
2093
2094 def abort(self):
2095 # override AgentTask.abort() to avoid killing the process and ending
2096 # the task. post-job tasks continue when the job is aborted.
2097 pass
2098
2099
showardb5626452009-06-30 01:57:28 +00002100class GatherLogsTask(PostJobTask, CleanupHostsMixin):
showardd3dc1992009-04-22 21:01:40 +00002101 """
2102 Task responsible for
2103 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2104 * copying logs to the results repository
2105 * spawning CleanupTasks for hosts, if necessary
2106 * spawning a FinalReparseTask for the job
2107 """
showarded2afea2009-07-07 20:54:07 +00002108 def __init__(self, job, queue_entries, recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00002109 self._job = job
2110 super(GatherLogsTask, self).__init__(
2111 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
showarded2afea2009-07-07 20:54:07 +00002112 logfile_name='.collect_crashinfo.log',
2113 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00002114 self._set_ids(queue_entries=queue_entries)
2115
2116
2117 def _generate_command(self, results_dir):
2118 host_list = ','.join(queue_entry.host.hostname
2119 for queue_entry in self._queue_entries)
2120 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
2121 '-r', results_dir]
2122
2123
2124 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00002125 for queue_entry in self._queue_entries:
2126 if queue_entry.status != models.HostQueueEntry.Status.GATHERING:
2127 raise SchedulerError('Gather task attempting to start on '
2128 'non-gathering entry: %s' % queue_entry)
2129 if queue_entry.host.status != models.Host.Status.RUNNING:
2130 raise SchedulerError('Gather task attempting to start on queue '
showard0c5c18d2009-09-24 22:20:45 +00002131 'entry with non-running host status %s: %s'
2132 % (queue_entry.host.status, queue_entry))
showard8cc058f2009-09-08 16:26:33 +00002133
showardd3dc1992009-04-22 21:01:40 +00002134 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002135
2136
showardd3dc1992009-04-22 21:01:40 +00002137 def epilog(self):
2138 super(GatherLogsTask, self).epilog()
showardb5626452009-06-30 01:57:28 +00002139
showard6d1c1432009-08-20 23:30:39 +00002140 self._copy_and_parse_results(self._queue_entries,
2141 use_monitor=self._autoserv_monitor)
2142
2143 if self._autoserv_monitor.has_process():
2144 final_success = (self._final_status ==
2145 models.HostQueueEntry.Status.COMPLETED)
2146 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2147 else:
2148 final_success = False
2149 num_tests_failed = 0
2150
showardb5626452009-06-30 01:57:28 +00002151 self._reboot_hosts(self._job, self._queue_entries, final_success,
2152 num_tests_failed)
showardd3dc1992009-04-22 21:01:40 +00002153
2154
showard0bbfc212009-04-29 21:06:13 +00002155 def run(self):
showard597bfd32009-05-08 18:22:50 +00002156 autoserv_exit_code = self._autoserv_monitor.exit_code()
2157 # only run if Autoserv exited due to some signal. if we have no exit
2158 # code, assume something bad (and signal-like) happened.
2159 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002160 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002161 else:
2162 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002163
2164
showard8fe93b52008-11-18 17:53:22 +00002165class CleanupTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002166 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2167
2168
showard8cc058f2009-09-08 16:26:33 +00002169 def __init__(self, task, recover_run_monitor=None):
showarded2afea2009-07-07 20:54:07 +00002170 super(CleanupTask, self).__init__(
showard8cc058f2009-09-08 16:26:33 +00002171 task, ['--cleanup'], recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002172
showard8cc058f2009-09-08 16:26:33 +00002173 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mbligh16c722d2008-03-05 00:58:44 +00002174
mblighd5c95802008-03-05 00:33:46 +00002175
jadmanski0afbb632008-06-06 21:10:57 +00002176 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002177 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00002178 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard8cc058f2009-09-08 16:26:33 +00002179 self.host.set_status(models.Host.Status.CLEANING)
2180 if self.queue_entry:
2181 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2182
2183
showard775300b2009-09-09 15:30:50 +00002184 def _finish_epilog(self):
2185 if not self.queue_entry:
2186 return
2187
2188 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
2189 self.queue_entry.on_pending()
2190 elif self.success:
2191 if self.queue_entry.job.run_verify:
2192 entry = models.HostQueueEntry(id=self.queue_entry.id)
2193 models.SpecialTask.objects.create(
2194 host=models.Host(id=self.host.id),
2195 queue_entry=entry,
2196 task=models.SpecialTask.Task.VERIFY)
2197 else:
2198 self.queue_entry.on_pending()
mblighd5c95802008-03-05 00:33:46 +00002199
mblighd5c95802008-03-05 00:33:46 +00002200
showard21baa452008-10-21 00:08:39 +00002201 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00002202 super(CleanupTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002203
showard21baa452008-10-21 00:08:39 +00002204 if self.success:
2205 self.host.update_field('dirty', 0)
showard775300b2009-09-09 15:30:50 +00002206 self.host.set_status(models.Host.Status.READY)
showard21baa452008-10-21 00:08:39 +00002207
showard775300b2009-09-09 15:30:50 +00002208 self._finish_epilog()
showard8cc058f2009-09-08 16:26:33 +00002209
showard21baa452008-10-21 00:08:39 +00002210
showardd3dc1992009-04-22 21:01:40 +00002211class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00002212 _num_running_parses = 0
2213
showarded2afea2009-07-07 20:54:07 +00002214 def __init__(self, queue_entries, recover_run_monitor=None):
2215 super(FinalReparseTask, self).__init__(
2216 queue_entries, pidfile_name=_PARSER_PID_FILE,
2217 logfile_name='.parse.log',
2218 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002219 # don't use _set_ids, since we don't want to set the host_ids
2220 self.queue_entry_ids = [entry.id for entry in queue_entries]
showarded2afea2009-07-07 20:54:07 +00002221 self._parse_started = self.started
showard97aed502008-11-04 02:01:24 +00002222
showard97aed502008-11-04 02:01:24 +00002223
2224 @classmethod
2225 def _increment_running_parses(cls):
2226 cls._num_running_parses += 1
2227
2228
2229 @classmethod
2230 def _decrement_running_parses(cls):
2231 cls._num_running_parses -= 1
2232
2233
2234 @classmethod
2235 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002236 return (cls._num_running_parses <
2237 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002238
2239
2240 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00002241 for queue_entry in self._queue_entries:
2242 if queue_entry.status != models.HostQueueEntry.Status.PARSING:
2243 raise SchedulerError('Parse task attempting to start on '
2244 'non-parsing entry: %s' % queue_entry)
2245
showard97aed502008-11-04 02:01:24 +00002246 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002247
2248
2249 def epilog(self):
2250 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002251 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00002252
2253
showardd3dc1992009-04-22 21:01:40 +00002254 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00002255 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00002256 results_dir]
showard97aed502008-11-04 02:01:24 +00002257
2258
showard08a36412009-05-05 01:01:13 +00002259 def tick(self):
2260 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002261 # and we can, at which point we revert to default behavior
2262 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00002263 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002264 else:
2265 self._try_starting_parse()
2266
2267
2268 def run(self):
2269 # override run() to not actually run unless we can
2270 self._try_starting_parse()
2271
2272
2273 def _try_starting_parse(self):
2274 if not self._can_run_new_parse():
2275 return
showard170873e2009-01-07 00:22:26 +00002276
showard97aed502008-11-04 02:01:24 +00002277 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002278 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00002279
showard97aed502008-11-04 02:01:24 +00002280 self._increment_running_parses()
2281 self._parse_started = True
2282
2283
2284 def finished(self, success):
2285 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00002286 if self._parse_started:
2287 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002288
2289
showarda3c58572009-03-12 20:36:59 +00002290class DBError(Exception):
2291 """Raised by the DBObject constructor when its select fails."""
2292
2293
mbligh36768f02008-02-22 18:28:33 +00002294class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002295 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002296
2297 # Subclasses MUST override these:
2298 _table_name = ''
2299 _fields = ()
2300
showarda3c58572009-03-12 20:36:59 +00002301 # A mapping from (type, id) to the instance of the object for that
2302 # particular id. This prevents us from creating new Job() and Host()
2303 # instances for every HostQueueEntry object that we instantiate as
2304 # multiple HQEs often share the same Job.
2305 _instances_by_type_and_id = weakref.WeakValueDictionary()
2306 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002307
showarda3c58572009-03-12 20:36:59 +00002308
2309 def __new__(cls, id=None, **kwargs):
2310 """
2311 Look to see if we already have an instance for this particular type
2312 and id. If so, use it instead of creating a duplicate instance.
2313 """
2314 if id is not None:
2315 instance = cls._instances_by_type_and_id.get((cls, id))
2316 if instance:
2317 return instance
2318 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2319
2320
2321 def __init__(self, id=None, row=None, new_record=False, always_query=True):
showard8cc058f2009-09-08 16:26:33 +00002322 assert bool(id) or bool(row)
2323 if id is not None and row is not None:
2324 assert id == row[0]
showard6ae5ea92009-02-25 00:11:51 +00002325 assert self._table_name, '_table_name must be defined in your class'
2326 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002327 if not new_record:
2328 if self._initialized and not always_query:
2329 return # We've already been initialized.
2330 if id is None:
2331 id = row[0]
2332 # Tell future constructors to use us instead of re-querying while
2333 # this instance is still around.
2334 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002335
showard6ae5ea92009-02-25 00:11:51 +00002336 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002337
jadmanski0afbb632008-06-06 21:10:57 +00002338 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002339
jadmanski0afbb632008-06-06 21:10:57 +00002340 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002341 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002342
showarda3c58572009-03-12 20:36:59 +00002343 if self._initialized:
2344 differences = self._compare_fields_in_row(row)
2345 if differences:
showard7629f142009-03-27 21:02:02 +00002346 logging.warn(
2347 'initialized %s %s instance requery is updating: %s',
2348 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002349 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002350 self._initialized = True
2351
2352
2353 @classmethod
2354 def _clear_instance_cache(cls):
2355 """Used for testing, clear the internal instance cache."""
2356 cls._instances_by_type_and_id.clear()
2357
2358
showardccbd6c52009-03-21 00:10:21 +00002359 def _fetch_row_from_db(self, row_id):
2360 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2361 rows = _db.execute(sql, (row_id,))
2362 if not rows:
showard76e29d12009-04-15 21:53:10 +00002363 raise DBError("row not found (table=%s, row id=%s)"
2364 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002365 return rows[0]
2366
2367
showarda3c58572009-03-12 20:36:59 +00002368 def _assert_row_length(self, row):
2369 assert len(row) == len(self._fields), (
2370 "table = %s, row = %s/%d, fields = %s/%d" % (
2371 self.__table, row, len(row), self._fields, len(self._fields)))
2372
2373
2374 def _compare_fields_in_row(self, row):
2375 """
showarddae680a2009-10-12 20:26:43 +00002376 Given a row as returned by a SELECT query, compare it to our existing in
2377 memory fields. Fractional seconds are stripped from datetime values
2378 before comparison.
showarda3c58572009-03-12 20:36:59 +00002379
2380 @param row - A sequence of values corresponding to fields named in
2381 The class attribute _fields.
2382
2383 @returns A dictionary listing the differences keyed by field name
2384 containing tuples of (current_value, row_value).
2385 """
2386 self._assert_row_length(row)
2387 differences = {}
showarddae680a2009-10-12 20:26:43 +00002388 datetime_cmp_fmt = '%Y-%m-%d %H:%M:%S' # Leave off the microseconds.
showarda3c58572009-03-12 20:36:59 +00002389 for field, row_value in itertools.izip(self._fields, row):
2390 current_value = getattr(self, field)
showarddae680a2009-10-12 20:26:43 +00002391 if (isinstance(current_value, datetime.datetime)
2392 and isinstance(row_value, datetime.datetime)):
2393 current_value = current_value.strftime(datetime_cmp_fmt)
2394 row_value = row_value.strftime(datetime_cmp_fmt)
showarda3c58572009-03-12 20:36:59 +00002395 if current_value != row_value:
2396 differences[field] = (current_value, row_value)
2397 return differences
showard2bab8f42008-11-12 18:15:22 +00002398
2399
2400 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002401 """
2402 Update our field attributes using a single row returned by SELECT.
2403
2404 @param row - A sequence of values corresponding to fields named in
2405 the class fields list.
2406 """
2407 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002408
showard2bab8f42008-11-12 18:15:22 +00002409 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002410 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002411 setattr(self, field, value)
2412 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002413
showard2bab8f42008-11-12 18:15:22 +00002414 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002415
mblighe2586682008-02-29 22:45:46 +00002416
showardccbd6c52009-03-21 00:10:21 +00002417 def update_from_database(self):
2418 assert self.id is not None
2419 row = self._fetch_row_from_db(self.id)
2420 self._update_fields_from_row(row)
2421
2422
jadmanski0afbb632008-06-06 21:10:57 +00002423 def count(self, where, table = None):
2424 if not table:
2425 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002426
jadmanski0afbb632008-06-06 21:10:57 +00002427 rows = _db.execute("""
2428 SELECT count(*) FROM %s
2429 WHERE %s
2430 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002431
jadmanski0afbb632008-06-06 21:10:57 +00002432 assert len(rows) == 1
2433
2434 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002435
2436
showardd3dc1992009-04-22 21:01:40 +00002437 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002438 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002439
showard2bab8f42008-11-12 18:15:22 +00002440 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002441 return
mbligh36768f02008-02-22 18:28:33 +00002442
mblighf8c624d2008-07-03 16:58:45 +00002443 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002444 _db.execute(query, (value, self.id))
2445
showard2bab8f42008-11-12 18:15:22 +00002446 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002447
2448
jadmanski0afbb632008-06-06 21:10:57 +00002449 def save(self):
2450 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002451 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002452 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002453 values = []
2454 for key in keys:
2455 value = getattr(self, key)
2456 if value is None:
2457 values.append('NULL')
2458 else:
2459 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002460 values_str = ','.join(values)
2461 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2462 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002463 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002464 # Update our id to the one the database just assigned to us.
2465 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002466
2467
jadmanski0afbb632008-06-06 21:10:57 +00002468 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002469 self._instances_by_type_and_id.pop((type(self), id), None)
2470 self._initialized = False
2471 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002472 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2473 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002474
2475
showard63a34772008-08-18 19:32:50 +00002476 @staticmethod
2477 def _prefix_with(string, prefix):
2478 if string:
2479 string = prefix + string
2480 return string
2481
2482
jadmanski0afbb632008-06-06 21:10:57 +00002483 @classmethod
showard989f25d2008-10-01 11:38:11 +00002484 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002485 """
2486 Construct instances of our class based on the given database query.
2487
2488 @yields One class instance for each row fetched.
2489 """
showard63a34772008-08-18 19:32:50 +00002490 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2491 where = cls._prefix_with(where, 'WHERE ')
2492 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002493 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002494 'joins' : joins,
2495 'where' : where,
2496 'order_by' : order_by})
2497 rows = _db.execute(query, params)
showard8cc058f2009-09-08 16:26:33 +00002498 return [cls(id=row[0], row=row) for row in rows]
mblighe2586682008-02-29 22:45:46 +00002499
mbligh36768f02008-02-22 18:28:33 +00002500
2501class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002502 _table_name = 'ineligible_host_queues'
2503 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002504
2505
showard89f84db2009-03-12 20:39:13 +00002506class AtomicGroup(DBObject):
2507 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002508 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2509 'invalid')
showard89f84db2009-03-12 20:39:13 +00002510
2511
showard989f25d2008-10-01 11:38:11 +00002512class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002513 _table_name = 'labels'
2514 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002515 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002516
2517
showard6157c632009-07-06 20:19:31 +00002518 def __repr__(self):
2519 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
2520 self.name, self.id, self.atomic_group_id)
2521
2522
mbligh36768f02008-02-22 18:28:33 +00002523class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002524 _table_name = 'hosts'
2525 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2526 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2527
2528
jadmanski0afbb632008-06-06 21:10:57 +00002529 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002530 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002531 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002532
2533
showard170873e2009-01-07 00:22:26 +00002534 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002535 """
showard170873e2009-01-07 00:22:26 +00002536 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002537 """
2538 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002539 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002540 FROM labels
2541 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002542 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002543 ORDER BY labels.name
2544 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002545 platform = None
2546 all_labels = []
2547 for label_name, is_platform in rows:
2548 if is_platform:
2549 platform = label_name
2550 all_labels.append(label_name)
2551 return platform, all_labels
2552
2553
showard54c1ea92009-05-20 00:32:58 +00002554 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2555
2556
2557 @classmethod
2558 def cmp_for_sort(cls, a, b):
2559 """
2560 A comparison function for sorting Host objects by hostname.
2561
2562 This strips any trailing numeric digits, ignores leading 0s and
2563 compares hostnames by the leading name and the trailing digits as a
2564 number. If both hostnames do not match this pattern, they are simply
2565 compared as lower case strings.
2566
2567 Example of how hostnames will be sorted:
2568
2569 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2570
2571 This hopefully satisfy most people's hostname sorting needs regardless
2572 of their exact naming schemes. Nobody sane should have both a host10
2573 and host010 (but the algorithm works regardless).
2574 """
2575 lower_a = a.hostname.lower()
2576 lower_b = b.hostname.lower()
2577 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2578 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2579 if match_a and match_b:
2580 name_a, number_a_str = match_a.groups()
2581 name_b, number_b_str = match_b.groups()
2582 number_a = int(number_a_str.lstrip('0'))
2583 number_b = int(number_b_str.lstrip('0'))
2584 result = cmp((name_a, number_a), (name_b, number_b))
2585 if result == 0 and lower_a != lower_b:
2586 # If they compared equal above but the lower case names are
2587 # indeed different, don't report equality. abc012 != abc12.
2588 return cmp(lower_a, lower_b)
2589 return result
2590 else:
2591 return cmp(lower_a, lower_b)
2592
2593
mbligh36768f02008-02-22 18:28:33 +00002594class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002595 _table_name = 'host_queue_entries'
2596 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002597 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002598 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002599
2600
showarda3c58572009-03-12 20:36:59 +00002601 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002602 assert id or row
showarda3c58572009-03-12 20:36:59 +00002603 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002604 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002605
jadmanski0afbb632008-06-06 21:10:57 +00002606 if self.host_id:
2607 self.host = Host(self.host_id)
2608 else:
2609 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002610
showard77182562009-06-10 00:16:05 +00002611 if self.atomic_group_id:
2612 self.atomic_group = AtomicGroup(self.atomic_group_id,
2613 always_query=False)
2614 else:
2615 self.atomic_group = None
2616
showard170873e2009-01-07 00:22:26 +00002617 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002618 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002619
2620
showard89f84db2009-03-12 20:39:13 +00002621 @classmethod
2622 def clone(cls, template):
2623 """
2624 Creates a new row using the values from a template instance.
2625
2626 The new instance will not exist in the database or have a valid
2627 id attribute until its save() method is called.
2628 """
2629 assert isinstance(template, cls)
2630 new_row = [getattr(template, field) for field in cls._fields]
2631 clone = cls(row=new_row, new_record=True)
2632 clone.id = None
2633 return clone
2634
2635
showardc85c21b2008-11-24 22:17:37 +00002636 def _view_job_url(self):
2637 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2638
2639
showardf1ae3542009-05-11 19:26:02 +00002640 def get_labels(self):
2641 """
2642 Get all labels associated with this host queue entry (either via the
2643 meta_host or as a job dependency label). The labels yielded are not
2644 guaranteed to be unique.
2645
2646 @yields Label instances associated with this host_queue_entry.
2647 """
2648 if self.meta_host:
2649 yield Label(id=self.meta_host, always_query=False)
2650 labels = Label.fetch(
2651 joins="JOIN jobs_dependency_labels AS deps "
2652 "ON (labels.id = deps.label_id)",
2653 where="deps.job_id = %d" % self.job.id)
2654 for label in labels:
2655 yield label
2656
2657
jadmanski0afbb632008-06-06 21:10:57 +00002658 def set_host(self, host):
2659 if host:
2660 self.queue_log_record('Assigning host ' + host.hostname)
2661 self.update_field('host_id', host.id)
2662 self.update_field('active', True)
2663 self.block_host(host.id)
2664 else:
2665 self.queue_log_record('Releasing host')
2666 self.unblock_host(self.host.id)
2667 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002668
jadmanski0afbb632008-06-06 21:10:57 +00002669 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002670
2671
jadmanski0afbb632008-06-06 21:10:57 +00002672 def get_host(self):
2673 return self.host
mbligh36768f02008-02-22 18:28:33 +00002674
2675
jadmanski0afbb632008-06-06 21:10:57 +00002676 def queue_log_record(self, log_line):
2677 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002678 _drone_manager.write_lines_to_file(self.queue_log_path,
2679 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002680
2681
jadmanski0afbb632008-06-06 21:10:57 +00002682 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002683 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002684 row = [0, self.job.id, host_id]
2685 block = IneligibleHostQueue(row=row, new_record=True)
2686 block.save()
mblighe2586682008-02-29 22:45:46 +00002687
2688
jadmanski0afbb632008-06-06 21:10:57 +00002689 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002690 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002691 blocks = IneligibleHostQueue.fetch(
2692 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2693 for block in blocks:
2694 block.delete()
mblighe2586682008-02-29 22:45:46 +00002695
2696
showard2bab8f42008-11-12 18:15:22 +00002697 def set_execution_subdir(self, subdir=None):
2698 if subdir is None:
2699 assert self.get_host()
2700 subdir = self.get_host().hostname
2701 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002702
2703
showard6355f6b2008-12-05 18:52:13 +00002704 def _get_hostname(self):
2705 if self.host:
2706 return self.host.hostname
2707 return 'no host'
2708
2709
showard170873e2009-01-07 00:22:26 +00002710 def __str__(self):
showard828fc4c2009-09-14 20:31:00 +00002711 flags = []
2712 if self.active:
2713 flags.append('active')
2714 if self.complete:
2715 flags.append('complete')
2716 if self.deleted:
2717 flags.append('deleted')
2718 if self.aborted:
2719 flags.append('aborted')
2720 flags_str = ','.join(flags)
2721 if flags_str:
2722 flags_str = ' [%s]' % flags_str
2723 return "%s/%d (%d) %s%s" % (self._get_hostname(), self.job.id, self.id,
2724 self.status, flags_str)
showard170873e2009-01-07 00:22:26 +00002725
2726
jadmanski0afbb632008-06-06 21:10:57 +00002727 def set_status(self, status):
showard56824072009-10-12 20:30:21 +00002728 logging.info("%s -> %s", self, status)
mblighf8c624d2008-07-03 16:58:45 +00002729
showard56824072009-10-12 20:30:21 +00002730 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002731
showard8cc058f2009-09-08 16:26:33 +00002732 if status in (models.HostQueueEntry.Status.QUEUED,
2733 models.HostQueueEntry.Status.PARSING):
jadmanski0afbb632008-06-06 21:10:57 +00002734 self.update_field('complete', False)
2735 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002736
showard8cc058f2009-09-08 16:26:33 +00002737 if status in (models.HostQueueEntry.Status.PENDING,
2738 models.HostQueueEntry.Status.RUNNING,
2739 models.HostQueueEntry.Status.VERIFYING,
2740 models.HostQueueEntry.Status.STARTING,
2741 models.HostQueueEntry.Status.GATHERING):
jadmanski0afbb632008-06-06 21:10:57 +00002742 self.update_field('complete', False)
2743 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002744
showard8cc058f2009-09-08 16:26:33 +00002745 if status in (models.HostQueueEntry.Status.FAILED,
2746 models.HostQueueEntry.Status.COMPLETED,
2747 models.HostQueueEntry.Status.STOPPED,
2748 models.HostQueueEntry.Status.ABORTED):
jadmanski0afbb632008-06-06 21:10:57 +00002749 self.update_field('complete', True)
2750 self.update_field('active', False)
showardf85a0b72009-10-07 20:48:45 +00002751 self._on_complete()
showardc85c21b2008-11-24 22:17:37 +00002752
2753 should_email_status = (status.lower() in _notify_email_statuses or
2754 'all' in _notify_email_statuses)
2755 if should_email_status:
2756 self._email_on_status(status)
2757
2758 self._email_on_job_complete()
2759
2760
showardf85a0b72009-10-07 20:48:45 +00002761 def _on_complete(self):
2762 if not self.execution_subdir:
2763 return
2764 # unregister any possible pidfiles associated with this queue entry
2765 for pidfile_name in _ALL_PIDFILE_NAMES:
2766 pidfile_id = _drone_manager.get_pidfile_id_from(
2767 self.execution_path(), pidfile_name=pidfile_name)
2768 _drone_manager.unregister_pidfile(pidfile_id)
2769
2770
showardc85c21b2008-11-24 22:17:37 +00002771 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002772 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002773
2774 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2775 self.job.id, self.job.name, hostname, status)
2776 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2777 self.job.id, self.job.name, hostname, status,
2778 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002779 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002780
2781
2782 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002783 if not self.job.is_finished():
2784 return
showard542e8402008-09-19 20:16:18 +00002785
showardc85c21b2008-11-24 22:17:37 +00002786 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002787 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002788 for queue_entry in hosts_queue:
2789 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002790 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002791 queue_entry.status))
2792
2793 summary_text = "\n".join(summary_text)
2794 status_counts = models.Job.objects.get_status_counts(
2795 [self.job.id])[self.job.id]
2796 status = ', '.join('%d %s' % (count, status) for status, count
2797 in status_counts.iteritems())
2798
2799 subject = 'Autotest: Job ID: %s "%s" %s' % (
2800 self.job.id, self.job.name, status)
2801 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2802 self.job.id, self.job.name, status, self._view_job_url(),
2803 summary_text)
showard170873e2009-01-07 00:22:26 +00002804 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002805
2806
showard8cc058f2009-09-08 16:26:33 +00002807 def schedule_pre_job_tasks(self, assigned_host=None):
showard77182562009-06-10 00:16:05 +00002808 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002809 assert assigned_host
2810 # ensure results dir exists for the queue log
showard08356c12009-06-15 20:24:01 +00002811 if self.host_id is None:
2812 self.set_host(assigned_host)
2813 else:
2814 assert assigned_host.id == self.host_id
mbligh36768f02008-02-22 18:28:33 +00002815
showardcfd4a7e2009-07-11 01:47:33 +00002816 logging.info("%s/%s/%s scheduled on %s, status=%s",
showardb18134f2009-03-20 20:52:18 +00002817 self.job.name, self.meta_host, self.atomic_group_id,
2818 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002819
showard8cc058f2009-09-08 16:26:33 +00002820 self._do_schedule_pre_job_tasks()
showard77182562009-06-10 00:16:05 +00002821
2822
showard8cc058f2009-09-08 16:26:33 +00002823 def _do_schedule_pre_job_tasks(self):
showard77182562009-06-10 00:16:05 +00002824 # Every host goes thru the Verifying stage (which may or may not
2825 # actually do anything as determined by get_pre_job_tasks).
2826 self.set_status(models.HostQueueEntry.Status.VERIFYING)
showard8cc058f2009-09-08 16:26:33 +00002827 self.job.schedule_pre_job_tasks(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002828
showard6ae5ea92009-02-25 00:11:51 +00002829
jadmanski0afbb632008-06-06 21:10:57 +00002830 def requeue(self):
showardcfd4a7e2009-07-11 01:47:33 +00002831 assert self.host
showard8cc058f2009-09-08 16:26:33 +00002832 self.set_status(models.HostQueueEntry.Status.QUEUED)
showard12f3e322009-05-13 21:27:42 +00002833 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002834 # verify/cleanup failure sets the execution subdir, so reset it here
2835 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002836 if self.meta_host:
2837 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002838
2839
jadmanski0afbb632008-06-06 21:10:57 +00002840 def handle_host_failure(self):
2841 """\
2842 Called when this queue entry's host has failed verification and
2843 repair.
2844 """
2845 assert not self.meta_host
showard8cc058f2009-09-08 16:26:33 +00002846 self.set_status(models.HostQueueEntry.Status.FAILED)
showard2bab8f42008-11-12 18:15:22 +00002847 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002848
2849
jadmanskif7fa2cc2008-10-01 14:13:23 +00002850 @property
2851 def aborted_by(self):
2852 self._load_abort_info()
2853 return self._aborted_by
2854
2855
2856 @property
2857 def aborted_on(self):
2858 self._load_abort_info()
2859 return self._aborted_on
2860
2861
2862 def _load_abort_info(self):
2863 """ Fetch info about who aborted the job. """
2864 if hasattr(self, "_aborted_by"):
2865 return
2866 rows = _db.execute("""
2867 SELECT users.login, aborted_host_queue_entries.aborted_on
2868 FROM aborted_host_queue_entries
2869 INNER JOIN users
2870 ON users.id = aborted_host_queue_entries.aborted_by_id
2871 WHERE aborted_host_queue_entries.queue_entry_id = %s
2872 """, (self.id,))
2873 if rows:
2874 self._aborted_by, self._aborted_on = rows[0]
2875 else:
2876 self._aborted_by = self._aborted_on = None
2877
2878
showardb2e2c322008-10-14 17:33:55 +00002879 def on_pending(self):
2880 """
2881 Called when an entry in a synchronous job has passed verify. If the
showard8cc058f2009-09-08 16:26:33 +00002882 job is ready to run, sets the entries to STARTING. Otherwise, it leaves
2883 them in PENDING.
showardb2e2c322008-10-14 17:33:55 +00002884 """
showard8cc058f2009-09-08 16:26:33 +00002885 self.set_status(models.HostQueueEntry.Status.PENDING)
2886 self.host.set_status(models.Host.Status.PENDING)
showardb000a8d2009-07-28 20:02:07 +00002887
2888 # Some debug code here: sends an email if an asynchronous job does not
2889 # immediately enter Starting.
2890 # TODO: Remove this once we figure out why asynchronous jobs are getting
2891 # stuck in Pending.
showard8cc058f2009-09-08 16:26:33 +00002892 self.job.run_if_ready(queue_entry=self)
2893 if (self.job.synch_count == 1 and
2894 self.status == models.HostQueueEntry.Status.PENDING):
showardb000a8d2009-07-28 20:02:07 +00002895 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
2896 message = 'Asynchronous job stuck in Pending'
2897 email_manager.manager.enqueue_notify_email(subject, message)
showardb2e2c322008-10-14 17:33:55 +00002898
2899
showardd3dc1992009-04-22 21:01:40 +00002900 def abort(self, dispatcher):
2901 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002902
showardd3dc1992009-04-22 21:01:40 +00002903 Status = models.HostQueueEntry.Status
showard8cc058f2009-09-08 16:26:33 +00002904 if self.status in (Status.GATHERING, Status.PARSING):
showardd3dc1992009-04-22 21:01:40 +00002905 # do nothing; post-job tasks will finish and then mark this entry
2906 # with status "Aborted" and take care of the host
2907 return
2908
showard8cc058f2009-09-08 16:26:33 +00002909 if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING):
2910 assert not dispatcher.get_agents_for_entry(self)
showardd3dc1992009-04-22 21:01:40 +00002911 self.host.set_status(models.Host.Status.READY)
2912 elif self.status == Status.VERIFYING:
showard8cc058f2009-09-08 16:26:33 +00002913 models.SpecialTask.objects.create(
2914 task=models.SpecialTask.Task.CLEANUP,
2915 host=models.Host(id=self.host.id))
showardd3dc1992009-04-22 21:01:40 +00002916
2917 self.set_status(Status.ABORTED)
showardd2014822009-10-12 20:26:58 +00002918 self.job.abort_delay_ready_task()
showard170873e2009-01-07 00:22:26 +00002919
showard8cc058f2009-09-08 16:26:33 +00002920
2921 def get_group_name(self):
2922 atomic_group = self.atomic_group
2923 if not atomic_group:
2924 return ''
2925
2926 # Look at any meta_host and dependency labels and pick the first
2927 # one that also specifies this atomic group. Use that label name
2928 # as the group name if possible (it is more specific).
2929 for label in self.get_labels():
2930 if label.atomic_group_id:
2931 assert label.atomic_group_id == atomic_group.id
2932 return label.name
2933 return atomic_group.name
2934
2935
showard170873e2009-01-07 00:22:26 +00002936 def execution_tag(self):
2937 assert self.execution_subdir
mblighe7d9c602009-07-02 19:02:33 +00002938 return "%s/%s" % (self.job.tag(), self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002939
2940
showarded2afea2009-07-07 20:54:07 +00002941 def execution_path(self):
2942 return self.execution_tag()
2943
2944
mbligh36768f02008-02-22 18:28:33 +00002945class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002946 _table_name = 'jobs'
2947 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2948 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002949 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002950 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002951
showard77182562009-06-10 00:16:05 +00002952 # This does not need to be a column in the DB. The delays are likely to
2953 # be configured short. If the scheduler is stopped and restarted in
2954 # the middle of a job's delay cycle, the delay cycle will either be
2955 # repeated or skipped depending on the number of Pending machines found
2956 # when the restarted scheduler recovers to track it. Not a problem.
2957 #
2958 # A reference to the DelayedCallTask that will wake up the job should
2959 # no other HQEs change state in time. Its end_time attribute is used
2960 # by our run_with_ready_delay() method to determine if the wait is over.
2961 _delay_ready_task = None
2962
2963 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
2964 # all status='Pending' atomic group HQEs incase a delay was running when the
2965 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00002966
showarda3c58572009-03-12 20:36:59 +00002967 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002968 assert id or row
showarda3c58572009-03-12 20:36:59 +00002969 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002970
mblighe2586682008-02-29 22:45:46 +00002971
jadmanski0afbb632008-06-06 21:10:57 +00002972 def is_server_job(self):
2973 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002974
2975
showard170873e2009-01-07 00:22:26 +00002976 def tag(self):
2977 return "%s-%s" % (self.id, self.owner)
2978
2979
jadmanski0afbb632008-06-06 21:10:57 +00002980 def get_host_queue_entries(self):
2981 rows = _db.execute("""
2982 SELECT * FROM host_queue_entries
2983 WHERE job_id= %s
2984 """, (self.id,))
2985 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002986
jadmanski0afbb632008-06-06 21:10:57 +00002987 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002988
jadmanski0afbb632008-06-06 21:10:57 +00002989 return entries
mbligh36768f02008-02-22 18:28:33 +00002990
2991
jadmanski0afbb632008-06-06 21:10:57 +00002992 def set_status(self, status, update_queues=False):
2993 self.update_field('status',status)
2994
2995 if update_queues:
2996 for queue_entry in self.get_host_queue_entries():
2997 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002998
2999
showard77182562009-06-10 00:16:05 +00003000 def _atomic_and_has_started(self):
3001 """
3002 @returns True if any of the HostQueueEntries associated with this job
3003 have entered the Status.STARTING state or beyond.
3004 """
3005 atomic_entries = models.HostQueueEntry.objects.filter(
3006 job=self.id, atomic_group__isnull=False)
3007 if atomic_entries.count() <= 0:
3008 return False
3009
showardaf8b4ca2009-06-16 18:47:26 +00003010 # These states may *only* be reached if Job.run() has been called.
3011 started_statuses = (models.HostQueueEntry.Status.STARTING,
3012 models.HostQueueEntry.Status.RUNNING,
3013 models.HostQueueEntry.Status.COMPLETED)
3014
3015 started_entries = atomic_entries.filter(status__in=started_statuses)
showard77182562009-06-10 00:16:05 +00003016 return started_entries.count() > 0
3017
3018
showard708b3522009-08-20 23:26:15 +00003019 def _hosts_assigned_count(self):
3020 """The number of HostQueueEntries assigned a Host for this job."""
3021 entries = models.HostQueueEntry.objects.filter(job=self.id,
3022 host__isnull=False)
3023 return entries.count()
3024
3025
showard77182562009-06-10 00:16:05 +00003026 def _pending_count(self):
3027 """The number of HostQueueEntries for this job in the Pending state."""
3028 pending_entries = models.HostQueueEntry.objects.filter(
3029 job=self.id, status=models.HostQueueEntry.Status.PENDING)
3030 return pending_entries.count()
3031
3032
showardd2014822009-10-12 20:26:58 +00003033 def _pending_threshold(self, atomic_group):
3034 """
3035 @param atomic_group: The AtomicGroup associated with this job that we
3036 are using to bound the threshold.
3037 @returns The minimum number of HostQueueEntries assigned a Host before
3038 this job can run.
3039 """
3040 return min(self._hosts_assigned_count(),
3041 atomic_group.max_number_of_machines)
3042
3043
jadmanski0afbb632008-06-06 21:10:57 +00003044 def is_ready(self):
showard77182562009-06-10 00:16:05 +00003045 # NOTE: Atomic group jobs stop reporting ready after they have been
3046 # started to avoid launching multiple copies of one atomic job.
3047 # Only possible if synch_count is less than than half the number of
3048 # machines in the atomic group.
showardb000a8d2009-07-28 20:02:07 +00003049 pending_count = self._pending_count()
3050 atomic_and_has_started = self._atomic_and_has_started()
3051 ready = (pending_count >= self.synch_count
showardd2014822009-10-12 20:26:58 +00003052 and not atomic_and_has_started)
showardb000a8d2009-07-28 20:02:07 +00003053
3054 if not ready:
3055 logging.info(
3056 'Job %s not ready: %s pending, %s required '
3057 '(Atomic and started: %s)',
3058 self, pending_count, self.synch_count,
3059 atomic_and_has_started)
3060
3061 return ready
mbligh36768f02008-02-22 18:28:33 +00003062
3063
jadmanski0afbb632008-06-06 21:10:57 +00003064 def num_machines(self, clause = None):
3065 sql = "job_id=%s" % self.id
3066 if clause:
3067 sql += " AND (%s)" % clause
3068 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00003069
3070
jadmanski0afbb632008-06-06 21:10:57 +00003071 def num_queued(self):
3072 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00003073
3074
jadmanski0afbb632008-06-06 21:10:57 +00003075 def num_active(self):
3076 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00003077
3078
jadmanski0afbb632008-06-06 21:10:57 +00003079 def num_complete(self):
3080 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00003081
3082
jadmanski0afbb632008-06-06 21:10:57 +00003083 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00003084 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00003085
mbligh36768f02008-02-22 18:28:33 +00003086
showard6bb7c292009-01-30 01:44:51 +00003087 def _not_yet_run_entries(self, include_verifying=True):
3088 statuses = [models.HostQueueEntry.Status.QUEUED,
3089 models.HostQueueEntry.Status.PENDING]
3090 if include_verifying:
3091 statuses.append(models.HostQueueEntry.Status.VERIFYING)
3092 return models.HostQueueEntry.objects.filter(job=self.id,
3093 status__in=statuses)
3094
3095
3096 def _stop_all_entries(self):
3097 entries_to_stop = self._not_yet_run_entries(
3098 include_verifying=False)
3099 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00003100 assert not child_entry.complete, (
3101 '%s status=%s, active=%s, complete=%s' %
3102 (child_entry.id, child_entry.status, child_entry.active,
3103 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00003104 if child_entry.status == models.HostQueueEntry.Status.PENDING:
3105 child_entry.host.status = models.Host.Status.READY
3106 child_entry.host.save()
3107 child_entry.status = models.HostQueueEntry.Status.STOPPED
3108 child_entry.save()
3109
showard2bab8f42008-11-12 18:15:22 +00003110 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00003111 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00003112 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00003113 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00003114
3115
jadmanski0afbb632008-06-06 21:10:57 +00003116 def write_to_machines_file(self, queue_entry):
3117 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00003118 file_path = os.path.join(self.tag(), '.machines')
3119 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00003120
3121
showardf1ae3542009-05-11 19:26:02 +00003122 def _next_group_name(self, group_name=''):
3123 """@returns a directory name to use for the next host group results."""
3124 if group_name:
3125 # Sanitize for use as a pathname.
3126 group_name = group_name.replace(os.path.sep, '_')
3127 if group_name.startswith('.'):
3128 group_name = '_' + group_name[1:]
3129 # Add a separator between the group name and 'group%d'.
3130 group_name += '.'
3131 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00003132 query = models.HostQueueEntry.objects.filter(
3133 job=self.id).values('execution_subdir').distinct()
3134 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00003135 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
3136 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00003137 if ids:
3138 next_id = max(ids) + 1
3139 else:
3140 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00003141 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00003142
3143
showarddb502762009-09-09 15:31:20 +00003144 def _write_control_file(self, execution_path):
showard170873e2009-01-07 00:22:26 +00003145 control_path = _drone_manager.attach_file_to_execution(
showarddb502762009-09-09 15:31:20 +00003146 execution_path, self.control_file)
showard170873e2009-01-07 00:22:26 +00003147 return control_path
mbligh36768f02008-02-22 18:28:33 +00003148
showardb2e2c322008-10-14 17:33:55 +00003149
showard2bab8f42008-11-12 18:15:22 +00003150 def get_group_entries(self, queue_entry_from_group):
showard8375ce02009-10-12 20:35:13 +00003151 """
3152 @param queue_entry_from_group: A HostQueueEntry instance to find other
3153 group entries on this job for.
3154
3155 @returns A list of HostQueueEntry objects all executing this job as
3156 part of the same group as the one supplied (having the same
3157 execution_subdir).
3158 """
showard2bab8f42008-11-12 18:15:22 +00003159 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00003160 return list(HostQueueEntry.fetch(
3161 where='job_id=%s AND execution_subdir=%s',
3162 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00003163
3164
showard8cc058f2009-09-08 16:26:33 +00003165 def get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00003166 assert queue_entries
showarddb502762009-09-09 15:31:20 +00003167 execution_path = queue_entries[0].execution_path()
3168 control_path = self._write_control_file(execution_path)
jadmanski0afbb632008-06-06 21:10:57 +00003169 hostnames = ','.join([entry.get_host().hostname
3170 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00003171
showarddb502762009-09-09 15:31:20 +00003172 execution_tag = queue_entries[0].execution_tag()
showard87ba02a2009-04-20 19:37:32 +00003173 params = _autoserv_command_line(
showarded2afea2009-07-07 20:54:07 +00003174 hostnames,
showard87ba02a2009-04-20 19:37:32 +00003175 ['-P', execution_tag, '-n',
3176 _drone_manager.absolute_path(control_path)],
showarde9c69362009-06-30 01:58:03 +00003177 job=self, verbose=False)
mbligh36768f02008-02-22 18:28:33 +00003178
jadmanski0afbb632008-06-06 21:10:57 +00003179 if not self.is_server_job():
3180 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00003181
showardb2e2c322008-10-14 17:33:55 +00003182 return params
mblighe2586682008-02-29 22:45:46 +00003183
mbligh36768f02008-02-22 18:28:33 +00003184
showardc9ae1782009-01-30 01:42:37 +00003185 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00003186 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00003187 return True
showard0fc38302008-10-23 00:44:07 +00003188 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00003189 return queue_entry.get_host().dirty
3190 return False
showard21baa452008-10-21 00:08:39 +00003191
showardc9ae1782009-01-30 01:42:37 +00003192
showard8cc058f2009-09-08 16:26:33 +00003193 def _should_run_verify(self, queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003194 do_not_verify = (queue_entry.host.protection ==
3195 host_protections.Protection.DO_NOT_VERIFY)
3196 if do_not_verify:
3197 return False
3198 return self.run_verify
3199
3200
showard8cc058f2009-09-08 16:26:33 +00003201 def schedule_pre_job_tasks(self, queue_entry):
showard77182562009-06-10 00:16:05 +00003202 """
3203 Get a list of tasks to perform before the host_queue_entry
3204 may be used to run this Job (such as Cleanup & Verify).
3205
3206 @returns A list of tasks to be done to the given queue_entry before
mbligh6fbdb802009-08-03 16:42:55 +00003207 it should be considered be ready to run this job. The last
showard77182562009-06-10 00:16:05 +00003208 task in the list calls HostQueueEntry.on_pending(), which
3209 continues the flow of the job.
3210 """
showardc9ae1782009-01-30 01:42:37 +00003211 if self._should_run_cleanup(queue_entry):
showard8cc058f2009-09-08 16:26:33 +00003212 task = models.SpecialTask.Task.CLEANUP
3213 elif self._should_run_verify(queue_entry):
3214 task = models.SpecialTask.Task.VERIFY
3215 else:
3216 queue_entry.on_pending()
3217 return
3218
3219 models.SpecialTask.objects.create(
3220 host=models.Host(id=queue_entry.host_id),
3221 queue_entry=models.HostQueueEntry(id=queue_entry.id),
3222 task=task)
showard21baa452008-10-21 00:08:39 +00003223
3224
showardf1ae3542009-05-11 19:26:02 +00003225 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00003226 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00003227 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00003228 else:
showardf1ae3542009-05-11 19:26:02 +00003229 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00003230 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00003231 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00003232 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003233
3234 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00003235 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003236
3237
3238 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00003239 """
3240 @returns A tuple containing a list of HostQueueEntry instances to be
3241 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00003242 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00003243 """
showard77182562009-06-10 00:16:05 +00003244 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00003245 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00003246 if atomic_group:
3247 num_entries_wanted = atomic_group.max_number_of_machines
3248 else:
3249 num_entries_wanted = self.synch_count
3250 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00003251
showardf1ae3542009-05-11 19:26:02 +00003252 if num_entries_wanted > 0:
3253 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00003254 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00003255 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00003256 params=(self.id, include_queue_entry.id)))
3257
3258 # Sort the chosen hosts by hostname before slicing.
3259 def cmp_queue_entries_by_hostname(entry_a, entry_b):
3260 return Host.cmp_for_sort(entry_a.host, entry_b.host)
3261 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
3262 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00003263
showardf1ae3542009-05-11 19:26:02 +00003264 # Sanity check. We'll only ever be called if this can be met.
showard828fc4c2009-09-14 20:31:00 +00003265 if len(chosen_entries) < self.synch_count:
3266 message = ('job %s got less than %s chosen entries: %s' % (
3267 self.id, self.synch_count, chosen_entries))
3268 logging.error(message)
3269 email_manager.manager.enqueue_notify_email(
3270 'Job not started, too few chosen entries', message)
3271 return []
showardf1ae3542009-05-11 19:26:02 +00003272
showard8cc058f2009-09-08 16:26:33 +00003273 group_name = include_queue_entry.get_group_name()
showardf1ae3542009-05-11 19:26:02 +00003274
3275 self._assign_new_group(chosen_entries, group_name=group_name)
showard8cc058f2009-09-08 16:26:33 +00003276 return chosen_entries
showard2bab8f42008-11-12 18:15:22 +00003277
3278
showard77182562009-06-10 00:16:05 +00003279 def run_if_ready(self, queue_entry):
3280 """
showard8375ce02009-10-12 20:35:13 +00003281 Run this job by kicking its HQEs into status='Starting' if enough
3282 hosts are ready for it to run.
3283
3284 Cleans up by kicking HQEs into status='Stopped' if this Job is not
3285 ready to run.
showard77182562009-06-10 00:16:05 +00003286 """
showardb2e2c322008-10-14 17:33:55 +00003287 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00003288 self.stop_if_necessary()
showard8cc058f2009-09-08 16:26:33 +00003289 elif queue_entry.atomic_group:
3290 self.run_with_ready_delay(queue_entry)
3291 else:
3292 self.run(queue_entry)
showard77182562009-06-10 00:16:05 +00003293
3294
3295 def run_with_ready_delay(self, queue_entry):
3296 """
3297 Start a delay to wait for more hosts to enter Pending state before
3298 launching an atomic group job. Once set, the a delay cannot be reset.
3299
3300 @param queue_entry: The HostQueueEntry object to get atomic group
3301 info from and pass to run_if_ready when the delay is up.
3302
3303 @returns An Agent to run the job as appropriate or None if a delay
3304 has already been set.
3305 """
3306 assert queue_entry.job_id == self.id
3307 assert queue_entry.atomic_group
3308 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
showardd2014822009-10-12 20:26:58 +00003309 over_max_threshold = (self._pending_count() >=
3310 self._pending_threshold(queue_entry.atomic_group))
showard77182562009-06-10 00:16:05 +00003311 delay_expired = (self._delay_ready_task and
3312 time.time() >= self._delay_ready_task.end_time)
3313
3314 # Delay is disabled or we already have enough? Do not wait to run.
3315 if not delay or over_max_threshold or delay_expired:
showard8cc058f2009-09-08 16:26:33 +00003316 self.run(queue_entry)
3317 else:
3318 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
showard77182562009-06-10 00:16:05 +00003319
showard8cc058f2009-09-08 16:26:33 +00003320
3321 def schedule_delayed_callback_task(self, queue_entry):
3322 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
3323
showard77182562009-06-10 00:16:05 +00003324 if self._delay_ready_task:
3325 return None
3326
showard8cc058f2009-09-08 16:26:33 +00003327 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
3328
showard77182562009-06-10 00:16:05 +00003329 def run_job_after_delay():
showardd2014822009-10-12 20:26:58 +00003330 logging.info('Job %s done waiting for extra hosts.', self)
3331 # Check to see if the job is still relevant. It could have aborted
3332 # while we were waiting or hosts could have disappearred, etc.
3333 threshold = self._pending_threshold(queue_entry.atomic_group)
3334 if self._pending_count() < threshold:
3335 logging.info('Job %s had too few Pending hosts after waiting '
3336 'for extras. Not running.', self)
3337 return
showard77182562009-06-10 00:16:05 +00003338 return self.run(queue_entry)
3339
showard708b3522009-08-20 23:26:15 +00003340 logging.info('Job %s waiting up to %s seconds for more hosts.',
3341 self.id, delay)
showard77182562009-06-10 00:16:05 +00003342 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3343 callback=run_job_after_delay)
showard8cc058f2009-09-08 16:26:33 +00003344 return self._delay_ready_task
showard77182562009-06-10 00:16:05 +00003345
3346
3347 def run(self, queue_entry):
3348 """
3349 @param queue_entry: The HostQueueEntry instance calling this method.
showard77182562009-06-10 00:16:05 +00003350 """
3351 if queue_entry.atomic_group and self._atomic_and_has_started():
3352 logging.error('Job.run() called on running atomic Job %d '
3353 'with HQE %s.', self.id, queue_entry)
showard8cc058f2009-09-08 16:26:33 +00003354 return
3355 queue_entries = self._choose_group_to_run(queue_entry)
showard828fc4c2009-09-14 20:31:00 +00003356 if queue_entries:
3357 self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00003358
3359
showard8cc058f2009-09-08 16:26:33 +00003360 def _finish_run(self, queue_entries):
showardb2ccdda2008-10-28 20:39:05 +00003361 for queue_entry in queue_entries:
showard8cc058f2009-09-08 16:26:33 +00003362 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showardd2014822009-10-12 20:26:58 +00003363 self.abort_delay_ready_task()
3364
3365
3366 def abort_delay_ready_task(self):
3367 """Abort the delayed task associated with this job, if any."""
showard77182562009-06-10 00:16:05 +00003368 if self._delay_ready_task:
3369 # Cancel any pending callback that would try to run again
3370 # as we are already running.
3371 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003372
showardd2014822009-10-12 20:26:58 +00003373
showardb000a8d2009-07-28 20:02:07 +00003374 def __str__(self):
3375 return '%s-%s' % (self.id, self.owner)
3376
3377
mbligh36768f02008-02-22 18:28:33 +00003378if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003379 main()