blob: cf0ec2ef9c57dbbfaed9b8bee040eae54bf88fa1 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showard136e6dc2009-06-10 19:38:49 +000010import itertools, logging, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard043c62a2009-06-10 19:48:57 +000013from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard136e6dc2009-06-10 19:38:49 +000015from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000016from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000017from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000018from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000019from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000020from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000021from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000022
showard549afad2009-08-20 23:33:36 +000023BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
24PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000025
mbligh36768f02008-02-22 18:28:33 +000026RESULTS_DIR = '.'
27AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000028DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000029AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
30
31if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000032 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000033AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
34AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
35
36if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000037 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000038
showardd3dc1992009-04-22 21:01:40 +000039_AUTOSERV_PID_FILE = '.autoserv_execute'
40_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
41_PARSER_PID_FILE = '.parser_execute'
42
43_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
44 _PARSER_PID_FILE)
45
showard35162b02009-03-03 02:17:30 +000046# error message to leave in results dir when an autoserv process disappears
47# mysteriously
48_LOST_PROCESS_ERROR = """\
49Autoserv failed abnormally during execution for this job, probably due to a
50system error on the Autotest server. Full results may not be available. Sorry.
51"""
52
mbligh6f8bab42008-02-29 22:45:14 +000053_db = None
mbligh36768f02008-02-22 18:28:33 +000054_shutdown = False
showard170873e2009-01-07 00:22:26 +000055_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
56_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000057_testing_mode = False
showard542e8402008-09-19 20:16:18 +000058_base_url = None
showardc85c21b2008-11-24 22:17:37 +000059_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000060_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000061
62
showardec6a3b92009-09-25 20:29:13 +000063def _get_pidfile_timeout_secs():
64 """@returns How long to wait for autoserv to write pidfile."""
65 pidfile_timeout_mins = global_config.global_config.get_config_value(
66 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
67 return pidfile_timeout_mins * 60
68
69
mbligh83c1e9e2009-05-01 23:10:41 +000070def _site_init_monitor_db_dummy():
71 return {}
72
73
mbligh36768f02008-02-22 18:28:33 +000074def main():
showard27f33872009-04-07 18:20:53 +000075 try:
showard549afad2009-08-20 23:33:36 +000076 try:
77 main_without_exception_handling()
78 except SystemExit:
79 raise
80 except:
81 logging.exception('Exception escaping in monitor_db')
82 raise
83 finally:
84 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000085
86
87def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000088 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000089
showard136e6dc2009-06-10 19:38:49 +000090 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000091 parser = optparse.OptionParser(usage)
92 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
93 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000094 parser.add_option('--test', help='Indicate that scheduler is under ' +
95 'test and should use dummy autoserv and no parsing',
96 action='store_true')
97 (options, args) = parser.parse_args()
98 if len(args) != 1:
99 parser.print_usage()
100 return
mbligh36768f02008-02-22 18:28:33 +0000101
showard5613c662009-06-08 23:30:33 +0000102 scheduler_enabled = global_config.global_config.get_config_value(
103 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
104
105 if not scheduler_enabled:
106 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
107 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000108 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000109 sys.exit(1)
110
jadmanski0afbb632008-06-06 21:10:57 +0000111 global RESULTS_DIR
112 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000113
mbligh83c1e9e2009-05-01 23:10:41 +0000114 site_init = utils.import_site_function(__file__,
115 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
116 _site_init_monitor_db_dummy)
117 site_init()
118
showardcca334f2009-03-12 20:38:34 +0000119 # Change the cwd while running to avoid issues incase we were launched from
120 # somewhere odd (such as a random NFS home directory of the person running
121 # sudo to launch us as the appropriate user).
122 os.chdir(RESULTS_DIR)
123
jadmanski0afbb632008-06-06 21:10:57 +0000124 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000125 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
126 "notify_email_statuses",
127 default='')
showardc85c21b2008-11-24 22:17:37 +0000128 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000129 _notify_email_statuses = [status for status in
130 re.split(r'[\s,;:]', notify_statuses_list.lower())
131 if status]
showardc85c21b2008-11-24 22:17:37 +0000132
jadmanski0afbb632008-06-06 21:10:57 +0000133 if options.test:
134 global _autoserv_path
135 _autoserv_path = 'autoserv_dummy'
136 global _testing_mode
137 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000138
mbligh37eceaa2008-12-15 22:56:37 +0000139 # AUTOTEST_WEB.base_url is still a supported config option as some people
140 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000141 global _base_url
showard170873e2009-01-07 00:22:26 +0000142 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
143 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000144 if config_base_url:
145 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000146 else:
mbligh37eceaa2008-12-15 22:56:37 +0000147 # For the common case of everything running on a single server you
148 # can just set the hostname in a single place in the config file.
149 server_name = c.get_config_value('SERVER', 'hostname')
150 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000151 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000152 sys.exit(1)
153 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000154
showardc5afc462009-01-13 00:09:39 +0000155 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000156 server.start()
157
jadmanski0afbb632008-06-06 21:10:57 +0000158 try:
showard136e6dc2009-06-10 19:38:49 +0000159 init()
showardc5afc462009-01-13 00:09:39 +0000160 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000161 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000162
jadmanski0afbb632008-06-06 21:10:57 +0000163 while not _shutdown:
164 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000165 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000166 except:
showard170873e2009-01-07 00:22:26 +0000167 email_manager.manager.log_stacktrace(
168 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000169
showard170873e2009-01-07 00:22:26 +0000170 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000171 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000172 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000173 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000174
175
showard136e6dc2009-06-10 19:38:49 +0000176def setup_logging():
177 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
178 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
179 logging_manager.configure_logging(
180 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
181 logfile_name=log_name)
182
183
mbligh36768f02008-02-22 18:28:33 +0000184def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000185 global _shutdown
186 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000187 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000188
189
showard136e6dc2009-06-10 19:38:49 +0000190def init():
showardb18134f2009-03-20 20:52:18 +0000191 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
192 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000193
showard8de37132009-08-31 18:33:08 +0000194 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000195 logging.critical("monitor_db already running, aborting!")
196 sys.exit(1)
197 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000198
showardb1e51872008-10-07 11:08:18 +0000199 if _testing_mode:
200 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000201 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000202
jadmanski0afbb632008-06-06 21:10:57 +0000203 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
204 global _db
showard170873e2009-01-07 00:22:26 +0000205 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000206 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000207
showardfa8629c2008-11-04 16:51:23 +0000208 # ensure Django connection is in autocommit
209 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000210 # bypass the readonly connection
211 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000212
showardb18134f2009-03-20 20:52:18 +0000213 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000214 signal.signal(signal.SIGINT, handle_sigint)
215
showardd1ee1dd2009-01-07 21:33:08 +0000216 drones = global_config.global_config.get_config_value(
217 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
218 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000219 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000220 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000221 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
222
showardb18134f2009-03-20 20:52:18 +0000223 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000224
225
showarded2afea2009-07-07 20:54:07 +0000226def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
227 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000228 """
229 @returns The autoserv command line as a list of executable + parameters.
230
231 @param machines - string - A machine or comma separated list of machines
232 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000233 @param extra_args - list - Additional arguments to pass to autoserv.
234 @param job - Job object - If supplied, -u owner and -l name parameters
235 will be added.
236 @param queue_entry - A HostQueueEntry object - If supplied and no Job
237 object was supplied, this will be used to lookup the Job object.
238 """
showard87ba02a2009-04-20 19:37:32 +0000239 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
showarded2afea2009-07-07 20:54:07 +0000240 '-r', drone_manager.WORKING_DIRECTORY]
showard87ba02a2009-04-20 19:37:32 +0000241 if job or queue_entry:
242 if not job:
243 job = queue_entry.job
244 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000245 if verbose:
246 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000247 return autoserv_argv + extra_args
248
249
showard89f84db2009-03-12 20:39:13 +0000250class SchedulerError(Exception):
251 """Raised by HostScheduler when an inconsistent state occurs."""
252
253
showard63a34772008-08-18 19:32:50 +0000254class HostScheduler(object):
255 def _get_ready_hosts(self):
256 # avoid any host with a currently active queue entry against it
257 hosts = Host.fetch(
258 joins='LEFT JOIN host_queue_entries AS active_hqe '
259 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000260 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000261 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000262 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000263 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
264 return dict((host.id, host) for host in hosts)
265
266
267 @staticmethod
268 def _get_sql_id_list(id_list):
269 return ','.join(str(item_id) for item_id in id_list)
270
271
272 @classmethod
showard989f25d2008-10-01 11:38:11 +0000273 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000274 if not id_list:
275 return {}
showard63a34772008-08-18 19:32:50 +0000276 query %= cls._get_sql_id_list(id_list)
277 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000278 return cls._process_many2many_dict(rows, flip)
279
280
281 @staticmethod
282 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000283 result = {}
284 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000285 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000286 if flip:
287 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000288 result.setdefault(left_id, set()).add(right_id)
289 return result
290
291
292 @classmethod
293 def _get_job_acl_groups(cls, job_ids):
294 query = """
showardd9ac4452009-02-07 02:04:37 +0000295 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000296 FROM jobs
297 INNER JOIN users ON users.login = jobs.owner
298 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
299 WHERE jobs.id IN (%s)
300 """
301 return cls._get_many2many_dict(query, job_ids)
302
303
304 @classmethod
305 def _get_job_ineligible_hosts(cls, job_ids):
306 query = """
307 SELECT job_id, host_id
308 FROM ineligible_host_queues
309 WHERE job_id IN (%s)
310 """
311 return cls._get_many2many_dict(query, job_ids)
312
313
314 @classmethod
showard989f25d2008-10-01 11:38:11 +0000315 def _get_job_dependencies(cls, job_ids):
316 query = """
317 SELECT job_id, label_id
318 FROM jobs_dependency_labels
319 WHERE job_id IN (%s)
320 """
321 return cls._get_many2many_dict(query, job_ids)
322
323
324 @classmethod
showard63a34772008-08-18 19:32:50 +0000325 def _get_host_acls(cls, host_ids):
326 query = """
showardd9ac4452009-02-07 02:04:37 +0000327 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000328 FROM acl_groups_hosts
329 WHERE host_id IN (%s)
330 """
331 return cls._get_many2many_dict(query, host_ids)
332
333
334 @classmethod
335 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000336 if not host_ids:
337 return {}, {}
showard63a34772008-08-18 19:32:50 +0000338 query = """
339 SELECT label_id, host_id
340 FROM hosts_labels
341 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000342 """ % cls._get_sql_id_list(host_ids)
343 rows = _db.execute(query)
344 labels_to_hosts = cls._process_many2many_dict(rows)
345 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
346 return labels_to_hosts, hosts_to_labels
347
348
349 @classmethod
350 def _get_labels(cls):
351 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000352
353
354 def refresh(self, pending_queue_entries):
355 self._hosts_available = self._get_ready_hosts()
356
357 relevant_jobs = [queue_entry.job_id
358 for queue_entry in pending_queue_entries]
359 self._job_acls = self._get_job_acl_groups(relevant_jobs)
360 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000361 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000362
363 host_ids = self._hosts_available.keys()
364 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000365 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
366
367 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000368
369
370 def _is_acl_accessible(self, host_id, queue_entry):
371 job_acls = self._job_acls.get(queue_entry.job_id, set())
372 host_acls = self._host_acls.get(host_id, set())
373 return len(host_acls.intersection(job_acls)) > 0
374
375
showard989f25d2008-10-01 11:38:11 +0000376 def _check_job_dependencies(self, job_dependencies, host_labels):
377 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000378 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000379
380
381 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
382 queue_entry):
showardade14e22009-01-26 22:38:32 +0000383 if not queue_entry.meta_host:
384 # bypass only_if_needed labels when a specific host is selected
385 return True
386
showard989f25d2008-10-01 11:38:11 +0000387 for label_id in host_labels:
388 label = self._labels[label_id]
389 if not label.only_if_needed:
390 # we don't care about non-only_if_needed labels
391 continue
392 if queue_entry.meta_host == label_id:
393 # if the label was requested in a metahost it's OK
394 continue
395 if label_id not in job_dependencies:
396 return False
397 return True
398
399
showard89f84db2009-03-12 20:39:13 +0000400 def _check_atomic_group_labels(self, host_labels, queue_entry):
401 """
402 Determine if the given HostQueueEntry's atomic group settings are okay
403 to schedule on a host with the given labels.
404
showard6157c632009-07-06 20:19:31 +0000405 @param host_labels: A list of label ids that the host has.
406 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000407
408 @returns True if atomic group settings are okay, False otherwise.
409 """
showard6157c632009-07-06 20:19:31 +0000410 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000411 queue_entry.atomic_group_id)
412
413
showard6157c632009-07-06 20:19:31 +0000414 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000415 """
416 Return the atomic group label id for a host with the given set of
417 labels if any, or None otherwise. Raises an exception if more than
418 one atomic group are found in the set of labels.
419
showard6157c632009-07-06 20:19:31 +0000420 @param host_labels: A list of label ids that the host has.
421 @param queue_entry: The HostQueueEntry we're testing. Only used for
422 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000423
424 @returns The id of the atomic group found on a label in host_labels
425 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000426 """
showard6157c632009-07-06 20:19:31 +0000427 atomic_labels = [self._labels[label_id] for label_id in host_labels
428 if self._labels[label_id].atomic_group_id is not None]
429 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000430 if not atomic_ids:
431 return None
432 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000433 logging.error('More than one Atomic Group on HQE "%s" via: %r',
434 queue_entry, atomic_labels)
435 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000436
437
438 def _get_atomic_group_labels(self, atomic_group_id):
439 """
440 Lookup the label ids that an atomic_group is associated with.
441
442 @param atomic_group_id - The id of the AtomicGroup to look up.
443
444 @returns A generator yeilding Label ids for this atomic group.
445 """
446 return (id for id, label in self._labels.iteritems()
447 if label.atomic_group_id == atomic_group_id
448 and not label.invalid)
449
450
showard54c1ea92009-05-20 00:32:58 +0000451 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000452 """
453 @param group_hosts - A sequence of Host ids to test for usability
454 and eligibility against the Job associated with queue_entry.
455 @param queue_entry - The HostQueueEntry that these hosts are being
456 tested for eligibility against.
457
458 @returns A subset of group_hosts Host ids that are eligible for the
459 supplied queue_entry.
460 """
461 return set(host_id for host_id in group_hosts
462 if self._is_host_usable(host_id)
463 and self._is_host_eligible_for_job(host_id, queue_entry))
464
465
showard989f25d2008-10-01 11:38:11 +0000466 def _is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000467 if self._is_host_invalid(host_id):
468 # if an invalid host is scheduled for a job, it's a one-time host
469 # and it therefore bypasses eligibility checks. note this can only
470 # happen for non-metahosts, because invalid hosts have their label
471 # relationships cleared.
472 return True
473
showard989f25d2008-10-01 11:38:11 +0000474 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
475 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000476
showard89f84db2009-03-12 20:39:13 +0000477 return (self._is_acl_accessible(host_id, queue_entry) and
478 self._check_job_dependencies(job_dependencies, host_labels) and
479 self._check_only_if_needed_labels(
480 job_dependencies, host_labels, queue_entry) and
481 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000482
483
showard2924b0a2009-06-18 23:16:15 +0000484 def _is_host_invalid(self, host_id):
485 host_object = self._hosts_available.get(host_id, None)
486 return host_object and host_object.invalid
487
488
showard63a34772008-08-18 19:32:50 +0000489 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000490 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000491 return None
492 return self._hosts_available.pop(queue_entry.host_id, None)
493
494
495 def _is_host_usable(self, host_id):
496 if host_id not in self._hosts_available:
497 # host was already used during this scheduling cycle
498 return False
499 if self._hosts_available[host_id].invalid:
500 # Invalid hosts cannot be used for metahosts. They're included in
501 # the original query because they can be used by non-metahosts.
502 return False
503 return True
504
505
506 def _schedule_metahost(self, queue_entry):
507 label_id = queue_entry.meta_host
508 hosts_in_label = self._label_hosts.get(label_id, set())
509 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
510 set())
511
512 # must iterate over a copy so we can mutate the original while iterating
513 for host_id in list(hosts_in_label):
514 if not self._is_host_usable(host_id):
515 hosts_in_label.remove(host_id)
516 continue
517 if host_id in ineligible_host_ids:
518 continue
showard989f25d2008-10-01 11:38:11 +0000519 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000520 continue
521
showard89f84db2009-03-12 20:39:13 +0000522 # Remove the host from our cached internal state before returning
523 # the host object.
showard63a34772008-08-18 19:32:50 +0000524 hosts_in_label.remove(host_id)
525 return self._hosts_available.pop(host_id)
526 return None
527
528
529 def find_eligible_host(self, queue_entry):
530 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000531 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000532 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000533 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000534 return self._schedule_metahost(queue_entry)
535
536
showard89f84db2009-03-12 20:39:13 +0000537 def find_eligible_atomic_group(self, queue_entry):
538 """
539 Given an atomic group host queue entry, locate an appropriate group
540 of hosts for the associated job to run on.
541
542 The caller is responsible for creating new HQEs for the additional
543 hosts returned in order to run the actual job on them.
544
545 @returns A list of Host instances in a ready state to satisfy this
546 atomic group scheduling. Hosts will all belong to the same
547 atomic group label as specified by the queue_entry.
548 An empty list will be returned if no suitable atomic
549 group could be found.
550
551 TODO(gps): what is responsible for kicking off any attempted repairs on
552 a group of hosts? not this function, but something needs to. We do
553 not communicate that reason for returning [] outside of here...
554 For now, we'll just be unschedulable if enough hosts within one group
555 enter Repair Failed state.
556 """
557 assert queue_entry.atomic_group_id is not None
558 job = queue_entry.job
559 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000560 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000561 if job.synch_count > atomic_group.max_number_of_machines:
562 # Such a Job and HostQueueEntry should never be possible to
563 # create using the frontend. Regardless, we can't process it.
564 # Abort it immediately and log an error on the scheduler.
565 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000566 logging.error(
567 'Error: job %d synch_count=%d > requested atomic_group %d '
568 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
569 job.id, job.synch_count, atomic_group.id,
570 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000571 return []
572 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
573 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
574 set())
575
576 # Look in each label associated with atomic_group until we find one with
577 # enough hosts to satisfy the job.
578 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
579 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
580 if queue_entry.meta_host is not None:
581 # If we have a metahost label, only allow its hosts.
582 group_hosts.intersection_update(hosts_in_label)
583 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000584 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000585 group_hosts, queue_entry)
586
587 # Job.synch_count is treated as "minimum synch count" when
588 # scheduling for an atomic group of hosts. The atomic group
589 # number of machines is the maximum to pick out of a single
590 # atomic group label for scheduling at one time.
591 min_hosts = job.synch_count
592 max_hosts = atomic_group.max_number_of_machines
593
showard54c1ea92009-05-20 00:32:58 +0000594 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000595 # Not enough eligible hosts in this atomic group label.
596 continue
597
showard54c1ea92009-05-20 00:32:58 +0000598 eligible_hosts_in_group = [self._hosts_available[id]
599 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000600 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000601 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000602
showard89f84db2009-03-12 20:39:13 +0000603 # Limit ourselves to scheduling the atomic group size.
604 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000605 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000606
607 # Remove the selected hosts from our cached internal state
608 # of available hosts in order to return the Host objects.
609 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000610 for host in eligible_hosts_in_group:
611 hosts_in_label.discard(host.id)
612 self._hosts_available.pop(host.id)
613 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000614 return host_list
615
616 return []
617
618
showard170873e2009-01-07 00:22:26 +0000619class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000620 def __init__(self):
621 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000622 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000623 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000624 user_cleanup_time = scheduler_config.config.clean_interval
625 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
626 _db, user_cleanup_time)
627 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000628 self._host_agents = {}
629 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000630
mbligh36768f02008-02-22 18:28:33 +0000631
showard915958d2009-04-22 21:00:58 +0000632 def initialize(self, recover_hosts=True):
633 self._periodic_cleanup.initialize()
634 self._24hr_upkeep.initialize()
635
jadmanski0afbb632008-06-06 21:10:57 +0000636 # always recover processes
637 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000638
jadmanski0afbb632008-06-06 21:10:57 +0000639 if recover_hosts:
640 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000641
642
jadmanski0afbb632008-06-06 21:10:57 +0000643 def tick(self):
showard170873e2009-01-07 00:22:26 +0000644 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000645 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000646 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000647 self._process_recurring_runs()
showard8cc058f2009-09-08 16:26:33 +0000648 self._schedule_delay_tasks()
showard8cc058f2009-09-08 16:26:33 +0000649 self._schedule_running_host_queue_entries()
650 self._schedule_special_tasks()
showard65db3932009-10-28 19:54:35 +0000651 self._schedule_new_jobs()
jadmanski0afbb632008-06-06 21:10:57 +0000652 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000653 _drone_manager.execute_actions()
654 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000655
showard97aed502008-11-04 02:01:24 +0000656
mblighf3294cc2009-04-08 21:17:38 +0000657 def _run_cleanup(self):
658 self._periodic_cleanup.run_cleanup_maybe()
659 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000660
mbligh36768f02008-02-22 18:28:33 +0000661
showard170873e2009-01-07 00:22:26 +0000662 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
663 for object_id in object_ids:
664 agent_dict.setdefault(object_id, set()).add(agent)
665
666
667 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
668 for object_id in object_ids:
669 assert object_id in agent_dict
670 agent_dict[object_id].remove(agent)
671
672
jadmanski0afbb632008-06-06 21:10:57 +0000673 def add_agent(self, agent):
674 self._agents.append(agent)
675 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000676 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
677 self._register_agent_for_ids(self._queue_entry_agents,
678 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000679
showard170873e2009-01-07 00:22:26 +0000680
681 def get_agents_for_entry(self, queue_entry):
682 """
683 Find agents corresponding to the specified queue_entry.
684 """
showardd3dc1992009-04-22 21:01:40 +0000685 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000686
687
688 def host_has_agent(self, host):
689 """
690 Determine if there is currently an Agent present using this host.
691 """
692 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000693
694
jadmanski0afbb632008-06-06 21:10:57 +0000695 def remove_agent(self, agent):
696 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000697 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
698 agent)
699 self._unregister_agent_for_ids(self._queue_entry_agents,
700 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000701
702
showard8cc058f2009-09-08 16:26:33 +0000703 def _host_has_scheduled_special_task(self, host):
704 return bool(models.SpecialTask.objects.filter(host__id=host.id,
705 is_active=False,
706 is_complete=False))
707
708
jadmanski0afbb632008-06-06 21:10:57 +0000709 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000710 self._register_pidfiles()
711 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000712 self._recover_all_recoverable_entries()
showard8cc058f2009-09-08 16:26:33 +0000713 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000714 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000715 self._reverify_remaining_hosts()
716 # reinitialize drones after killing orphaned processes, since they can
717 # leave around files when they die
718 _drone_manager.execute_actions()
719 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000720
showard170873e2009-01-07 00:22:26 +0000721
722 def _register_pidfiles(self):
723 # during recovery we may need to read pidfiles for both running and
724 # parsing entries
725 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000726 where="status IN ('Running', 'Gathering', 'Parsing')")
showarded2afea2009-07-07 20:54:07 +0000727 special_tasks = models.SpecialTask.objects.filter(is_active=True)
728 for execution_entry in itertools.chain(queue_entries, special_tasks):
showardd3dc1992009-04-22 21:01:40 +0000729 for pidfile_name in _ALL_PIDFILE_NAMES:
730 pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000731 execution_entry.execution_path(), pidfile_name=pidfile_name)
showardd3dc1992009-04-22 21:01:40 +0000732 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000733
734
showarded2afea2009-07-07 20:54:07 +0000735 def _get_recovery_run_monitor(self, execution_path, pidfile_name, orphans):
736 run_monitor = PidfileRunMonitor()
737 run_monitor.attach_to_existing_process(execution_path,
738 pidfile_name=pidfile_name)
739 if run_monitor.has_process():
740 orphans.discard(run_monitor.get_process())
741 return run_monitor, '(process %s)' % run_monitor.get_process()
742 return None, 'without process'
743
744
showard8cc058f2009-09-08 16:26:33 +0000745 def _get_unassigned_entries(self, status):
746 for entry in HostQueueEntry.fetch(where="status = '%s'" % status):
showard0db3d432009-10-12 20:29:15 +0000747 if entry.status == status and not self.get_agents_for_entry(entry):
748 # The status can change during iteration, e.g., if job.run()
749 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000750 yield entry
751
752
showardd3dc1992009-04-22 21:01:40 +0000753 def _recover_entries_with_status(self, status, orphans, pidfile_name,
754 recover_entries_fn):
showard8cc058f2009-09-08 16:26:33 +0000755 for queue_entry in self._get_unassigned_entries(status):
showardd3dc1992009-04-22 21:01:40 +0000756 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showarded2afea2009-07-07 20:54:07 +0000757 run_monitor, process_string = self._get_recovery_run_monitor(
758 queue_entry.execution_path(), pidfile_name, orphans)
showard8cc058f2009-09-08 16:26:33 +0000759 if not run_monitor:
760 # _schedule_running_host_queue_entries should schedule and
761 # recover these entries
762 continue
showard597bfd32009-05-08 18:22:50 +0000763
showarded2afea2009-07-07 20:54:07 +0000764 logging.info('Recovering %s entry %s %s',status.lower(),
765 ', '.join(str(entry) for entry in queue_entries),
766 process_string)
showardd3dc1992009-04-22 21:01:40 +0000767 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000768
769
showard6878e8b2009-07-20 22:37:45 +0000770 def _check_for_remaining_orphan_processes(self, orphans):
771 if not orphans:
772 return
773 subject = 'Unrecovered orphan autoserv processes remain'
774 message = '\n'.join(str(process) for process in orphans)
775 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000776
777 die_on_orphans = global_config.global_config.get_config_value(
778 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
779
780 if die_on_orphans:
781 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000782
showard170873e2009-01-07 00:22:26 +0000783
showardd3dc1992009-04-22 21:01:40 +0000784 def _recover_running_entries(self, orphans):
785 def recover_entries(job, queue_entries, run_monitor):
showard8cc058f2009-09-08 16:26:33 +0000786 queue_task = QueueTask(job=job, queue_entries=queue_entries,
787 recover_run_monitor=run_monitor)
788 self.add_agent(Agent(task=queue_task,
789 num_processes=len(queue_entries)))
showardd3dc1992009-04-22 21:01:40 +0000790
791 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
showarded2afea2009-07-07 20:54:07 +0000792 orphans, _AUTOSERV_PID_FILE,
showardd3dc1992009-04-22 21:01:40 +0000793 recover_entries)
794
795
796 def _recover_gathering_entries(self, orphans):
797 def recover_entries(job, queue_entries, run_monitor):
798 gather_task = GatherLogsTask(job, queue_entries,
showarded2afea2009-07-07 20:54:07 +0000799 recover_run_monitor=run_monitor)
showard8cc058f2009-09-08 16:26:33 +0000800 self.add_agent(Agent(gather_task))
showardd3dc1992009-04-22 21:01:40 +0000801
802 self._recover_entries_with_status(
803 models.HostQueueEntry.Status.GATHERING,
804 orphans, _CRASHINFO_PID_FILE, recover_entries)
805
806
807 def _recover_parsing_entries(self, orphans):
808 def recover_entries(job, queue_entries, run_monitor):
809 reparse_task = FinalReparseTask(queue_entries,
showarded2afea2009-07-07 20:54:07 +0000810 recover_run_monitor=run_monitor)
showard8cc058f2009-09-08 16:26:33 +0000811 self.add_agent(Agent(reparse_task, num_processes=0))
showardd3dc1992009-04-22 21:01:40 +0000812
813 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
814 orphans, _PARSER_PID_FILE,
815 recover_entries)
816
817
showard8cc058f2009-09-08 16:26:33 +0000818 def _recover_pending_entries(self):
819 for entry in self._get_unassigned_entries(
820 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000821 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000822 entry.on_pending()
823
824
showardd3dc1992009-04-22 21:01:40 +0000825 def _recover_all_recoverable_entries(self):
826 orphans = _drone_manager.get_orphaned_autoserv_processes()
827 self._recover_running_entries(orphans)
828 self._recover_gathering_entries(orphans)
829 self._recover_parsing_entries(orphans)
showarded2afea2009-07-07 20:54:07 +0000830 self._recover_special_tasks(orphans)
showard6878e8b2009-07-20 22:37:45 +0000831 self._check_for_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000832
showard97aed502008-11-04 02:01:24 +0000833
showarded2afea2009-07-07 20:54:07 +0000834 def _recover_special_tasks(self, orphans):
showard2fe3f1d2009-07-06 20:19:11 +0000835 """\
836 Recovers all special tasks that have started running but have not
837 completed.
838 """
showard2fe3f1d2009-07-06 20:19:11 +0000839 tasks = models.SpecialTask.objects.filter(is_active=True,
840 is_complete=False)
showard65db3932009-10-28 19:54:35 +0000841 for task in tasks:
showard9b6ec502009-08-20 23:25:17 +0000842 if self.host_has_agent(task.host):
843 raise SchedulerError(
844 "%s already has a host agent %s." % (
showard8cc058f2009-09-08 16:26:33 +0000845 task, self._host_agents.get(task.host.id)))
showard2fe3f1d2009-07-06 20:19:11 +0000846
showarded2afea2009-07-07 20:54:07 +0000847 run_monitor, process_string = self._get_recovery_run_monitor(
848 task.execution_path(), _AUTOSERV_PID_FILE, orphans)
849
850 logging.info('Recovering %s %s', task, process_string)
showard65db3932009-10-28 19:54:35 +0000851 self._run_or_recover_special_task(task, run_monitor=run_monitor)
showard6af73ad2009-07-28 20:00:58 +0000852
853
showardb8900452009-10-12 20:31:01 +0000854 def _check_for_unrecovered_verifying_entries(self):
showard170873e2009-01-07 00:22:26 +0000855 queue_entries = HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000856 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
857 unrecovered_hqes = []
858 for queue_entry in queue_entries:
859 special_tasks = models.SpecialTask.objects.filter(
860 task__in=(models.SpecialTask.Task.CLEANUP,
861 models.SpecialTask.Task.VERIFY),
862 queue_entry__id=queue_entry.id,
863 is_complete=False)
864 if special_tasks.count() == 0:
865 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000866
showardb8900452009-10-12 20:31:01 +0000867 if unrecovered_hqes:
868 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
showarde8e37072009-08-20 23:31:30 +0000869 raise SchedulerError(
showard37757f32009-10-19 18:34:24 +0000870 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000871 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000872
873
showard65db3932009-10-28 19:54:35 +0000874 def _get_prioritized_special_tasks(self):
875 """
876 Returns all queued SpecialTasks prioritized for repair first, then
877 cleanup, then verify.
878 """
879 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
880 is_complete=False,
881 host__locked=False)
882 # exclude hosts with active queue entries unless the SpecialTask is for
883 # that queue entry
884 queued_tasks = models.Host.objects.add_join(
885 queued_tasks, 'host_queue_entries', 'host_id',
886 join_condition='host_queue_entries.active',
887 force_left_join=True)
888 queued_tasks = queued_tasks.extra(
889 where=['(host_queue_entries.id IS NULL OR '
890 'host_queue_entries.id = special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000891
showard65db3932009-10-28 19:54:35 +0000892 # reorder tasks by priority
893 task_priority_order = [models.SpecialTask.Task.REPAIR,
894 models.SpecialTask.Task.CLEANUP,
895 models.SpecialTask.Task.VERIFY]
896 def task_priority_key(task):
897 return task_priority_order.index(task.task)
898 return sorted(queued_tasks, key=task_priority_key)
899
900
901 def _run_or_recover_special_task(self, special_task, run_monitor=None):
902 """
903 Construct an AgentTask class to run the given SpecialTask and add it
904 to this dispatcher.
905 @param special_task: a models.SpecialTask instance
906 @run_monitor: if given, a running SpecialTask will be recovered with
907 this monitor.
908 """
909 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
910 for agent_task_class in special_agent_task_classes:
911 if agent_task_class.TASK_TYPE == special_task.task:
912 agent_task = agent_task_class(task=special_task,
913 recover_run_monitor=run_monitor)
914 self.add_agent(Agent(agent_task))
915 return
916
917 email_manager.manager.enqueue_notify_email(
918 'No AgentTask class for task', str(special_task))
919
920
921 def _schedule_special_tasks(self):
922 """
923 Execute queued SpecialTasks that are ready to run on idle hosts.
924 """
925 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000926 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000927 continue
showard65db3932009-10-28 19:54:35 +0000928 self._run_or_recover_special_task(task)
showard1ff7b2e2009-05-15 23:17:18 +0000929
930
showard170873e2009-01-07 00:22:26 +0000931 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000932 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000933 # should never happen
showarded2afea2009-07-07 20:54:07 +0000934 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000935 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000936 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000937 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000938 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000939
940
jadmanski0afbb632008-06-06 21:10:57 +0000941 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000942 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +0000943 full_where='locked = 0 AND invalid = 0 AND ' + where
944 for host in Host.fetch(where=full_where):
945 if self.host_has_agent(host):
946 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000947 continue
showard8cc058f2009-09-08 16:26:33 +0000948 if self._host_has_scheduled_special_task(host):
949 # host will have a special task scheduled on the next cycle
950 continue
showard170873e2009-01-07 00:22:26 +0000951 if print_message:
showardb18134f2009-03-20 20:52:18 +0000952 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000953 models.SpecialTask.objects.create(
954 task=models.SpecialTask.Task.CLEANUP,
955 host=models.Host(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000956
957
jadmanski0afbb632008-06-06 21:10:57 +0000958 def _recover_hosts(self):
959 # recover "Repair Failed" hosts
960 message = 'Reverifying dead host %s'
961 self._reverify_hosts_where("status = 'Repair Failed'",
962 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000963
964
showard04c82c52008-05-29 19:38:12 +0000965
showardb95b1bd2008-08-15 18:11:04 +0000966 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000967 # prioritize by job priority, then non-metahost over metahost, then FIFO
968 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000969 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000970 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000971 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000972
973
showard89f84db2009-03-12 20:39:13 +0000974 def _refresh_pending_queue_entries(self):
975 """
976 Lookup the pending HostQueueEntries and call our HostScheduler
977 refresh() method given that list. Return the list.
978
979 @returns A list of pending HostQueueEntries sorted in priority order.
980 """
showard63a34772008-08-18 19:32:50 +0000981 queue_entries = self._get_pending_queue_entries()
982 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000983 return []
showardb95b1bd2008-08-15 18:11:04 +0000984
showard63a34772008-08-18 19:32:50 +0000985 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000986
showard89f84db2009-03-12 20:39:13 +0000987 return queue_entries
988
989
990 def _schedule_atomic_group(self, queue_entry):
991 """
992 Schedule the given queue_entry on an atomic group of hosts.
993
994 Returns immediately if there are insufficient available hosts.
995
996 Creates new HostQueueEntries based off of queue_entry for the
997 scheduled hosts and starts them all running.
998 """
999 # This is a virtual host queue entry representing an entire
1000 # atomic group, find a group and schedule their hosts.
1001 group_hosts = self._host_scheduler.find_eligible_atomic_group(
1002 queue_entry)
1003 if not group_hosts:
1004 return
showardcbe6f942009-06-17 19:33:49 +00001005
1006 logging.info('Expanding atomic group entry %s with hosts %s',
1007 queue_entry,
1008 ', '.join(host.hostname for host in group_hosts))
showard89f84db2009-03-12 20:39:13 +00001009 # The first assigned host uses the original HostQueueEntry
1010 group_queue_entries = [queue_entry]
1011 for assigned_host in group_hosts[1:]:
1012 # Create a new HQE for every additional assigned_host.
1013 new_hqe = HostQueueEntry.clone(queue_entry)
1014 new_hqe.save()
1015 group_queue_entries.append(new_hqe)
1016 assert len(group_queue_entries) == len(group_hosts)
1017 for queue_entry, host in itertools.izip(group_queue_entries,
1018 group_hosts):
1019 self._run_queue_entry(queue_entry, host)
1020
1021
1022 def _schedule_new_jobs(self):
1023 queue_entries = self._refresh_pending_queue_entries()
1024 if not queue_entries:
1025 return
1026
showard63a34772008-08-18 19:32:50 +00001027 for queue_entry in queue_entries:
showarde55955f2009-10-07 20:48:58 +00001028 is_unassigned_atomic_group = (
1029 queue_entry.atomic_group_id is not None
1030 and queue_entry.host_id is None)
1031 if is_unassigned_atomic_group:
1032 self._schedule_atomic_group(queue_entry)
1033 else:
showard89f84db2009-03-12 20:39:13 +00001034 assigned_host = self._host_scheduler.find_eligible_host(
1035 queue_entry)
showard65db3932009-10-28 19:54:35 +00001036 if assigned_host and not self.host_has_agent(assigned_host):
showard89f84db2009-03-12 20:39:13 +00001037 self._run_queue_entry(queue_entry, assigned_host)
showardb95b1bd2008-08-15 18:11:04 +00001038
1039
showard8cc058f2009-09-08 16:26:33 +00001040 def _schedule_running_host_queue_entries(self):
showard8375ce02009-10-12 20:35:13 +00001041 status_enum = models.HostQueueEntry.Status
1042 running_statuses = (status_enum.STARTING, status_enum.RUNNING,
1043 status_enum.GATHERING, status_enum.PARSING)
1044 sql_statuses = ', '.join(('"%s"' % s for s in running_statuses))
1045 entries = HostQueueEntry.fetch(where="status IN (%s)" % sql_statuses)
showard8cc058f2009-09-08 16:26:33 +00001046 for entry in entries:
1047 if self.get_agents_for_entry(entry):
1048 continue
1049
1050 task_entries = entry.job.get_group_entries(entry)
1051 for task_entry in task_entries:
1052 if (task_entry.status != models.HostQueueEntry.Status.PARSING
1053 and self.host_has_agent(task_entry.host)):
showard8375ce02009-10-12 20:35:13 +00001054 agent = tuple(self._host_agents.get(task_entry.host.id))[0]
showard8cc058f2009-09-08 16:26:33 +00001055 raise SchedulerError('Attempted to schedule on host that '
1056 'already has agent: %s (previous '
1057 'agent task: %s)'
1058 % (task_entry, agent.task))
1059
1060 if entry.status in (models.HostQueueEntry.Status.STARTING,
1061 models.HostQueueEntry.Status.RUNNING):
1062 params = entry.job.get_autoserv_params(task_entries)
1063 agent_task = QueueTask(job=entry.job,
1064 queue_entries=task_entries, cmd=params)
1065 elif entry.status == models.HostQueueEntry.Status.GATHERING:
1066 agent_task = GatherLogsTask(
1067 job=entry.job, queue_entries=task_entries)
1068 elif entry.status == models.HostQueueEntry.Status.PARSING:
1069 agent_task = FinalReparseTask(queue_entries=task_entries)
1070 else:
1071 raise SchedulerError('_schedule_running_host_queue_entries got '
1072 'entry with invalid status %s: %s'
1073 % (entry.status, entry))
1074
1075 self.add_agent(Agent(agent_task, num_processes=len(task_entries)))
1076
1077
1078 def _schedule_delay_tasks(self):
showardd2014822009-10-12 20:26:58 +00001079 for entry in HostQueueEntry.fetch(where='status = "%s"' %
1080 models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +00001081 task = entry.job.schedule_delayed_callback_task(entry)
1082 if task:
1083 self.add_agent(Agent(task, num_processes=0))
1084
1085
showardb95b1bd2008-08-15 18:11:04 +00001086 def _run_queue_entry(self, queue_entry, host):
showard8cc058f2009-09-08 16:26:33 +00001087 queue_entry.schedule_pre_job_tasks(assigned_host=host)
mblighd5c95802008-03-05 00:33:46 +00001088
1089
jadmanski0afbb632008-06-06 21:10:57 +00001090 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +00001091 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001092 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001093 for agent in self.get_agents_for_entry(entry):
1094 agent.abort()
1095 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001096
1097
showard324bf812009-01-20 23:23:38 +00001098 def _can_start_agent(self, agent, num_started_this_cycle,
1099 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001100 # always allow zero-process agents to run
1101 if agent.num_processes == 0:
1102 return True
1103 # don't allow any nonzero-process agents to run after we've reached a
1104 # limit (this avoids starvation of many-process agents)
1105 if have_reached_limit:
1106 return False
1107 # total process throttling
showard324bf812009-01-20 23:23:38 +00001108 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +00001109 return False
1110 # if a single agent exceeds the per-cycle throttling, still allow it to
1111 # run when it's the first agent in the cycle
1112 if num_started_this_cycle == 0:
1113 return True
1114 # per-cycle throttling
1115 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +00001116 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001117 return False
1118 return True
1119
1120
jadmanski0afbb632008-06-06 21:10:57 +00001121 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001122 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001123 have_reached_limit = False
1124 # iterate over copy, so we can remove agents during iteration
1125 for agent in list(self._agents):
showard8cc058f2009-09-08 16:26:33 +00001126 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001127 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001128 have_reached_limit):
1129 have_reached_limit = True
1130 continue
showard4c5374f2008-09-04 17:02:56 +00001131 num_started_this_cycle += agent.num_processes
1132 agent.tick()
showard8cc058f2009-09-08 16:26:33 +00001133 if agent.is_done():
1134 logging.info("agent finished")
1135 self.remove_agent(agent)
showarda9435c02009-05-13 21:28:17 +00001136 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001137 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001138
1139
showard29f7cd22009-04-29 21:16:24 +00001140 def _process_recurring_runs(self):
1141 recurring_runs = models.RecurringRun.objects.filter(
1142 start_date__lte=datetime.datetime.now())
1143 for rrun in recurring_runs:
1144 # Create job from template
1145 job = rrun.job
1146 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001147 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001148
1149 host_objects = info['hosts']
1150 one_time_hosts = info['one_time_hosts']
1151 metahost_objects = info['meta_hosts']
1152 dependencies = info['dependencies']
1153 atomic_group = info['atomic_group']
1154
1155 for host in one_time_hosts or []:
1156 this_host = models.Host.create_one_time_host(host.hostname)
1157 host_objects.append(this_host)
1158
1159 try:
1160 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001161 options=options,
showard29f7cd22009-04-29 21:16:24 +00001162 host_objects=host_objects,
1163 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001164 atomic_group=atomic_group)
1165
1166 except Exception, ex:
1167 logging.exception(ex)
1168 #TODO send email
1169
1170 if rrun.loop_count == 1:
1171 rrun.delete()
1172 else:
1173 if rrun.loop_count != 0: # if not infinite loop
1174 # calculate new start_date
1175 difference = datetime.timedelta(seconds=rrun.loop_period)
1176 rrun.start_date = rrun.start_date + difference
1177 rrun.loop_count -= 1
1178 rrun.save()
1179
1180
showard170873e2009-01-07 00:22:26 +00001181class PidfileRunMonitor(object):
1182 """
1183 Client must call either run() to start a new process or
1184 attach_to_existing_process().
1185 """
mbligh36768f02008-02-22 18:28:33 +00001186
showard170873e2009-01-07 00:22:26 +00001187 class _PidfileException(Exception):
1188 """
1189 Raised when there's some unexpected behavior with the pid file, but only
1190 used internally (never allowed to escape this class).
1191 """
mbligh36768f02008-02-22 18:28:33 +00001192
1193
showard170873e2009-01-07 00:22:26 +00001194 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001195 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001196 self._start_time = None
1197 self.pidfile_id = None
1198 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001199
1200
showard170873e2009-01-07 00:22:26 +00001201 def _add_nice_command(self, command, nice_level):
1202 if not nice_level:
1203 return command
1204 return ['nice', '-n', str(nice_level)] + command
1205
1206
1207 def _set_start_time(self):
1208 self._start_time = time.time()
1209
1210
1211 def run(self, command, working_directory, nice_level=None, log_file=None,
1212 pidfile_name=None, paired_with_pidfile=None):
1213 assert command is not None
1214 if nice_level is not None:
1215 command = ['nice', '-n', str(nice_level)] + command
1216 self._set_start_time()
1217 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001218 command, working_directory, pidfile_name=pidfile_name,
1219 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001220
1221
showarded2afea2009-07-07 20:54:07 +00001222 def attach_to_existing_process(self, execution_path,
showardd3dc1992009-04-22 21:01:40 +00001223 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001224 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001225 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001226 execution_path, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001227 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001228
1229
jadmanski0afbb632008-06-06 21:10:57 +00001230 def kill(self):
showard170873e2009-01-07 00:22:26 +00001231 if self.has_process():
1232 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001233
mbligh36768f02008-02-22 18:28:33 +00001234
showard170873e2009-01-07 00:22:26 +00001235 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001236 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001237 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001238
1239
showard170873e2009-01-07 00:22:26 +00001240 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001241 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001242 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001243 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001244
1245
showard170873e2009-01-07 00:22:26 +00001246 def _read_pidfile(self, use_second_read=False):
1247 assert self.pidfile_id is not None, (
1248 'You must call run() or attach_to_existing_process()')
1249 contents = _drone_manager.get_pidfile_contents(
1250 self.pidfile_id, use_second_read=use_second_read)
1251 if contents.is_invalid():
1252 self._state = drone_manager.PidfileContents()
1253 raise self._PidfileException(contents)
1254 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001255
1256
showard21baa452008-10-21 00:08:39 +00001257 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001258 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1259 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001260 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001261 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001262
1263
1264 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001265 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001266 return
mblighbb421852008-03-11 22:36:16 +00001267
showard21baa452008-10-21 00:08:39 +00001268 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001269
showard170873e2009-01-07 00:22:26 +00001270 if self._state.process is None:
1271 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001272 return
mbligh90a549d2008-03-25 23:52:34 +00001273
showard21baa452008-10-21 00:08:39 +00001274 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001275 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001276 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001277 return
mbligh90a549d2008-03-25 23:52:34 +00001278
showard170873e2009-01-07 00:22:26 +00001279 # pid but no running process - maybe process *just* exited
1280 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001281 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001282 # autoserv exited without writing an exit code
1283 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001284 self._handle_pidfile_error(
1285 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001286
showard21baa452008-10-21 00:08:39 +00001287
1288 def _get_pidfile_info(self):
1289 """\
1290 After completion, self._state will contain:
1291 pid=None, exit_status=None if autoserv has not yet run
1292 pid!=None, exit_status=None if autoserv is running
1293 pid!=None, exit_status!=None if autoserv has completed
1294 """
1295 try:
1296 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001297 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001298 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001299
1300
showard170873e2009-01-07 00:22:26 +00001301 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001302 """\
1303 Called when no pidfile is found or no pid is in the pidfile.
1304 """
showard170873e2009-01-07 00:22:26 +00001305 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001306 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001307 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001308 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001309 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001310
1311
showard35162b02009-03-03 02:17:30 +00001312 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001313 """\
1314 Called when autoserv has exited without writing an exit status,
1315 or we've timed out waiting for autoserv to write a pid to the
1316 pidfile. In either case, we just return failure and the caller
1317 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001318
showard170873e2009-01-07 00:22:26 +00001319 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001320 """
1321 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001322 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001323 self._state.exit_status = 1
1324 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001325
1326
jadmanski0afbb632008-06-06 21:10:57 +00001327 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001328 self._get_pidfile_info()
1329 return self._state.exit_status
1330
1331
1332 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001333 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001334 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001335 if self._state.num_tests_failed is None:
1336 return -1
showard21baa452008-10-21 00:08:39 +00001337 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001338
1339
showardcdaeae82009-08-31 18:32:48 +00001340 def try_copy_results_on_drone(self, **kwargs):
1341 if self.has_process():
1342 # copy results logs into the normal place for job results
1343 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1344
1345
1346 def try_copy_to_results_repository(self, source, **kwargs):
1347 if self.has_process():
1348 _drone_manager.copy_to_results_repository(self.get_process(),
1349 source, **kwargs)
1350
1351
mbligh36768f02008-02-22 18:28:33 +00001352class Agent(object):
showard77182562009-06-10 00:16:05 +00001353 """
showard8cc058f2009-09-08 16:26:33 +00001354 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001355
1356 The following methods are required on all task objects:
1357 poll() - Called periodically to let the task check its status and
1358 update its internal state. If the task succeeded.
1359 is_done() - Returns True if the task is finished.
1360 abort() - Called when an abort has been requested. The task must
1361 set its aborted attribute to True if it actually aborted.
1362
1363 The following attributes are required on all task objects:
1364 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001365 success - bool, True if this task succeeded.
1366 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1367 host_ids - A sequence of Host ids this task represents.
1368
1369 The following attribute is written to all task objects:
1370 agent - A reference to the Agent instance that the task has been
1371 added to.
1372 """
1373
1374
showard8cc058f2009-09-08 16:26:33 +00001375 def __init__(self, task, num_processes=1):
showard77182562009-06-10 00:16:05 +00001376 """
showard8cc058f2009-09-08 16:26:33 +00001377 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001378 @param num_processes: The number of subprocesses the Agent represents.
1379 This is used by the Dispatcher for managing the load on the
1380 system. Defaults to 1.
1381 """
showard8cc058f2009-09-08 16:26:33 +00001382 self.task = task
1383 task.agent = self
1384
showard77182562009-06-10 00:16:05 +00001385 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001386 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001387 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001388
showard8cc058f2009-09-08 16:26:33 +00001389 self.queue_entry_ids = task.queue_entry_ids
1390 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001391
showard8cc058f2009-09-08 16:26:33 +00001392 self.started = False
mbligh36768f02008-02-22 18:28:33 +00001393
1394
jadmanski0afbb632008-06-06 21:10:57 +00001395 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001396 self.started = True
1397 if self.task:
1398 self.task.poll()
1399 if self.task.is_done():
1400 self.task = None
showardec113162008-05-08 00:52:49 +00001401
1402
jadmanski0afbb632008-06-06 21:10:57 +00001403 def is_done(self):
showard8cc058f2009-09-08 16:26:33 +00001404 return self.task is None
mbligh36768f02008-02-22 18:28:33 +00001405
1406
showardd3dc1992009-04-22 21:01:40 +00001407 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001408 if self.task:
1409 self.task.abort()
1410 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001411 # tasks can choose to ignore aborts
showard8cc058f2009-09-08 16:26:33 +00001412 self.task = None
showard20f9bdd2009-04-29 19:48:33 +00001413
showardd3dc1992009-04-22 21:01:40 +00001414
showard77182562009-06-10 00:16:05 +00001415class DelayedCallTask(object):
1416 """
1417 A task object like AgentTask for an Agent to run that waits for the
1418 specified amount of time to have elapsed before calling the supplied
1419 callback once and finishing. If the callback returns anything, it is
1420 assumed to be a new Agent instance and will be added to the dispatcher.
1421
1422 @attribute end_time: The absolute posix time after which this task will
1423 call its callback when it is polled and be finished.
1424
1425 Also has all attributes required by the Agent class.
1426 """
1427 def __init__(self, delay_seconds, callback, now_func=None):
1428 """
1429 @param delay_seconds: The delay in seconds from now that this task
1430 will call the supplied callback and be done.
1431 @param callback: A callable to be called by this task once after at
1432 least delay_seconds time has elapsed. It must return None
1433 or a new Agent instance.
1434 @param now_func: A time.time like function. Default: time.time.
1435 Used for testing.
1436 """
1437 assert delay_seconds > 0
1438 assert callable(callback)
1439 if not now_func:
1440 now_func = time.time
1441 self._now_func = now_func
1442 self._callback = callback
1443
1444 self.end_time = self._now_func() + delay_seconds
1445
1446 # These attributes are required by Agent.
1447 self.aborted = False
showard77182562009-06-10 00:16:05 +00001448 self.host_ids = ()
1449 self.success = False
1450 self.queue_entry_ids = ()
1451 # This is filled in by Agent.add_task().
1452 self.agent = None
1453
1454
1455 def poll(self):
showard8cc058f2009-09-08 16:26:33 +00001456 if not self.is_done() and self._now_func() >= self.end_time:
1457 self._callback()
showard77182562009-06-10 00:16:05 +00001458 self.success = True
1459
1460
1461 def is_done(self):
showard8cc058f2009-09-08 16:26:33 +00001462 return self.success or self.aborted
showard77182562009-06-10 00:16:05 +00001463
1464
1465 def abort(self):
1466 self.aborted = True
showard77182562009-06-10 00:16:05 +00001467
1468
mbligh36768f02008-02-22 18:28:33 +00001469class AgentTask(object):
showard8cc058f2009-09-08 16:26:33 +00001470 def __init__(self, cmd=None, working_directory=None,
showarded2afea2009-07-07 20:54:07 +00001471 pidfile_name=None, paired_with_pidfile=None,
1472 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001473 self.done = False
jadmanski0afbb632008-06-06 21:10:57 +00001474 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001475 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001476 self.agent = None
showarded2afea2009-07-07 20:54:07 +00001477 self.monitor = recover_run_monitor
1478 self.started = bool(recover_run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +00001479 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001480 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001481 self.queue_entry_ids = []
1482 self.host_ids = []
1483 self.log_file = None
1484
1485
1486 def _set_ids(self, host=None, queue_entries=None):
1487 if queue_entries and queue_entries != [None]:
1488 self.host_ids = [entry.host.id for entry in queue_entries]
1489 self.queue_entry_ids = [entry.id for entry in queue_entries]
1490 else:
1491 assert host
1492 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001493
1494
jadmanski0afbb632008-06-06 21:10:57 +00001495 def poll(self):
showard08a36412009-05-05 01:01:13 +00001496 if not self.started:
1497 self.start()
1498 self.tick()
1499
1500
1501 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001502 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001503 exit_code = self.monitor.exit_code()
1504 if exit_code is None:
1505 return
1506 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001507 else:
1508 success = False
mbligh36768f02008-02-22 18:28:33 +00001509
jadmanski0afbb632008-06-06 21:10:57 +00001510 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001511
1512
jadmanski0afbb632008-06-06 21:10:57 +00001513 def is_done(self):
1514 return self.done
mbligh36768f02008-02-22 18:28:33 +00001515
1516
jadmanski0afbb632008-06-06 21:10:57 +00001517 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001518 if self.done:
1519 return
jadmanski0afbb632008-06-06 21:10:57 +00001520 self.done = True
1521 self.success = success
1522 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001523
1524
jadmanski0afbb632008-06-06 21:10:57 +00001525 def prolog(self):
showarded2afea2009-07-07 20:54:07 +00001526 assert not self.monitor
mblighd64e5702008-04-04 21:39:28 +00001527
mbligh36768f02008-02-22 18:28:33 +00001528
jadmanski0afbb632008-06-06 21:10:57 +00001529 def cleanup(self):
showardcdaeae82009-08-31 18:32:48 +00001530 if self.monitor and self.log_file:
1531 self.monitor.try_copy_to_results_repository(self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001532
1533
jadmanski0afbb632008-06-06 21:10:57 +00001534 def epilog(self):
1535 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001536
1537
jadmanski0afbb632008-06-06 21:10:57 +00001538 def start(self):
1539 assert self.agent
1540
1541 if not self.started:
1542 self.prolog()
1543 self.run()
1544
1545 self.started = True
1546
1547
1548 def abort(self):
1549 if self.monitor:
1550 self.monitor.kill()
1551 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001552 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001553 self.cleanup()
1554
1555
showarded2afea2009-07-07 20:54:07 +00001556 def _get_consistent_execution_path(self, execution_entries):
1557 first_execution_path = execution_entries[0].execution_path()
1558 for execution_entry in execution_entries[1:]:
1559 assert execution_entry.execution_path() == first_execution_path, (
1560 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1561 execution_entry,
1562 first_execution_path,
1563 execution_entries[0]))
1564 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001565
1566
showarded2afea2009-07-07 20:54:07 +00001567 def _copy_results(self, execution_entries, use_monitor=None):
1568 """
1569 @param execution_entries: list of objects with execution_path() method
1570 """
showard6d1c1432009-08-20 23:30:39 +00001571 if use_monitor is not None and not use_monitor.has_process():
1572 return
1573
showarded2afea2009-07-07 20:54:07 +00001574 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001575 if use_monitor is None:
1576 assert self.monitor
1577 use_monitor = self.monitor
1578 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001579 execution_path = self._get_consistent_execution_path(execution_entries)
1580 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001581 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001582
showarda1e74b32009-05-12 17:32:04 +00001583
1584 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001585 for queue_entry in queue_entries:
1586 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001587
1588
showarda1e74b32009-05-12 17:32:04 +00001589 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1590 self._copy_results(queue_entries, use_monitor)
1591 self._parse_results(queue_entries)
1592
1593
showardd3dc1992009-04-22 21:01:40 +00001594 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
showarded2afea2009-07-07 20:54:07 +00001595 assert not self.monitor
jadmanski0afbb632008-06-06 21:10:57 +00001596 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001597 self.monitor = PidfileRunMonitor()
1598 self.monitor.run(self.cmd, self._working_directory,
1599 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001600 log_file=self.log_file,
1601 pidfile_name=pidfile_name,
1602 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001603
1604
showardd9205182009-04-27 20:09:55 +00001605class TaskWithJobKeyvals(object):
1606 """AgentTask mixin providing functionality to help with job keyval files."""
1607 _KEYVAL_FILE = 'keyval'
1608 def _format_keyval(self, key, value):
1609 return '%s=%s' % (key, value)
1610
1611
1612 def _keyval_path(self):
1613 """Subclasses must override this"""
1614 raise NotImplemented
1615
1616
1617 def _write_keyval_after_job(self, field, value):
1618 assert self.monitor
1619 if not self.monitor.has_process():
1620 return
1621 _drone_manager.write_lines_to_file(
1622 self._keyval_path(), [self._format_keyval(field, value)],
1623 paired_with_process=self.monitor.get_process())
1624
1625
1626 def _job_queued_keyval(self, job):
1627 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1628
1629
1630 def _write_job_finished(self):
1631 self._write_keyval_after_job("job_finished", int(time.time()))
1632
1633
showarddb502762009-09-09 15:31:20 +00001634 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1635 keyval_contents = '\n'.join(self._format_keyval(key, value)
1636 for key, value in keyval_dict.iteritems())
1637 # always end with a newline to allow additional keyvals to be written
1638 keyval_contents += '\n'
1639 _drone_manager.attach_file_to_execution(self._working_directory,
1640 keyval_contents,
1641 file_path=keyval_path)
1642
1643
1644 def _write_keyvals_before_job(self, keyval_dict):
1645 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1646
1647
1648 def _write_host_keyvals(self, host):
1649 keyval_path = os.path.join(self._working_directory, 'host_keyvals',
1650 host.hostname)
1651 platform, all_labels = host.platform_and_labels()
1652 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1653 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1654
1655
showard8cc058f2009-09-08 16:26:33 +00001656class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001657 """
1658 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1659 """
1660
1661 TASK_TYPE = None
1662 host = None
1663 queue_entry = None
1664
1665 def __init__(self, task, extra_command_args, **kwargs):
showarded2afea2009-07-07 20:54:07 +00001666 assert (self.TASK_TYPE is not None,
1667 'self.TASK_TYPE must be overridden')
showard8cc058f2009-09-08 16:26:33 +00001668
1669 self.host = Host(id=task.host.id)
1670 self.queue_entry = None
1671 if task.queue_entry:
1672 self.queue_entry = HostQueueEntry(id=task.queue_entry.id)
1673
showarded2afea2009-07-07 20:54:07 +00001674 self.task = task
showarddb502762009-09-09 15:31:20 +00001675 kwargs['working_directory'] = task.execution_path()
showarded2afea2009-07-07 20:54:07 +00001676 self._extra_command_args = extra_command_args
1677 super(SpecialAgentTask, self).__init__(**kwargs)
1678
1679
showard8cc058f2009-09-08 16:26:33 +00001680 def _keyval_path(self):
1681 return os.path.join(self._working_directory, self._KEYVAL_FILE)
1682
1683
showarded2afea2009-07-07 20:54:07 +00001684 def prolog(self):
1685 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001686 self.cmd = _autoserv_command_line(self.host.hostname,
1687 self._extra_command_args,
1688 queue_entry=self.queue_entry)
1689 self._working_directory = self.task.execution_path()
1690 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001691 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001692
1693
showardde634ee2009-01-30 01:44:24 +00001694 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001695 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001696
showard2fe3f1d2009-07-06 20:19:11 +00001697 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001698 return # don't fail metahost entries, they'll be reassigned
1699
showard2fe3f1d2009-07-06 20:19:11 +00001700 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001701 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001702 return # entry has been aborted
1703
showard2fe3f1d2009-07-06 20:19:11 +00001704 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001705 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001706 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001707 self._write_keyval_after_job(queued_key, queued_time)
1708 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001709
showard8cc058f2009-09-08 16:26:33 +00001710 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001711 self.monitor.try_copy_results_on_drone(
1712 source_path=self._working_directory + '/',
1713 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001714
showard2fe3f1d2009-07-06 20:19:11 +00001715 self._copy_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001716 self.queue_entry.handle_host_failure()
showard2fe3f1d2009-07-06 20:19:11 +00001717 if self.queue_entry.job.parse_failed_repair:
1718 self._parse_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001719
1720 pidfile_id = _drone_manager.get_pidfile_id_from(
1721 self.queue_entry.execution_path(),
1722 pidfile_name=_AUTOSERV_PID_FILE)
1723 _drone_manager.register_pidfile(pidfile_id)
1724
1725
1726 def cleanup(self):
1727 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001728
1729 # We will consider an aborted task to be "Failed"
1730 self.task.finish(bool(self.success))
1731
showardf85a0b72009-10-07 20:48:45 +00001732 if self.monitor:
1733 if self.monitor.has_process():
1734 self._copy_results([self.task])
1735 if self.monitor.pidfile_id is not None:
1736 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001737
1738
1739class RepairTask(SpecialAgentTask):
1740 TASK_TYPE = models.SpecialTask.Task.REPAIR
1741
1742
1743 def __init__(self, task, recover_run_monitor=None):
1744 """\
1745 queue_entry: queue entry to mark failed if this repair fails.
1746 """
1747 protection = host_protections.Protection.get_string(
1748 task.host.protection)
1749 # normalize the protection name
1750 protection = host_protections.Protection.get_attr_name(protection)
1751
1752 super(RepairTask, self).__init__(
1753 task, ['-R', '--host-protection', protection],
1754 recover_run_monitor=recover_run_monitor)
1755
1756 # *don't* include the queue entry in IDs -- if the queue entry is
1757 # aborted, we want to leave the repair task running
1758 self._set_ids(host=self.host)
1759
1760
1761 def prolog(self):
1762 super(RepairTask, self).prolog()
1763 logging.info("repair_task starting")
1764 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001765
1766
jadmanski0afbb632008-06-06 21:10:57 +00001767 def epilog(self):
1768 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001769
jadmanski0afbb632008-06-06 21:10:57 +00001770 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001771 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001772 else:
showard8cc058f2009-09-08 16:26:33 +00001773 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001774 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001775 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001776
1777
showarded2afea2009-07-07 20:54:07 +00001778class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001779 def _copy_to_results_repository(self):
1780 if not self.queue_entry or self.queue_entry.meta_host:
1781 return
1782
1783 self.queue_entry.set_execution_subdir()
1784 log_name = os.path.basename(self.task.execution_path())
1785 source = os.path.join(self.task.execution_path(), 'debug',
1786 'autoserv.DEBUG')
1787 destination = os.path.join(
1788 self.queue_entry.execution_path(), log_name)
1789
1790 self.monitor.try_copy_to_results_repository(
1791 source, destination_path=destination)
1792
1793
showard170873e2009-01-07 00:22:26 +00001794 def epilog(self):
1795 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001796
showard775300b2009-09-09 15:30:50 +00001797 if self.success:
1798 return
showard8fe93b52008-11-18 17:53:22 +00001799
showard775300b2009-09-09 15:30:50 +00001800 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001801
showard775300b2009-09-09 15:30:50 +00001802 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001803 # effectively ignore failure for these hosts
1804 self.success = True
showard775300b2009-09-09 15:30:50 +00001805 return
1806
1807 if self.queue_entry:
1808 self.queue_entry.requeue()
1809
1810 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001811 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001812 queue_entry__id=self.queue_entry.id):
1813 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1814 self._fail_queue_entry()
1815 return
1816
1817 queue_entry = models.HostQueueEntry(id=self.queue_entry.id)
1818 else:
1819 queue_entry = None
1820
1821 models.SpecialTask.objects.create(
1822 host=models.Host(id=self.host.id),
1823 task=models.SpecialTask.Task.REPAIR,
1824 queue_entry=queue_entry)
showard58721a82009-08-20 23:32:40 +00001825
showard8fe93b52008-11-18 17:53:22 +00001826
1827class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001828 TASK_TYPE = models.SpecialTask.Task.VERIFY
1829
1830
showard8cc058f2009-09-08 16:26:33 +00001831 def __init__(self, task, recover_run_monitor=None):
showarded2afea2009-07-07 20:54:07 +00001832 super(VerifyTask, self).__init__(
showard8cc058f2009-09-08 16:26:33 +00001833 task, ['-v'], recover_run_monitor=recover_run_monitor)
mblighe2586682008-02-29 22:45:46 +00001834
showard8cc058f2009-09-08 16:26:33 +00001835 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001836
1837
jadmanski0afbb632008-06-06 21:10:57 +00001838 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001839 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001840
showardb18134f2009-03-20 20:52:18 +00001841 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001842 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001843 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1844 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001845
showarded2afea2009-07-07 20:54:07 +00001846 # Delete any other queued verifies for this host. One verify will do
1847 # and there's no need to keep records of other requests.
1848 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001849 host__id=self.host.id,
1850 task=models.SpecialTask.Task.VERIFY,
showarded2afea2009-07-07 20:54:07 +00001851 is_active=False, is_complete=False)
1852 queued_verifies = queued_verifies.exclude(id=self.task.id)
1853 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001854
mbligh36768f02008-02-22 18:28:33 +00001855
jadmanski0afbb632008-06-06 21:10:57 +00001856 def epilog(self):
1857 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001858 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001859 if self.queue_entry:
1860 self.queue_entry.on_pending()
1861 else:
1862 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001863
1864
showardb5626452009-06-30 01:57:28 +00001865class CleanupHostsMixin(object):
1866 def _reboot_hosts(self, job, queue_entries, final_success,
1867 num_tests_failed):
1868 reboot_after = job.reboot_after
1869 do_reboot = (
1870 # always reboot after aborted jobs
1871 self._final_status == models.HostQueueEntry.Status.ABORTED
1872 or reboot_after == models.RebootAfter.ALWAYS
1873 or (reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED
1874 and final_success and num_tests_failed == 0))
1875
1876 for queue_entry in queue_entries:
1877 if do_reboot:
1878 # don't pass the queue entry to the CleanupTask. if the cleanup
1879 # fails, the job doesn't care -- it's over.
showard8cc058f2009-09-08 16:26:33 +00001880 models.SpecialTask.objects.create(
1881 host=models.Host(id=queue_entry.host.id),
1882 task=models.SpecialTask.Task.CLEANUP)
showardb5626452009-06-30 01:57:28 +00001883 else:
showard8cc058f2009-09-08 16:26:33 +00001884 queue_entry.host.set_status(models.Host.Status.READY)
showardb5626452009-06-30 01:57:28 +00001885
1886
1887class QueueTask(AgentTask, TaskWithJobKeyvals, CleanupHostsMixin):
showard8cc058f2009-09-08 16:26:33 +00001888 def __init__(self, job, queue_entries, cmd=None, recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001889 self.job = job
1890 self.queue_entries = queue_entries
showard8cc058f2009-09-08 16:26:33 +00001891 self.group_name = queue_entries[0].get_group_name()
showarded2afea2009-07-07 20:54:07 +00001892 super(QueueTask, self).__init__(
1893 cmd, self._execution_path(),
1894 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00001895 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001896
1897
showard73ec0442009-02-07 02:05:20 +00001898 def _keyval_path(self):
showarded2afea2009-07-07 20:54:07 +00001899 return os.path.join(self._execution_path(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001900
1901
showarded2afea2009-07-07 20:54:07 +00001902 def _execution_path(self):
1903 return self.queue_entries[0].execution_path()
mblighbb421852008-03-11 22:36:16 +00001904
1905
jadmanski0afbb632008-06-06 21:10:57 +00001906 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00001907 for entry in self.queue_entries:
1908 if entry.status not in (models.HostQueueEntry.Status.STARTING,
1909 models.HostQueueEntry.Status.RUNNING):
1910 raise SchedulerError('Queue task attempting to start '
1911 'entry with invalid status %s: %s'
1912 % (entry.status, entry))
1913 if entry.host.status not in (models.Host.Status.PENDING,
1914 models.Host.Status.RUNNING):
1915 raise SchedulerError('Queue task attempting to start on queue '
1916 'entry with invalid host status %s: %s'
1917 % (entry.host.status, entry))
1918
showardd9205182009-04-27 20:09:55 +00001919 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001920 keyval_dict = {queued_key: queued_time}
1921 if self.group_name:
1922 keyval_dict['host_group_name'] = self.group_name
1923 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001924 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001925 self._write_host_keyvals(queue_entry.host)
showard8cc058f2009-09-08 16:26:33 +00001926 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showard12f3e322009-05-13 21:27:42 +00001927 queue_entry.update_field('started_on', datetime.datetime.now())
showard8cc058f2009-09-08 16:26:33 +00001928 queue_entry.host.set_status(models.Host.Status.RUNNING)
showard21baa452008-10-21 00:08:39 +00001929 queue_entry.host.update_field('dirty', 1)
showardc6a56872009-07-28 20:11:58 +00001930 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
1931 # TODO(gps): Remove this if nothing needs it anymore.
1932 # A potential user is: tko/parser
jadmanski0afbb632008-06-06 21:10:57 +00001933 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001934
1935
showard35162b02009-03-03 02:17:30 +00001936 def _write_lost_process_error_file(self):
showarded2afea2009-07-07 20:54:07 +00001937 error_file_path = os.path.join(self._execution_path(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001938 _drone_manager.write_lines_to_file(error_file_path,
1939 [_LOST_PROCESS_ERROR])
1940
1941
showardd3dc1992009-04-22 21:01:40 +00001942 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001943 if not self.monitor:
1944 return
1945
showardd9205182009-04-27 20:09:55 +00001946 self._write_job_finished()
1947
showard35162b02009-03-03 02:17:30 +00001948 if self.monitor.lost_process:
1949 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001950
showard8cc058f2009-09-08 16:26:33 +00001951 for queue_entry in self.queue_entries:
1952 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001953
1954
showardcbd74612008-11-19 21:42:02 +00001955 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001956 _drone_manager.write_lines_to_file(
showarded2afea2009-07-07 20:54:07 +00001957 os.path.join(self._execution_path(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001958 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001959 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001960
1961
jadmanskif7fa2cc2008-10-01 14:13:23 +00001962 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001963 if not self.monitor or not self.monitor.has_process():
1964 return
1965
jadmanskif7fa2cc2008-10-01 14:13:23 +00001966 # build up sets of all the aborted_by and aborted_on values
1967 aborted_by, aborted_on = set(), set()
1968 for queue_entry in self.queue_entries:
1969 if queue_entry.aborted_by:
1970 aborted_by.add(queue_entry.aborted_by)
1971 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1972 aborted_on.add(t)
1973
1974 # extract some actual, unique aborted by value and write it out
1975 assert len(aborted_by) <= 1
1976 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001977 aborted_by_value = aborted_by.pop()
1978 aborted_on_value = max(aborted_on)
1979 else:
1980 aborted_by_value = 'autotest_system'
1981 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001982
showarda0382352009-02-11 23:36:43 +00001983 self._write_keyval_after_job("aborted_by", aborted_by_value)
1984 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001985
showardcbd74612008-11-19 21:42:02 +00001986 aborted_on_string = str(datetime.datetime.fromtimestamp(
1987 aborted_on_value))
1988 self._write_status_comment('Job aborted by %s on %s' %
1989 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001990
1991
jadmanski0afbb632008-06-06 21:10:57 +00001992 def abort(self):
1993 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001994 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001995 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001996
1997
jadmanski0afbb632008-06-06 21:10:57 +00001998 def epilog(self):
1999 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002000 self._finish_task()
2001 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00002002
2003
showardd3dc1992009-04-22 21:01:40 +00002004class PostJobTask(AgentTask):
2005 def __init__(self, queue_entries, pidfile_name, logfile_name,
showarded2afea2009-07-07 20:54:07 +00002006 recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00002007 self._queue_entries = queue_entries
2008 self._pidfile_name = pidfile_name
showardd3dc1992009-04-22 21:01:40 +00002009
showarded2afea2009-07-07 20:54:07 +00002010 self._execution_path = self._get_consistent_execution_path(
2011 queue_entries)
2012 self._results_dir = _drone_manager.absolute_path(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00002013 self._autoserv_monitor = PidfileRunMonitor()
showarded2afea2009-07-07 20:54:07 +00002014 self._autoserv_monitor.attach_to_existing_process(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00002015 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
2016
2017 if _testing_mode:
2018 command = 'true'
2019 else:
2020 command = self._generate_command(self._results_dir)
2021
showarded2afea2009-07-07 20:54:07 +00002022 super(PostJobTask, self).__init__(
2023 cmd=command, working_directory=self._execution_path,
2024 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00002025
showarded2afea2009-07-07 20:54:07 +00002026 self.log_file = os.path.join(self._execution_path, logfile_name)
showardd3dc1992009-04-22 21:01:40 +00002027 self._final_status = self._determine_final_status()
2028
2029
2030 def _generate_command(self, results_dir):
2031 raise NotImplementedError('Subclasses must override this')
2032
2033
2034 def _job_was_aborted(self):
2035 was_aborted = None
2036 for queue_entry in self._queue_entries:
2037 queue_entry.update_from_database()
2038 if was_aborted is None: # first queue entry
2039 was_aborted = bool(queue_entry.aborted)
2040 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
2041 email_manager.manager.enqueue_notify_email(
2042 'Inconsistent abort state',
2043 'Queue entries have inconsistent abort state: ' +
2044 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
2045 # don't crash here, just assume true
2046 return True
2047 return was_aborted
2048
2049
2050 def _determine_final_status(self):
2051 if self._job_was_aborted():
2052 return models.HostQueueEntry.Status.ABORTED
2053
2054 # we'll use a PidfileRunMonitor to read the autoserv exit status
2055 if self._autoserv_monitor.exit_code() == 0:
2056 return models.HostQueueEntry.Status.COMPLETED
2057 return models.HostQueueEntry.Status.FAILED
2058
2059
2060 def run(self):
showard8cc058f2009-09-08 16:26:33 +00002061 # Make sure we actually have results to work with.
2062 # This should never happen in normal operation.
showard5add1c82009-05-26 19:27:46 +00002063 if not self._autoserv_monitor.has_process():
2064 email_manager.manager.enqueue_notify_email(
showard8cc058f2009-09-08 16:26:33 +00002065 'No results in post-job task',
2066 'No results in post-job task at %s' %
2067 self._autoserv_monitor.pidfile_id)
showard5add1c82009-05-26 19:27:46 +00002068 self.finished(False)
2069 return
2070
2071 super(PostJobTask, self).run(
2072 pidfile_name=self._pidfile_name,
2073 paired_with_pidfile=self._paired_with_pidfile)
showardd3dc1992009-04-22 21:01:40 +00002074
2075
2076 def _set_all_statuses(self, status):
2077 for queue_entry in self._queue_entries:
2078 queue_entry.set_status(status)
2079
2080
2081 def abort(self):
2082 # override AgentTask.abort() to avoid killing the process and ending
2083 # the task. post-job tasks continue when the job is aborted.
2084 pass
2085
2086
showardb5626452009-06-30 01:57:28 +00002087class GatherLogsTask(PostJobTask, CleanupHostsMixin):
showardd3dc1992009-04-22 21:01:40 +00002088 """
2089 Task responsible for
2090 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2091 * copying logs to the results repository
2092 * spawning CleanupTasks for hosts, if necessary
2093 * spawning a FinalReparseTask for the job
2094 """
showarded2afea2009-07-07 20:54:07 +00002095 def __init__(self, job, queue_entries, recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00002096 self._job = job
2097 super(GatherLogsTask, self).__init__(
2098 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
showarded2afea2009-07-07 20:54:07 +00002099 logfile_name='.collect_crashinfo.log',
2100 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00002101 self._set_ids(queue_entries=queue_entries)
2102
2103
2104 def _generate_command(self, results_dir):
2105 host_list = ','.join(queue_entry.host.hostname
2106 for queue_entry in self._queue_entries)
2107 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
2108 '-r', results_dir]
2109
2110
2111 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00002112 for queue_entry in self._queue_entries:
2113 if queue_entry.status != models.HostQueueEntry.Status.GATHERING:
2114 raise SchedulerError('Gather task attempting to start on '
2115 'non-gathering entry: %s' % queue_entry)
2116 if queue_entry.host.status != models.Host.Status.RUNNING:
2117 raise SchedulerError('Gather task attempting to start on queue '
showard0c5c18d2009-09-24 22:20:45 +00002118 'entry with non-running host status %s: %s'
2119 % (queue_entry.host.status, queue_entry))
showard8cc058f2009-09-08 16:26:33 +00002120
showardd3dc1992009-04-22 21:01:40 +00002121 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002122
2123
showardd3dc1992009-04-22 21:01:40 +00002124 def epilog(self):
2125 super(GatherLogsTask, self).epilog()
showardb5626452009-06-30 01:57:28 +00002126
showard6d1c1432009-08-20 23:30:39 +00002127 self._copy_and_parse_results(self._queue_entries,
2128 use_monitor=self._autoserv_monitor)
2129
2130 if self._autoserv_monitor.has_process():
2131 final_success = (self._final_status ==
2132 models.HostQueueEntry.Status.COMPLETED)
2133 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2134 else:
2135 final_success = False
2136 num_tests_failed = 0
2137
showardb5626452009-06-30 01:57:28 +00002138 self._reboot_hosts(self._job, self._queue_entries, final_success,
2139 num_tests_failed)
showardd3dc1992009-04-22 21:01:40 +00002140
2141
showard0bbfc212009-04-29 21:06:13 +00002142 def run(self):
showard597bfd32009-05-08 18:22:50 +00002143 autoserv_exit_code = self._autoserv_monitor.exit_code()
2144 # only run if Autoserv exited due to some signal. if we have no exit
2145 # code, assume something bad (and signal-like) happened.
2146 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002147 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002148 else:
2149 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002150
2151
showard8fe93b52008-11-18 17:53:22 +00002152class CleanupTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002153 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2154
2155
showard8cc058f2009-09-08 16:26:33 +00002156 def __init__(self, task, recover_run_monitor=None):
showarded2afea2009-07-07 20:54:07 +00002157 super(CleanupTask, self).__init__(
showard8cc058f2009-09-08 16:26:33 +00002158 task, ['--cleanup'], recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002159
showard8cc058f2009-09-08 16:26:33 +00002160 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mbligh16c722d2008-03-05 00:58:44 +00002161
mblighd5c95802008-03-05 00:33:46 +00002162
jadmanski0afbb632008-06-06 21:10:57 +00002163 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002164 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00002165 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard8cc058f2009-09-08 16:26:33 +00002166 self.host.set_status(models.Host.Status.CLEANING)
2167 if self.queue_entry:
2168 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2169
2170
showard775300b2009-09-09 15:30:50 +00002171 def _finish_epilog(self):
showard7b2d7cb2009-10-28 19:53:03 +00002172 if not self.queue_entry or not self.success:
showard775300b2009-09-09 15:30:50 +00002173 return
2174
showard7b2d7cb2009-10-28 19:53:03 +00002175 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
2176 should_run_verify = (
2177 self.queue_entry.job.run_verify
2178 and self.host.protection != do_not_verify_protection)
2179 if should_run_verify:
2180 entry = models.HostQueueEntry(id=self.queue_entry.id)
2181 models.SpecialTask.objects.create(
2182 host=models.Host(id=self.host.id),
2183 queue_entry=entry,
2184 task=models.SpecialTask.Task.VERIFY)
2185 else:
showard775300b2009-09-09 15:30:50 +00002186 self.queue_entry.on_pending()
mblighd5c95802008-03-05 00:33:46 +00002187
mblighd5c95802008-03-05 00:33:46 +00002188
showard21baa452008-10-21 00:08:39 +00002189 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00002190 super(CleanupTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002191
showard21baa452008-10-21 00:08:39 +00002192 if self.success:
2193 self.host.update_field('dirty', 0)
showard775300b2009-09-09 15:30:50 +00002194 self.host.set_status(models.Host.Status.READY)
showard21baa452008-10-21 00:08:39 +00002195
showard775300b2009-09-09 15:30:50 +00002196 self._finish_epilog()
showard8cc058f2009-09-08 16:26:33 +00002197
showard21baa452008-10-21 00:08:39 +00002198
showardd3dc1992009-04-22 21:01:40 +00002199class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00002200 _num_running_parses = 0
2201
showarded2afea2009-07-07 20:54:07 +00002202 def __init__(self, queue_entries, recover_run_monitor=None):
2203 super(FinalReparseTask, self).__init__(
2204 queue_entries, pidfile_name=_PARSER_PID_FILE,
2205 logfile_name='.parse.log',
2206 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002207 # don't use _set_ids, since we don't want to set the host_ids
2208 self.queue_entry_ids = [entry.id for entry in queue_entries]
showarded2afea2009-07-07 20:54:07 +00002209 self._parse_started = self.started
showard97aed502008-11-04 02:01:24 +00002210
showard97aed502008-11-04 02:01:24 +00002211
2212 @classmethod
2213 def _increment_running_parses(cls):
2214 cls._num_running_parses += 1
2215
2216
2217 @classmethod
2218 def _decrement_running_parses(cls):
2219 cls._num_running_parses -= 1
2220
2221
2222 @classmethod
2223 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002224 return (cls._num_running_parses <
2225 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002226
2227
2228 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00002229 for queue_entry in self._queue_entries:
2230 if queue_entry.status != models.HostQueueEntry.Status.PARSING:
2231 raise SchedulerError('Parse task attempting to start on '
2232 'non-parsing entry: %s' % queue_entry)
2233
showard97aed502008-11-04 02:01:24 +00002234 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002235
2236
2237 def epilog(self):
2238 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002239 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00002240
2241
showardd3dc1992009-04-22 21:01:40 +00002242 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00002243 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00002244 results_dir]
showard97aed502008-11-04 02:01:24 +00002245
2246
showard08a36412009-05-05 01:01:13 +00002247 def tick(self):
2248 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002249 # and we can, at which point we revert to default behavior
2250 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00002251 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002252 else:
2253 self._try_starting_parse()
2254
2255
2256 def run(self):
2257 # override run() to not actually run unless we can
2258 self._try_starting_parse()
2259
2260
2261 def _try_starting_parse(self):
2262 if not self._can_run_new_parse():
2263 return
showard170873e2009-01-07 00:22:26 +00002264
showard97aed502008-11-04 02:01:24 +00002265 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002266 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00002267
showard97aed502008-11-04 02:01:24 +00002268 self._increment_running_parses()
2269 self._parse_started = True
2270
2271
2272 def finished(self, success):
2273 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00002274 if self._parse_started:
2275 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002276
2277
showarda3c58572009-03-12 20:36:59 +00002278class DBError(Exception):
2279 """Raised by the DBObject constructor when its select fails."""
2280
2281
mbligh36768f02008-02-22 18:28:33 +00002282class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002283 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002284
2285 # Subclasses MUST override these:
2286 _table_name = ''
2287 _fields = ()
2288
showarda3c58572009-03-12 20:36:59 +00002289 # A mapping from (type, id) to the instance of the object for that
2290 # particular id. This prevents us from creating new Job() and Host()
2291 # instances for every HostQueueEntry object that we instantiate as
2292 # multiple HQEs often share the same Job.
2293 _instances_by_type_and_id = weakref.WeakValueDictionary()
2294 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002295
showarda3c58572009-03-12 20:36:59 +00002296
2297 def __new__(cls, id=None, **kwargs):
2298 """
2299 Look to see if we already have an instance for this particular type
2300 and id. If so, use it instead of creating a duplicate instance.
2301 """
2302 if id is not None:
2303 instance = cls._instances_by_type_and_id.get((cls, id))
2304 if instance:
2305 return instance
2306 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2307
2308
2309 def __init__(self, id=None, row=None, new_record=False, always_query=True):
showard8cc058f2009-09-08 16:26:33 +00002310 assert bool(id) or bool(row)
2311 if id is not None and row is not None:
2312 assert id == row[0]
showard6ae5ea92009-02-25 00:11:51 +00002313 assert self._table_name, '_table_name must be defined in your class'
2314 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002315 if not new_record:
2316 if self._initialized and not always_query:
2317 return # We've already been initialized.
2318 if id is None:
2319 id = row[0]
2320 # Tell future constructors to use us instead of re-querying while
2321 # this instance is still around.
2322 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002323
showard6ae5ea92009-02-25 00:11:51 +00002324 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002325
jadmanski0afbb632008-06-06 21:10:57 +00002326 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002327
jadmanski0afbb632008-06-06 21:10:57 +00002328 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002329 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002330
showarda3c58572009-03-12 20:36:59 +00002331 if self._initialized:
2332 differences = self._compare_fields_in_row(row)
2333 if differences:
showard7629f142009-03-27 21:02:02 +00002334 logging.warn(
2335 'initialized %s %s instance requery is updating: %s',
2336 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002337 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002338 self._initialized = True
2339
2340
2341 @classmethod
2342 def _clear_instance_cache(cls):
2343 """Used for testing, clear the internal instance cache."""
2344 cls._instances_by_type_and_id.clear()
2345
2346
showardccbd6c52009-03-21 00:10:21 +00002347 def _fetch_row_from_db(self, row_id):
2348 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2349 rows = _db.execute(sql, (row_id,))
2350 if not rows:
showard76e29d12009-04-15 21:53:10 +00002351 raise DBError("row not found (table=%s, row id=%s)"
2352 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002353 return rows[0]
2354
2355
showarda3c58572009-03-12 20:36:59 +00002356 def _assert_row_length(self, row):
2357 assert len(row) == len(self._fields), (
2358 "table = %s, row = %s/%d, fields = %s/%d" % (
2359 self.__table, row, len(row), self._fields, len(self._fields)))
2360
2361
2362 def _compare_fields_in_row(self, row):
2363 """
showarddae680a2009-10-12 20:26:43 +00002364 Given a row as returned by a SELECT query, compare it to our existing in
2365 memory fields. Fractional seconds are stripped from datetime values
2366 before comparison.
showarda3c58572009-03-12 20:36:59 +00002367
2368 @param row - A sequence of values corresponding to fields named in
2369 The class attribute _fields.
2370
2371 @returns A dictionary listing the differences keyed by field name
2372 containing tuples of (current_value, row_value).
2373 """
2374 self._assert_row_length(row)
2375 differences = {}
showarddae680a2009-10-12 20:26:43 +00002376 datetime_cmp_fmt = '%Y-%m-%d %H:%M:%S' # Leave off the microseconds.
showarda3c58572009-03-12 20:36:59 +00002377 for field, row_value in itertools.izip(self._fields, row):
2378 current_value = getattr(self, field)
showarddae680a2009-10-12 20:26:43 +00002379 if (isinstance(current_value, datetime.datetime)
2380 and isinstance(row_value, datetime.datetime)):
2381 current_value = current_value.strftime(datetime_cmp_fmt)
2382 row_value = row_value.strftime(datetime_cmp_fmt)
showarda3c58572009-03-12 20:36:59 +00002383 if current_value != row_value:
2384 differences[field] = (current_value, row_value)
2385 return differences
showard2bab8f42008-11-12 18:15:22 +00002386
2387
2388 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002389 """
2390 Update our field attributes using a single row returned by SELECT.
2391
2392 @param row - A sequence of values corresponding to fields named in
2393 the class fields list.
2394 """
2395 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002396
showard2bab8f42008-11-12 18:15:22 +00002397 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002398 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002399 setattr(self, field, value)
2400 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002401
showard2bab8f42008-11-12 18:15:22 +00002402 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002403
mblighe2586682008-02-29 22:45:46 +00002404
showardccbd6c52009-03-21 00:10:21 +00002405 def update_from_database(self):
2406 assert self.id is not None
2407 row = self._fetch_row_from_db(self.id)
2408 self._update_fields_from_row(row)
2409
2410
jadmanski0afbb632008-06-06 21:10:57 +00002411 def count(self, where, table = None):
2412 if not table:
2413 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002414
jadmanski0afbb632008-06-06 21:10:57 +00002415 rows = _db.execute("""
2416 SELECT count(*) FROM %s
2417 WHERE %s
2418 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002419
jadmanski0afbb632008-06-06 21:10:57 +00002420 assert len(rows) == 1
2421
2422 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002423
2424
showardd3dc1992009-04-22 21:01:40 +00002425 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002426 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002427
showard2bab8f42008-11-12 18:15:22 +00002428 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002429 return
mbligh36768f02008-02-22 18:28:33 +00002430
mblighf8c624d2008-07-03 16:58:45 +00002431 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002432 _db.execute(query, (value, self.id))
2433
showard2bab8f42008-11-12 18:15:22 +00002434 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002435
2436
jadmanski0afbb632008-06-06 21:10:57 +00002437 def save(self):
2438 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002439 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002440 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002441 values = []
2442 for key in keys:
2443 value = getattr(self, key)
2444 if value is None:
2445 values.append('NULL')
2446 else:
2447 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002448 values_str = ','.join(values)
2449 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2450 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002451 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002452 # Update our id to the one the database just assigned to us.
2453 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002454
2455
jadmanski0afbb632008-06-06 21:10:57 +00002456 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002457 self._instances_by_type_and_id.pop((type(self), id), None)
2458 self._initialized = False
2459 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002460 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2461 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002462
2463
showard63a34772008-08-18 19:32:50 +00002464 @staticmethod
2465 def _prefix_with(string, prefix):
2466 if string:
2467 string = prefix + string
2468 return string
2469
2470
jadmanski0afbb632008-06-06 21:10:57 +00002471 @classmethod
showard989f25d2008-10-01 11:38:11 +00002472 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002473 """
2474 Construct instances of our class based on the given database query.
2475
2476 @yields One class instance for each row fetched.
2477 """
showard63a34772008-08-18 19:32:50 +00002478 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2479 where = cls._prefix_with(where, 'WHERE ')
2480 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002481 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002482 'joins' : joins,
2483 'where' : where,
2484 'order_by' : order_by})
2485 rows = _db.execute(query, params)
showard8cc058f2009-09-08 16:26:33 +00002486 return [cls(id=row[0], row=row) for row in rows]
mblighe2586682008-02-29 22:45:46 +00002487
mbligh36768f02008-02-22 18:28:33 +00002488
2489class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002490 _table_name = 'ineligible_host_queues'
2491 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002492
2493
showard89f84db2009-03-12 20:39:13 +00002494class AtomicGroup(DBObject):
2495 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002496 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2497 'invalid')
showard89f84db2009-03-12 20:39:13 +00002498
2499
showard989f25d2008-10-01 11:38:11 +00002500class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002501 _table_name = 'labels'
2502 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002503 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002504
2505
showard6157c632009-07-06 20:19:31 +00002506 def __repr__(self):
2507 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
2508 self.name, self.id, self.atomic_group_id)
2509
2510
mbligh36768f02008-02-22 18:28:33 +00002511class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002512 _table_name = 'hosts'
2513 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2514 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2515
2516
jadmanski0afbb632008-06-06 21:10:57 +00002517 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002518 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002519 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002520
2521
showard170873e2009-01-07 00:22:26 +00002522 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002523 """
showard170873e2009-01-07 00:22:26 +00002524 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002525 """
2526 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002527 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002528 FROM labels
2529 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002530 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002531 ORDER BY labels.name
2532 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002533 platform = None
2534 all_labels = []
2535 for label_name, is_platform in rows:
2536 if is_platform:
2537 platform = label_name
2538 all_labels.append(label_name)
2539 return platform, all_labels
2540
2541
showard54c1ea92009-05-20 00:32:58 +00002542 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2543
2544
2545 @classmethod
2546 def cmp_for_sort(cls, a, b):
2547 """
2548 A comparison function for sorting Host objects by hostname.
2549
2550 This strips any trailing numeric digits, ignores leading 0s and
2551 compares hostnames by the leading name and the trailing digits as a
2552 number. If both hostnames do not match this pattern, they are simply
2553 compared as lower case strings.
2554
2555 Example of how hostnames will be sorted:
2556
2557 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2558
2559 This hopefully satisfy most people's hostname sorting needs regardless
2560 of their exact naming schemes. Nobody sane should have both a host10
2561 and host010 (but the algorithm works regardless).
2562 """
2563 lower_a = a.hostname.lower()
2564 lower_b = b.hostname.lower()
2565 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2566 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2567 if match_a and match_b:
2568 name_a, number_a_str = match_a.groups()
2569 name_b, number_b_str = match_b.groups()
2570 number_a = int(number_a_str.lstrip('0'))
2571 number_b = int(number_b_str.lstrip('0'))
2572 result = cmp((name_a, number_a), (name_b, number_b))
2573 if result == 0 and lower_a != lower_b:
2574 # If they compared equal above but the lower case names are
2575 # indeed different, don't report equality. abc012 != abc12.
2576 return cmp(lower_a, lower_b)
2577 return result
2578 else:
2579 return cmp(lower_a, lower_b)
2580
2581
mbligh36768f02008-02-22 18:28:33 +00002582class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002583 _table_name = 'host_queue_entries'
2584 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002585 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002586 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002587
2588
showarda3c58572009-03-12 20:36:59 +00002589 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002590 assert id or row
showarda3c58572009-03-12 20:36:59 +00002591 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002592 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002593
jadmanski0afbb632008-06-06 21:10:57 +00002594 if self.host_id:
2595 self.host = Host(self.host_id)
2596 else:
2597 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002598
showard77182562009-06-10 00:16:05 +00002599 if self.atomic_group_id:
2600 self.atomic_group = AtomicGroup(self.atomic_group_id,
2601 always_query=False)
2602 else:
2603 self.atomic_group = None
2604
showard170873e2009-01-07 00:22:26 +00002605 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002606 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002607
2608
showard89f84db2009-03-12 20:39:13 +00002609 @classmethod
2610 def clone(cls, template):
2611 """
2612 Creates a new row using the values from a template instance.
2613
2614 The new instance will not exist in the database or have a valid
2615 id attribute until its save() method is called.
2616 """
2617 assert isinstance(template, cls)
2618 new_row = [getattr(template, field) for field in cls._fields]
2619 clone = cls(row=new_row, new_record=True)
2620 clone.id = None
2621 return clone
2622
2623
showardc85c21b2008-11-24 22:17:37 +00002624 def _view_job_url(self):
2625 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2626
2627
showardf1ae3542009-05-11 19:26:02 +00002628 def get_labels(self):
2629 """
2630 Get all labels associated with this host queue entry (either via the
2631 meta_host or as a job dependency label). The labels yielded are not
2632 guaranteed to be unique.
2633
2634 @yields Label instances associated with this host_queue_entry.
2635 """
2636 if self.meta_host:
2637 yield Label(id=self.meta_host, always_query=False)
2638 labels = Label.fetch(
2639 joins="JOIN jobs_dependency_labels AS deps "
2640 "ON (labels.id = deps.label_id)",
2641 where="deps.job_id = %d" % self.job.id)
2642 for label in labels:
2643 yield label
2644
2645
jadmanski0afbb632008-06-06 21:10:57 +00002646 def set_host(self, host):
2647 if host:
2648 self.queue_log_record('Assigning host ' + host.hostname)
2649 self.update_field('host_id', host.id)
2650 self.update_field('active', True)
2651 self.block_host(host.id)
2652 else:
2653 self.queue_log_record('Releasing host')
2654 self.unblock_host(self.host.id)
2655 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002656
jadmanski0afbb632008-06-06 21:10:57 +00002657 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002658
2659
jadmanski0afbb632008-06-06 21:10:57 +00002660 def get_host(self):
2661 return self.host
mbligh36768f02008-02-22 18:28:33 +00002662
2663
jadmanski0afbb632008-06-06 21:10:57 +00002664 def queue_log_record(self, log_line):
2665 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002666 _drone_manager.write_lines_to_file(self.queue_log_path,
2667 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002668
2669
jadmanski0afbb632008-06-06 21:10:57 +00002670 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002671 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002672 row = [0, self.job.id, host_id]
2673 block = IneligibleHostQueue(row=row, new_record=True)
2674 block.save()
mblighe2586682008-02-29 22:45:46 +00002675
2676
jadmanski0afbb632008-06-06 21:10:57 +00002677 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002678 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002679 blocks = IneligibleHostQueue.fetch(
2680 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2681 for block in blocks:
2682 block.delete()
mblighe2586682008-02-29 22:45:46 +00002683
2684
showard2bab8f42008-11-12 18:15:22 +00002685 def set_execution_subdir(self, subdir=None):
2686 if subdir is None:
2687 assert self.get_host()
2688 subdir = self.get_host().hostname
2689 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002690
2691
showard6355f6b2008-12-05 18:52:13 +00002692 def _get_hostname(self):
2693 if self.host:
2694 return self.host.hostname
2695 return 'no host'
2696
2697
showard170873e2009-01-07 00:22:26 +00002698 def __str__(self):
showard828fc4c2009-09-14 20:31:00 +00002699 flags = []
2700 if self.active:
2701 flags.append('active')
2702 if self.complete:
2703 flags.append('complete')
2704 if self.deleted:
2705 flags.append('deleted')
2706 if self.aborted:
2707 flags.append('aborted')
2708 flags_str = ','.join(flags)
2709 if flags_str:
2710 flags_str = ' [%s]' % flags_str
2711 return "%s/%d (%d) %s%s" % (self._get_hostname(), self.job.id, self.id,
2712 self.status, flags_str)
showard170873e2009-01-07 00:22:26 +00002713
2714
jadmanski0afbb632008-06-06 21:10:57 +00002715 def set_status(self, status):
showard56824072009-10-12 20:30:21 +00002716 logging.info("%s -> %s", self, status)
mblighf8c624d2008-07-03 16:58:45 +00002717
showard56824072009-10-12 20:30:21 +00002718 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002719
showard8cc058f2009-09-08 16:26:33 +00002720 if status in (models.HostQueueEntry.Status.QUEUED,
2721 models.HostQueueEntry.Status.PARSING):
jadmanski0afbb632008-06-06 21:10:57 +00002722 self.update_field('complete', False)
2723 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002724
showard8cc058f2009-09-08 16:26:33 +00002725 if status in (models.HostQueueEntry.Status.PENDING,
2726 models.HostQueueEntry.Status.RUNNING,
2727 models.HostQueueEntry.Status.VERIFYING,
2728 models.HostQueueEntry.Status.STARTING,
2729 models.HostQueueEntry.Status.GATHERING):
jadmanski0afbb632008-06-06 21:10:57 +00002730 self.update_field('complete', False)
2731 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002732
showard8cc058f2009-09-08 16:26:33 +00002733 if status in (models.HostQueueEntry.Status.FAILED,
2734 models.HostQueueEntry.Status.COMPLETED,
2735 models.HostQueueEntry.Status.STOPPED,
2736 models.HostQueueEntry.Status.ABORTED):
jadmanski0afbb632008-06-06 21:10:57 +00002737 self.update_field('complete', True)
2738 self.update_field('active', False)
showardf85a0b72009-10-07 20:48:45 +00002739 self._on_complete()
showardc85c21b2008-11-24 22:17:37 +00002740
2741 should_email_status = (status.lower() in _notify_email_statuses or
2742 'all' in _notify_email_statuses)
2743 if should_email_status:
2744 self._email_on_status(status)
2745
2746 self._email_on_job_complete()
2747
2748
showardf85a0b72009-10-07 20:48:45 +00002749 def _on_complete(self):
2750 if not self.execution_subdir:
2751 return
2752 # unregister any possible pidfiles associated with this queue entry
2753 for pidfile_name in _ALL_PIDFILE_NAMES:
2754 pidfile_id = _drone_manager.get_pidfile_id_from(
2755 self.execution_path(), pidfile_name=pidfile_name)
2756 _drone_manager.unregister_pidfile(pidfile_id)
2757
2758
showardc85c21b2008-11-24 22:17:37 +00002759 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002760 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002761
2762 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2763 self.job.id, self.job.name, hostname, status)
2764 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2765 self.job.id, self.job.name, hostname, status,
2766 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002767 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002768
2769
2770 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002771 if not self.job.is_finished():
2772 return
showard542e8402008-09-19 20:16:18 +00002773
showardc85c21b2008-11-24 22:17:37 +00002774 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002775 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002776 for queue_entry in hosts_queue:
2777 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002778 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002779 queue_entry.status))
2780
2781 summary_text = "\n".join(summary_text)
2782 status_counts = models.Job.objects.get_status_counts(
2783 [self.job.id])[self.job.id]
2784 status = ', '.join('%d %s' % (count, status) for status, count
2785 in status_counts.iteritems())
2786
2787 subject = 'Autotest: Job ID: %s "%s" %s' % (
2788 self.job.id, self.job.name, status)
2789 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2790 self.job.id, self.job.name, status, self._view_job_url(),
2791 summary_text)
showard170873e2009-01-07 00:22:26 +00002792 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002793
2794
showard8cc058f2009-09-08 16:26:33 +00002795 def schedule_pre_job_tasks(self, assigned_host=None):
showard77182562009-06-10 00:16:05 +00002796 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002797 assert assigned_host
2798 # ensure results dir exists for the queue log
showard08356c12009-06-15 20:24:01 +00002799 if self.host_id is None:
2800 self.set_host(assigned_host)
2801 else:
2802 assert assigned_host.id == self.host_id
mbligh36768f02008-02-22 18:28:33 +00002803
showardcfd4a7e2009-07-11 01:47:33 +00002804 logging.info("%s/%s/%s scheduled on %s, status=%s",
showardb18134f2009-03-20 20:52:18 +00002805 self.job.name, self.meta_host, self.atomic_group_id,
2806 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002807
showard8cc058f2009-09-08 16:26:33 +00002808 self._do_schedule_pre_job_tasks()
showard77182562009-06-10 00:16:05 +00002809
2810
showard8cc058f2009-09-08 16:26:33 +00002811 def _do_schedule_pre_job_tasks(self):
showard77182562009-06-10 00:16:05 +00002812 # Every host goes thru the Verifying stage (which may or may not
2813 # actually do anything as determined by get_pre_job_tasks).
2814 self.set_status(models.HostQueueEntry.Status.VERIFYING)
showard8cc058f2009-09-08 16:26:33 +00002815 self.job.schedule_pre_job_tasks(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002816
showard6ae5ea92009-02-25 00:11:51 +00002817
jadmanski0afbb632008-06-06 21:10:57 +00002818 def requeue(self):
showardcfd4a7e2009-07-11 01:47:33 +00002819 assert self.host
showard8cc058f2009-09-08 16:26:33 +00002820 self.set_status(models.HostQueueEntry.Status.QUEUED)
showard12f3e322009-05-13 21:27:42 +00002821 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002822 # verify/cleanup failure sets the execution subdir, so reset it here
2823 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002824 if self.meta_host:
2825 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002826
2827
jadmanski0afbb632008-06-06 21:10:57 +00002828 def handle_host_failure(self):
2829 """\
2830 Called when this queue entry's host has failed verification and
2831 repair.
2832 """
2833 assert not self.meta_host
showard8cc058f2009-09-08 16:26:33 +00002834 self.set_status(models.HostQueueEntry.Status.FAILED)
showard2bab8f42008-11-12 18:15:22 +00002835 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002836
2837
jadmanskif7fa2cc2008-10-01 14:13:23 +00002838 @property
2839 def aborted_by(self):
2840 self._load_abort_info()
2841 return self._aborted_by
2842
2843
2844 @property
2845 def aborted_on(self):
2846 self._load_abort_info()
2847 return self._aborted_on
2848
2849
2850 def _load_abort_info(self):
2851 """ Fetch info about who aborted the job. """
2852 if hasattr(self, "_aborted_by"):
2853 return
2854 rows = _db.execute("""
2855 SELECT users.login, aborted_host_queue_entries.aborted_on
2856 FROM aborted_host_queue_entries
2857 INNER JOIN users
2858 ON users.id = aborted_host_queue_entries.aborted_by_id
2859 WHERE aborted_host_queue_entries.queue_entry_id = %s
2860 """, (self.id,))
2861 if rows:
2862 self._aborted_by, self._aborted_on = rows[0]
2863 else:
2864 self._aborted_by = self._aborted_on = None
2865
2866
showardb2e2c322008-10-14 17:33:55 +00002867 def on_pending(self):
2868 """
2869 Called when an entry in a synchronous job has passed verify. If the
showard8cc058f2009-09-08 16:26:33 +00002870 job is ready to run, sets the entries to STARTING. Otherwise, it leaves
2871 them in PENDING.
showardb2e2c322008-10-14 17:33:55 +00002872 """
showard8cc058f2009-09-08 16:26:33 +00002873 self.set_status(models.HostQueueEntry.Status.PENDING)
2874 self.host.set_status(models.Host.Status.PENDING)
showardb000a8d2009-07-28 20:02:07 +00002875
2876 # Some debug code here: sends an email if an asynchronous job does not
2877 # immediately enter Starting.
2878 # TODO: Remove this once we figure out why asynchronous jobs are getting
2879 # stuck in Pending.
showard8cc058f2009-09-08 16:26:33 +00002880 self.job.run_if_ready(queue_entry=self)
2881 if (self.job.synch_count == 1 and
2882 self.status == models.HostQueueEntry.Status.PENDING):
showardb000a8d2009-07-28 20:02:07 +00002883 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
2884 message = 'Asynchronous job stuck in Pending'
2885 email_manager.manager.enqueue_notify_email(subject, message)
showardb2e2c322008-10-14 17:33:55 +00002886
2887
showardd3dc1992009-04-22 21:01:40 +00002888 def abort(self, dispatcher):
2889 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002890
showardd3dc1992009-04-22 21:01:40 +00002891 Status = models.HostQueueEntry.Status
showard8cc058f2009-09-08 16:26:33 +00002892 if self.status in (Status.GATHERING, Status.PARSING):
showardd3dc1992009-04-22 21:01:40 +00002893 # do nothing; post-job tasks will finish and then mark this entry
2894 # with status "Aborted" and take care of the host
2895 return
2896
showard8cc058f2009-09-08 16:26:33 +00002897 if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING):
2898 assert not dispatcher.get_agents_for_entry(self)
showardd3dc1992009-04-22 21:01:40 +00002899 self.host.set_status(models.Host.Status.READY)
2900 elif self.status == Status.VERIFYING:
showard8cc058f2009-09-08 16:26:33 +00002901 models.SpecialTask.objects.create(
2902 task=models.SpecialTask.Task.CLEANUP,
2903 host=models.Host(id=self.host.id))
showardd3dc1992009-04-22 21:01:40 +00002904
2905 self.set_status(Status.ABORTED)
showardd2014822009-10-12 20:26:58 +00002906 self.job.abort_delay_ready_task()
showard170873e2009-01-07 00:22:26 +00002907
showard8cc058f2009-09-08 16:26:33 +00002908
2909 def get_group_name(self):
2910 atomic_group = self.atomic_group
2911 if not atomic_group:
2912 return ''
2913
2914 # Look at any meta_host and dependency labels and pick the first
2915 # one that also specifies this atomic group. Use that label name
2916 # as the group name if possible (it is more specific).
2917 for label in self.get_labels():
2918 if label.atomic_group_id:
2919 assert label.atomic_group_id == atomic_group.id
2920 return label.name
2921 return atomic_group.name
2922
2923
showard170873e2009-01-07 00:22:26 +00002924 def execution_tag(self):
2925 assert self.execution_subdir
mblighe7d9c602009-07-02 19:02:33 +00002926 return "%s/%s" % (self.job.tag(), self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002927
2928
showarded2afea2009-07-07 20:54:07 +00002929 def execution_path(self):
2930 return self.execution_tag()
2931
2932
mbligh36768f02008-02-22 18:28:33 +00002933class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002934 _table_name = 'jobs'
2935 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2936 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002937 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002938 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002939
showard77182562009-06-10 00:16:05 +00002940 # This does not need to be a column in the DB. The delays are likely to
2941 # be configured short. If the scheduler is stopped and restarted in
2942 # the middle of a job's delay cycle, the delay cycle will either be
2943 # repeated or skipped depending on the number of Pending machines found
2944 # when the restarted scheduler recovers to track it. Not a problem.
2945 #
2946 # A reference to the DelayedCallTask that will wake up the job should
2947 # no other HQEs change state in time. Its end_time attribute is used
2948 # by our run_with_ready_delay() method to determine if the wait is over.
2949 _delay_ready_task = None
2950
2951 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
2952 # all status='Pending' atomic group HQEs incase a delay was running when the
2953 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00002954
showarda3c58572009-03-12 20:36:59 +00002955 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002956 assert id or row
showarda3c58572009-03-12 20:36:59 +00002957 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002958
mblighe2586682008-02-29 22:45:46 +00002959
jadmanski0afbb632008-06-06 21:10:57 +00002960 def is_server_job(self):
2961 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002962
2963
showard170873e2009-01-07 00:22:26 +00002964 def tag(self):
2965 return "%s-%s" % (self.id, self.owner)
2966
2967
jadmanski0afbb632008-06-06 21:10:57 +00002968 def get_host_queue_entries(self):
2969 rows = _db.execute("""
2970 SELECT * FROM host_queue_entries
2971 WHERE job_id= %s
2972 """, (self.id,))
2973 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002974
jadmanski0afbb632008-06-06 21:10:57 +00002975 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002976
jadmanski0afbb632008-06-06 21:10:57 +00002977 return entries
mbligh36768f02008-02-22 18:28:33 +00002978
2979
jadmanski0afbb632008-06-06 21:10:57 +00002980 def set_status(self, status, update_queues=False):
2981 self.update_field('status',status)
2982
2983 if update_queues:
2984 for queue_entry in self.get_host_queue_entries():
2985 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002986
2987
showard77182562009-06-10 00:16:05 +00002988 def _atomic_and_has_started(self):
2989 """
2990 @returns True if any of the HostQueueEntries associated with this job
2991 have entered the Status.STARTING state or beyond.
2992 """
2993 atomic_entries = models.HostQueueEntry.objects.filter(
2994 job=self.id, atomic_group__isnull=False)
2995 if atomic_entries.count() <= 0:
2996 return False
2997
showardaf8b4ca2009-06-16 18:47:26 +00002998 # These states may *only* be reached if Job.run() has been called.
2999 started_statuses = (models.HostQueueEntry.Status.STARTING,
3000 models.HostQueueEntry.Status.RUNNING,
3001 models.HostQueueEntry.Status.COMPLETED)
3002
3003 started_entries = atomic_entries.filter(status__in=started_statuses)
showard77182562009-06-10 00:16:05 +00003004 return started_entries.count() > 0
3005
3006
showard708b3522009-08-20 23:26:15 +00003007 def _hosts_assigned_count(self):
3008 """The number of HostQueueEntries assigned a Host for this job."""
3009 entries = models.HostQueueEntry.objects.filter(job=self.id,
3010 host__isnull=False)
3011 return entries.count()
3012
3013
showard77182562009-06-10 00:16:05 +00003014 def _pending_count(self):
3015 """The number of HostQueueEntries for this job in the Pending state."""
3016 pending_entries = models.HostQueueEntry.objects.filter(
3017 job=self.id, status=models.HostQueueEntry.Status.PENDING)
3018 return pending_entries.count()
3019
3020
showardd2014822009-10-12 20:26:58 +00003021 def _pending_threshold(self, atomic_group):
3022 """
3023 @param atomic_group: The AtomicGroup associated with this job that we
3024 are using to bound the threshold.
3025 @returns The minimum number of HostQueueEntries assigned a Host before
3026 this job can run.
3027 """
3028 return min(self._hosts_assigned_count(),
3029 atomic_group.max_number_of_machines)
3030
3031
jadmanski0afbb632008-06-06 21:10:57 +00003032 def is_ready(self):
showard77182562009-06-10 00:16:05 +00003033 # NOTE: Atomic group jobs stop reporting ready after they have been
3034 # started to avoid launching multiple copies of one atomic job.
3035 # Only possible if synch_count is less than than half the number of
3036 # machines in the atomic group.
showardb000a8d2009-07-28 20:02:07 +00003037 pending_count = self._pending_count()
3038 atomic_and_has_started = self._atomic_and_has_started()
3039 ready = (pending_count >= self.synch_count
showardd2014822009-10-12 20:26:58 +00003040 and not atomic_and_has_started)
showardb000a8d2009-07-28 20:02:07 +00003041
3042 if not ready:
3043 logging.info(
3044 'Job %s not ready: %s pending, %s required '
3045 '(Atomic and started: %s)',
3046 self, pending_count, self.synch_count,
3047 atomic_and_has_started)
3048
3049 return ready
mbligh36768f02008-02-22 18:28:33 +00003050
3051
jadmanski0afbb632008-06-06 21:10:57 +00003052 def num_machines(self, clause = None):
3053 sql = "job_id=%s" % self.id
3054 if clause:
3055 sql += " AND (%s)" % clause
3056 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00003057
3058
jadmanski0afbb632008-06-06 21:10:57 +00003059 def num_queued(self):
3060 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00003061
3062
jadmanski0afbb632008-06-06 21:10:57 +00003063 def num_active(self):
3064 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00003065
3066
jadmanski0afbb632008-06-06 21:10:57 +00003067 def num_complete(self):
3068 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00003069
3070
jadmanski0afbb632008-06-06 21:10:57 +00003071 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00003072 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00003073
mbligh36768f02008-02-22 18:28:33 +00003074
showard6bb7c292009-01-30 01:44:51 +00003075 def _not_yet_run_entries(self, include_verifying=True):
3076 statuses = [models.HostQueueEntry.Status.QUEUED,
3077 models.HostQueueEntry.Status.PENDING]
3078 if include_verifying:
3079 statuses.append(models.HostQueueEntry.Status.VERIFYING)
3080 return models.HostQueueEntry.objects.filter(job=self.id,
3081 status__in=statuses)
3082
3083
3084 def _stop_all_entries(self):
3085 entries_to_stop = self._not_yet_run_entries(
3086 include_verifying=False)
3087 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00003088 assert not child_entry.complete, (
3089 '%s status=%s, active=%s, complete=%s' %
3090 (child_entry.id, child_entry.status, child_entry.active,
3091 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00003092 if child_entry.status == models.HostQueueEntry.Status.PENDING:
3093 child_entry.host.status = models.Host.Status.READY
3094 child_entry.host.save()
3095 child_entry.status = models.HostQueueEntry.Status.STOPPED
3096 child_entry.save()
3097
showard2bab8f42008-11-12 18:15:22 +00003098 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00003099 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00003100 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00003101 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00003102
3103
jadmanski0afbb632008-06-06 21:10:57 +00003104 def write_to_machines_file(self, queue_entry):
3105 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00003106 file_path = os.path.join(self.tag(), '.machines')
3107 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00003108
3109
showardf1ae3542009-05-11 19:26:02 +00003110 def _next_group_name(self, group_name=''):
3111 """@returns a directory name to use for the next host group results."""
3112 if group_name:
3113 # Sanitize for use as a pathname.
3114 group_name = group_name.replace(os.path.sep, '_')
3115 if group_name.startswith('.'):
3116 group_name = '_' + group_name[1:]
3117 # Add a separator between the group name and 'group%d'.
3118 group_name += '.'
3119 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00003120 query = models.HostQueueEntry.objects.filter(
3121 job=self.id).values('execution_subdir').distinct()
3122 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00003123 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
3124 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00003125 if ids:
3126 next_id = max(ids) + 1
3127 else:
3128 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00003129 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00003130
3131
showarddb502762009-09-09 15:31:20 +00003132 def _write_control_file(self, execution_path):
showard170873e2009-01-07 00:22:26 +00003133 control_path = _drone_manager.attach_file_to_execution(
showarddb502762009-09-09 15:31:20 +00003134 execution_path, self.control_file)
showard170873e2009-01-07 00:22:26 +00003135 return control_path
mbligh36768f02008-02-22 18:28:33 +00003136
showardb2e2c322008-10-14 17:33:55 +00003137
showard2bab8f42008-11-12 18:15:22 +00003138 def get_group_entries(self, queue_entry_from_group):
showard8375ce02009-10-12 20:35:13 +00003139 """
3140 @param queue_entry_from_group: A HostQueueEntry instance to find other
3141 group entries on this job for.
3142
3143 @returns A list of HostQueueEntry objects all executing this job as
3144 part of the same group as the one supplied (having the same
3145 execution_subdir).
3146 """
showard2bab8f42008-11-12 18:15:22 +00003147 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00003148 return list(HostQueueEntry.fetch(
3149 where='job_id=%s AND execution_subdir=%s',
3150 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00003151
3152
showard8cc058f2009-09-08 16:26:33 +00003153 def get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00003154 assert queue_entries
showarddb502762009-09-09 15:31:20 +00003155 execution_path = queue_entries[0].execution_path()
3156 control_path = self._write_control_file(execution_path)
jadmanski0afbb632008-06-06 21:10:57 +00003157 hostnames = ','.join([entry.get_host().hostname
3158 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00003159
showarddb502762009-09-09 15:31:20 +00003160 execution_tag = queue_entries[0].execution_tag()
showard87ba02a2009-04-20 19:37:32 +00003161 params = _autoserv_command_line(
showarded2afea2009-07-07 20:54:07 +00003162 hostnames,
showard87ba02a2009-04-20 19:37:32 +00003163 ['-P', execution_tag, '-n',
3164 _drone_manager.absolute_path(control_path)],
showarde9c69362009-06-30 01:58:03 +00003165 job=self, verbose=False)
mbligh36768f02008-02-22 18:28:33 +00003166
jadmanski0afbb632008-06-06 21:10:57 +00003167 if not self.is_server_job():
3168 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00003169
showardb2e2c322008-10-14 17:33:55 +00003170 return params
mblighe2586682008-02-29 22:45:46 +00003171
mbligh36768f02008-02-22 18:28:33 +00003172
showardc9ae1782009-01-30 01:42:37 +00003173 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00003174 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00003175 return True
showard0fc38302008-10-23 00:44:07 +00003176 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00003177 return queue_entry.get_host().dirty
3178 return False
showard21baa452008-10-21 00:08:39 +00003179
showardc9ae1782009-01-30 01:42:37 +00003180
showard8cc058f2009-09-08 16:26:33 +00003181 def _should_run_verify(self, queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003182 do_not_verify = (queue_entry.host.protection ==
3183 host_protections.Protection.DO_NOT_VERIFY)
3184 if do_not_verify:
3185 return False
3186 return self.run_verify
3187
3188
showard8cc058f2009-09-08 16:26:33 +00003189 def schedule_pre_job_tasks(self, queue_entry):
showard77182562009-06-10 00:16:05 +00003190 """
3191 Get a list of tasks to perform before the host_queue_entry
3192 may be used to run this Job (such as Cleanup & Verify).
3193
3194 @returns A list of tasks to be done to the given queue_entry before
mbligh6fbdb802009-08-03 16:42:55 +00003195 it should be considered be ready to run this job. The last
showard77182562009-06-10 00:16:05 +00003196 task in the list calls HostQueueEntry.on_pending(), which
3197 continues the flow of the job.
3198 """
showardc9ae1782009-01-30 01:42:37 +00003199 if self._should_run_cleanup(queue_entry):
showard8cc058f2009-09-08 16:26:33 +00003200 task = models.SpecialTask.Task.CLEANUP
3201 elif self._should_run_verify(queue_entry):
3202 task = models.SpecialTask.Task.VERIFY
3203 else:
3204 queue_entry.on_pending()
3205 return
3206
3207 models.SpecialTask.objects.create(
3208 host=models.Host(id=queue_entry.host_id),
3209 queue_entry=models.HostQueueEntry(id=queue_entry.id),
3210 task=task)
showard21baa452008-10-21 00:08:39 +00003211
3212
showardf1ae3542009-05-11 19:26:02 +00003213 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00003214 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00003215 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00003216 else:
showardf1ae3542009-05-11 19:26:02 +00003217 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00003218 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00003219 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00003220 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003221
3222 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00003223 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003224
3225
3226 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00003227 """
3228 @returns A tuple containing a list of HostQueueEntry instances to be
3229 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00003230 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00003231 """
showard77182562009-06-10 00:16:05 +00003232 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00003233 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00003234 if atomic_group:
3235 num_entries_wanted = atomic_group.max_number_of_machines
3236 else:
3237 num_entries_wanted = self.synch_count
3238 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00003239
showardf1ae3542009-05-11 19:26:02 +00003240 if num_entries_wanted > 0:
3241 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00003242 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00003243 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00003244 params=(self.id, include_queue_entry.id)))
3245
3246 # Sort the chosen hosts by hostname before slicing.
3247 def cmp_queue_entries_by_hostname(entry_a, entry_b):
3248 return Host.cmp_for_sort(entry_a.host, entry_b.host)
3249 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
3250 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00003251
showardf1ae3542009-05-11 19:26:02 +00003252 # Sanity check. We'll only ever be called if this can be met.
showard828fc4c2009-09-14 20:31:00 +00003253 if len(chosen_entries) < self.synch_count:
3254 message = ('job %s got less than %s chosen entries: %s' % (
3255 self.id, self.synch_count, chosen_entries))
3256 logging.error(message)
3257 email_manager.manager.enqueue_notify_email(
3258 'Job not started, too few chosen entries', message)
3259 return []
showardf1ae3542009-05-11 19:26:02 +00003260
showard8cc058f2009-09-08 16:26:33 +00003261 group_name = include_queue_entry.get_group_name()
showardf1ae3542009-05-11 19:26:02 +00003262
3263 self._assign_new_group(chosen_entries, group_name=group_name)
showard8cc058f2009-09-08 16:26:33 +00003264 return chosen_entries
showard2bab8f42008-11-12 18:15:22 +00003265
3266
showard77182562009-06-10 00:16:05 +00003267 def run_if_ready(self, queue_entry):
3268 """
showard8375ce02009-10-12 20:35:13 +00003269 Run this job by kicking its HQEs into status='Starting' if enough
3270 hosts are ready for it to run.
3271
3272 Cleans up by kicking HQEs into status='Stopped' if this Job is not
3273 ready to run.
showard77182562009-06-10 00:16:05 +00003274 """
showardb2e2c322008-10-14 17:33:55 +00003275 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00003276 self.stop_if_necessary()
showard8cc058f2009-09-08 16:26:33 +00003277 elif queue_entry.atomic_group:
3278 self.run_with_ready_delay(queue_entry)
3279 else:
3280 self.run(queue_entry)
showard77182562009-06-10 00:16:05 +00003281
3282
3283 def run_with_ready_delay(self, queue_entry):
3284 """
3285 Start a delay to wait for more hosts to enter Pending state before
3286 launching an atomic group job. Once set, the a delay cannot be reset.
3287
3288 @param queue_entry: The HostQueueEntry object to get atomic group
3289 info from and pass to run_if_ready when the delay is up.
3290
3291 @returns An Agent to run the job as appropriate or None if a delay
3292 has already been set.
3293 """
3294 assert queue_entry.job_id == self.id
3295 assert queue_entry.atomic_group
3296 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
showardd2014822009-10-12 20:26:58 +00003297 over_max_threshold = (self._pending_count() >=
3298 self._pending_threshold(queue_entry.atomic_group))
showard77182562009-06-10 00:16:05 +00003299 delay_expired = (self._delay_ready_task and
3300 time.time() >= self._delay_ready_task.end_time)
3301
3302 # Delay is disabled or we already have enough? Do not wait to run.
3303 if not delay or over_max_threshold or delay_expired:
showard8cc058f2009-09-08 16:26:33 +00003304 self.run(queue_entry)
3305 else:
3306 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
showard77182562009-06-10 00:16:05 +00003307
showard8cc058f2009-09-08 16:26:33 +00003308
3309 def schedule_delayed_callback_task(self, queue_entry):
3310 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
3311
showard77182562009-06-10 00:16:05 +00003312 if self._delay_ready_task:
3313 return None
3314
showard8cc058f2009-09-08 16:26:33 +00003315 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
3316
showard77182562009-06-10 00:16:05 +00003317 def run_job_after_delay():
showardd2014822009-10-12 20:26:58 +00003318 logging.info('Job %s done waiting for extra hosts.', self)
3319 # Check to see if the job is still relevant. It could have aborted
3320 # while we were waiting or hosts could have disappearred, etc.
3321 threshold = self._pending_threshold(queue_entry.atomic_group)
3322 if self._pending_count() < threshold:
3323 logging.info('Job %s had too few Pending hosts after waiting '
3324 'for extras. Not running.', self)
3325 return
showard77182562009-06-10 00:16:05 +00003326 return self.run(queue_entry)
3327
showard708b3522009-08-20 23:26:15 +00003328 logging.info('Job %s waiting up to %s seconds for more hosts.',
3329 self.id, delay)
showard77182562009-06-10 00:16:05 +00003330 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3331 callback=run_job_after_delay)
showard8cc058f2009-09-08 16:26:33 +00003332 return self._delay_ready_task
showard77182562009-06-10 00:16:05 +00003333
3334
3335 def run(self, queue_entry):
3336 """
3337 @param queue_entry: The HostQueueEntry instance calling this method.
showard77182562009-06-10 00:16:05 +00003338 """
3339 if queue_entry.atomic_group and self._atomic_and_has_started():
3340 logging.error('Job.run() called on running atomic Job %d '
3341 'with HQE %s.', self.id, queue_entry)
showard8cc058f2009-09-08 16:26:33 +00003342 return
3343 queue_entries = self._choose_group_to_run(queue_entry)
showard828fc4c2009-09-14 20:31:00 +00003344 if queue_entries:
3345 self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00003346
3347
showard8cc058f2009-09-08 16:26:33 +00003348 def _finish_run(self, queue_entries):
showardb2ccdda2008-10-28 20:39:05 +00003349 for queue_entry in queue_entries:
showard8cc058f2009-09-08 16:26:33 +00003350 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showardd2014822009-10-12 20:26:58 +00003351 self.abort_delay_ready_task()
3352
3353
3354 def abort_delay_ready_task(self):
3355 """Abort the delayed task associated with this job, if any."""
showard77182562009-06-10 00:16:05 +00003356 if self._delay_ready_task:
3357 # Cancel any pending callback that would try to run again
3358 # as we are already running.
3359 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003360
showardd2014822009-10-12 20:26:58 +00003361
showardb000a8d2009-07-28 20:02:07 +00003362 def __str__(self):
3363 return '%s-%s' % (self.id, self.owner)
3364
3365
mbligh36768f02008-02-22 18:28:33 +00003366if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003367 main()