blob: c4b18d82b3cb6ba07dc5814c00a0a8c1e54f6633 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showard136e6dc2009-06-10 19:38:49 +000010import itertools, logging, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard043c62a2009-06-10 19:48:57 +000013from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard136e6dc2009-06-10 19:38:49 +000015from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000016from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000017from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000018from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000019from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000020from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000021from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000022
showard549afad2009-08-20 23:33:36 +000023BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
24PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000025
mbligh36768f02008-02-22 18:28:33 +000026RESULTS_DIR = '.'
27AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000028DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000029AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
30
31if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000032 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000033AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
34AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
35
36if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000037 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000038
showardd3dc1992009-04-22 21:01:40 +000039_AUTOSERV_PID_FILE = '.autoserv_execute'
40_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
41_PARSER_PID_FILE = '.parser_execute'
42
43_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
44 _PARSER_PID_FILE)
45
showard35162b02009-03-03 02:17:30 +000046# error message to leave in results dir when an autoserv process disappears
47# mysteriously
48_LOST_PROCESS_ERROR = """\
49Autoserv failed abnormally during execution for this job, probably due to a
50system error on the Autotest server. Full results may not be available. Sorry.
51"""
52
mbligh6f8bab42008-02-29 22:45:14 +000053_db = None
mbligh36768f02008-02-22 18:28:33 +000054_shutdown = False
showard170873e2009-01-07 00:22:26 +000055_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
56_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000057_testing_mode = False
showard542e8402008-09-19 20:16:18 +000058_base_url = None
showardc85c21b2008-11-24 22:17:37 +000059_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000060_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000061
62
showardec6a3b92009-09-25 20:29:13 +000063def _get_pidfile_timeout_secs():
64 """@returns How long to wait for autoserv to write pidfile."""
65 pidfile_timeout_mins = global_config.global_config.get_config_value(
66 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
67 return pidfile_timeout_mins * 60
68
69
mbligh83c1e9e2009-05-01 23:10:41 +000070def _site_init_monitor_db_dummy():
71 return {}
72
73
mbligh36768f02008-02-22 18:28:33 +000074def main():
showard27f33872009-04-07 18:20:53 +000075 try:
showard549afad2009-08-20 23:33:36 +000076 try:
77 main_without_exception_handling()
78 except SystemExit:
79 raise
80 except:
81 logging.exception('Exception escaping in monitor_db')
82 raise
83 finally:
84 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000085
86
87def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000088 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000089
showard136e6dc2009-06-10 19:38:49 +000090 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000091 parser = optparse.OptionParser(usage)
92 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
93 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000094 parser.add_option('--test', help='Indicate that scheduler is under ' +
95 'test and should use dummy autoserv and no parsing',
96 action='store_true')
97 (options, args) = parser.parse_args()
98 if len(args) != 1:
99 parser.print_usage()
100 return
mbligh36768f02008-02-22 18:28:33 +0000101
showard5613c662009-06-08 23:30:33 +0000102 scheduler_enabled = global_config.global_config.get_config_value(
103 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
104
105 if not scheduler_enabled:
106 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
107 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000108 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000109 sys.exit(1)
110
jadmanski0afbb632008-06-06 21:10:57 +0000111 global RESULTS_DIR
112 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000113
mbligh83c1e9e2009-05-01 23:10:41 +0000114 site_init = utils.import_site_function(__file__,
115 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
116 _site_init_monitor_db_dummy)
117 site_init()
118
showardcca334f2009-03-12 20:38:34 +0000119 # Change the cwd while running to avoid issues incase we were launched from
120 # somewhere odd (such as a random NFS home directory of the person running
121 # sudo to launch us as the appropriate user).
122 os.chdir(RESULTS_DIR)
123
jadmanski0afbb632008-06-06 21:10:57 +0000124 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000125 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
126 "notify_email_statuses",
127 default='')
showardc85c21b2008-11-24 22:17:37 +0000128 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000129 _notify_email_statuses = [status for status in
130 re.split(r'[\s,;:]', notify_statuses_list.lower())
131 if status]
showardc85c21b2008-11-24 22:17:37 +0000132
jadmanski0afbb632008-06-06 21:10:57 +0000133 if options.test:
134 global _autoserv_path
135 _autoserv_path = 'autoserv_dummy'
136 global _testing_mode
137 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000138
mbligh37eceaa2008-12-15 22:56:37 +0000139 # AUTOTEST_WEB.base_url is still a supported config option as some people
140 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000141 global _base_url
showard170873e2009-01-07 00:22:26 +0000142 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
143 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000144 if config_base_url:
145 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000146 else:
mbligh37eceaa2008-12-15 22:56:37 +0000147 # For the common case of everything running on a single server you
148 # can just set the hostname in a single place in the config file.
149 server_name = c.get_config_value('SERVER', 'hostname')
150 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000151 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000152 sys.exit(1)
153 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000154
showardc5afc462009-01-13 00:09:39 +0000155 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000156 server.start()
157
jadmanski0afbb632008-06-06 21:10:57 +0000158 try:
showard136e6dc2009-06-10 19:38:49 +0000159 init()
showardc5afc462009-01-13 00:09:39 +0000160 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000161 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000162
jadmanski0afbb632008-06-06 21:10:57 +0000163 while not _shutdown:
164 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000165 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000166 except:
showard170873e2009-01-07 00:22:26 +0000167 email_manager.manager.log_stacktrace(
168 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000169
showard170873e2009-01-07 00:22:26 +0000170 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000171 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000172 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000173 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000174
175
showard136e6dc2009-06-10 19:38:49 +0000176def setup_logging():
177 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
178 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
179 logging_manager.configure_logging(
180 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
181 logfile_name=log_name)
182
183
mbligh36768f02008-02-22 18:28:33 +0000184def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000185 global _shutdown
186 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000187 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000188
189
showard136e6dc2009-06-10 19:38:49 +0000190def init():
showardb18134f2009-03-20 20:52:18 +0000191 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
192 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000193
showard8de37132009-08-31 18:33:08 +0000194 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000195 logging.critical("monitor_db already running, aborting!")
196 sys.exit(1)
197 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000198
showardb1e51872008-10-07 11:08:18 +0000199 if _testing_mode:
200 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000201 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000202
jadmanski0afbb632008-06-06 21:10:57 +0000203 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
204 global _db
showard170873e2009-01-07 00:22:26 +0000205 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000206 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000207
showardfa8629c2008-11-04 16:51:23 +0000208 # ensure Django connection is in autocommit
209 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000210 # bypass the readonly connection
211 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000212
showardb18134f2009-03-20 20:52:18 +0000213 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000214 signal.signal(signal.SIGINT, handle_sigint)
215
showardd1ee1dd2009-01-07 21:33:08 +0000216 drones = global_config.global_config.get_config_value(
217 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
218 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000219 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000220 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000221 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
222
showardb18134f2009-03-20 20:52:18 +0000223 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000224
225
showarded2afea2009-07-07 20:54:07 +0000226def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
227 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000228 """
229 @returns The autoserv command line as a list of executable + parameters.
230
231 @param machines - string - A machine or comma separated list of machines
232 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000233 @param extra_args - list - Additional arguments to pass to autoserv.
234 @param job - Job object - If supplied, -u owner and -l name parameters
235 will be added.
236 @param queue_entry - A HostQueueEntry object - If supplied and no Job
237 object was supplied, this will be used to lookup the Job object.
238 """
showard87ba02a2009-04-20 19:37:32 +0000239 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
showarded2afea2009-07-07 20:54:07 +0000240 '-r', drone_manager.WORKING_DIRECTORY]
showard87ba02a2009-04-20 19:37:32 +0000241 if job or queue_entry:
242 if not job:
243 job = queue_entry.job
244 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000245 if verbose:
246 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000247 return autoserv_argv + extra_args
248
249
showard89f84db2009-03-12 20:39:13 +0000250class SchedulerError(Exception):
251 """Raised by HostScheduler when an inconsistent state occurs."""
252
253
showard63a34772008-08-18 19:32:50 +0000254class HostScheduler(object):
255 def _get_ready_hosts(self):
256 # avoid any host with a currently active queue entry against it
257 hosts = Host.fetch(
258 joins='LEFT JOIN host_queue_entries AS active_hqe '
259 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000260 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000261 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000262 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000263 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
264 return dict((host.id, host) for host in hosts)
265
266
267 @staticmethod
268 def _get_sql_id_list(id_list):
269 return ','.join(str(item_id) for item_id in id_list)
270
271
272 @classmethod
showard989f25d2008-10-01 11:38:11 +0000273 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000274 if not id_list:
275 return {}
showard63a34772008-08-18 19:32:50 +0000276 query %= cls._get_sql_id_list(id_list)
277 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000278 return cls._process_many2many_dict(rows, flip)
279
280
281 @staticmethod
282 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000283 result = {}
284 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000285 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000286 if flip:
287 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000288 result.setdefault(left_id, set()).add(right_id)
289 return result
290
291
292 @classmethod
293 def _get_job_acl_groups(cls, job_ids):
294 query = """
showardd9ac4452009-02-07 02:04:37 +0000295 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000296 FROM jobs
297 INNER JOIN users ON users.login = jobs.owner
298 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
299 WHERE jobs.id IN (%s)
300 """
301 return cls._get_many2many_dict(query, job_ids)
302
303
304 @classmethod
305 def _get_job_ineligible_hosts(cls, job_ids):
306 query = """
307 SELECT job_id, host_id
308 FROM ineligible_host_queues
309 WHERE job_id IN (%s)
310 """
311 return cls._get_many2many_dict(query, job_ids)
312
313
314 @classmethod
showard989f25d2008-10-01 11:38:11 +0000315 def _get_job_dependencies(cls, job_ids):
316 query = """
317 SELECT job_id, label_id
318 FROM jobs_dependency_labels
319 WHERE job_id IN (%s)
320 """
321 return cls._get_many2many_dict(query, job_ids)
322
323
324 @classmethod
showard63a34772008-08-18 19:32:50 +0000325 def _get_host_acls(cls, host_ids):
326 query = """
showardd9ac4452009-02-07 02:04:37 +0000327 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000328 FROM acl_groups_hosts
329 WHERE host_id IN (%s)
330 """
331 return cls._get_many2many_dict(query, host_ids)
332
333
334 @classmethod
335 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000336 if not host_ids:
337 return {}, {}
showard63a34772008-08-18 19:32:50 +0000338 query = """
339 SELECT label_id, host_id
340 FROM hosts_labels
341 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000342 """ % cls._get_sql_id_list(host_ids)
343 rows = _db.execute(query)
344 labels_to_hosts = cls._process_many2many_dict(rows)
345 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
346 return labels_to_hosts, hosts_to_labels
347
348
349 @classmethod
350 def _get_labels(cls):
351 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000352
353
354 def refresh(self, pending_queue_entries):
355 self._hosts_available = self._get_ready_hosts()
356
357 relevant_jobs = [queue_entry.job_id
358 for queue_entry in pending_queue_entries]
359 self._job_acls = self._get_job_acl_groups(relevant_jobs)
360 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000361 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000362
363 host_ids = self._hosts_available.keys()
364 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000365 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
366
367 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000368
369
370 def _is_acl_accessible(self, host_id, queue_entry):
371 job_acls = self._job_acls.get(queue_entry.job_id, set())
372 host_acls = self._host_acls.get(host_id, set())
373 return len(host_acls.intersection(job_acls)) > 0
374
375
showard989f25d2008-10-01 11:38:11 +0000376 def _check_job_dependencies(self, job_dependencies, host_labels):
377 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000378 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000379
380
381 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
382 queue_entry):
showardade14e22009-01-26 22:38:32 +0000383 if not queue_entry.meta_host:
384 # bypass only_if_needed labels when a specific host is selected
385 return True
386
showard989f25d2008-10-01 11:38:11 +0000387 for label_id in host_labels:
388 label = self._labels[label_id]
389 if not label.only_if_needed:
390 # we don't care about non-only_if_needed labels
391 continue
392 if queue_entry.meta_host == label_id:
393 # if the label was requested in a metahost it's OK
394 continue
395 if label_id not in job_dependencies:
396 return False
397 return True
398
399
showard89f84db2009-03-12 20:39:13 +0000400 def _check_atomic_group_labels(self, host_labels, queue_entry):
401 """
402 Determine if the given HostQueueEntry's atomic group settings are okay
403 to schedule on a host with the given labels.
404
showard6157c632009-07-06 20:19:31 +0000405 @param host_labels: A list of label ids that the host has.
406 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000407
408 @returns True if atomic group settings are okay, False otherwise.
409 """
showard6157c632009-07-06 20:19:31 +0000410 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000411 queue_entry.atomic_group_id)
412
413
showard6157c632009-07-06 20:19:31 +0000414 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000415 """
416 Return the atomic group label id for a host with the given set of
417 labels if any, or None otherwise. Raises an exception if more than
418 one atomic group are found in the set of labels.
419
showard6157c632009-07-06 20:19:31 +0000420 @param host_labels: A list of label ids that the host has.
421 @param queue_entry: The HostQueueEntry we're testing. Only used for
422 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000423
424 @returns The id of the atomic group found on a label in host_labels
425 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000426 """
showard6157c632009-07-06 20:19:31 +0000427 atomic_labels = [self._labels[label_id] for label_id in host_labels
428 if self._labels[label_id].atomic_group_id is not None]
429 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000430 if not atomic_ids:
431 return None
432 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000433 logging.error('More than one Atomic Group on HQE "%s" via: %r',
434 queue_entry, atomic_labels)
435 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000436
437
438 def _get_atomic_group_labels(self, atomic_group_id):
439 """
440 Lookup the label ids that an atomic_group is associated with.
441
442 @param atomic_group_id - The id of the AtomicGroup to look up.
443
444 @returns A generator yeilding Label ids for this atomic group.
445 """
446 return (id for id, label in self._labels.iteritems()
447 if label.atomic_group_id == atomic_group_id
448 and not label.invalid)
449
450
showard54c1ea92009-05-20 00:32:58 +0000451 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000452 """
453 @param group_hosts - A sequence of Host ids to test for usability
454 and eligibility against the Job associated with queue_entry.
455 @param queue_entry - The HostQueueEntry that these hosts are being
456 tested for eligibility against.
457
458 @returns A subset of group_hosts Host ids that are eligible for the
459 supplied queue_entry.
460 """
461 return set(host_id for host_id in group_hosts
462 if self._is_host_usable(host_id)
463 and self._is_host_eligible_for_job(host_id, queue_entry))
464
465
showard989f25d2008-10-01 11:38:11 +0000466 def _is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000467 if self._is_host_invalid(host_id):
468 # if an invalid host is scheduled for a job, it's a one-time host
469 # and it therefore bypasses eligibility checks. note this can only
470 # happen for non-metahosts, because invalid hosts have their label
471 # relationships cleared.
472 return True
473
showard989f25d2008-10-01 11:38:11 +0000474 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
475 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000476
showard89f84db2009-03-12 20:39:13 +0000477 return (self._is_acl_accessible(host_id, queue_entry) and
478 self._check_job_dependencies(job_dependencies, host_labels) and
479 self._check_only_if_needed_labels(
480 job_dependencies, host_labels, queue_entry) and
481 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000482
483
showard2924b0a2009-06-18 23:16:15 +0000484 def _is_host_invalid(self, host_id):
485 host_object = self._hosts_available.get(host_id, None)
486 return host_object and host_object.invalid
487
488
showard63a34772008-08-18 19:32:50 +0000489 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000490 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000491 return None
492 return self._hosts_available.pop(queue_entry.host_id, None)
493
494
495 def _is_host_usable(self, host_id):
496 if host_id not in self._hosts_available:
497 # host was already used during this scheduling cycle
498 return False
499 if self._hosts_available[host_id].invalid:
500 # Invalid hosts cannot be used for metahosts. They're included in
501 # the original query because they can be used by non-metahosts.
502 return False
503 return True
504
505
506 def _schedule_metahost(self, queue_entry):
507 label_id = queue_entry.meta_host
508 hosts_in_label = self._label_hosts.get(label_id, set())
509 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
510 set())
511
512 # must iterate over a copy so we can mutate the original while iterating
513 for host_id in list(hosts_in_label):
514 if not self._is_host_usable(host_id):
515 hosts_in_label.remove(host_id)
516 continue
517 if host_id in ineligible_host_ids:
518 continue
showard989f25d2008-10-01 11:38:11 +0000519 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000520 continue
521
showard89f84db2009-03-12 20:39:13 +0000522 # Remove the host from our cached internal state before returning
523 # the host object.
showard63a34772008-08-18 19:32:50 +0000524 hosts_in_label.remove(host_id)
525 return self._hosts_available.pop(host_id)
526 return None
527
528
529 def find_eligible_host(self, queue_entry):
530 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000531 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000532 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000533 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000534 return self._schedule_metahost(queue_entry)
535
536
showard89f84db2009-03-12 20:39:13 +0000537 def find_eligible_atomic_group(self, queue_entry):
538 """
539 Given an atomic group host queue entry, locate an appropriate group
540 of hosts for the associated job to run on.
541
542 The caller is responsible for creating new HQEs for the additional
543 hosts returned in order to run the actual job on them.
544
545 @returns A list of Host instances in a ready state to satisfy this
546 atomic group scheduling. Hosts will all belong to the same
547 atomic group label as specified by the queue_entry.
548 An empty list will be returned if no suitable atomic
549 group could be found.
550
551 TODO(gps): what is responsible for kicking off any attempted repairs on
552 a group of hosts? not this function, but something needs to. We do
553 not communicate that reason for returning [] outside of here...
554 For now, we'll just be unschedulable if enough hosts within one group
555 enter Repair Failed state.
556 """
557 assert queue_entry.atomic_group_id is not None
558 job = queue_entry.job
559 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000560 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000561 if job.synch_count > atomic_group.max_number_of_machines:
562 # Such a Job and HostQueueEntry should never be possible to
563 # create using the frontend. Regardless, we can't process it.
564 # Abort it immediately and log an error on the scheduler.
565 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000566 logging.error(
567 'Error: job %d synch_count=%d > requested atomic_group %d '
568 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
569 job.id, job.synch_count, atomic_group.id,
570 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000571 return []
572 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
573 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
574 set())
575
576 # Look in each label associated with atomic_group until we find one with
577 # enough hosts to satisfy the job.
578 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
579 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
580 if queue_entry.meta_host is not None:
581 # If we have a metahost label, only allow its hosts.
582 group_hosts.intersection_update(hosts_in_label)
583 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000584 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000585 group_hosts, queue_entry)
586
587 # Job.synch_count is treated as "minimum synch count" when
588 # scheduling for an atomic group of hosts. The atomic group
589 # number of machines is the maximum to pick out of a single
590 # atomic group label for scheduling at one time.
591 min_hosts = job.synch_count
592 max_hosts = atomic_group.max_number_of_machines
593
showard54c1ea92009-05-20 00:32:58 +0000594 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000595 # Not enough eligible hosts in this atomic group label.
596 continue
597
showard54c1ea92009-05-20 00:32:58 +0000598 eligible_hosts_in_group = [self._hosts_available[id]
599 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000600 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000601 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000602
showard89f84db2009-03-12 20:39:13 +0000603 # Limit ourselves to scheduling the atomic group size.
604 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000605 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000606
607 # Remove the selected hosts from our cached internal state
608 # of available hosts in order to return the Host objects.
609 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000610 for host in eligible_hosts_in_group:
611 hosts_in_label.discard(host.id)
612 self._hosts_available.pop(host.id)
613 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000614 return host_list
615
616 return []
617
618
showard170873e2009-01-07 00:22:26 +0000619class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000620 def __init__(self):
621 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000622 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000623 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000624 user_cleanup_time = scheduler_config.config.clean_interval
625 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
626 _db, user_cleanup_time)
627 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000628 self._host_agents = {}
629 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000630
mbligh36768f02008-02-22 18:28:33 +0000631
showard915958d2009-04-22 21:00:58 +0000632 def initialize(self, recover_hosts=True):
633 self._periodic_cleanup.initialize()
634 self._24hr_upkeep.initialize()
635
jadmanski0afbb632008-06-06 21:10:57 +0000636 # always recover processes
637 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000638
jadmanski0afbb632008-06-06 21:10:57 +0000639 if recover_hosts:
640 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000641
642
jadmanski0afbb632008-06-06 21:10:57 +0000643 def tick(self):
showard170873e2009-01-07 00:22:26 +0000644 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000645 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000646 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000647 self._process_recurring_runs()
showard8cc058f2009-09-08 16:26:33 +0000648 self._schedule_delay_tasks()
jadmanski0afbb632008-06-06 21:10:57 +0000649 self._schedule_new_jobs()
showard8cc058f2009-09-08 16:26:33 +0000650 self._schedule_running_host_queue_entries()
651 self._schedule_special_tasks()
jadmanski0afbb632008-06-06 21:10:57 +0000652 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000653 _drone_manager.execute_actions()
654 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000655
showard97aed502008-11-04 02:01:24 +0000656
mblighf3294cc2009-04-08 21:17:38 +0000657 def _run_cleanup(self):
658 self._periodic_cleanup.run_cleanup_maybe()
659 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000660
mbligh36768f02008-02-22 18:28:33 +0000661
showard170873e2009-01-07 00:22:26 +0000662 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
663 for object_id in object_ids:
664 agent_dict.setdefault(object_id, set()).add(agent)
665
666
667 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
668 for object_id in object_ids:
669 assert object_id in agent_dict
670 agent_dict[object_id].remove(agent)
671
672
jadmanski0afbb632008-06-06 21:10:57 +0000673 def add_agent(self, agent):
674 self._agents.append(agent)
675 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000676 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
677 self._register_agent_for_ids(self._queue_entry_agents,
678 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000679
showard170873e2009-01-07 00:22:26 +0000680
681 def get_agents_for_entry(self, queue_entry):
682 """
683 Find agents corresponding to the specified queue_entry.
684 """
showardd3dc1992009-04-22 21:01:40 +0000685 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000686
687
688 def host_has_agent(self, host):
689 """
690 Determine if there is currently an Agent present using this host.
691 """
692 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000693
694
jadmanski0afbb632008-06-06 21:10:57 +0000695 def remove_agent(self, agent):
696 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000697 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
698 agent)
699 self._unregister_agent_for_ids(self._queue_entry_agents,
700 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000701
702
showard8cc058f2009-09-08 16:26:33 +0000703 def _host_has_scheduled_special_task(self, host):
704 return bool(models.SpecialTask.objects.filter(host__id=host.id,
705 is_active=False,
706 is_complete=False))
707
708
jadmanski0afbb632008-06-06 21:10:57 +0000709 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000710 self._register_pidfiles()
711 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000712 self._recover_all_recoverable_entries()
showard8cc058f2009-09-08 16:26:33 +0000713 self._recover_pending_entries()
showard6878e8b2009-07-20 22:37:45 +0000714 self._check_for_remaining_active_entries()
showard170873e2009-01-07 00:22:26 +0000715 self._reverify_remaining_hosts()
716 # reinitialize drones after killing orphaned processes, since they can
717 # leave around files when they die
718 _drone_manager.execute_actions()
719 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000720
showard170873e2009-01-07 00:22:26 +0000721
722 def _register_pidfiles(self):
723 # during recovery we may need to read pidfiles for both running and
724 # parsing entries
725 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000726 where="status IN ('Running', 'Gathering', 'Parsing')")
showarded2afea2009-07-07 20:54:07 +0000727 special_tasks = models.SpecialTask.objects.filter(is_active=True)
728 for execution_entry in itertools.chain(queue_entries, special_tasks):
showardd3dc1992009-04-22 21:01:40 +0000729 for pidfile_name in _ALL_PIDFILE_NAMES:
730 pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000731 execution_entry.execution_path(), pidfile_name=pidfile_name)
showardd3dc1992009-04-22 21:01:40 +0000732 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000733
734
showarded2afea2009-07-07 20:54:07 +0000735 def _get_recovery_run_monitor(self, execution_path, pidfile_name, orphans):
736 run_monitor = PidfileRunMonitor()
737 run_monitor.attach_to_existing_process(execution_path,
738 pidfile_name=pidfile_name)
739 if run_monitor.has_process():
740 orphans.discard(run_monitor.get_process())
741 return run_monitor, '(process %s)' % run_monitor.get_process()
742 return None, 'without process'
743
744
showard8cc058f2009-09-08 16:26:33 +0000745 def _get_unassigned_entries(self, status):
746 for entry in HostQueueEntry.fetch(where="status = '%s'" % status):
showard0db3d432009-10-12 20:29:15 +0000747 if entry.status == status and not self.get_agents_for_entry(entry):
748 # The status can change during iteration, e.g., if job.run()
749 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000750 yield entry
751
752
showardd3dc1992009-04-22 21:01:40 +0000753 def _recover_entries_with_status(self, status, orphans, pidfile_name,
754 recover_entries_fn):
showard8cc058f2009-09-08 16:26:33 +0000755 for queue_entry in self._get_unassigned_entries(status):
showardd3dc1992009-04-22 21:01:40 +0000756 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showarded2afea2009-07-07 20:54:07 +0000757 run_monitor, process_string = self._get_recovery_run_monitor(
758 queue_entry.execution_path(), pidfile_name, orphans)
showard8cc058f2009-09-08 16:26:33 +0000759 if not run_monitor:
760 # _schedule_running_host_queue_entries should schedule and
761 # recover these entries
762 continue
showard597bfd32009-05-08 18:22:50 +0000763
showarded2afea2009-07-07 20:54:07 +0000764 logging.info('Recovering %s entry %s %s',status.lower(),
765 ', '.join(str(entry) for entry in queue_entries),
766 process_string)
showardd3dc1992009-04-22 21:01:40 +0000767 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000768
769
showard6878e8b2009-07-20 22:37:45 +0000770 def _check_for_remaining_orphan_processes(self, orphans):
771 if not orphans:
772 return
773 subject = 'Unrecovered orphan autoserv processes remain'
774 message = '\n'.join(str(process) for process in orphans)
775 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000776
777 die_on_orphans = global_config.global_config.get_config_value(
778 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
779
780 if die_on_orphans:
781 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000782
showard170873e2009-01-07 00:22:26 +0000783
showardd3dc1992009-04-22 21:01:40 +0000784 def _recover_running_entries(self, orphans):
785 def recover_entries(job, queue_entries, run_monitor):
showard8cc058f2009-09-08 16:26:33 +0000786 queue_task = QueueTask(job=job, queue_entries=queue_entries,
787 recover_run_monitor=run_monitor)
788 self.add_agent(Agent(task=queue_task,
789 num_processes=len(queue_entries)))
showardd3dc1992009-04-22 21:01:40 +0000790
791 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
showarded2afea2009-07-07 20:54:07 +0000792 orphans, _AUTOSERV_PID_FILE,
showardd3dc1992009-04-22 21:01:40 +0000793 recover_entries)
794
795
796 def _recover_gathering_entries(self, orphans):
797 def recover_entries(job, queue_entries, run_monitor):
798 gather_task = GatherLogsTask(job, queue_entries,
showarded2afea2009-07-07 20:54:07 +0000799 recover_run_monitor=run_monitor)
showard8cc058f2009-09-08 16:26:33 +0000800 self.add_agent(Agent(gather_task))
showardd3dc1992009-04-22 21:01:40 +0000801
802 self._recover_entries_with_status(
803 models.HostQueueEntry.Status.GATHERING,
804 orphans, _CRASHINFO_PID_FILE, recover_entries)
805
806
807 def _recover_parsing_entries(self, orphans):
808 def recover_entries(job, queue_entries, run_monitor):
809 reparse_task = FinalReparseTask(queue_entries,
showarded2afea2009-07-07 20:54:07 +0000810 recover_run_monitor=run_monitor)
showard8cc058f2009-09-08 16:26:33 +0000811 self.add_agent(Agent(reparse_task, num_processes=0))
showardd3dc1992009-04-22 21:01:40 +0000812
813 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
814 orphans, _PARSER_PID_FILE,
815 recover_entries)
816
817
showard8cc058f2009-09-08 16:26:33 +0000818 def _recover_pending_entries(self):
819 for entry in self._get_unassigned_entries(
820 models.HostQueueEntry.Status.PENDING):
821 entry.on_pending()
822
823
showardd3dc1992009-04-22 21:01:40 +0000824 def _recover_all_recoverable_entries(self):
825 orphans = _drone_manager.get_orphaned_autoserv_processes()
826 self._recover_running_entries(orphans)
827 self._recover_gathering_entries(orphans)
828 self._recover_parsing_entries(orphans)
showarded2afea2009-07-07 20:54:07 +0000829 self._recover_special_tasks(orphans)
showard6878e8b2009-07-20 22:37:45 +0000830 self._check_for_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000831
showard97aed502008-11-04 02:01:24 +0000832
showarded2afea2009-07-07 20:54:07 +0000833 def _recover_special_tasks(self, orphans):
showard2fe3f1d2009-07-06 20:19:11 +0000834 """\
835 Recovers all special tasks that have started running but have not
836 completed.
837 """
838
839 tasks = models.SpecialTask.objects.filter(is_active=True,
840 is_complete=False)
841 # Use ordering to force NULL queue_entry_id's to the end of the list
showarda5288b42009-07-28 20:06:08 +0000842 for task in tasks.order_by('-queue_entry__id'):
showard9b6ec502009-08-20 23:25:17 +0000843 if self.host_has_agent(task.host):
844 raise SchedulerError(
845 "%s already has a host agent %s." % (
showard8cc058f2009-09-08 16:26:33 +0000846 task, self._host_agents.get(task.host.id)))
showard2fe3f1d2009-07-06 20:19:11 +0000847
showarded2afea2009-07-07 20:54:07 +0000848 run_monitor, process_string = self._get_recovery_run_monitor(
849 task.execution_path(), _AUTOSERV_PID_FILE, orphans)
850
851 logging.info('Recovering %s %s', task, process_string)
showard8cc058f2009-09-08 16:26:33 +0000852 self._recover_special_task(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000853
854
showard8cc058f2009-09-08 16:26:33 +0000855 def _recover_special_task(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000856 """\
857 Recovers a single special task.
858 """
859 if task.task == models.SpecialTask.Task.VERIFY:
showard8cc058f2009-09-08 16:26:33 +0000860 agent_task = self._recover_verify(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000861 elif task.task == models.SpecialTask.Task.REPAIR:
showard8cc058f2009-09-08 16:26:33 +0000862 agent_task = self._recover_repair(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000863 elif task.task == models.SpecialTask.Task.CLEANUP:
showard8cc058f2009-09-08 16:26:33 +0000864 agent_task = self._recover_cleanup(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000865 else:
866 # Should never happen
867 logging.error(
868 "Special task id %d had invalid task %s", (task.id, task.task))
869
showard8cc058f2009-09-08 16:26:33 +0000870 self.add_agent(Agent(agent_task))
showard2fe3f1d2009-07-06 20:19:11 +0000871
872
showard8cc058f2009-09-08 16:26:33 +0000873 def _recover_verify(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000874 """\
875 Recovers a verify task.
876 No associated queue entry: Verify host
877 With associated queue entry: Verify host, and run associated queue
878 entry
879 """
showard8cc058f2009-09-08 16:26:33 +0000880 return VerifyTask(task=task, recover_run_monitor=run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000881
882
showard8cc058f2009-09-08 16:26:33 +0000883 def _recover_repair(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000884 """\
885 Recovers a repair task.
886 Always repair host
887 """
showard8cc058f2009-09-08 16:26:33 +0000888 return RepairTask(task=task, recover_run_monitor=run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000889
890
showard8cc058f2009-09-08 16:26:33 +0000891 def _recover_cleanup(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000892 """\
893 Recovers a cleanup task.
894 No associated queue entry: Clean host
895 With associated queue entry: Clean host, verify host if needed, and
896 run associated queue entry
897 """
showard8cc058f2009-09-08 16:26:33 +0000898 return CleanupTask(task=task, recover_run_monitor=run_monitor)
showard6af73ad2009-07-28 20:00:58 +0000899
900
showard6878e8b2009-07-20 22:37:45 +0000901 def _check_for_remaining_active_entries(self):
showard170873e2009-01-07 00:22:26 +0000902 queue_entries = HostQueueEntry.fetch(
showard8cc058f2009-09-08 16:26:33 +0000903 where='active AND NOT complete AND status NOT IN '
904 '("Starting", "Gathering", "Pending")')
showardd3dc1992009-04-22 21:01:40 +0000905
showarde8e37072009-08-20 23:31:30 +0000906 unrecovered_active_hqes = [entry for entry in queue_entries
showard8cc058f2009-09-08 16:26:33 +0000907 if not self.get_agents_for_entry(entry) and
908 not self._host_has_scheduled_special_task(
909 entry.host)]
showarde8e37072009-08-20 23:31:30 +0000910 if unrecovered_active_hqes:
911 message = '\n'.join(str(hqe) for hqe in unrecovered_active_hqes)
912 raise SchedulerError(
913 '%d unrecovered active host queue entries:\n%s' %
914 (len(unrecovered_active_hqes), message))
showard170873e2009-01-07 00:22:26 +0000915
916
showard8cc058f2009-09-08 16:26:33 +0000917 def _schedule_special_tasks(self):
918 tasks = models.SpecialTask.objects.filter(is_active=False,
919 is_complete=False,
920 host__locked=False)
921 # We want lower ids to come first, but the NULL queue_entry_ids need to
922 # come last
923 tasks = tasks.extra(select={'isnull' : 'queue_entry_id IS NULL'})
924 tasks = tasks.extra(order_by=['isnull', 'id'])
showard6d7b2ff2009-06-10 00:16:47 +0000925
showard2fe3f1d2009-07-06 20:19:11 +0000926 for task in tasks:
showard8cc058f2009-09-08 16:26:33 +0000927 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000928 continue
showard6d7b2ff2009-06-10 00:16:47 +0000929
showard8cc058f2009-09-08 16:26:33 +0000930 if task.task == models.SpecialTask.Task.CLEANUP:
931 agent_task = CleanupTask(task=task)
932 elif task.task == models.SpecialTask.Task.VERIFY:
933 agent_task = VerifyTask(task=task)
934 elif task.task == models.SpecialTask.Task.REPAIR:
935 agent_task = RepairTask(task=task)
936 else:
937 email_manager.manager.enqueue_notify_email(
938 'Special task with invalid task', task)
939 continue
940
941 self.add_agent(Agent(agent_task))
showard1ff7b2e2009-05-15 23:17:18 +0000942
943
showard170873e2009-01-07 00:22:26 +0000944 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000945 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000946 # should never happen
showarded2afea2009-07-07 20:54:07 +0000947 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000948 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000949 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000950 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000951 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000952
953
jadmanski0afbb632008-06-06 21:10:57 +0000954 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000955 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +0000956 full_where='locked = 0 AND invalid = 0 AND ' + where
957 for host in Host.fetch(where=full_where):
958 if self.host_has_agent(host):
959 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000960 continue
showard8cc058f2009-09-08 16:26:33 +0000961 if self._host_has_scheduled_special_task(host):
962 # host will have a special task scheduled on the next cycle
963 continue
showard170873e2009-01-07 00:22:26 +0000964 if print_message:
showardb18134f2009-03-20 20:52:18 +0000965 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000966 models.SpecialTask.objects.create(
967 task=models.SpecialTask.Task.CLEANUP,
968 host=models.Host(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000969
970
jadmanski0afbb632008-06-06 21:10:57 +0000971 def _recover_hosts(self):
972 # recover "Repair Failed" hosts
973 message = 'Reverifying dead host %s'
974 self._reverify_hosts_where("status = 'Repair Failed'",
975 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000976
977
showard04c82c52008-05-29 19:38:12 +0000978
showardb95b1bd2008-08-15 18:11:04 +0000979 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000980 # prioritize by job priority, then non-metahost over metahost, then FIFO
981 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000982 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000983 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000984 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000985
986
showard89f84db2009-03-12 20:39:13 +0000987 def _refresh_pending_queue_entries(self):
988 """
989 Lookup the pending HostQueueEntries and call our HostScheduler
990 refresh() method given that list. Return the list.
991
992 @returns A list of pending HostQueueEntries sorted in priority order.
993 """
showard63a34772008-08-18 19:32:50 +0000994 queue_entries = self._get_pending_queue_entries()
995 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000996 return []
showardb95b1bd2008-08-15 18:11:04 +0000997
showard63a34772008-08-18 19:32:50 +0000998 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000999
showard89f84db2009-03-12 20:39:13 +00001000 return queue_entries
1001
1002
1003 def _schedule_atomic_group(self, queue_entry):
1004 """
1005 Schedule the given queue_entry on an atomic group of hosts.
1006
1007 Returns immediately if there are insufficient available hosts.
1008
1009 Creates new HostQueueEntries based off of queue_entry for the
1010 scheduled hosts and starts them all running.
1011 """
1012 # This is a virtual host queue entry representing an entire
1013 # atomic group, find a group and schedule their hosts.
1014 group_hosts = self._host_scheduler.find_eligible_atomic_group(
1015 queue_entry)
1016 if not group_hosts:
1017 return
showardcbe6f942009-06-17 19:33:49 +00001018
1019 logging.info('Expanding atomic group entry %s with hosts %s',
1020 queue_entry,
1021 ', '.join(host.hostname for host in group_hosts))
showard89f84db2009-03-12 20:39:13 +00001022 # The first assigned host uses the original HostQueueEntry
1023 group_queue_entries = [queue_entry]
1024 for assigned_host in group_hosts[1:]:
1025 # Create a new HQE for every additional assigned_host.
1026 new_hqe = HostQueueEntry.clone(queue_entry)
1027 new_hqe.save()
1028 group_queue_entries.append(new_hqe)
1029 assert len(group_queue_entries) == len(group_hosts)
1030 for queue_entry, host in itertools.izip(group_queue_entries,
1031 group_hosts):
1032 self._run_queue_entry(queue_entry, host)
1033
1034
1035 def _schedule_new_jobs(self):
1036 queue_entries = self._refresh_pending_queue_entries()
1037 if not queue_entries:
1038 return
1039
showard63a34772008-08-18 19:32:50 +00001040 for queue_entry in queue_entries:
showarde55955f2009-10-07 20:48:58 +00001041 is_unassigned_atomic_group = (
1042 queue_entry.atomic_group_id is not None
1043 and queue_entry.host_id is None)
1044 if is_unassigned_atomic_group:
1045 self._schedule_atomic_group(queue_entry)
1046 else:
showard89f84db2009-03-12 20:39:13 +00001047 assigned_host = self._host_scheduler.find_eligible_host(
1048 queue_entry)
1049 if assigned_host:
1050 self._run_queue_entry(queue_entry, assigned_host)
showardb95b1bd2008-08-15 18:11:04 +00001051
1052
showard8cc058f2009-09-08 16:26:33 +00001053 def _schedule_running_host_queue_entries(self):
1054 entries = HostQueueEntry.fetch(
1055 where="status IN "
1056 "('Starting', 'Running', 'Gathering', 'Parsing')")
1057 for entry in entries:
1058 if self.get_agents_for_entry(entry):
1059 continue
1060
1061 task_entries = entry.job.get_group_entries(entry)
1062 for task_entry in task_entries:
1063 if (task_entry.status != models.HostQueueEntry.Status.PARSING
1064 and self.host_has_agent(task_entry.host)):
1065 agent = self._host_agents.get(task_entry.host.id)[0]
1066 raise SchedulerError('Attempted to schedule on host that '
1067 'already has agent: %s (previous '
1068 'agent task: %s)'
1069 % (task_entry, agent.task))
1070
1071 if entry.status in (models.HostQueueEntry.Status.STARTING,
1072 models.HostQueueEntry.Status.RUNNING):
1073 params = entry.job.get_autoserv_params(task_entries)
1074 agent_task = QueueTask(job=entry.job,
1075 queue_entries=task_entries, cmd=params)
1076 elif entry.status == models.HostQueueEntry.Status.GATHERING:
1077 agent_task = GatherLogsTask(
1078 job=entry.job, queue_entries=task_entries)
1079 elif entry.status == models.HostQueueEntry.Status.PARSING:
1080 agent_task = FinalReparseTask(queue_entries=task_entries)
1081 else:
1082 raise SchedulerError('_schedule_running_host_queue_entries got '
1083 'entry with invalid status %s: %s'
1084 % (entry.status, entry))
1085
1086 self.add_agent(Agent(agent_task, num_processes=len(task_entries)))
1087
1088
1089 def _schedule_delay_tasks(self):
showardd2014822009-10-12 20:26:58 +00001090 for entry in HostQueueEntry.fetch(where='status = "%s"' %
1091 models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +00001092 task = entry.job.schedule_delayed_callback_task(entry)
1093 if task:
1094 self.add_agent(Agent(task, num_processes=0))
1095
1096
showardb95b1bd2008-08-15 18:11:04 +00001097 def _run_queue_entry(self, queue_entry, host):
showard8cc058f2009-09-08 16:26:33 +00001098 queue_entry.schedule_pre_job_tasks(assigned_host=host)
mblighd5c95802008-03-05 00:33:46 +00001099
1100
jadmanski0afbb632008-06-06 21:10:57 +00001101 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +00001102 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001103 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001104 for agent in self.get_agents_for_entry(entry):
1105 agent.abort()
1106 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001107
1108
showard324bf812009-01-20 23:23:38 +00001109 def _can_start_agent(self, agent, num_started_this_cycle,
1110 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001111 # always allow zero-process agents to run
1112 if agent.num_processes == 0:
1113 return True
1114 # don't allow any nonzero-process agents to run after we've reached a
1115 # limit (this avoids starvation of many-process agents)
1116 if have_reached_limit:
1117 return False
1118 # total process throttling
showard324bf812009-01-20 23:23:38 +00001119 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +00001120 return False
1121 # if a single agent exceeds the per-cycle throttling, still allow it to
1122 # run when it's the first agent in the cycle
1123 if num_started_this_cycle == 0:
1124 return True
1125 # per-cycle throttling
1126 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +00001127 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001128 return False
1129 return True
1130
1131
jadmanski0afbb632008-06-06 21:10:57 +00001132 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001133 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001134 have_reached_limit = False
1135 # iterate over copy, so we can remove agents during iteration
1136 for agent in list(self._agents):
showard8cc058f2009-09-08 16:26:33 +00001137 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001138 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001139 have_reached_limit):
1140 have_reached_limit = True
1141 continue
showard4c5374f2008-09-04 17:02:56 +00001142 num_started_this_cycle += agent.num_processes
1143 agent.tick()
showard8cc058f2009-09-08 16:26:33 +00001144 if agent.is_done():
1145 logging.info("agent finished")
1146 self.remove_agent(agent)
showarda9435c02009-05-13 21:28:17 +00001147 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001148 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001149
1150
showard29f7cd22009-04-29 21:16:24 +00001151 def _process_recurring_runs(self):
1152 recurring_runs = models.RecurringRun.objects.filter(
1153 start_date__lte=datetime.datetime.now())
1154 for rrun in recurring_runs:
1155 # Create job from template
1156 job = rrun.job
1157 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001158 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001159
1160 host_objects = info['hosts']
1161 one_time_hosts = info['one_time_hosts']
1162 metahost_objects = info['meta_hosts']
1163 dependencies = info['dependencies']
1164 atomic_group = info['atomic_group']
1165
1166 for host in one_time_hosts or []:
1167 this_host = models.Host.create_one_time_host(host.hostname)
1168 host_objects.append(this_host)
1169
1170 try:
1171 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001172 options=options,
showard29f7cd22009-04-29 21:16:24 +00001173 host_objects=host_objects,
1174 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001175 atomic_group=atomic_group)
1176
1177 except Exception, ex:
1178 logging.exception(ex)
1179 #TODO send email
1180
1181 if rrun.loop_count == 1:
1182 rrun.delete()
1183 else:
1184 if rrun.loop_count != 0: # if not infinite loop
1185 # calculate new start_date
1186 difference = datetime.timedelta(seconds=rrun.loop_period)
1187 rrun.start_date = rrun.start_date + difference
1188 rrun.loop_count -= 1
1189 rrun.save()
1190
1191
showard170873e2009-01-07 00:22:26 +00001192class PidfileRunMonitor(object):
1193 """
1194 Client must call either run() to start a new process or
1195 attach_to_existing_process().
1196 """
mbligh36768f02008-02-22 18:28:33 +00001197
showard170873e2009-01-07 00:22:26 +00001198 class _PidfileException(Exception):
1199 """
1200 Raised when there's some unexpected behavior with the pid file, but only
1201 used internally (never allowed to escape this class).
1202 """
mbligh36768f02008-02-22 18:28:33 +00001203
1204
showard170873e2009-01-07 00:22:26 +00001205 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001206 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001207 self._start_time = None
1208 self.pidfile_id = None
1209 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001210
1211
showard170873e2009-01-07 00:22:26 +00001212 def _add_nice_command(self, command, nice_level):
1213 if not nice_level:
1214 return command
1215 return ['nice', '-n', str(nice_level)] + command
1216
1217
1218 def _set_start_time(self):
1219 self._start_time = time.time()
1220
1221
1222 def run(self, command, working_directory, nice_level=None, log_file=None,
1223 pidfile_name=None, paired_with_pidfile=None):
1224 assert command is not None
1225 if nice_level is not None:
1226 command = ['nice', '-n', str(nice_level)] + command
1227 self._set_start_time()
1228 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001229 command, working_directory, pidfile_name=pidfile_name,
1230 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001231
1232
showarded2afea2009-07-07 20:54:07 +00001233 def attach_to_existing_process(self, execution_path,
showardd3dc1992009-04-22 21:01:40 +00001234 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001235 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001236 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001237 execution_path, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001238 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001239
1240
jadmanski0afbb632008-06-06 21:10:57 +00001241 def kill(self):
showard170873e2009-01-07 00:22:26 +00001242 if self.has_process():
1243 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001244
mbligh36768f02008-02-22 18:28:33 +00001245
showard170873e2009-01-07 00:22:26 +00001246 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001247 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001248 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001249
1250
showard170873e2009-01-07 00:22:26 +00001251 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001252 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001253 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001254 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001255
1256
showard170873e2009-01-07 00:22:26 +00001257 def _read_pidfile(self, use_second_read=False):
1258 assert self.pidfile_id is not None, (
1259 'You must call run() or attach_to_existing_process()')
1260 contents = _drone_manager.get_pidfile_contents(
1261 self.pidfile_id, use_second_read=use_second_read)
1262 if contents.is_invalid():
1263 self._state = drone_manager.PidfileContents()
1264 raise self._PidfileException(contents)
1265 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001266
1267
showard21baa452008-10-21 00:08:39 +00001268 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001269 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1270 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001271 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001272 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001273
1274
1275 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001276 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001277 return
mblighbb421852008-03-11 22:36:16 +00001278
showard21baa452008-10-21 00:08:39 +00001279 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001280
showard170873e2009-01-07 00:22:26 +00001281 if self._state.process is None:
1282 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001283 return
mbligh90a549d2008-03-25 23:52:34 +00001284
showard21baa452008-10-21 00:08:39 +00001285 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001286 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001287 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001288 return
mbligh90a549d2008-03-25 23:52:34 +00001289
showard170873e2009-01-07 00:22:26 +00001290 # pid but no running process - maybe process *just* exited
1291 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001292 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001293 # autoserv exited without writing an exit code
1294 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001295 self._handle_pidfile_error(
1296 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001297
showard21baa452008-10-21 00:08:39 +00001298
1299 def _get_pidfile_info(self):
1300 """\
1301 After completion, self._state will contain:
1302 pid=None, exit_status=None if autoserv has not yet run
1303 pid!=None, exit_status=None if autoserv is running
1304 pid!=None, exit_status!=None if autoserv has completed
1305 """
1306 try:
1307 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001308 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001309 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001310
1311
showard170873e2009-01-07 00:22:26 +00001312 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001313 """\
1314 Called when no pidfile is found or no pid is in the pidfile.
1315 """
showard170873e2009-01-07 00:22:26 +00001316 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001317 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001318 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001319 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001320 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001321
1322
showard35162b02009-03-03 02:17:30 +00001323 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001324 """\
1325 Called when autoserv has exited without writing an exit status,
1326 or we've timed out waiting for autoserv to write a pid to the
1327 pidfile. In either case, we just return failure and the caller
1328 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001329
showard170873e2009-01-07 00:22:26 +00001330 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001331 """
1332 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001333 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001334 self._state.exit_status = 1
1335 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001336
1337
jadmanski0afbb632008-06-06 21:10:57 +00001338 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001339 self._get_pidfile_info()
1340 return self._state.exit_status
1341
1342
1343 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001344 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001345 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001346 if self._state.num_tests_failed is None:
1347 return -1
showard21baa452008-10-21 00:08:39 +00001348 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001349
1350
showardcdaeae82009-08-31 18:32:48 +00001351 def try_copy_results_on_drone(self, **kwargs):
1352 if self.has_process():
1353 # copy results logs into the normal place for job results
1354 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1355
1356
1357 def try_copy_to_results_repository(self, source, **kwargs):
1358 if self.has_process():
1359 _drone_manager.copy_to_results_repository(self.get_process(),
1360 source, **kwargs)
1361
1362
mbligh36768f02008-02-22 18:28:33 +00001363class Agent(object):
showard77182562009-06-10 00:16:05 +00001364 """
showard8cc058f2009-09-08 16:26:33 +00001365 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001366
1367 The following methods are required on all task objects:
1368 poll() - Called periodically to let the task check its status and
1369 update its internal state. If the task succeeded.
1370 is_done() - Returns True if the task is finished.
1371 abort() - Called when an abort has been requested. The task must
1372 set its aborted attribute to True if it actually aborted.
1373
1374 The following attributes are required on all task objects:
1375 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001376 success - bool, True if this task succeeded.
1377 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1378 host_ids - A sequence of Host ids this task represents.
1379
1380 The following attribute is written to all task objects:
1381 agent - A reference to the Agent instance that the task has been
1382 added to.
1383 """
1384
1385
showard8cc058f2009-09-08 16:26:33 +00001386 def __init__(self, task, num_processes=1):
showard77182562009-06-10 00:16:05 +00001387 """
showard8cc058f2009-09-08 16:26:33 +00001388 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001389 @param num_processes: The number of subprocesses the Agent represents.
1390 This is used by the Dispatcher for managing the load on the
1391 system. Defaults to 1.
1392 """
showard8cc058f2009-09-08 16:26:33 +00001393 self.task = task
1394 task.agent = self
1395
showard77182562009-06-10 00:16:05 +00001396 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001397 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001398 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001399
showard8cc058f2009-09-08 16:26:33 +00001400 self.queue_entry_ids = task.queue_entry_ids
1401 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001402
showard8cc058f2009-09-08 16:26:33 +00001403 self.started = False
mbligh36768f02008-02-22 18:28:33 +00001404
1405
jadmanski0afbb632008-06-06 21:10:57 +00001406 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001407 self.started = True
1408 if self.task:
1409 self.task.poll()
1410 if self.task.is_done():
1411 self.task = None
showardec113162008-05-08 00:52:49 +00001412
1413
jadmanski0afbb632008-06-06 21:10:57 +00001414 def is_done(self):
showard8cc058f2009-09-08 16:26:33 +00001415 return self.task is None
mbligh36768f02008-02-22 18:28:33 +00001416
1417
showardd3dc1992009-04-22 21:01:40 +00001418 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001419 if self.task:
1420 self.task.abort()
1421 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001422 # tasks can choose to ignore aborts
showard8cc058f2009-09-08 16:26:33 +00001423 self.task = None
showard20f9bdd2009-04-29 19:48:33 +00001424
showardd3dc1992009-04-22 21:01:40 +00001425
showard77182562009-06-10 00:16:05 +00001426class DelayedCallTask(object):
1427 """
1428 A task object like AgentTask for an Agent to run that waits for the
1429 specified amount of time to have elapsed before calling the supplied
1430 callback once and finishing. If the callback returns anything, it is
1431 assumed to be a new Agent instance and will be added to the dispatcher.
1432
1433 @attribute end_time: The absolute posix time after which this task will
1434 call its callback when it is polled and be finished.
1435
1436 Also has all attributes required by the Agent class.
1437 """
1438 def __init__(self, delay_seconds, callback, now_func=None):
1439 """
1440 @param delay_seconds: The delay in seconds from now that this task
1441 will call the supplied callback and be done.
1442 @param callback: A callable to be called by this task once after at
1443 least delay_seconds time has elapsed. It must return None
1444 or a new Agent instance.
1445 @param now_func: A time.time like function. Default: time.time.
1446 Used for testing.
1447 """
1448 assert delay_seconds > 0
1449 assert callable(callback)
1450 if not now_func:
1451 now_func = time.time
1452 self._now_func = now_func
1453 self._callback = callback
1454
1455 self.end_time = self._now_func() + delay_seconds
1456
1457 # These attributes are required by Agent.
1458 self.aborted = False
showard77182562009-06-10 00:16:05 +00001459 self.host_ids = ()
1460 self.success = False
1461 self.queue_entry_ids = ()
1462 # This is filled in by Agent.add_task().
1463 self.agent = None
1464
1465
1466 def poll(self):
showard8cc058f2009-09-08 16:26:33 +00001467 if not self.is_done() and self._now_func() >= self.end_time:
1468 self._callback()
showard77182562009-06-10 00:16:05 +00001469 self.success = True
1470
1471
1472 def is_done(self):
showard8cc058f2009-09-08 16:26:33 +00001473 return self.success or self.aborted
showard77182562009-06-10 00:16:05 +00001474
1475
1476 def abort(self):
1477 self.aborted = True
showard77182562009-06-10 00:16:05 +00001478
1479
mbligh36768f02008-02-22 18:28:33 +00001480class AgentTask(object):
showard8cc058f2009-09-08 16:26:33 +00001481 def __init__(self, cmd=None, working_directory=None,
showarded2afea2009-07-07 20:54:07 +00001482 pidfile_name=None, paired_with_pidfile=None,
1483 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001484 self.done = False
jadmanski0afbb632008-06-06 21:10:57 +00001485 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001486 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001487 self.agent = None
showarded2afea2009-07-07 20:54:07 +00001488 self.monitor = recover_run_monitor
1489 self.started = bool(recover_run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +00001490 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001491 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001492 self.queue_entry_ids = []
1493 self.host_ids = []
1494 self.log_file = None
1495
1496
1497 def _set_ids(self, host=None, queue_entries=None):
1498 if queue_entries and queue_entries != [None]:
1499 self.host_ids = [entry.host.id for entry in queue_entries]
1500 self.queue_entry_ids = [entry.id for entry in queue_entries]
1501 else:
1502 assert host
1503 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001504
1505
jadmanski0afbb632008-06-06 21:10:57 +00001506 def poll(self):
showard08a36412009-05-05 01:01:13 +00001507 if not self.started:
1508 self.start()
1509 self.tick()
1510
1511
1512 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001513 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001514 exit_code = self.monitor.exit_code()
1515 if exit_code is None:
1516 return
1517 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001518 else:
1519 success = False
mbligh36768f02008-02-22 18:28:33 +00001520
jadmanski0afbb632008-06-06 21:10:57 +00001521 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001522
1523
jadmanski0afbb632008-06-06 21:10:57 +00001524 def is_done(self):
1525 return self.done
mbligh36768f02008-02-22 18:28:33 +00001526
1527
jadmanski0afbb632008-06-06 21:10:57 +00001528 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001529 if self.done:
1530 return
jadmanski0afbb632008-06-06 21:10:57 +00001531 self.done = True
1532 self.success = success
1533 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001534
1535
jadmanski0afbb632008-06-06 21:10:57 +00001536 def prolog(self):
showarded2afea2009-07-07 20:54:07 +00001537 assert not self.monitor
mblighd64e5702008-04-04 21:39:28 +00001538
mbligh36768f02008-02-22 18:28:33 +00001539
jadmanski0afbb632008-06-06 21:10:57 +00001540 def cleanup(self):
showardcdaeae82009-08-31 18:32:48 +00001541 if self.monitor and self.log_file:
1542 self.monitor.try_copy_to_results_repository(self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001543
1544
jadmanski0afbb632008-06-06 21:10:57 +00001545 def epilog(self):
1546 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001547
1548
jadmanski0afbb632008-06-06 21:10:57 +00001549 def start(self):
1550 assert self.agent
1551
1552 if not self.started:
1553 self.prolog()
1554 self.run()
1555
1556 self.started = True
1557
1558
1559 def abort(self):
1560 if self.monitor:
1561 self.monitor.kill()
1562 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001563 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001564 self.cleanup()
1565
1566
showarded2afea2009-07-07 20:54:07 +00001567 def _get_consistent_execution_path(self, execution_entries):
1568 first_execution_path = execution_entries[0].execution_path()
1569 for execution_entry in execution_entries[1:]:
1570 assert execution_entry.execution_path() == first_execution_path, (
1571 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1572 execution_entry,
1573 first_execution_path,
1574 execution_entries[0]))
1575 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001576
1577
showarded2afea2009-07-07 20:54:07 +00001578 def _copy_results(self, execution_entries, use_monitor=None):
1579 """
1580 @param execution_entries: list of objects with execution_path() method
1581 """
showard6d1c1432009-08-20 23:30:39 +00001582 if use_monitor is not None and not use_monitor.has_process():
1583 return
1584
showarded2afea2009-07-07 20:54:07 +00001585 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001586 if use_monitor is None:
1587 assert self.monitor
1588 use_monitor = self.monitor
1589 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001590 execution_path = self._get_consistent_execution_path(execution_entries)
1591 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001592 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001593
showarda1e74b32009-05-12 17:32:04 +00001594
1595 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001596 for queue_entry in queue_entries:
1597 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001598
1599
showarda1e74b32009-05-12 17:32:04 +00001600 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1601 self._copy_results(queue_entries, use_monitor)
1602 self._parse_results(queue_entries)
1603
1604
showardd3dc1992009-04-22 21:01:40 +00001605 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
showarded2afea2009-07-07 20:54:07 +00001606 assert not self.monitor
jadmanski0afbb632008-06-06 21:10:57 +00001607 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001608 self.monitor = PidfileRunMonitor()
1609 self.monitor.run(self.cmd, self._working_directory,
1610 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001611 log_file=self.log_file,
1612 pidfile_name=pidfile_name,
1613 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001614
1615
showardd9205182009-04-27 20:09:55 +00001616class TaskWithJobKeyvals(object):
1617 """AgentTask mixin providing functionality to help with job keyval files."""
1618 _KEYVAL_FILE = 'keyval'
1619 def _format_keyval(self, key, value):
1620 return '%s=%s' % (key, value)
1621
1622
1623 def _keyval_path(self):
1624 """Subclasses must override this"""
1625 raise NotImplemented
1626
1627
1628 def _write_keyval_after_job(self, field, value):
1629 assert self.monitor
1630 if not self.monitor.has_process():
1631 return
1632 _drone_manager.write_lines_to_file(
1633 self._keyval_path(), [self._format_keyval(field, value)],
1634 paired_with_process=self.monitor.get_process())
1635
1636
1637 def _job_queued_keyval(self, job):
1638 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1639
1640
1641 def _write_job_finished(self):
1642 self._write_keyval_after_job("job_finished", int(time.time()))
1643
1644
showarddb502762009-09-09 15:31:20 +00001645 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1646 keyval_contents = '\n'.join(self._format_keyval(key, value)
1647 for key, value in keyval_dict.iteritems())
1648 # always end with a newline to allow additional keyvals to be written
1649 keyval_contents += '\n'
1650 _drone_manager.attach_file_to_execution(self._working_directory,
1651 keyval_contents,
1652 file_path=keyval_path)
1653
1654
1655 def _write_keyvals_before_job(self, keyval_dict):
1656 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1657
1658
1659 def _write_host_keyvals(self, host):
1660 keyval_path = os.path.join(self._working_directory, 'host_keyvals',
1661 host.hostname)
1662 platform, all_labels = host.platform_and_labels()
1663 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1664 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1665
1666
showard8cc058f2009-09-08 16:26:33 +00001667class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001668 """
1669 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1670 """
1671
1672 TASK_TYPE = None
1673 host = None
1674 queue_entry = None
1675
1676 def __init__(self, task, extra_command_args, **kwargs):
showarded2afea2009-07-07 20:54:07 +00001677 assert (self.TASK_TYPE is not None,
1678 'self.TASK_TYPE must be overridden')
showard8cc058f2009-09-08 16:26:33 +00001679
1680 self.host = Host(id=task.host.id)
1681 self.queue_entry = None
1682 if task.queue_entry:
1683 self.queue_entry = HostQueueEntry(id=task.queue_entry.id)
1684
showarded2afea2009-07-07 20:54:07 +00001685 self.task = task
showarddb502762009-09-09 15:31:20 +00001686 kwargs['working_directory'] = task.execution_path()
showarded2afea2009-07-07 20:54:07 +00001687 self._extra_command_args = extra_command_args
1688 super(SpecialAgentTask, self).__init__(**kwargs)
1689
1690
showard8cc058f2009-09-08 16:26:33 +00001691 def _keyval_path(self):
1692 return os.path.join(self._working_directory, self._KEYVAL_FILE)
1693
1694
showarded2afea2009-07-07 20:54:07 +00001695 def prolog(self):
1696 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001697 self.cmd = _autoserv_command_line(self.host.hostname,
1698 self._extra_command_args,
1699 queue_entry=self.queue_entry)
1700 self._working_directory = self.task.execution_path()
1701 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001702 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001703
1704
showardde634ee2009-01-30 01:44:24 +00001705 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001706 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001707
showard2fe3f1d2009-07-06 20:19:11 +00001708 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001709 return # don't fail metahost entries, they'll be reassigned
1710
showard2fe3f1d2009-07-06 20:19:11 +00001711 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001712 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001713 return # entry has been aborted
1714
showard2fe3f1d2009-07-06 20:19:11 +00001715 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001716 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001717 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001718 self._write_keyval_after_job(queued_key, queued_time)
1719 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001720
showard8cc058f2009-09-08 16:26:33 +00001721 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001722 self.monitor.try_copy_results_on_drone(
1723 source_path=self._working_directory + '/',
1724 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001725
showard2fe3f1d2009-07-06 20:19:11 +00001726 self._copy_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001727 self.queue_entry.handle_host_failure()
showard2fe3f1d2009-07-06 20:19:11 +00001728 if self.queue_entry.job.parse_failed_repair:
1729 self._parse_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001730
1731 pidfile_id = _drone_manager.get_pidfile_id_from(
1732 self.queue_entry.execution_path(),
1733 pidfile_name=_AUTOSERV_PID_FILE)
1734 _drone_manager.register_pidfile(pidfile_id)
1735
1736
1737 def cleanup(self):
1738 super(SpecialAgentTask, self).cleanup()
1739 self.task.finish()
showardf85a0b72009-10-07 20:48:45 +00001740 if self.monitor:
1741 if self.monitor.has_process():
1742 self._copy_results([self.task])
1743 if self.monitor.pidfile_id is not None:
1744 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001745
1746
1747class RepairTask(SpecialAgentTask):
1748 TASK_TYPE = models.SpecialTask.Task.REPAIR
1749
1750
1751 def __init__(self, task, recover_run_monitor=None):
1752 """\
1753 queue_entry: queue entry to mark failed if this repair fails.
1754 """
1755 protection = host_protections.Protection.get_string(
1756 task.host.protection)
1757 # normalize the protection name
1758 protection = host_protections.Protection.get_attr_name(protection)
1759
1760 super(RepairTask, self).__init__(
1761 task, ['-R', '--host-protection', protection],
1762 recover_run_monitor=recover_run_monitor)
1763
1764 # *don't* include the queue entry in IDs -- if the queue entry is
1765 # aborted, we want to leave the repair task running
1766 self._set_ids(host=self.host)
1767
1768
1769 def prolog(self):
1770 super(RepairTask, self).prolog()
1771 logging.info("repair_task starting")
1772 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001773
1774
jadmanski0afbb632008-06-06 21:10:57 +00001775 def epilog(self):
1776 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001777
jadmanski0afbb632008-06-06 21:10:57 +00001778 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001779 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001780 else:
showard8cc058f2009-09-08 16:26:33 +00001781 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001782 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001783 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001784
1785
showarded2afea2009-07-07 20:54:07 +00001786class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001787 def _copy_to_results_repository(self):
1788 if not self.queue_entry or self.queue_entry.meta_host:
1789 return
1790
1791 self.queue_entry.set_execution_subdir()
1792 log_name = os.path.basename(self.task.execution_path())
1793 source = os.path.join(self.task.execution_path(), 'debug',
1794 'autoserv.DEBUG')
1795 destination = os.path.join(
1796 self.queue_entry.execution_path(), log_name)
1797
1798 self.monitor.try_copy_to_results_repository(
1799 source, destination_path=destination)
1800
1801
showard170873e2009-01-07 00:22:26 +00001802 def epilog(self):
1803 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001804
showard775300b2009-09-09 15:30:50 +00001805 if self.success:
1806 return
showard8fe93b52008-11-18 17:53:22 +00001807
showard775300b2009-09-09 15:30:50 +00001808 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001809
showard775300b2009-09-09 15:30:50 +00001810 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
1811 return
1812
1813 if self.queue_entry:
1814 self.queue_entry.requeue()
1815
1816 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001817 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001818 queue_entry__id=self.queue_entry.id):
1819 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1820 self._fail_queue_entry()
1821 return
1822
1823 queue_entry = models.HostQueueEntry(id=self.queue_entry.id)
1824 else:
1825 queue_entry = None
1826
1827 models.SpecialTask.objects.create(
1828 host=models.Host(id=self.host.id),
1829 task=models.SpecialTask.Task.REPAIR,
1830 queue_entry=queue_entry)
showard58721a82009-08-20 23:32:40 +00001831
showard8fe93b52008-11-18 17:53:22 +00001832
1833class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001834 TASK_TYPE = models.SpecialTask.Task.VERIFY
1835
1836
showard8cc058f2009-09-08 16:26:33 +00001837 def __init__(self, task, recover_run_monitor=None):
showarded2afea2009-07-07 20:54:07 +00001838 super(VerifyTask, self).__init__(
showard8cc058f2009-09-08 16:26:33 +00001839 task, ['-v'], recover_run_monitor=recover_run_monitor)
mblighe2586682008-02-29 22:45:46 +00001840
showard8cc058f2009-09-08 16:26:33 +00001841 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001842
1843
jadmanski0afbb632008-06-06 21:10:57 +00001844 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001845 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001846
showardb18134f2009-03-20 20:52:18 +00001847 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001848 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001849 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1850 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001851
showarded2afea2009-07-07 20:54:07 +00001852 # Delete any other queued verifies for this host. One verify will do
1853 # and there's no need to keep records of other requests.
1854 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001855 host__id=self.host.id,
1856 task=models.SpecialTask.Task.VERIFY,
showarded2afea2009-07-07 20:54:07 +00001857 is_active=False, is_complete=False)
1858 queued_verifies = queued_verifies.exclude(id=self.task.id)
1859 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001860
mbligh36768f02008-02-22 18:28:33 +00001861
jadmanski0afbb632008-06-06 21:10:57 +00001862 def epilog(self):
1863 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001864 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001865 if self.queue_entry:
1866 self.queue_entry.on_pending()
1867 else:
1868 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001869
1870
showardb5626452009-06-30 01:57:28 +00001871class CleanupHostsMixin(object):
1872 def _reboot_hosts(self, job, queue_entries, final_success,
1873 num_tests_failed):
1874 reboot_after = job.reboot_after
1875 do_reboot = (
1876 # always reboot after aborted jobs
1877 self._final_status == models.HostQueueEntry.Status.ABORTED
1878 or reboot_after == models.RebootAfter.ALWAYS
1879 or (reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED
1880 and final_success and num_tests_failed == 0))
1881
1882 for queue_entry in queue_entries:
1883 if do_reboot:
1884 # don't pass the queue entry to the CleanupTask. if the cleanup
1885 # fails, the job doesn't care -- it's over.
showard8cc058f2009-09-08 16:26:33 +00001886 models.SpecialTask.objects.create(
1887 host=models.Host(id=queue_entry.host.id),
1888 task=models.SpecialTask.Task.CLEANUP)
showardb5626452009-06-30 01:57:28 +00001889 else:
showard8cc058f2009-09-08 16:26:33 +00001890 queue_entry.host.set_status(models.Host.Status.READY)
showardb5626452009-06-30 01:57:28 +00001891
1892
1893class QueueTask(AgentTask, TaskWithJobKeyvals, CleanupHostsMixin):
showard8cc058f2009-09-08 16:26:33 +00001894 def __init__(self, job, queue_entries, cmd=None, recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001895 self.job = job
1896 self.queue_entries = queue_entries
showard8cc058f2009-09-08 16:26:33 +00001897 self.group_name = queue_entries[0].get_group_name()
showarded2afea2009-07-07 20:54:07 +00001898 super(QueueTask, self).__init__(
1899 cmd, self._execution_path(),
1900 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00001901 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001902
1903
showard73ec0442009-02-07 02:05:20 +00001904 def _keyval_path(self):
showarded2afea2009-07-07 20:54:07 +00001905 return os.path.join(self._execution_path(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001906
1907
showarded2afea2009-07-07 20:54:07 +00001908 def _execution_path(self):
1909 return self.queue_entries[0].execution_path()
mblighbb421852008-03-11 22:36:16 +00001910
1911
jadmanski0afbb632008-06-06 21:10:57 +00001912 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00001913 for entry in self.queue_entries:
1914 if entry.status not in (models.HostQueueEntry.Status.STARTING,
1915 models.HostQueueEntry.Status.RUNNING):
1916 raise SchedulerError('Queue task attempting to start '
1917 'entry with invalid status %s: %s'
1918 % (entry.status, entry))
1919 if entry.host.status not in (models.Host.Status.PENDING,
1920 models.Host.Status.RUNNING):
1921 raise SchedulerError('Queue task attempting to start on queue '
1922 'entry with invalid host status %s: %s'
1923 % (entry.host.status, entry))
1924
showardd9205182009-04-27 20:09:55 +00001925 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001926 keyval_dict = {queued_key: queued_time}
1927 if self.group_name:
1928 keyval_dict['host_group_name'] = self.group_name
1929 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001930 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001931 self._write_host_keyvals(queue_entry.host)
showard8cc058f2009-09-08 16:26:33 +00001932 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showard12f3e322009-05-13 21:27:42 +00001933 queue_entry.update_field('started_on', datetime.datetime.now())
showard8cc058f2009-09-08 16:26:33 +00001934 queue_entry.host.set_status(models.Host.Status.RUNNING)
showard21baa452008-10-21 00:08:39 +00001935 queue_entry.host.update_field('dirty', 1)
showardc6a56872009-07-28 20:11:58 +00001936 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
1937 # TODO(gps): Remove this if nothing needs it anymore.
1938 # A potential user is: tko/parser
jadmanski0afbb632008-06-06 21:10:57 +00001939 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001940
1941
showard35162b02009-03-03 02:17:30 +00001942 def _write_lost_process_error_file(self):
showarded2afea2009-07-07 20:54:07 +00001943 error_file_path = os.path.join(self._execution_path(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001944 _drone_manager.write_lines_to_file(error_file_path,
1945 [_LOST_PROCESS_ERROR])
1946
1947
showardd3dc1992009-04-22 21:01:40 +00001948 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001949 if not self.monitor:
1950 return
1951
showardd9205182009-04-27 20:09:55 +00001952 self._write_job_finished()
1953
showard35162b02009-03-03 02:17:30 +00001954 if self.monitor.lost_process:
1955 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001956
showard8cc058f2009-09-08 16:26:33 +00001957 for queue_entry in self.queue_entries:
1958 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001959
1960
showardcbd74612008-11-19 21:42:02 +00001961 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001962 _drone_manager.write_lines_to_file(
showarded2afea2009-07-07 20:54:07 +00001963 os.path.join(self._execution_path(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001964 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001965 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001966
1967
jadmanskif7fa2cc2008-10-01 14:13:23 +00001968 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001969 if not self.monitor or not self.monitor.has_process():
1970 return
1971
jadmanskif7fa2cc2008-10-01 14:13:23 +00001972 # build up sets of all the aborted_by and aborted_on values
1973 aborted_by, aborted_on = set(), set()
1974 for queue_entry in self.queue_entries:
1975 if queue_entry.aborted_by:
1976 aborted_by.add(queue_entry.aborted_by)
1977 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1978 aborted_on.add(t)
1979
1980 # extract some actual, unique aborted by value and write it out
1981 assert len(aborted_by) <= 1
1982 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001983 aborted_by_value = aborted_by.pop()
1984 aborted_on_value = max(aborted_on)
1985 else:
1986 aborted_by_value = 'autotest_system'
1987 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001988
showarda0382352009-02-11 23:36:43 +00001989 self._write_keyval_after_job("aborted_by", aborted_by_value)
1990 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001991
showardcbd74612008-11-19 21:42:02 +00001992 aborted_on_string = str(datetime.datetime.fromtimestamp(
1993 aborted_on_value))
1994 self._write_status_comment('Job aborted by %s on %s' %
1995 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001996
1997
jadmanski0afbb632008-06-06 21:10:57 +00001998 def abort(self):
1999 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00002000 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00002001 self._finish_task()
showard21baa452008-10-21 00:08:39 +00002002
2003
jadmanski0afbb632008-06-06 21:10:57 +00002004 def epilog(self):
2005 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002006 self._finish_task()
2007 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00002008
2009
showardd3dc1992009-04-22 21:01:40 +00002010class PostJobTask(AgentTask):
2011 def __init__(self, queue_entries, pidfile_name, logfile_name,
showarded2afea2009-07-07 20:54:07 +00002012 recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00002013 self._queue_entries = queue_entries
2014 self._pidfile_name = pidfile_name
showardd3dc1992009-04-22 21:01:40 +00002015
showarded2afea2009-07-07 20:54:07 +00002016 self._execution_path = self._get_consistent_execution_path(
2017 queue_entries)
2018 self._results_dir = _drone_manager.absolute_path(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00002019 self._autoserv_monitor = PidfileRunMonitor()
showarded2afea2009-07-07 20:54:07 +00002020 self._autoserv_monitor.attach_to_existing_process(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00002021 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
2022
2023 if _testing_mode:
2024 command = 'true'
2025 else:
2026 command = self._generate_command(self._results_dir)
2027
showarded2afea2009-07-07 20:54:07 +00002028 super(PostJobTask, self).__init__(
2029 cmd=command, working_directory=self._execution_path,
2030 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00002031
showarded2afea2009-07-07 20:54:07 +00002032 self.log_file = os.path.join(self._execution_path, logfile_name)
showardd3dc1992009-04-22 21:01:40 +00002033 self._final_status = self._determine_final_status()
2034
2035
2036 def _generate_command(self, results_dir):
2037 raise NotImplementedError('Subclasses must override this')
2038
2039
2040 def _job_was_aborted(self):
2041 was_aborted = None
2042 for queue_entry in self._queue_entries:
2043 queue_entry.update_from_database()
2044 if was_aborted is None: # first queue entry
2045 was_aborted = bool(queue_entry.aborted)
2046 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
2047 email_manager.manager.enqueue_notify_email(
2048 'Inconsistent abort state',
2049 'Queue entries have inconsistent abort state: ' +
2050 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
2051 # don't crash here, just assume true
2052 return True
2053 return was_aborted
2054
2055
2056 def _determine_final_status(self):
2057 if self._job_was_aborted():
2058 return models.HostQueueEntry.Status.ABORTED
2059
2060 # we'll use a PidfileRunMonitor to read the autoserv exit status
2061 if self._autoserv_monitor.exit_code() == 0:
2062 return models.HostQueueEntry.Status.COMPLETED
2063 return models.HostQueueEntry.Status.FAILED
2064
2065
2066 def run(self):
showard8cc058f2009-09-08 16:26:33 +00002067 # Make sure we actually have results to work with.
2068 # This should never happen in normal operation.
showard5add1c82009-05-26 19:27:46 +00002069 if not self._autoserv_monitor.has_process():
2070 email_manager.manager.enqueue_notify_email(
showard8cc058f2009-09-08 16:26:33 +00002071 'No results in post-job task',
2072 'No results in post-job task at %s' %
2073 self._autoserv_monitor.pidfile_id)
showard5add1c82009-05-26 19:27:46 +00002074 self.finished(False)
2075 return
2076
2077 super(PostJobTask, self).run(
2078 pidfile_name=self._pidfile_name,
2079 paired_with_pidfile=self._paired_with_pidfile)
showardd3dc1992009-04-22 21:01:40 +00002080
2081
2082 def _set_all_statuses(self, status):
2083 for queue_entry in self._queue_entries:
2084 queue_entry.set_status(status)
2085
2086
2087 def abort(self):
2088 # override AgentTask.abort() to avoid killing the process and ending
2089 # the task. post-job tasks continue when the job is aborted.
2090 pass
2091
2092
showardb5626452009-06-30 01:57:28 +00002093class GatherLogsTask(PostJobTask, CleanupHostsMixin):
showardd3dc1992009-04-22 21:01:40 +00002094 """
2095 Task responsible for
2096 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2097 * copying logs to the results repository
2098 * spawning CleanupTasks for hosts, if necessary
2099 * spawning a FinalReparseTask for the job
2100 """
showarded2afea2009-07-07 20:54:07 +00002101 def __init__(self, job, queue_entries, recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00002102 self._job = job
2103 super(GatherLogsTask, self).__init__(
2104 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
showarded2afea2009-07-07 20:54:07 +00002105 logfile_name='.collect_crashinfo.log',
2106 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00002107 self._set_ids(queue_entries=queue_entries)
2108
2109
2110 def _generate_command(self, results_dir):
2111 host_list = ','.join(queue_entry.host.hostname
2112 for queue_entry in self._queue_entries)
2113 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
2114 '-r', results_dir]
2115
2116
2117 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00002118 for queue_entry in self._queue_entries:
2119 if queue_entry.status != models.HostQueueEntry.Status.GATHERING:
2120 raise SchedulerError('Gather task attempting to start on '
2121 'non-gathering entry: %s' % queue_entry)
2122 if queue_entry.host.status != models.Host.Status.RUNNING:
2123 raise SchedulerError('Gather task attempting to start on queue '
showard0c5c18d2009-09-24 22:20:45 +00002124 'entry with non-running host status %s: %s'
2125 % (queue_entry.host.status, queue_entry))
showard8cc058f2009-09-08 16:26:33 +00002126
showardd3dc1992009-04-22 21:01:40 +00002127 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002128
2129
showardd3dc1992009-04-22 21:01:40 +00002130 def epilog(self):
2131 super(GatherLogsTask, self).epilog()
showardb5626452009-06-30 01:57:28 +00002132
showard6d1c1432009-08-20 23:30:39 +00002133 self._copy_and_parse_results(self._queue_entries,
2134 use_monitor=self._autoserv_monitor)
2135
2136 if self._autoserv_monitor.has_process():
2137 final_success = (self._final_status ==
2138 models.HostQueueEntry.Status.COMPLETED)
2139 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2140 else:
2141 final_success = False
2142 num_tests_failed = 0
2143
showardb5626452009-06-30 01:57:28 +00002144 self._reboot_hosts(self._job, self._queue_entries, final_success,
2145 num_tests_failed)
showardd3dc1992009-04-22 21:01:40 +00002146
2147
showard0bbfc212009-04-29 21:06:13 +00002148 def run(self):
showard597bfd32009-05-08 18:22:50 +00002149 autoserv_exit_code = self._autoserv_monitor.exit_code()
2150 # only run if Autoserv exited due to some signal. if we have no exit
2151 # code, assume something bad (and signal-like) happened.
2152 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002153 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002154 else:
2155 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002156
2157
showard8fe93b52008-11-18 17:53:22 +00002158class CleanupTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002159 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2160
2161
showard8cc058f2009-09-08 16:26:33 +00002162 def __init__(self, task, recover_run_monitor=None):
showarded2afea2009-07-07 20:54:07 +00002163 super(CleanupTask, self).__init__(
showard8cc058f2009-09-08 16:26:33 +00002164 task, ['--cleanup'], recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002165
showard8cc058f2009-09-08 16:26:33 +00002166 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mbligh16c722d2008-03-05 00:58:44 +00002167
mblighd5c95802008-03-05 00:33:46 +00002168
jadmanski0afbb632008-06-06 21:10:57 +00002169 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002170 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00002171 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard8cc058f2009-09-08 16:26:33 +00002172 self.host.set_status(models.Host.Status.CLEANING)
2173 if self.queue_entry:
2174 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2175
2176
showard775300b2009-09-09 15:30:50 +00002177 def _finish_epilog(self):
2178 if not self.queue_entry:
2179 return
2180
2181 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
2182 self.queue_entry.on_pending()
2183 elif self.success:
2184 if self.queue_entry.job.run_verify:
2185 entry = models.HostQueueEntry(id=self.queue_entry.id)
2186 models.SpecialTask.objects.create(
2187 host=models.Host(id=self.host.id),
2188 queue_entry=entry,
2189 task=models.SpecialTask.Task.VERIFY)
2190 else:
2191 self.queue_entry.on_pending()
mblighd5c95802008-03-05 00:33:46 +00002192
mblighd5c95802008-03-05 00:33:46 +00002193
showard21baa452008-10-21 00:08:39 +00002194 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00002195 super(CleanupTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002196
showard21baa452008-10-21 00:08:39 +00002197 if self.success:
2198 self.host.update_field('dirty', 0)
showard775300b2009-09-09 15:30:50 +00002199 self.host.set_status(models.Host.Status.READY)
showard21baa452008-10-21 00:08:39 +00002200
showard775300b2009-09-09 15:30:50 +00002201 self._finish_epilog()
showard8cc058f2009-09-08 16:26:33 +00002202
showard21baa452008-10-21 00:08:39 +00002203
showardd3dc1992009-04-22 21:01:40 +00002204class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00002205 _num_running_parses = 0
2206
showarded2afea2009-07-07 20:54:07 +00002207 def __init__(self, queue_entries, recover_run_monitor=None):
2208 super(FinalReparseTask, self).__init__(
2209 queue_entries, pidfile_name=_PARSER_PID_FILE,
2210 logfile_name='.parse.log',
2211 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002212 # don't use _set_ids, since we don't want to set the host_ids
2213 self.queue_entry_ids = [entry.id for entry in queue_entries]
showarded2afea2009-07-07 20:54:07 +00002214 self._parse_started = self.started
showard97aed502008-11-04 02:01:24 +00002215
showard97aed502008-11-04 02:01:24 +00002216
2217 @classmethod
2218 def _increment_running_parses(cls):
2219 cls._num_running_parses += 1
2220
2221
2222 @classmethod
2223 def _decrement_running_parses(cls):
2224 cls._num_running_parses -= 1
2225
2226
2227 @classmethod
2228 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002229 return (cls._num_running_parses <
2230 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002231
2232
2233 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00002234 for queue_entry in self._queue_entries:
2235 if queue_entry.status != models.HostQueueEntry.Status.PARSING:
2236 raise SchedulerError('Parse task attempting to start on '
2237 'non-parsing entry: %s' % queue_entry)
2238
showard97aed502008-11-04 02:01:24 +00002239 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002240
2241
2242 def epilog(self):
2243 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002244 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00002245
2246
showardd3dc1992009-04-22 21:01:40 +00002247 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00002248 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00002249 results_dir]
showard97aed502008-11-04 02:01:24 +00002250
2251
showard08a36412009-05-05 01:01:13 +00002252 def tick(self):
2253 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002254 # and we can, at which point we revert to default behavior
2255 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00002256 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002257 else:
2258 self._try_starting_parse()
2259
2260
2261 def run(self):
2262 # override run() to not actually run unless we can
2263 self._try_starting_parse()
2264
2265
2266 def _try_starting_parse(self):
2267 if not self._can_run_new_parse():
2268 return
showard170873e2009-01-07 00:22:26 +00002269
showard97aed502008-11-04 02:01:24 +00002270 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002271 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00002272
showard97aed502008-11-04 02:01:24 +00002273 self._increment_running_parses()
2274 self._parse_started = True
2275
2276
2277 def finished(self, success):
2278 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00002279 if self._parse_started:
2280 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002281
2282
showarda3c58572009-03-12 20:36:59 +00002283class DBError(Exception):
2284 """Raised by the DBObject constructor when its select fails."""
2285
2286
mbligh36768f02008-02-22 18:28:33 +00002287class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002288 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002289
2290 # Subclasses MUST override these:
2291 _table_name = ''
2292 _fields = ()
2293
showarda3c58572009-03-12 20:36:59 +00002294 # A mapping from (type, id) to the instance of the object for that
2295 # particular id. This prevents us from creating new Job() and Host()
2296 # instances for every HostQueueEntry object that we instantiate as
2297 # multiple HQEs often share the same Job.
2298 _instances_by_type_and_id = weakref.WeakValueDictionary()
2299 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002300
showarda3c58572009-03-12 20:36:59 +00002301
2302 def __new__(cls, id=None, **kwargs):
2303 """
2304 Look to see if we already have an instance for this particular type
2305 and id. If so, use it instead of creating a duplicate instance.
2306 """
2307 if id is not None:
2308 instance = cls._instances_by_type_and_id.get((cls, id))
2309 if instance:
2310 return instance
2311 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2312
2313
2314 def __init__(self, id=None, row=None, new_record=False, always_query=True):
showard8cc058f2009-09-08 16:26:33 +00002315 assert bool(id) or bool(row)
2316 if id is not None and row is not None:
2317 assert id == row[0]
showard6ae5ea92009-02-25 00:11:51 +00002318 assert self._table_name, '_table_name must be defined in your class'
2319 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002320 if not new_record:
2321 if self._initialized and not always_query:
2322 return # We've already been initialized.
2323 if id is None:
2324 id = row[0]
2325 # Tell future constructors to use us instead of re-querying while
2326 # this instance is still around.
2327 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002328
showard6ae5ea92009-02-25 00:11:51 +00002329 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002330
jadmanski0afbb632008-06-06 21:10:57 +00002331 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002332
jadmanski0afbb632008-06-06 21:10:57 +00002333 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002334 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002335
showarda3c58572009-03-12 20:36:59 +00002336 if self._initialized:
2337 differences = self._compare_fields_in_row(row)
2338 if differences:
showard7629f142009-03-27 21:02:02 +00002339 logging.warn(
2340 'initialized %s %s instance requery is updating: %s',
2341 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002342 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002343 self._initialized = True
2344
2345
2346 @classmethod
2347 def _clear_instance_cache(cls):
2348 """Used for testing, clear the internal instance cache."""
2349 cls._instances_by_type_and_id.clear()
2350
2351
showardccbd6c52009-03-21 00:10:21 +00002352 def _fetch_row_from_db(self, row_id):
2353 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2354 rows = _db.execute(sql, (row_id,))
2355 if not rows:
showard76e29d12009-04-15 21:53:10 +00002356 raise DBError("row not found (table=%s, row id=%s)"
2357 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002358 return rows[0]
2359
2360
showarda3c58572009-03-12 20:36:59 +00002361 def _assert_row_length(self, row):
2362 assert len(row) == len(self._fields), (
2363 "table = %s, row = %s/%d, fields = %s/%d" % (
2364 self.__table, row, len(row), self._fields, len(self._fields)))
2365
2366
2367 def _compare_fields_in_row(self, row):
2368 """
showarddae680a2009-10-12 20:26:43 +00002369 Given a row as returned by a SELECT query, compare it to our existing in
2370 memory fields. Fractional seconds are stripped from datetime values
2371 before comparison.
showarda3c58572009-03-12 20:36:59 +00002372
2373 @param row - A sequence of values corresponding to fields named in
2374 The class attribute _fields.
2375
2376 @returns A dictionary listing the differences keyed by field name
2377 containing tuples of (current_value, row_value).
2378 """
2379 self._assert_row_length(row)
2380 differences = {}
showarddae680a2009-10-12 20:26:43 +00002381 datetime_cmp_fmt = '%Y-%m-%d %H:%M:%S' # Leave off the microseconds.
showarda3c58572009-03-12 20:36:59 +00002382 for field, row_value in itertools.izip(self._fields, row):
2383 current_value = getattr(self, field)
showarddae680a2009-10-12 20:26:43 +00002384 if (isinstance(current_value, datetime.datetime)
2385 and isinstance(row_value, datetime.datetime)):
2386 current_value = current_value.strftime(datetime_cmp_fmt)
2387 row_value = row_value.strftime(datetime_cmp_fmt)
showarda3c58572009-03-12 20:36:59 +00002388 if current_value != row_value:
2389 differences[field] = (current_value, row_value)
2390 return differences
showard2bab8f42008-11-12 18:15:22 +00002391
2392
2393 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002394 """
2395 Update our field attributes using a single row returned by SELECT.
2396
2397 @param row - A sequence of values corresponding to fields named in
2398 the class fields list.
2399 """
2400 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002401
showard2bab8f42008-11-12 18:15:22 +00002402 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002403 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002404 setattr(self, field, value)
2405 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002406
showard2bab8f42008-11-12 18:15:22 +00002407 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002408
mblighe2586682008-02-29 22:45:46 +00002409
showardccbd6c52009-03-21 00:10:21 +00002410 def update_from_database(self):
2411 assert self.id is not None
2412 row = self._fetch_row_from_db(self.id)
2413 self._update_fields_from_row(row)
2414
2415
jadmanski0afbb632008-06-06 21:10:57 +00002416 def count(self, where, table = None):
2417 if not table:
2418 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002419
jadmanski0afbb632008-06-06 21:10:57 +00002420 rows = _db.execute("""
2421 SELECT count(*) FROM %s
2422 WHERE %s
2423 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002424
jadmanski0afbb632008-06-06 21:10:57 +00002425 assert len(rows) == 1
2426
2427 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002428
2429
showardd3dc1992009-04-22 21:01:40 +00002430 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002431 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002432
showard2bab8f42008-11-12 18:15:22 +00002433 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002434 return
mbligh36768f02008-02-22 18:28:33 +00002435
mblighf8c624d2008-07-03 16:58:45 +00002436 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002437 _db.execute(query, (value, self.id))
2438
showard2bab8f42008-11-12 18:15:22 +00002439 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002440
2441
jadmanski0afbb632008-06-06 21:10:57 +00002442 def save(self):
2443 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002444 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002445 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002446 values = []
2447 for key in keys:
2448 value = getattr(self, key)
2449 if value is None:
2450 values.append('NULL')
2451 else:
2452 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002453 values_str = ','.join(values)
2454 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2455 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002456 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002457 # Update our id to the one the database just assigned to us.
2458 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002459
2460
jadmanski0afbb632008-06-06 21:10:57 +00002461 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002462 self._instances_by_type_and_id.pop((type(self), id), None)
2463 self._initialized = False
2464 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002465 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2466 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002467
2468
showard63a34772008-08-18 19:32:50 +00002469 @staticmethod
2470 def _prefix_with(string, prefix):
2471 if string:
2472 string = prefix + string
2473 return string
2474
2475
jadmanski0afbb632008-06-06 21:10:57 +00002476 @classmethod
showard989f25d2008-10-01 11:38:11 +00002477 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002478 """
2479 Construct instances of our class based on the given database query.
2480
2481 @yields One class instance for each row fetched.
2482 """
showard63a34772008-08-18 19:32:50 +00002483 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2484 where = cls._prefix_with(where, 'WHERE ')
2485 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002486 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002487 'joins' : joins,
2488 'where' : where,
2489 'order_by' : order_by})
2490 rows = _db.execute(query, params)
showard8cc058f2009-09-08 16:26:33 +00002491 return [cls(id=row[0], row=row) for row in rows]
mblighe2586682008-02-29 22:45:46 +00002492
mbligh36768f02008-02-22 18:28:33 +00002493
2494class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002495 _table_name = 'ineligible_host_queues'
2496 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002497
2498
showard89f84db2009-03-12 20:39:13 +00002499class AtomicGroup(DBObject):
2500 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002501 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2502 'invalid')
showard89f84db2009-03-12 20:39:13 +00002503
2504
showard989f25d2008-10-01 11:38:11 +00002505class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002506 _table_name = 'labels'
2507 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002508 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002509
2510
showard6157c632009-07-06 20:19:31 +00002511 def __repr__(self):
2512 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
2513 self.name, self.id, self.atomic_group_id)
2514
2515
mbligh36768f02008-02-22 18:28:33 +00002516class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002517 _table_name = 'hosts'
2518 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2519 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2520
2521
jadmanski0afbb632008-06-06 21:10:57 +00002522 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002523 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002524 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002525
2526
showard170873e2009-01-07 00:22:26 +00002527 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002528 """
showard170873e2009-01-07 00:22:26 +00002529 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002530 """
2531 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002532 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002533 FROM labels
2534 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002535 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002536 ORDER BY labels.name
2537 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002538 platform = None
2539 all_labels = []
2540 for label_name, is_platform in rows:
2541 if is_platform:
2542 platform = label_name
2543 all_labels.append(label_name)
2544 return platform, all_labels
2545
2546
showard54c1ea92009-05-20 00:32:58 +00002547 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2548
2549
2550 @classmethod
2551 def cmp_for_sort(cls, a, b):
2552 """
2553 A comparison function for sorting Host objects by hostname.
2554
2555 This strips any trailing numeric digits, ignores leading 0s and
2556 compares hostnames by the leading name and the trailing digits as a
2557 number. If both hostnames do not match this pattern, they are simply
2558 compared as lower case strings.
2559
2560 Example of how hostnames will be sorted:
2561
2562 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2563
2564 This hopefully satisfy most people's hostname sorting needs regardless
2565 of their exact naming schemes. Nobody sane should have both a host10
2566 and host010 (but the algorithm works regardless).
2567 """
2568 lower_a = a.hostname.lower()
2569 lower_b = b.hostname.lower()
2570 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2571 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2572 if match_a and match_b:
2573 name_a, number_a_str = match_a.groups()
2574 name_b, number_b_str = match_b.groups()
2575 number_a = int(number_a_str.lstrip('0'))
2576 number_b = int(number_b_str.lstrip('0'))
2577 result = cmp((name_a, number_a), (name_b, number_b))
2578 if result == 0 and lower_a != lower_b:
2579 # If they compared equal above but the lower case names are
2580 # indeed different, don't report equality. abc012 != abc12.
2581 return cmp(lower_a, lower_b)
2582 return result
2583 else:
2584 return cmp(lower_a, lower_b)
2585
2586
mbligh36768f02008-02-22 18:28:33 +00002587class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002588 _table_name = 'host_queue_entries'
2589 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002590 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002591 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002592
2593
showarda3c58572009-03-12 20:36:59 +00002594 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002595 assert id or row
showarda3c58572009-03-12 20:36:59 +00002596 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002597 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002598
jadmanski0afbb632008-06-06 21:10:57 +00002599 if self.host_id:
2600 self.host = Host(self.host_id)
2601 else:
2602 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002603
showard77182562009-06-10 00:16:05 +00002604 if self.atomic_group_id:
2605 self.atomic_group = AtomicGroup(self.atomic_group_id,
2606 always_query=False)
2607 else:
2608 self.atomic_group = None
2609
showard170873e2009-01-07 00:22:26 +00002610 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002611 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002612
2613
showard89f84db2009-03-12 20:39:13 +00002614 @classmethod
2615 def clone(cls, template):
2616 """
2617 Creates a new row using the values from a template instance.
2618
2619 The new instance will not exist in the database or have a valid
2620 id attribute until its save() method is called.
2621 """
2622 assert isinstance(template, cls)
2623 new_row = [getattr(template, field) for field in cls._fields]
2624 clone = cls(row=new_row, new_record=True)
2625 clone.id = None
2626 return clone
2627
2628
showardc85c21b2008-11-24 22:17:37 +00002629 def _view_job_url(self):
2630 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2631
2632
showardf1ae3542009-05-11 19:26:02 +00002633 def get_labels(self):
2634 """
2635 Get all labels associated with this host queue entry (either via the
2636 meta_host or as a job dependency label). The labels yielded are not
2637 guaranteed to be unique.
2638
2639 @yields Label instances associated with this host_queue_entry.
2640 """
2641 if self.meta_host:
2642 yield Label(id=self.meta_host, always_query=False)
2643 labels = Label.fetch(
2644 joins="JOIN jobs_dependency_labels AS deps "
2645 "ON (labels.id = deps.label_id)",
2646 where="deps.job_id = %d" % self.job.id)
2647 for label in labels:
2648 yield label
2649
2650
jadmanski0afbb632008-06-06 21:10:57 +00002651 def set_host(self, host):
2652 if host:
2653 self.queue_log_record('Assigning host ' + host.hostname)
2654 self.update_field('host_id', host.id)
2655 self.update_field('active', True)
2656 self.block_host(host.id)
2657 else:
2658 self.queue_log_record('Releasing host')
2659 self.unblock_host(self.host.id)
2660 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002661
jadmanski0afbb632008-06-06 21:10:57 +00002662 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002663
2664
jadmanski0afbb632008-06-06 21:10:57 +00002665 def get_host(self):
2666 return self.host
mbligh36768f02008-02-22 18:28:33 +00002667
2668
jadmanski0afbb632008-06-06 21:10:57 +00002669 def queue_log_record(self, log_line):
2670 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002671 _drone_manager.write_lines_to_file(self.queue_log_path,
2672 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002673
2674
jadmanski0afbb632008-06-06 21:10:57 +00002675 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002676 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002677 row = [0, self.job.id, host_id]
2678 block = IneligibleHostQueue(row=row, new_record=True)
2679 block.save()
mblighe2586682008-02-29 22:45:46 +00002680
2681
jadmanski0afbb632008-06-06 21:10:57 +00002682 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002683 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002684 blocks = IneligibleHostQueue.fetch(
2685 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2686 for block in blocks:
2687 block.delete()
mblighe2586682008-02-29 22:45:46 +00002688
2689
showard2bab8f42008-11-12 18:15:22 +00002690 def set_execution_subdir(self, subdir=None):
2691 if subdir is None:
2692 assert self.get_host()
2693 subdir = self.get_host().hostname
2694 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002695
2696
showard6355f6b2008-12-05 18:52:13 +00002697 def _get_hostname(self):
2698 if self.host:
2699 return self.host.hostname
2700 return 'no host'
2701
2702
showard170873e2009-01-07 00:22:26 +00002703 def __str__(self):
showard828fc4c2009-09-14 20:31:00 +00002704 flags = []
2705 if self.active:
2706 flags.append('active')
2707 if self.complete:
2708 flags.append('complete')
2709 if self.deleted:
2710 flags.append('deleted')
2711 if self.aborted:
2712 flags.append('aborted')
2713 flags_str = ','.join(flags)
2714 if flags_str:
2715 flags_str = ' [%s]' % flags_str
2716 return "%s/%d (%d) %s%s" % (self._get_hostname(), self.job.id, self.id,
2717 self.status, flags_str)
showard170873e2009-01-07 00:22:26 +00002718
2719
jadmanski0afbb632008-06-06 21:10:57 +00002720 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002721 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002722
showardb18134f2009-03-20 20:52:18 +00002723 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002724
showard8cc058f2009-09-08 16:26:33 +00002725 if status in (models.HostQueueEntry.Status.QUEUED,
2726 models.HostQueueEntry.Status.PARSING):
jadmanski0afbb632008-06-06 21:10:57 +00002727 self.update_field('complete', False)
2728 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002729
showard8cc058f2009-09-08 16:26:33 +00002730 if status in (models.HostQueueEntry.Status.PENDING,
2731 models.HostQueueEntry.Status.RUNNING,
2732 models.HostQueueEntry.Status.VERIFYING,
2733 models.HostQueueEntry.Status.STARTING,
2734 models.HostQueueEntry.Status.GATHERING):
jadmanski0afbb632008-06-06 21:10:57 +00002735 self.update_field('complete', False)
2736 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002737
showard8cc058f2009-09-08 16:26:33 +00002738 if status in (models.HostQueueEntry.Status.FAILED,
2739 models.HostQueueEntry.Status.COMPLETED,
2740 models.HostQueueEntry.Status.STOPPED,
2741 models.HostQueueEntry.Status.ABORTED):
jadmanski0afbb632008-06-06 21:10:57 +00002742 self.update_field('complete', True)
2743 self.update_field('active', False)
showardf85a0b72009-10-07 20:48:45 +00002744 self._on_complete()
showardc85c21b2008-11-24 22:17:37 +00002745
2746 should_email_status = (status.lower() in _notify_email_statuses or
2747 'all' in _notify_email_statuses)
2748 if should_email_status:
2749 self._email_on_status(status)
2750
2751 self._email_on_job_complete()
2752
2753
showardf85a0b72009-10-07 20:48:45 +00002754 def _on_complete(self):
2755 if not self.execution_subdir:
2756 return
2757 # unregister any possible pidfiles associated with this queue entry
2758 for pidfile_name in _ALL_PIDFILE_NAMES:
2759 pidfile_id = _drone_manager.get_pidfile_id_from(
2760 self.execution_path(), pidfile_name=pidfile_name)
2761 _drone_manager.unregister_pidfile(pidfile_id)
2762
2763
showardc85c21b2008-11-24 22:17:37 +00002764 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002765 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002766
2767 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2768 self.job.id, self.job.name, hostname, status)
2769 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2770 self.job.id, self.job.name, hostname, status,
2771 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002772 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002773
2774
2775 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002776 if not self.job.is_finished():
2777 return
showard542e8402008-09-19 20:16:18 +00002778
showardc85c21b2008-11-24 22:17:37 +00002779 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002780 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002781 for queue_entry in hosts_queue:
2782 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002783 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002784 queue_entry.status))
2785
2786 summary_text = "\n".join(summary_text)
2787 status_counts = models.Job.objects.get_status_counts(
2788 [self.job.id])[self.job.id]
2789 status = ', '.join('%d %s' % (count, status) for status, count
2790 in status_counts.iteritems())
2791
2792 subject = 'Autotest: Job ID: %s "%s" %s' % (
2793 self.job.id, self.job.name, status)
2794 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2795 self.job.id, self.job.name, status, self._view_job_url(),
2796 summary_text)
showard170873e2009-01-07 00:22:26 +00002797 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002798
2799
showard8cc058f2009-09-08 16:26:33 +00002800 def schedule_pre_job_tasks(self, assigned_host=None):
showard77182562009-06-10 00:16:05 +00002801 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002802 assert assigned_host
2803 # ensure results dir exists for the queue log
showard08356c12009-06-15 20:24:01 +00002804 if self.host_id is None:
2805 self.set_host(assigned_host)
2806 else:
2807 assert assigned_host.id == self.host_id
mbligh36768f02008-02-22 18:28:33 +00002808
showardcfd4a7e2009-07-11 01:47:33 +00002809 logging.info("%s/%s/%s scheduled on %s, status=%s",
showardb18134f2009-03-20 20:52:18 +00002810 self.job.name, self.meta_host, self.atomic_group_id,
2811 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002812
showard8cc058f2009-09-08 16:26:33 +00002813 self._do_schedule_pre_job_tasks()
showard77182562009-06-10 00:16:05 +00002814
2815
showard8cc058f2009-09-08 16:26:33 +00002816 def _do_schedule_pre_job_tasks(self):
showard77182562009-06-10 00:16:05 +00002817 # Every host goes thru the Verifying stage (which may or may not
2818 # actually do anything as determined by get_pre_job_tasks).
2819 self.set_status(models.HostQueueEntry.Status.VERIFYING)
showard8cc058f2009-09-08 16:26:33 +00002820 self.job.schedule_pre_job_tasks(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002821
showard6ae5ea92009-02-25 00:11:51 +00002822
jadmanski0afbb632008-06-06 21:10:57 +00002823 def requeue(self):
showardcfd4a7e2009-07-11 01:47:33 +00002824 assert self.host
showard8cc058f2009-09-08 16:26:33 +00002825 self.set_status(models.HostQueueEntry.Status.QUEUED)
showard12f3e322009-05-13 21:27:42 +00002826 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002827 # verify/cleanup failure sets the execution subdir, so reset it here
2828 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002829 if self.meta_host:
2830 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002831
2832
jadmanski0afbb632008-06-06 21:10:57 +00002833 def handle_host_failure(self):
2834 """\
2835 Called when this queue entry's host has failed verification and
2836 repair.
2837 """
2838 assert not self.meta_host
showard8cc058f2009-09-08 16:26:33 +00002839 self.set_status(models.HostQueueEntry.Status.FAILED)
showard2bab8f42008-11-12 18:15:22 +00002840 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002841
2842
jadmanskif7fa2cc2008-10-01 14:13:23 +00002843 @property
2844 def aborted_by(self):
2845 self._load_abort_info()
2846 return self._aborted_by
2847
2848
2849 @property
2850 def aborted_on(self):
2851 self._load_abort_info()
2852 return self._aborted_on
2853
2854
2855 def _load_abort_info(self):
2856 """ Fetch info about who aborted the job. """
2857 if hasattr(self, "_aborted_by"):
2858 return
2859 rows = _db.execute("""
2860 SELECT users.login, aborted_host_queue_entries.aborted_on
2861 FROM aborted_host_queue_entries
2862 INNER JOIN users
2863 ON users.id = aborted_host_queue_entries.aborted_by_id
2864 WHERE aborted_host_queue_entries.queue_entry_id = %s
2865 """, (self.id,))
2866 if rows:
2867 self._aborted_by, self._aborted_on = rows[0]
2868 else:
2869 self._aborted_by = self._aborted_on = None
2870
2871
showardb2e2c322008-10-14 17:33:55 +00002872 def on_pending(self):
2873 """
2874 Called when an entry in a synchronous job has passed verify. If the
showard8cc058f2009-09-08 16:26:33 +00002875 job is ready to run, sets the entries to STARTING. Otherwise, it leaves
2876 them in PENDING.
showardb2e2c322008-10-14 17:33:55 +00002877 """
showard8cc058f2009-09-08 16:26:33 +00002878 self.set_status(models.HostQueueEntry.Status.PENDING)
2879 self.host.set_status(models.Host.Status.PENDING)
showardb000a8d2009-07-28 20:02:07 +00002880
2881 # Some debug code here: sends an email if an asynchronous job does not
2882 # immediately enter Starting.
2883 # TODO: Remove this once we figure out why asynchronous jobs are getting
2884 # stuck in Pending.
showard8cc058f2009-09-08 16:26:33 +00002885 self.job.run_if_ready(queue_entry=self)
2886 if (self.job.synch_count == 1 and
2887 self.status == models.HostQueueEntry.Status.PENDING):
showardb000a8d2009-07-28 20:02:07 +00002888 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
2889 message = 'Asynchronous job stuck in Pending'
2890 email_manager.manager.enqueue_notify_email(subject, message)
showardb2e2c322008-10-14 17:33:55 +00002891
2892
showardd3dc1992009-04-22 21:01:40 +00002893 def abort(self, dispatcher):
2894 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002895
showardd3dc1992009-04-22 21:01:40 +00002896 Status = models.HostQueueEntry.Status
showard8cc058f2009-09-08 16:26:33 +00002897 if self.status in (Status.GATHERING, Status.PARSING):
showardd3dc1992009-04-22 21:01:40 +00002898 # do nothing; post-job tasks will finish and then mark this entry
2899 # with status "Aborted" and take care of the host
2900 return
2901
showard8cc058f2009-09-08 16:26:33 +00002902 if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING):
2903 assert not dispatcher.get_agents_for_entry(self)
showardd3dc1992009-04-22 21:01:40 +00002904 self.host.set_status(models.Host.Status.READY)
2905 elif self.status == Status.VERIFYING:
showard8cc058f2009-09-08 16:26:33 +00002906 models.SpecialTask.objects.create(
2907 task=models.SpecialTask.Task.CLEANUP,
2908 host=models.Host(id=self.host.id))
showardd3dc1992009-04-22 21:01:40 +00002909
2910 self.set_status(Status.ABORTED)
showardd2014822009-10-12 20:26:58 +00002911 self.job.abort_delay_ready_task()
showard170873e2009-01-07 00:22:26 +00002912
showard8cc058f2009-09-08 16:26:33 +00002913
2914 def get_group_name(self):
2915 atomic_group = self.atomic_group
2916 if not atomic_group:
2917 return ''
2918
2919 # Look at any meta_host and dependency labels and pick the first
2920 # one that also specifies this atomic group. Use that label name
2921 # as the group name if possible (it is more specific).
2922 for label in self.get_labels():
2923 if label.atomic_group_id:
2924 assert label.atomic_group_id == atomic_group.id
2925 return label.name
2926 return atomic_group.name
2927
2928
showard170873e2009-01-07 00:22:26 +00002929 def execution_tag(self):
2930 assert self.execution_subdir
mblighe7d9c602009-07-02 19:02:33 +00002931 return "%s/%s" % (self.job.tag(), self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002932
2933
showarded2afea2009-07-07 20:54:07 +00002934 def execution_path(self):
2935 return self.execution_tag()
2936
2937
mbligh36768f02008-02-22 18:28:33 +00002938class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002939 _table_name = 'jobs'
2940 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2941 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002942 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002943 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002944
showard77182562009-06-10 00:16:05 +00002945 # This does not need to be a column in the DB. The delays are likely to
2946 # be configured short. If the scheduler is stopped and restarted in
2947 # the middle of a job's delay cycle, the delay cycle will either be
2948 # repeated or skipped depending on the number of Pending machines found
2949 # when the restarted scheduler recovers to track it. Not a problem.
2950 #
2951 # A reference to the DelayedCallTask that will wake up the job should
2952 # no other HQEs change state in time. Its end_time attribute is used
2953 # by our run_with_ready_delay() method to determine if the wait is over.
2954 _delay_ready_task = None
2955
2956 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
2957 # all status='Pending' atomic group HQEs incase a delay was running when the
2958 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00002959
showarda3c58572009-03-12 20:36:59 +00002960 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002961 assert id or row
showarda3c58572009-03-12 20:36:59 +00002962 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002963
mblighe2586682008-02-29 22:45:46 +00002964
jadmanski0afbb632008-06-06 21:10:57 +00002965 def is_server_job(self):
2966 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002967
2968
showard170873e2009-01-07 00:22:26 +00002969 def tag(self):
2970 return "%s-%s" % (self.id, self.owner)
2971
2972
jadmanski0afbb632008-06-06 21:10:57 +00002973 def get_host_queue_entries(self):
2974 rows = _db.execute("""
2975 SELECT * FROM host_queue_entries
2976 WHERE job_id= %s
2977 """, (self.id,))
2978 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002979
jadmanski0afbb632008-06-06 21:10:57 +00002980 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002981
jadmanski0afbb632008-06-06 21:10:57 +00002982 return entries
mbligh36768f02008-02-22 18:28:33 +00002983
2984
jadmanski0afbb632008-06-06 21:10:57 +00002985 def set_status(self, status, update_queues=False):
2986 self.update_field('status',status)
2987
2988 if update_queues:
2989 for queue_entry in self.get_host_queue_entries():
2990 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002991
2992
showard77182562009-06-10 00:16:05 +00002993 def _atomic_and_has_started(self):
2994 """
2995 @returns True if any of the HostQueueEntries associated with this job
2996 have entered the Status.STARTING state or beyond.
2997 """
2998 atomic_entries = models.HostQueueEntry.objects.filter(
2999 job=self.id, atomic_group__isnull=False)
3000 if atomic_entries.count() <= 0:
3001 return False
3002
showardaf8b4ca2009-06-16 18:47:26 +00003003 # These states may *only* be reached if Job.run() has been called.
3004 started_statuses = (models.HostQueueEntry.Status.STARTING,
3005 models.HostQueueEntry.Status.RUNNING,
3006 models.HostQueueEntry.Status.COMPLETED)
3007
3008 started_entries = atomic_entries.filter(status__in=started_statuses)
showard77182562009-06-10 00:16:05 +00003009 return started_entries.count() > 0
3010
3011
showard708b3522009-08-20 23:26:15 +00003012 def _hosts_assigned_count(self):
3013 """The number of HostQueueEntries assigned a Host for this job."""
3014 entries = models.HostQueueEntry.objects.filter(job=self.id,
3015 host__isnull=False)
3016 return entries.count()
3017
3018
showard77182562009-06-10 00:16:05 +00003019 def _pending_count(self):
3020 """The number of HostQueueEntries for this job in the Pending state."""
3021 pending_entries = models.HostQueueEntry.objects.filter(
3022 job=self.id, status=models.HostQueueEntry.Status.PENDING)
3023 return pending_entries.count()
3024
3025
showardd2014822009-10-12 20:26:58 +00003026 def _pending_threshold(self, atomic_group):
3027 """
3028 @param atomic_group: The AtomicGroup associated with this job that we
3029 are using to bound the threshold.
3030 @returns The minimum number of HostQueueEntries assigned a Host before
3031 this job can run.
3032 """
3033 return min(self._hosts_assigned_count(),
3034 atomic_group.max_number_of_machines)
3035
3036
jadmanski0afbb632008-06-06 21:10:57 +00003037 def is_ready(self):
showard77182562009-06-10 00:16:05 +00003038 # NOTE: Atomic group jobs stop reporting ready after they have been
3039 # started to avoid launching multiple copies of one atomic job.
3040 # Only possible if synch_count is less than than half the number of
3041 # machines in the atomic group.
showardb000a8d2009-07-28 20:02:07 +00003042 pending_count = self._pending_count()
3043 atomic_and_has_started = self._atomic_and_has_started()
3044 ready = (pending_count >= self.synch_count
showardd2014822009-10-12 20:26:58 +00003045 and not atomic_and_has_started)
showardb000a8d2009-07-28 20:02:07 +00003046
3047 if not ready:
3048 logging.info(
3049 'Job %s not ready: %s pending, %s required '
3050 '(Atomic and started: %s)',
3051 self, pending_count, self.synch_count,
3052 atomic_and_has_started)
3053
3054 return ready
mbligh36768f02008-02-22 18:28:33 +00003055
3056
jadmanski0afbb632008-06-06 21:10:57 +00003057 def num_machines(self, clause = None):
3058 sql = "job_id=%s" % self.id
3059 if clause:
3060 sql += " AND (%s)" % clause
3061 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00003062
3063
jadmanski0afbb632008-06-06 21:10:57 +00003064 def num_queued(self):
3065 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00003066
3067
jadmanski0afbb632008-06-06 21:10:57 +00003068 def num_active(self):
3069 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00003070
3071
jadmanski0afbb632008-06-06 21:10:57 +00003072 def num_complete(self):
3073 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00003074
3075
jadmanski0afbb632008-06-06 21:10:57 +00003076 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00003077 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00003078
mbligh36768f02008-02-22 18:28:33 +00003079
showard6bb7c292009-01-30 01:44:51 +00003080 def _not_yet_run_entries(self, include_verifying=True):
3081 statuses = [models.HostQueueEntry.Status.QUEUED,
3082 models.HostQueueEntry.Status.PENDING]
3083 if include_verifying:
3084 statuses.append(models.HostQueueEntry.Status.VERIFYING)
3085 return models.HostQueueEntry.objects.filter(job=self.id,
3086 status__in=statuses)
3087
3088
3089 def _stop_all_entries(self):
3090 entries_to_stop = self._not_yet_run_entries(
3091 include_verifying=False)
3092 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00003093 assert not child_entry.complete, (
3094 '%s status=%s, active=%s, complete=%s' %
3095 (child_entry.id, child_entry.status, child_entry.active,
3096 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00003097 if child_entry.status == models.HostQueueEntry.Status.PENDING:
3098 child_entry.host.status = models.Host.Status.READY
3099 child_entry.host.save()
3100 child_entry.status = models.HostQueueEntry.Status.STOPPED
3101 child_entry.save()
3102
showard2bab8f42008-11-12 18:15:22 +00003103 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00003104 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00003105 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00003106 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00003107
3108
jadmanski0afbb632008-06-06 21:10:57 +00003109 def write_to_machines_file(self, queue_entry):
3110 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00003111 file_path = os.path.join(self.tag(), '.machines')
3112 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00003113
3114
showardf1ae3542009-05-11 19:26:02 +00003115 def _next_group_name(self, group_name=''):
3116 """@returns a directory name to use for the next host group results."""
3117 if group_name:
3118 # Sanitize for use as a pathname.
3119 group_name = group_name.replace(os.path.sep, '_')
3120 if group_name.startswith('.'):
3121 group_name = '_' + group_name[1:]
3122 # Add a separator between the group name and 'group%d'.
3123 group_name += '.'
3124 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00003125 query = models.HostQueueEntry.objects.filter(
3126 job=self.id).values('execution_subdir').distinct()
3127 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00003128 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
3129 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00003130 if ids:
3131 next_id = max(ids) + 1
3132 else:
3133 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00003134 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00003135
3136
showarddb502762009-09-09 15:31:20 +00003137 def _write_control_file(self, execution_path):
showard170873e2009-01-07 00:22:26 +00003138 control_path = _drone_manager.attach_file_to_execution(
showarddb502762009-09-09 15:31:20 +00003139 execution_path, self.control_file)
showard170873e2009-01-07 00:22:26 +00003140 return control_path
mbligh36768f02008-02-22 18:28:33 +00003141
showardb2e2c322008-10-14 17:33:55 +00003142
showard2bab8f42008-11-12 18:15:22 +00003143 def get_group_entries(self, queue_entry_from_group):
3144 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00003145 return list(HostQueueEntry.fetch(
3146 where='job_id=%s AND execution_subdir=%s',
3147 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00003148
3149
showard8cc058f2009-09-08 16:26:33 +00003150 def get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00003151 assert queue_entries
showarddb502762009-09-09 15:31:20 +00003152 execution_path = queue_entries[0].execution_path()
3153 control_path = self._write_control_file(execution_path)
jadmanski0afbb632008-06-06 21:10:57 +00003154 hostnames = ','.join([entry.get_host().hostname
3155 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00003156
showarddb502762009-09-09 15:31:20 +00003157 execution_tag = queue_entries[0].execution_tag()
showard87ba02a2009-04-20 19:37:32 +00003158 params = _autoserv_command_line(
showarded2afea2009-07-07 20:54:07 +00003159 hostnames,
showard87ba02a2009-04-20 19:37:32 +00003160 ['-P', execution_tag, '-n',
3161 _drone_manager.absolute_path(control_path)],
showarde9c69362009-06-30 01:58:03 +00003162 job=self, verbose=False)
mbligh36768f02008-02-22 18:28:33 +00003163
jadmanski0afbb632008-06-06 21:10:57 +00003164 if not self.is_server_job():
3165 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00003166
showardb2e2c322008-10-14 17:33:55 +00003167 return params
mblighe2586682008-02-29 22:45:46 +00003168
mbligh36768f02008-02-22 18:28:33 +00003169
showardc9ae1782009-01-30 01:42:37 +00003170 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00003171 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00003172 return True
showard0fc38302008-10-23 00:44:07 +00003173 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00003174 return queue_entry.get_host().dirty
3175 return False
showard21baa452008-10-21 00:08:39 +00003176
showardc9ae1782009-01-30 01:42:37 +00003177
showard8cc058f2009-09-08 16:26:33 +00003178 def _should_run_verify(self, queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003179 do_not_verify = (queue_entry.host.protection ==
3180 host_protections.Protection.DO_NOT_VERIFY)
3181 if do_not_verify:
3182 return False
3183 return self.run_verify
3184
3185
showard8cc058f2009-09-08 16:26:33 +00003186 def schedule_pre_job_tasks(self, queue_entry):
showard77182562009-06-10 00:16:05 +00003187 """
3188 Get a list of tasks to perform before the host_queue_entry
3189 may be used to run this Job (such as Cleanup & Verify).
3190
3191 @returns A list of tasks to be done to the given queue_entry before
mbligh6fbdb802009-08-03 16:42:55 +00003192 it should be considered be ready to run this job. The last
showard77182562009-06-10 00:16:05 +00003193 task in the list calls HostQueueEntry.on_pending(), which
3194 continues the flow of the job.
3195 """
showardc9ae1782009-01-30 01:42:37 +00003196 if self._should_run_cleanup(queue_entry):
showard8cc058f2009-09-08 16:26:33 +00003197 task = models.SpecialTask.Task.CLEANUP
3198 elif self._should_run_verify(queue_entry):
3199 task = models.SpecialTask.Task.VERIFY
3200 else:
3201 queue_entry.on_pending()
3202 return
3203
3204 models.SpecialTask.objects.create(
3205 host=models.Host(id=queue_entry.host_id),
3206 queue_entry=models.HostQueueEntry(id=queue_entry.id),
3207 task=task)
showard21baa452008-10-21 00:08:39 +00003208
3209
showardf1ae3542009-05-11 19:26:02 +00003210 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00003211 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00003212 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00003213 else:
showardf1ae3542009-05-11 19:26:02 +00003214 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00003215 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00003216 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00003217 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003218
3219 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00003220 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003221
3222
3223 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00003224 """
3225 @returns A tuple containing a list of HostQueueEntry instances to be
3226 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00003227 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00003228 """
showard77182562009-06-10 00:16:05 +00003229 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00003230 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00003231 if atomic_group:
3232 num_entries_wanted = atomic_group.max_number_of_machines
3233 else:
3234 num_entries_wanted = self.synch_count
3235 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00003236
showardf1ae3542009-05-11 19:26:02 +00003237 if num_entries_wanted > 0:
3238 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00003239 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00003240 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00003241 params=(self.id, include_queue_entry.id)))
3242
3243 # Sort the chosen hosts by hostname before slicing.
3244 def cmp_queue_entries_by_hostname(entry_a, entry_b):
3245 return Host.cmp_for_sort(entry_a.host, entry_b.host)
3246 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
3247 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00003248
showardf1ae3542009-05-11 19:26:02 +00003249 # Sanity check. We'll only ever be called if this can be met.
showard828fc4c2009-09-14 20:31:00 +00003250 if len(chosen_entries) < self.synch_count:
3251 message = ('job %s got less than %s chosen entries: %s' % (
3252 self.id, self.synch_count, chosen_entries))
3253 logging.error(message)
3254 email_manager.manager.enqueue_notify_email(
3255 'Job not started, too few chosen entries', message)
3256 return []
showardf1ae3542009-05-11 19:26:02 +00003257
showard8cc058f2009-09-08 16:26:33 +00003258 group_name = include_queue_entry.get_group_name()
showardf1ae3542009-05-11 19:26:02 +00003259
3260 self._assign_new_group(chosen_entries, group_name=group_name)
showard8cc058f2009-09-08 16:26:33 +00003261 return chosen_entries
showard2bab8f42008-11-12 18:15:22 +00003262
3263
showard77182562009-06-10 00:16:05 +00003264 def run_if_ready(self, queue_entry):
3265 """
3266 @returns An Agent instance to ultimately run this job if enough hosts
3267 are ready for it to run.
3268 @returns None and potentially cleans up excess hosts if this Job
3269 is not ready to run.
3270 """
showardb2e2c322008-10-14 17:33:55 +00003271 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00003272 self.stop_if_necessary()
showard8cc058f2009-09-08 16:26:33 +00003273 elif queue_entry.atomic_group:
3274 self.run_with_ready_delay(queue_entry)
3275 else:
3276 self.run(queue_entry)
showard77182562009-06-10 00:16:05 +00003277
3278
3279 def run_with_ready_delay(self, queue_entry):
3280 """
3281 Start a delay to wait for more hosts to enter Pending state before
3282 launching an atomic group job. Once set, the a delay cannot be reset.
3283
3284 @param queue_entry: The HostQueueEntry object to get atomic group
3285 info from and pass to run_if_ready when the delay is up.
3286
3287 @returns An Agent to run the job as appropriate or None if a delay
3288 has already been set.
3289 """
3290 assert queue_entry.job_id == self.id
3291 assert queue_entry.atomic_group
3292 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
showardd2014822009-10-12 20:26:58 +00003293 over_max_threshold = (self._pending_count() >=
3294 self._pending_threshold(queue_entry.atomic_group))
showard77182562009-06-10 00:16:05 +00003295 delay_expired = (self._delay_ready_task and
3296 time.time() >= self._delay_ready_task.end_time)
3297
3298 # Delay is disabled or we already have enough? Do not wait to run.
3299 if not delay or over_max_threshold or delay_expired:
showard8cc058f2009-09-08 16:26:33 +00003300 self.run(queue_entry)
3301 else:
3302 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
showard77182562009-06-10 00:16:05 +00003303
showard8cc058f2009-09-08 16:26:33 +00003304
3305 def schedule_delayed_callback_task(self, queue_entry):
3306 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
3307
showard77182562009-06-10 00:16:05 +00003308 if self._delay_ready_task:
3309 return None
3310
showard8cc058f2009-09-08 16:26:33 +00003311 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
3312
showard77182562009-06-10 00:16:05 +00003313 def run_job_after_delay():
showardd2014822009-10-12 20:26:58 +00003314 logging.info('Job %s done waiting for extra hosts.', self)
3315 # Check to see if the job is still relevant. It could have aborted
3316 # while we were waiting or hosts could have disappearred, etc.
3317 threshold = self._pending_threshold(queue_entry.atomic_group)
3318 if self._pending_count() < threshold:
3319 logging.info('Job %s had too few Pending hosts after waiting '
3320 'for extras. Not running.', self)
3321 return
showard77182562009-06-10 00:16:05 +00003322 return self.run(queue_entry)
3323
showard708b3522009-08-20 23:26:15 +00003324 logging.info('Job %s waiting up to %s seconds for more hosts.',
3325 self.id, delay)
showard77182562009-06-10 00:16:05 +00003326 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3327 callback=run_job_after_delay)
showard8cc058f2009-09-08 16:26:33 +00003328 return self._delay_ready_task
showard77182562009-06-10 00:16:05 +00003329
3330
3331 def run(self, queue_entry):
3332 """
3333 @param queue_entry: The HostQueueEntry instance calling this method.
showard77182562009-06-10 00:16:05 +00003334 """
3335 if queue_entry.atomic_group and self._atomic_and_has_started():
3336 logging.error('Job.run() called on running atomic Job %d '
3337 'with HQE %s.', self.id, queue_entry)
showard8cc058f2009-09-08 16:26:33 +00003338 return
3339 queue_entries = self._choose_group_to_run(queue_entry)
showard828fc4c2009-09-14 20:31:00 +00003340 if queue_entries:
3341 self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00003342
3343
showard8cc058f2009-09-08 16:26:33 +00003344 def _finish_run(self, queue_entries):
showardb2ccdda2008-10-28 20:39:05 +00003345 for queue_entry in queue_entries:
showard8cc058f2009-09-08 16:26:33 +00003346 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showardd2014822009-10-12 20:26:58 +00003347 self.abort_delay_ready_task()
3348
3349
3350 def abort_delay_ready_task(self):
3351 """Abort the delayed task associated with this job, if any."""
showard77182562009-06-10 00:16:05 +00003352 if self._delay_ready_task:
3353 # Cancel any pending callback that would try to run again
3354 # as we are already running.
3355 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003356
showardd2014822009-10-12 20:26:58 +00003357
showardb000a8d2009-07-28 20:02:07 +00003358 def __str__(self):
3359 return '%s-%s' % (self.id, self.owner)
3360
3361
mbligh36768f02008-02-22 18:28:33 +00003362if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003363 main()