blob: 9b5b1ce7a4f398f12b33935ddb1c637382e64c11 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showard136e6dc2009-06-10 19:38:49 +000010import itertools, logging, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard043c62a2009-06-10 19:48:57 +000013from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard136e6dc2009-06-10 19:38:49 +000015from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000016from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000017from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000018from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000019from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000020from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000021from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000022
showard549afad2009-08-20 23:33:36 +000023BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
24PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000025
mbligh36768f02008-02-22 18:28:33 +000026RESULTS_DIR = '.'
27AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000028DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000029AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
30
31if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000032 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000033AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
34AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
35
36if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000037 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000038
showardd3dc1992009-04-22 21:01:40 +000039_AUTOSERV_PID_FILE = '.autoserv_execute'
40_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
41_PARSER_PID_FILE = '.parser_execute'
42
43_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
44 _PARSER_PID_FILE)
45
showard35162b02009-03-03 02:17:30 +000046# error message to leave in results dir when an autoserv process disappears
47# mysteriously
48_LOST_PROCESS_ERROR = """\
49Autoserv failed abnormally during execution for this job, probably due to a
50system error on the Autotest server. Full results may not be available. Sorry.
51"""
52
mbligh6f8bab42008-02-29 22:45:14 +000053_db = None
mbligh36768f02008-02-22 18:28:33 +000054_shutdown = False
showard170873e2009-01-07 00:22:26 +000055_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
56_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000057_testing_mode = False
showard542e8402008-09-19 20:16:18 +000058_base_url = None
showardc85c21b2008-11-24 22:17:37 +000059_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000060_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000061
62
showardec6a3b92009-09-25 20:29:13 +000063def _get_pidfile_timeout_secs():
64 """@returns How long to wait for autoserv to write pidfile."""
65 pidfile_timeout_mins = global_config.global_config.get_config_value(
66 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
67 return pidfile_timeout_mins * 60
68
69
mbligh83c1e9e2009-05-01 23:10:41 +000070def _site_init_monitor_db_dummy():
71 return {}
72
73
mbligh36768f02008-02-22 18:28:33 +000074def main():
showard27f33872009-04-07 18:20:53 +000075 try:
showard549afad2009-08-20 23:33:36 +000076 try:
77 main_without_exception_handling()
78 except SystemExit:
79 raise
80 except:
81 logging.exception('Exception escaping in monitor_db')
82 raise
83 finally:
84 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000085
86
87def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000088 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000089
showard136e6dc2009-06-10 19:38:49 +000090 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000091 parser = optparse.OptionParser(usage)
92 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
93 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000094 parser.add_option('--test', help='Indicate that scheduler is under ' +
95 'test and should use dummy autoserv and no parsing',
96 action='store_true')
97 (options, args) = parser.parse_args()
98 if len(args) != 1:
99 parser.print_usage()
100 return
mbligh36768f02008-02-22 18:28:33 +0000101
showard5613c662009-06-08 23:30:33 +0000102 scheduler_enabled = global_config.global_config.get_config_value(
103 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
104
105 if not scheduler_enabled:
106 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
107 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000108 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000109 sys.exit(1)
110
jadmanski0afbb632008-06-06 21:10:57 +0000111 global RESULTS_DIR
112 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000113
mbligh83c1e9e2009-05-01 23:10:41 +0000114 site_init = utils.import_site_function(__file__,
115 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
116 _site_init_monitor_db_dummy)
117 site_init()
118
showardcca334f2009-03-12 20:38:34 +0000119 # Change the cwd while running to avoid issues incase we were launched from
120 # somewhere odd (such as a random NFS home directory of the person running
121 # sudo to launch us as the appropriate user).
122 os.chdir(RESULTS_DIR)
123
jadmanski0afbb632008-06-06 21:10:57 +0000124 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000125 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
126 "notify_email_statuses",
127 default='')
showardc85c21b2008-11-24 22:17:37 +0000128 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000129 _notify_email_statuses = [status for status in
130 re.split(r'[\s,;:]', notify_statuses_list.lower())
131 if status]
showardc85c21b2008-11-24 22:17:37 +0000132
jadmanski0afbb632008-06-06 21:10:57 +0000133 if options.test:
134 global _autoserv_path
135 _autoserv_path = 'autoserv_dummy'
136 global _testing_mode
137 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000138
mbligh37eceaa2008-12-15 22:56:37 +0000139 # AUTOTEST_WEB.base_url is still a supported config option as some people
140 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000141 global _base_url
showard170873e2009-01-07 00:22:26 +0000142 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
143 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000144 if config_base_url:
145 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000146 else:
mbligh37eceaa2008-12-15 22:56:37 +0000147 # For the common case of everything running on a single server you
148 # can just set the hostname in a single place in the config file.
149 server_name = c.get_config_value('SERVER', 'hostname')
150 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000151 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000152 sys.exit(1)
153 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000154
showardc5afc462009-01-13 00:09:39 +0000155 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000156 server.start()
157
jadmanski0afbb632008-06-06 21:10:57 +0000158 try:
showard136e6dc2009-06-10 19:38:49 +0000159 init()
showardc5afc462009-01-13 00:09:39 +0000160 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000161 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000162
jadmanski0afbb632008-06-06 21:10:57 +0000163 while not _shutdown:
164 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000165 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000166 except:
showard170873e2009-01-07 00:22:26 +0000167 email_manager.manager.log_stacktrace(
168 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000169
showard170873e2009-01-07 00:22:26 +0000170 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000171 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000172 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000173 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000174
175
showard136e6dc2009-06-10 19:38:49 +0000176def setup_logging():
177 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
178 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
179 logging_manager.configure_logging(
180 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
181 logfile_name=log_name)
182
183
mbligh36768f02008-02-22 18:28:33 +0000184def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000185 global _shutdown
186 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000187 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000188
189
showard136e6dc2009-06-10 19:38:49 +0000190def init():
showardb18134f2009-03-20 20:52:18 +0000191 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
192 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000193
showard8de37132009-08-31 18:33:08 +0000194 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000195 logging.critical("monitor_db already running, aborting!")
196 sys.exit(1)
197 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000198
showardb1e51872008-10-07 11:08:18 +0000199 if _testing_mode:
200 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000201 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000202
jadmanski0afbb632008-06-06 21:10:57 +0000203 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
204 global _db
showard170873e2009-01-07 00:22:26 +0000205 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000206 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000207
showardfa8629c2008-11-04 16:51:23 +0000208 # ensure Django connection is in autocommit
209 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000210 # bypass the readonly connection
211 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000212
showardb18134f2009-03-20 20:52:18 +0000213 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000214 signal.signal(signal.SIGINT, handle_sigint)
215
showardd1ee1dd2009-01-07 21:33:08 +0000216 drones = global_config.global_config.get_config_value(
217 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
218 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000219 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000220 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000221 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
222
showardb18134f2009-03-20 20:52:18 +0000223 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000224
225
showarded2afea2009-07-07 20:54:07 +0000226def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
227 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000228 """
229 @returns The autoserv command line as a list of executable + parameters.
230
231 @param machines - string - A machine or comma separated list of machines
232 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000233 @param extra_args - list - Additional arguments to pass to autoserv.
234 @param job - Job object - If supplied, -u owner and -l name parameters
235 will be added.
236 @param queue_entry - A HostQueueEntry object - If supplied and no Job
237 object was supplied, this will be used to lookup the Job object.
238 """
showard87ba02a2009-04-20 19:37:32 +0000239 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
showarded2afea2009-07-07 20:54:07 +0000240 '-r', drone_manager.WORKING_DIRECTORY]
showard87ba02a2009-04-20 19:37:32 +0000241 if job or queue_entry:
242 if not job:
243 job = queue_entry.job
244 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000245 if verbose:
246 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000247 return autoserv_argv + extra_args
248
249
showard89f84db2009-03-12 20:39:13 +0000250class SchedulerError(Exception):
251 """Raised by HostScheduler when an inconsistent state occurs."""
252
253
showard63a34772008-08-18 19:32:50 +0000254class HostScheduler(object):
255 def _get_ready_hosts(self):
256 # avoid any host with a currently active queue entry against it
257 hosts = Host.fetch(
258 joins='LEFT JOIN host_queue_entries AS active_hqe '
259 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000260 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000261 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000262 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000263 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
264 return dict((host.id, host) for host in hosts)
265
266
267 @staticmethod
268 def _get_sql_id_list(id_list):
269 return ','.join(str(item_id) for item_id in id_list)
270
271
272 @classmethod
showard989f25d2008-10-01 11:38:11 +0000273 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000274 if not id_list:
275 return {}
showard63a34772008-08-18 19:32:50 +0000276 query %= cls._get_sql_id_list(id_list)
277 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000278 return cls._process_many2many_dict(rows, flip)
279
280
281 @staticmethod
282 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000283 result = {}
284 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000285 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000286 if flip:
287 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000288 result.setdefault(left_id, set()).add(right_id)
289 return result
290
291
292 @classmethod
293 def _get_job_acl_groups(cls, job_ids):
294 query = """
showardd9ac4452009-02-07 02:04:37 +0000295 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000296 FROM jobs
297 INNER JOIN users ON users.login = jobs.owner
298 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
299 WHERE jobs.id IN (%s)
300 """
301 return cls._get_many2many_dict(query, job_ids)
302
303
304 @classmethod
305 def _get_job_ineligible_hosts(cls, job_ids):
306 query = """
307 SELECT job_id, host_id
308 FROM ineligible_host_queues
309 WHERE job_id IN (%s)
310 """
311 return cls._get_many2many_dict(query, job_ids)
312
313
314 @classmethod
showard989f25d2008-10-01 11:38:11 +0000315 def _get_job_dependencies(cls, job_ids):
316 query = """
317 SELECT job_id, label_id
318 FROM jobs_dependency_labels
319 WHERE job_id IN (%s)
320 """
321 return cls._get_many2many_dict(query, job_ids)
322
323
324 @classmethod
showard63a34772008-08-18 19:32:50 +0000325 def _get_host_acls(cls, host_ids):
326 query = """
showardd9ac4452009-02-07 02:04:37 +0000327 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000328 FROM acl_groups_hosts
329 WHERE host_id IN (%s)
330 """
331 return cls._get_many2many_dict(query, host_ids)
332
333
334 @classmethod
335 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000336 if not host_ids:
337 return {}, {}
showard63a34772008-08-18 19:32:50 +0000338 query = """
339 SELECT label_id, host_id
340 FROM hosts_labels
341 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000342 """ % cls._get_sql_id_list(host_ids)
343 rows = _db.execute(query)
344 labels_to_hosts = cls._process_many2many_dict(rows)
345 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
346 return labels_to_hosts, hosts_to_labels
347
348
349 @classmethod
350 def _get_labels(cls):
351 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000352
353
354 def refresh(self, pending_queue_entries):
355 self._hosts_available = self._get_ready_hosts()
356
357 relevant_jobs = [queue_entry.job_id
358 for queue_entry in pending_queue_entries]
359 self._job_acls = self._get_job_acl_groups(relevant_jobs)
360 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000361 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000362
363 host_ids = self._hosts_available.keys()
364 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000365 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
366
367 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000368
369
370 def _is_acl_accessible(self, host_id, queue_entry):
371 job_acls = self._job_acls.get(queue_entry.job_id, set())
372 host_acls = self._host_acls.get(host_id, set())
373 return len(host_acls.intersection(job_acls)) > 0
374
375
showard989f25d2008-10-01 11:38:11 +0000376 def _check_job_dependencies(self, job_dependencies, host_labels):
377 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000378 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000379
380
381 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
382 queue_entry):
showardade14e22009-01-26 22:38:32 +0000383 if not queue_entry.meta_host:
384 # bypass only_if_needed labels when a specific host is selected
385 return True
386
showard989f25d2008-10-01 11:38:11 +0000387 for label_id in host_labels:
388 label = self._labels[label_id]
389 if not label.only_if_needed:
390 # we don't care about non-only_if_needed labels
391 continue
392 if queue_entry.meta_host == label_id:
393 # if the label was requested in a metahost it's OK
394 continue
395 if label_id not in job_dependencies:
396 return False
397 return True
398
399
showard89f84db2009-03-12 20:39:13 +0000400 def _check_atomic_group_labels(self, host_labels, queue_entry):
401 """
402 Determine if the given HostQueueEntry's atomic group settings are okay
403 to schedule on a host with the given labels.
404
showard6157c632009-07-06 20:19:31 +0000405 @param host_labels: A list of label ids that the host has.
406 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000407
408 @returns True if atomic group settings are okay, False otherwise.
409 """
showard6157c632009-07-06 20:19:31 +0000410 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000411 queue_entry.atomic_group_id)
412
413
showard6157c632009-07-06 20:19:31 +0000414 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000415 """
416 Return the atomic group label id for a host with the given set of
417 labels if any, or None otherwise. Raises an exception if more than
418 one atomic group are found in the set of labels.
419
showard6157c632009-07-06 20:19:31 +0000420 @param host_labels: A list of label ids that the host has.
421 @param queue_entry: The HostQueueEntry we're testing. Only used for
422 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000423
424 @returns The id of the atomic group found on a label in host_labels
425 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000426 """
showard6157c632009-07-06 20:19:31 +0000427 atomic_labels = [self._labels[label_id] for label_id in host_labels
428 if self._labels[label_id].atomic_group_id is not None]
429 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000430 if not atomic_ids:
431 return None
432 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000433 logging.error('More than one Atomic Group on HQE "%s" via: %r',
434 queue_entry, atomic_labels)
435 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000436
437
438 def _get_atomic_group_labels(self, atomic_group_id):
439 """
440 Lookup the label ids that an atomic_group is associated with.
441
442 @param atomic_group_id - The id of the AtomicGroup to look up.
443
444 @returns A generator yeilding Label ids for this atomic group.
445 """
446 return (id for id, label in self._labels.iteritems()
447 if label.atomic_group_id == atomic_group_id
448 and not label.invalid)
449
450
showard54c1ea92009-05-20 00:32:58 +0000451 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000452 """
453 @param group_hosts - A sequence of Host ids to test for usability
454 and eligibility against the Job associated with queue_entry.
455 @param queue_entry - The HostQueueEntry that these hosts are being
456 tested for eligibility against.
457
458 @returns A subset of group_hosts Host ids that are eligible for the
459 supplied queue_entry.
460 """
461 return set(host_id for host_id in group_hosts
462 if self._is_host_usable(host_id)
463 and self._is_host_eligible_for_job(host_id, queue_entry))
464
465
showard989f25d2008-10-01 11:38:11 +0000466 def _is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000467 if self._is_host_invalid(host_id):
468 # if an invalid host is scheduled for a job, it's a one-time host
469 # and it therefore bypasses eligibility checks. note this can only
470 # happen for non-metahosts, because invalid hosts have their label
471 # relationships cleared.
472 return True
473
showard989f25d2008-10-01 11:38:11 +0000474 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
475 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000476
showard89f84db2009-03-12 20:39:13 +0000477 return (self._is_acl_accessible(host_id, queue_entry) and
478 self._check_job_dependencies(job_dependencies, host_labels) and
479 self._check_only_if_needed_labels(
480 job_dependencies, host_labels, queue_entry) and
481 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000482
483
showard2924b0a2009-06-18 23:16:15 +0000484 def _is_host_invalid(self, host_id):
485 host_object = self._hosts_available.get(host_id, None)
486 return host_object and host_object.invalid
487
488
showard63a34772008-08-18 19:32:50 +0000489 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000490 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000491 return None
492 return self._hosts_available.pop(queue_entry.host_id, None)
493
494
495 def _is_host_usable(self, host_id):
496 if host_id not in self._hosts_available:
497 # host was already used during this scheduling cycle
498 return False
499 if self._hosts_available[host_id].invalid:
500 # Invalid hosts cannot be used for metahosts. They're included in
501 # the original query because they can be used by non-metahosts.
502 return False
503 return True
504
505
506 def _schedule_metahost(self, queue_entry):
507 label_id = queue_entry.meta_host
508 hosts_in_label = self._label_hosts.get(label_id, set())
509 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
510 set())
511
512 # must iterate over a copy so we can mutate the original while iterating
513 for host_id in list(hosts_in_label):
514 if not self._is_host_usable(host_id):
515 hosts_in_label.remove(host_id)
516 continue
517 if host_id in ineligible_host_ids:
518 continue
showard989f25d2008-10-01 11:38:11 +0000519 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000520 continue
521
showard89f84db2009-03-12 20:39:13 +0000522 # Remove the host from our cached internal state before returning
523 # the host object.
showard63a34772008-08-18 19:32:50 +0000524 hosts_in_label.remove(host_id)
525 return self._hosts_available.pop(host_id)
526 return None
527
528
529 def find_eligible_host(self, queue_entry):
530 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000531 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000532 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000533 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000534 return self._schedule_metahost(queue_entry)
535
536
showard89f84db2009-03-12 20:39:13 +0000537 def find_eligible_atomic_group(self, queue_entry):
538 """
539 Given an atomic group host queue entry, locate an appropriate group
540 of hosts for the associated job to run on.
541
542 The caller is responsible for creating new HQEs for the additional
543 hosts returned in order to run the actual job on them.
544
545 @returns A list of Host instances in a ready state to satisfy this
546 atomic group scheduling. Hosts will all belong to the same
547 atomic group label as specified by the queue_entry.
548 An empty list will be returned if no suitable atomic
549 group could be found.
550
551 TODO(gps): what is responsible for kicking off any attempted repairs on
552 a group of hosts? not this function, but something needs to. We do
553 not communicate that reason for returning [] outside of here...
554 For now, we'll just be unschedulable if enough hosts within one group
555 enter Repair Failed state.
556 """
557 assert queue_entry.atomic_group_id is not None
558 job = queue_entry.job
559 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000560 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000561 if job.synch_count > atomic_group.max_number_of_machines:
562 # Such a Job and HostQueueEntry should never be possible to
563 # create using the frontend. Regardless, we can't process it.
564 # Abort it immediately and log an error on the scheduler.
565 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000566 logging.error(
567 'Error: job %d synch_count=%d > requested atomic_group %d '
568 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
569 job.id, job.synch_count, atomic_group.id,
570 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000571 return []
572 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
573 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
574 set())
575
576 # Look in each label associated with atomic_group until we find one with
577 # enough hosts to satisfy the job.
578 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
579 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
580 if queue_entry.meta_host is not None:
581 # If we have a metahost label, only allow its hosts.
582 group_hosts.intersection_update(hosts_in_label)
583 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000584 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000585 group_hosts, queue_entry)
586
587 # Job.synch_count is treated as "minimum synch count" when
588 # scheduling for an atomic group of hosts. The atomic group
589 # number of machines is the maximum to pick out of a single
590 # atomic group label for scheduling at one time.
591 min_hosts = job.synch_count
592 max_hosts = atomic_group.max_number_of_machines
593
showard54c1ea92009-05-20 00:32:58 +0000594 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000595 # Not enough eligible hosts in this atomic group label.
596 continue
597
showard54c1ea92009-05-20 00:32:58 +0000598 eligible_hosts_in_group = [self._hosts_available[id]
599 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000600 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000601 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000602
showard89f84db2009-03-12 20:39:13 +0000603 # Limit ourselves to scheduling the atomic group size.
604 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000605 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000606
607 # Remove the selected hosts from our cached internal state
608 # of available hosts in order to return the Host objects.
609 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000610 for host in eligible_hosts_in_group:
611 hosts_in_label.discard(host.id)
612 self._hosts_available.pop(host.id)
613 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000614 return host_list
615
616 return []
617
618
showard170873e2009-01-07 00:22:26 +0000619class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000620 def __init__(self):
621 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000622 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000623 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000624 user_cleanup_time = scheduler_config.config.clean_interval
625 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
626 _db, user_cleanup_time)
627 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000628 self._host_agents = {}
629 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000630
mbligh36768f02008-02-22 18:28:33 +0000631
showard915958d2009-04-22 21:00:58 +0000632 def initialize(self, recover_hosts=True):
633 self._periodic_cleanup.initialize()
634 self._24hr_upkeep.initialize()
635
jadmanski0afbb632008-06-06 21:10:57 +0000636 # always recover processes
637 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000638
jadmanski0afbb632008-06-06 21:10:57 +0000639 if recover_hosts:
640 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000641
642
jadmanski0afbb632008-06-06 21:10:57 +0000643 def tick(self):
showard170873e2009-01-07 00:22:26 +0000644 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000645 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000646 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000647 self._process_recurring_runs()
showard8cc058f2009-09-08 16:26:33 +0000648 self._schedule_delay_tasks()
jadmanski0afbb632008-06-06 21:10:57 +0000649 self._schedule_new_jobs()
showard8cc058f2009-09-08 16:26:33 +0000650 self._schedule_running_host_queue_entries()
651 self._schedule_special_tasks()
jadmanski0afbb632008-06-06 21:10:57 +0000652 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000653 _drone_manager.execute_actions()
654 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000655
showard97aed502008-11-04 02:01:24 +0000656
mblighf3294cc2009-04-08 21:17:38 +0000657 def _run_cleanup(self):
658 self._periodic_cleanup.run_cleanup_maybe()
659 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000660
mbligh36768f02008-02-22 18:28:33 +0000661
showard170873e2009-01-07 00:22:26 +0000662 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
663 for object_id in object_ids:
664 agent_dict.setdefault(object_id, set()).add(agent)
665
666
667 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
668 for object_id in object_ids:
669 assert object_id in agent_dict
670 agent_dict[object_id].remove(agent)
671
672
jadmanski0afbb632008-06-06 21:10:57 +0000673 def add_agent(self, agent):
674 self._agents.append(agent)
675 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000676 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
677 self._register_agent_for_ids(self._queue_entry_agents,
678 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000679
showard170873e2009-01-07 00:22:26 +0000680
681 def get_agents_for_entry(self, queue_entry):
682 """
683 Find agents corresponding to the specified queue_entry.
684 """
showardd3dc1992009-04-22 21:01:40 +0000685 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000686
687
688 def host_has_agent(self, host):
689 """
690 Determine if there is currently an Agent present using this host.
691 """
692 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000693
694
jadmanski0afbb632008-06-06 21:10:57 +0000695 def remove_agent(self, agent):
696 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000697 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
698 agent)
699 self._unregister_agent_for_ids(self._queue_entry_agents,
700 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000701
702
showard8cc058f2009-09-08 16:26:33 +0000703 def _host_has_scheduled_special_task(self, host):
704 return bool(models.SpecialTask.objects.filter(host__id=host.id,
705 is_active=False,
706 is_complete=False))
707
708
jadmanski0afbb632008-06-06 21:10:57 +0000709 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000710 self._register_pidfiles()
711 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000712 self._recover_all_recoverable_entries()
showard8cc058f2009-09-08 16:26:33 +0000713 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000714 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000715 self._reverify_remaining_hosts()
716 # reinitialize drones after killing orphaned processes, since they can
717 # leave around files when they die
718 _drone_manager.execute_actions()
719 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000720
showard170873e2009-01-07 00:22:26 +0000721
722 def _register_pidfiles(self):
723 # during recovery we may need to read pidfiles for both running and
724 # parsing entries
725 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000726 where="status IN ('Running', 'Gathering', 'Parsing')")
showarded2afea2009-07-07 20:54:07 +0000727 special_tasks = models.SpecialTask.objects.filter(is_active=True)
728 for execution_entry in itertools.chain(queue_entries, special_tasks):
showardd3dc1992009-04-22 21:01:40 +0000729 for pidfile_name in _ALL_PIDFILE_NAMES:
730 pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000731 execution_entry.execution_path(), pidfile_name=pidfile_name)
showardd3dc1992009-04-22 21:01:40 +0000732 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000733
734
showarded2afea2009-07-07 20:54:07 +0000735 def _get_recovery_run_monitor(self, execution_path, pidfile_name, orphans):
736 run_monitor = PidfileRunMonitor()
737 run_monitor.attach_to_existing_process(execution_path,
738 pidfile_name=pidfile_name)
739 if run_monitor.has_process():
740 orphans.discard(run_monitor.get_process())
741 return run_monitor, '(process %s)' % run_monitor.get_process()
742 return None, 'without process'
743
744
showard8cc058f2009-09-08 16:26:33 +0000745 def _get_unassigned_entries(self, status):
746 for entry in HostQueueEntry.fetch(where="status = '%s'" % status):
showard0db3d432009-10-12 20:29:15 +0000747 if entry.status == status and not self.get_agents_for_entry(entry):
748 # The status can change during iteration, e.g., if job.run()
749 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000750 yield entry
751
752
showardd3dc1992009-04-22 21:01:40 +0000753 def _recover_entries_with_status(self, status, orphans, pidfile_name,
754 recover_entries_fn):
showard8cc058f2009-09-08 16:26:33 +0000755 for queue_entry in self._get_unassigned_entries(status):
showardd3dc1992009-04-22 21:01:40 +0000756 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showarded2afea2009-07-07 20:54:07 +0000757 run_monitor, process_string = self._get_recovery_run_monitor(
758 queue_entry.execution_path(), pidfile_name, orphans)
showard8cc058f2009-09-08 16:26:33 +0000759 if not run_monitor:
760 # _schedule_running_host_queue_entries should schedule and
761 # recover these entries
762 continue
showard597bfd32009-05-08 18:22:50 +0000763
showarded2afea2009-07-07 20:54:07 +0000764 logging.info('Recovering %s entry %s %s',status.lower(),
765 ', '.join(str(entry) for entry in queue_entries),
766 process_string)
showardd3dc1992009-04-22 21:01:40 +0000767 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000768
769
showard6878e8b2009-07-20 22:37:45 +0000770 def _check_for_remaining_orphan_processes(self, orphans):
771 if not orphans:
772 return
773 subject = 'Unrecovered orphan autoserv processes remain'
774 message = '\n'.join(str(process) for process in orphans)
775 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000776
777 die_on_orphans = global_config.global_config.get_config_value(
778 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
779
780 if die_on_orphans:
781 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000782
showard170873e2009-01-07 00:22:26 +0000783
showardd3dc1992009-04-22 21:01:40 +0000784 def _recover_running_entries(self, orphans):
785 def recover_entries(job, queue_entries, run_monitor):
showard8cc058f2009-09-08 16:26:33 +0000786 queue_task = QueueTask(job=job, queue_entries=queue_entries,
787 recover_run_monitor=run_monitor)
788 self.add_agent(Agent(task=queue_task,
789 num_processes=len(queue_entries)))
showardd3dc1992009-04-22 21:01:40 +0000790
791 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
showarded2afea2009-07-07 20:54:07 +0000792 orphans, _AUTOSERV_PID_FILE,
showardd3dc1992009-04-22 21:01:40 +0000793 recover_entries)
794
795
796 def _recover_gathering_entries(self, orphans):
797 def recover_entries(job, queue_entries, run_monitor):
798 gather_task = GatherLogsTask(job, queue_entries,
showarded2afea2009-07-07 20:54:07 +0000799 recover_run_monitor=run_monitor)
showard8cc058f2009-09-08 16:26:33 +0000800 self.add_agent(Agent(gather_task))
showardd3dc1992009-04-22 21:01:40 +0000801
802 self._recover_entries_with_status(
803 models.HostQueueEntry.Status.GATHERING,
804 orphans, _CRASHINFO_PID_FILE, recover_entries)
805
806
807 def _recover_parsing_entries(self, orphans):
808 def recover_entries(job, queue_entries, run_monitor):
809 reparse_task = FinalReparseTask(queue_entries,
showarded2afea2009-07-07 20:54:07 +0000810 recover_run_monitor=run_monitor)
showard8cc058f2009-09-08 16:26:33 +0000811 self.add_agent(Agent(reparse_task, num_processes=0))
showardd3dc1992009-04-22 21:01:40 +0000812
813 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
814 orphans, _PARSER_PID_FILE,
815 recover_entries)
816
817
showard8cc058f2009-09-08 16:26:33 +0000818 def _recover_pending_entries(self):
819 for entry in self._get_unassigned_entries(
820 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000821 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000822 entry.on_pending()
823
824
showardd3dc1992009-04-22 21:01:40 +0000825 def _recover_all_recoverable_entries(self):
826 orphans = _drone_manager.get_orphaned_autoserv_processes()
827 self._recover_running_entries(orphans)
828 self._recover_gathering_entries(orphans)
829 self._recover_parsing_entries(orphans)
showarded2afea2009-07-07 20:54:07 +0000830 self._recover_special_tasks(orphans)
showard6878e8b2009-07-20 22:37:45 +0000831 self._check_for_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000832
showard97aed502008-11-04 02:01:24 +0000833
showarded2afea2009-07-07 20:54:07 +0000834 def _recover_special_tasks(self, orphans):
showard2fe3f1d2009-07-06 20:19:11 +0000835 """\
836 Recovers all special tasks that have started running but have not
837 completed.
838 """
839
840 tasks = models.SpecialTask.objects.filter(is_active=True,
841 is_complete=False)
842 # Use ordering to force NULL queue_entry_id's to the end of the list
showarda5288b42009-07-28 20:06:08 +0000843 for task in tasks.order_by('-queue_entry__id'):
showard9b6ec502009-08-20 23:25:17 +0000844 if self.host_has_agent(task.host):
845 raise SchedulerError(
846 "%s already has a host agent %s." % (
showard8cc058f2009-09-08 16:26:33 +0000847 task, self._host_agents.get(task.host.id)))
showard2fe3f1d2009-07-06 20:19:11 +0000848
showarded2afea2009-07-07 20:54:07 +0000849 run_monitor, process_string = self._get_recovery_run_monitor(
850 task.execution_path(), _AUTOSERV_PID_FILE, orphans)
851
852 logging.info('Recovering %s %s', task, process_string)
showard8cc058f2009-09-08 16:26:33 +0000853 self._recover_special_task(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000854
855
showard8cc058f2009-09-08 16:26:33 +0000856 def _recover_special_task(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000857 """\
858 Recovers a single special task.
859 """
860 if task.task == models.SpecialTask.Task.VERIFY:
showard8cc058f2009-09-08 16:26:33 +0000861 agent_task = self._recover_verify(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000862 elif task.task == models.SpecialTask.Task.REPAIR:
showard8cc058f2009-09-08 16:26:33 +0000863 agent_task = self._recover_repair(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000864 elif task.task == models.SpecialTask.Task.CLEANUP:
showard8cc058f2009-09-08 16:26:33 +0000865 agent_task = self._recover_cleanup(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000866 else:
867 # Should never happen
868 logging.error(
869 "Special task id %d had invalid task %s", (task.id, task.task))
870
showard8cc058f2009-09-08 16:26:33 +0000871 self.add_agent(Agent(agent_task))
showard2fe3f1d2009-07-06 20:19:11 +0000872
873
showard8cc058f2009-09-08 16:26:33 +0000874 def _recover_verify(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000875 """\
876 Recovers a verify task.
877 No associated queue entry: Verify host
878 With associated queue entry: Verify host, and run associated queue
879 entry
880 """
showard8cc058f2009-09-08 16:26:33 +0000881 return VerifyTask(task=task, recover_run_monitor=run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000882
883
showard8cc058f2009-09-08 16:26:33 +0000884 def _recover_repair(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000885 """\
886 Recovers a repair task.
887 Always repair host
888 """
showard8cc058f2009-09-08 16:26:33 +0000889 return RepairTask(task=task, recover_run_monitor=run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000890
891
showard8cc058f2009-09-08 16:26:33 +0000892 def _recover_cleanup(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000893 """\
894 Recovers a cleanup task.
895 No associated queue entry: Clean host
896 With associated queue entry: Clean host, verify host if needed, and
897 run associated queue entry
898 """
showard8cc058f2009-09-08 16:26:33 +0000899 return CleanupTask(task=task, recover_run_monitor=run_monitor)
showard6af73ad2009-07-28 20:00:58 +0000900
901
showardb8900452009-10-12 20:31:01 +0000902 def _check_for_unrecovered_verifying_entries(self):
showard170873e2009-01-07 00:22:26 +0000903 queue_entries = HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000904 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
905 unrecovered_hqes = []
906 for queue_entry in queue_entries:
907 special_tasks = models.SpecialTask.objects.filter(
908 task__in=(models.SpecialTask.Task.CLEANUP,
909 models.SpecialTask.Task.VERIFY),
910 queue_entry__id=queue_entry.id,
911 is_complete=False)
912 if special_tasks.count() == 0:
913 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000914
showardb8900452009-10-12 20:31:01 +0000915 if unrecovered_hqes:
916 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
showarde8e37072009-08-20 23:31:30 +0000917 raise SchedulerError(
918 '%d unrecovered active host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000919 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000920
921
showard8cc058f2009-09-08 16:26:33 +0000922 def _schedule_special_tasks(self):
923 tasks = models.SpecialTask.objects.filter(is_active=False,
924 is_complete=False,
925 host__locked=False)
926 # We want lower ids to come first, but the NULL queue_entry_ids need to
927 # come last
928 tasks = tasks.extra(select={'isnull' : 'queue_entry_id IS NULL'})
929 tasks = tasks.extra(order_by=['isnull', 'id'])
showard6d7b2ff2009-06-10 00:16:47 +0000930
showard2fe3f1d2009-07-06 20:19:11 +0000931 for task in tasks:
showard8cc058f2009-09-08 16:26:33 +0000932 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000933 continue
showard6d7b2ff2009-06-10 00:16:47 +0000934
showard8cc058f2009-09-08 16:26:33 +0000935 if task.task == models.SpecialTask.Task.CLEANUP:
936 agent_task = CleanupTask(task=task)
937 elif task.task == models.SpecialTask.Task.VERIFY:
938 agent_task = VerifyTask(task=task)
939 elif task.task == models.SpecialTask.Task.REPAIR:
940 agent_task = RepairTask(task=task)
941 else:
942 email_manager.manager.enqueue_notify_email(
943 'Special task with invalid task', task)
944 continue
945
946 self.add_agent(Agent(agent_task))
showard1ff7b2e2009-05-15 23:17:18 +0000947
948
showard170873e2009-01-07 00:22:26 +0000949 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000950 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000951 # should never happen
showarded2afea2009-07-07 20:54:07 +0000952 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000953 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000954 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000955 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000956 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000957
958
jadmanski0afbb632008-06-06 21:10:57 +0000959 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000960 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +0000961 full_where='locked = 0 AND invalid = 0 AND ' + where
962 for host in Host.fetch(where=full_where):
963 if self.host_has_agent(host):
964 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000965 continue
showard8cc058f2009-09-08 16:26:33 +0000966 if self._host_has_scheduled_special_task(host):
967 # host will have a special task scheduled on the next cycle
968 continue
showard170873e2009-01-07 00:22:26 +0000969 if print_message:
showardb18134f2009-03-20 20:52:18 +0000970 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000971 models.SpecialTask.objects.create(
972 task=models.SpecialTask.Task.CLEANUP,
973 host=models.Host(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000974
975
jadmanski0afbb632008-06-06 21:10:57 +0000976 def _recover_hosts(self):
977 # recover "Repair Failed" hosts
978 message = 'Reverifying dead host %s'
979 self._reverify_hosts_where("status = 'Repair Failed'",
980 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000981
982
showard04c82c52008-05-29 19:38:12 +0000983
showardb95b1bd2008-08-15 18:11:04 +0000984 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000985 # prioritize by job priority, then non-metahost over metahost, then FIFO
986 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000987 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000988 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000989 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000990
991
showard89f84db2009-03-12 20:39:13 +0000992 def _refresh_pending_queue_entries(self):
993 """
994 Lookup the pending HostQueueEntries and call our HostScheduler
995 refresh() method given that list. Return the list.
996
997 @returns A list of pending HostQueueEntries sorted in priority order.
998 """
showard63a34772008-08-18 19:32:50 +0000999 queue_entries = self._get_pending_queue_entries()
1000 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +00001001 return []
showardb95b1bd2008-08-15 18:11:04 +00001002
showard63a34772008-08-18 19:32:50 +00001003 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +00001004
showard89f84db2009-03-12 20:39:13 +00001005 return queue_entries
1006
1007
1008 def _schedule_atomic_group(self, queue_entry):
1009 """
1010 Schedule the given queue_entry on an atomic group of hosts.
1011
1012 Returns immediately if there are insufficient available hosts.
1013
1014 Creates new HostQueueEntries based off of queue_entry for the
1015 scheduled hosts and starts them all running.
1016 """
1017 # This is a virtual host queue entry representing an entire
1018 # atomic group, find a group and schedule their hosts.
1019 group_hosts = self._host_scheduler.find_eligible_atomic_group(
1020 queue_entry)
1021 if not group_hosts:
1022 return
showardcbe6f942009-06-17 19:33:49 +00001023
1024 logging.info('Expanding atomic group entry %s with hosts %s',
1025 queue_entry,
1026 ', '.join(host.hostname for host in group_hosts))
showard89f84db2009-03-12 20:39:13 +00001027 # The first assigned host uses the original HostQueueEntry
1028 group_queue_entries = [queue_entry]
1029 for assigned_host in group_hosts[1:]:
1030 # Create a new HQE for every additional assigned_host.
1031 new_hqe = HostQueueEntry.clone(queue_entry)
1032 new_hqe.save()
1033 group_queue_entries.append(new_hqe)
1034 assert len(group_queue_entries) == len(group_hosts)
1035 for queue_entry, host in itertools.izip(group_queue_entries,
1036 group_hosts):
1037 self._run_queue_entry(queue_entry, host)
1038
1039
1040 def _schedule_new_jobs(self):
1041 queue_entries = self._refresh_pending_queue_entries()
1042 if not queue_entries:
1043 return
1044
showard63a34772008-08-18 19:32:50 +00001045 for queue_entry in queue_entries:
showarde55955f2009-10-07 20:48:58 +00001046 is_unassigned_atomic_group = (
1047 queue_entry.atomic_group_id is not None
1048 and queue_entry.host_id is None)
1049 if is_unassigned_atomic_group:
1050 self._schedule_atomic_group(queue_entry)
1051 else:
showard89f84db2009-03-12 20:39:13 +00001052 assigned_host = self._host_scheduler.find_eligible_host(
1053 queue_entry)
1054 if assigned_host:
1055 self._run_queue_entry(queue_entry, assigned_host)
showardb95b1bd2008-08-15 18:11:04 +00001056
1057
showard8cc058f2009-09-08 16:26:33 +00001058 def _schedule_running_host_queue_entries(self):
1059 entries = HostQueueEntry.fetch(
1060 where="status IN "
1061 "('Starting', 'Running', 'Gathering', 'Parsing')")
1062 for entry in entries:
1063 if self.get_agents_for_entry(entry):
1064 continue
1065
1066 task_entries = entry.job.get_group_entries(entry)
1067 for task_entry in task_entries:
1068 if (task_entry.status != models.HostQueueEntry.Status.PARSING
1069 and self.host_has_agent(task_entry.host)):
1070 agent = self._host_agents.get(task_entry.host.id)[0]
1071 raise SchedulerError('Attempted to schedule on host that '
1072 'already has agent: %s (previous '
1073 'agent task: %s)'
1074 % (task_entry, agent.task))
1075
1076 if entry.status in (models.HostQueueEntry.Status.STARTING,
1077 models.HostQueueEntry.Status.RUNNING):
1078 params = entry.job.get_autoserv_params(task_entries)
1079 agent_task = QueueTask(job=entry.job,
1080 queue_entries=task_entries, cmd=params)
1081 elif entry.status == models.HostQueueEntry.Status.GATHERING:
1082 agent_task = GatherLogsTask(
1083 job=entry.job, queue_entries=task_entries)
1084 elif entry.status == models.HostQueueEntry.Status.PARSING:
1085 agent_task = FinalReparseTask(queue_entries=task_entries)
1086 else:
1087 raise SchedulerError('_schedule_running_host_queue_entries got '
1088 'entry with invalid status %s: %s'
1089 % (entry.status, entry))
1090
1091 self.add_agent(Agent(agent_task, num_processes=len(task_entries)))
1092
1093
1094 def _schedule_delay_tasks(self):
showardd2014822009-10-12 20:26:58 +00001095 for entry in HostQueueEntry.fetch(where='status = "%s"' %
1096 models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +00001097 task = entry.job.schedule_delayed_callback_task(entry)
1098 if task:
1099 self.add_agent(Agent(task, num_processes=0))
1100
1101
showardb95b1bd2008-08-15 18:11:04 +00001102 def _run_queue_entry(self, queue_entry, host):
showard8cc058f2009-09-08 16:26:33 +00001103 queue_entry.schedule_pre_job_tasks(assigned_host=host)
mblighd5c95802008-03-05 00:33:46 +00001104
1105
jadmanski0afbb632008-06-06 21:10:57 +00001106 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +00001107 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001108 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001109 for agent in self.get_agents_for_entry(entry):
1110 agent.abort()
1111 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001112
1113
showard324bf812009-01-20 23:23:38 +00001114 def _can_start_agent(self, agent, num_started_this_cycle,
1115 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001116 # always allow zero-process agents to run
1117 if agent.num_processes == 0:
1118 return True
1119 # don't allow any nonzero-process agents to run after we've reached a
1120 # limit (this avoids starvation of many-process agents)
1121 if have_reached_limit:
1122 return False
1123 # total process throttling
showard324bf812009-01-20 23:23:38 +00001124 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +00001125 return False
1126 # if a single agent exceeds the per-cycle throttling, still allow it to
1127 # run when it's the first agent in the cycle
1128 if num_started_this_cycle == 0:
1129 return True
1130 # per-cycle throttling
1131 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +00001132 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001133 return False
1134 return True
1135
1136
jadmanski0afbb632008-06-06 21:10:57 +00001137 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001138 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001139 have_reached_limit = False
1140 # iterate over copy, so we can remove agents during iteration
1141 for agent in list(self._agents):
showard8cc058f2009-09-08 16:26:33 +00001142 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001143 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001144 have_reached_limit):
1145 have_reached_limit = True
1146 continue
showard4c5374f2008-09-04 17:02:56 +00001147 num_started_this_cycle += agent.num_processes
1148 agent.tick()
showard8cc058f2009-09-08 16:26:33 +00001149 if agent.is_done():
1150 logging.info("agent finished")
1151 self.remove_agent(agent)
showarda9435c02009-05-13 21:28:17 +00001152 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001153 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001154
1155
showard29f7cd22009-04-29 21:16:24 +00001156 def _process_recurring_runs(self):
1157 recurring_runs = models.RecurringRun.objects.filter(
1158 start_date__lte=datetime.datetime.now())
1159 for rrun in recurring_runs:
1160 # Create job from template
1161 job = rrun.job
1162 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001163 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001164
1165 host_objects = info['hosts']
1166 one_time_hosts = info['one_time_hosts']
1167 metahost_objects = info['meta_hosts']
1168 dependencies = info['dependencies']
1169 atomic_group = info['atomic_group']
1170
1171 for host in one_time_hosts or []:
1172 this_host = models.Host.create_one_time_host(host.hostname)
1173 host_objects.append(this_host)
1174
1175 try:
1176 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001177 options=options,
showard29f7cd22009-04-29 21:16:24 +00001178 host_objects=host_objects,
1179 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001180 atomic_group=atomic_group)
1181
1182 except Exception, ex:
1183 logging.exception(ex)
1184 #TODO send email
1185
1186 if rrun.loop_count == 1:
1187 rrun.delete()
1188 else:
1189 if rrun.loop_count != 0: # if not infinite loop
1190 # calculate new start_date
1191 difference = datetime.timedelta(seconds=rrun.loop_period)
1192 rrun.start_date = rrun.start_date + difference
1193 rrun.loop_count -= 1
1194 rrun.save()
1195
1196
showard170873e2009-01-07 00:22:26 +00001197class PidfileRunMonitor(object):
1198 """
1199 Client must call either run() to start a new process or
1200 attach_to_existing_process().
1201 """
mbligh36768f02008-02-22 18:28:33 +00001202
showard170873e2009-01-07 00:22:26 +00001203 class _PidfileException(Exception):
1204 """
1205 Raised when there's some unexpected behavior with the pid file, but only
1206 used internally (never allowed to escape this class).
1207 """
mbligh36768f02008-02-22 18:28:33 +00001208
1209
showard170873e2009-01-07 00:22:26 +00001210 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001211 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001212 self._start_time = None
1213 self.pidfile_id = None
1214 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001215
1216
showard170873e2009-01-07 00:22:26 +00001217 def _add_nice_command(self, command, nice_level):
1218 if not nice_level:
1219 return command
1220 return ['nice', '-n', str(nice_level)] + command
1221
1222
1223 def _set_start_time(self):
1224 self._start_time = time.time()
1225
1226
1227 def run(self, command, working_directory, nice_level=None, log_file=None,
1228 pidfile_name=None, paired_with_pidfile=None):
1229 assert command is not None
1230 if nice_level is not None:
1231 command = ['nice', '-n', str(nice_level)] + command
1232 self._set_start_time()
1233 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001234 command, working_directory, pidfile_name=pidfile_name,
1235 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001236
1237
showarded2afea2009-07-07 20:54:07 +00001238 def attach_to_existing_process(self, execution_path,
showardd3dc1992009-04-22 21:01:40 +00001239 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001240 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001241 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001242 execution_path, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001243 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001244
1245
jadmanski0afbb632008-06-06 21:10:57 +00001246 def kill(self):
showard170873e2009-01-07 00:22:26 +00001247 if self.has_process():
1248 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001249
mbligh36768f02008-02-22 18:28:33 +00001250
showard170873e2009-01-07 00:22:26 +00001251 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001252 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001253 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001254
1255
showard170873e2009-01-07 00:22:26 +00001256 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001257 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001258 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001259 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001260
1261
showard170873e2009-01-07 00:22:26 +00001262 def _read_pidfile(self, use_second_read=False):
1263 assert self.pidfile_id is not None, (
1264 'You must call run() or attach_to_existing_process()')
1265 contents = _drone_manager.get_pidfile_contents(
1266 self.pidfile_id, use_second_read=use_second_read)
1267 if contents.is_invalid():
1268 self._state = drone_manager.PidfileContents()
1269 raise self._PidfileException(contents)
1270 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001271
1272
showard21baa452008-10-21 00:08:39 +00001273 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001274 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1275 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001276 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001277 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001278
1279
1280 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001281 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001282 return
mblighbb421852008-03-11 22:36:16 +00001283
showard21baa452008-10-21 00:08:39 +00001284 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001285
showard170873e2009-01-07 00:22:26 +00001286 if self._state.process is None:
1287 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001288 return
mbligh90a549d2008-03-25 23:52:34 +00001289
showard21baa452008-10-21 00:08:39 +00001290 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001291 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001292 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001293 return
mbligh90a549d2008-03-25 23:52:34 +00001294
showard170873e2009-01-07 00:22:26 +00001295 # pid but no running process - maybe process *just* exited
1296 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001297 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001298 # autoserv exited without writing an exit code
1299 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001300 self._handle_pidfile_error(
1301 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001302
showard21baa452008-10-21 00:08:39 +00001303
1304 def _get_pidfile_info(self):
1305 """\
1306 After completion, self._state will contain:
1307 pid=None, exit_status=None if autoserv has not yet run
1308 pid!=None, exit_status=None if autoserv is running
1309 pid!=None, exit_status!=None if autoserv has completed
1310 """
1311 try:
1312 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001313 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001314 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001315
1316
showard170873e2009-01-07 00:22:26 +00001317 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001318 """\
1319 Called when no pidfile is found or no pid is in the pidfile.
1320 """
showard170873e2009-01-07 00:22:26 +00001321 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001322 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001323 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001324 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001325 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001326
1327
showard35162b02009-03-03 02:17:30 +00001328 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001329 """\
1330 Called when autoserv has exited without writing an exit status,
1331 or we've timed out waiting for autoserv to write a pid to the
1332 pidfile. In either case, we just return failure and the caller
1333 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001334
showard170873e2009-01-07 00:22:26 +00001335 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001336 """
1337 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001338 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001339 self._state.exit_status = 1
1340 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001341
1342
jadmanski0afbb632008-06-06 21:10:57 +00001343 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001344 self._get_pidfile_info()
1345 return self._state.exit_status
1346
1347
1348 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001349 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001350 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001351 if self._state.num_tests_failed is None:
1352 return -1
showard21baa452008-10-21 00:08:39 +00001353 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001354
1355
showardcdaeae82009-08-31 18:32:48 +00001356 def try_copy_results_on_drone(self, **kwargs):
1357 if self.has_process():
1358 # copy results logs into the normal place for job results
1359 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1360
1361
1362 def try_copy_to_results_repository(self, source, **kwargs):
1363 if self.has_process():
1364 _drone_manager.copy_to_results_repository(self.get_process(),
1365 source, **kwargs)
1366
1367
mbligh36768f02008-02-22 18:28:33 +00001368class Agent(object):
showard77182562009-06-10 00:16:05 +00001369 """
showard8cc058f2009-09-08 16:26:33 +00001370 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001371
1372 The following methods are required on all task objects:
1373 poll() - Called periodically to let the task check its status and
1374 update its internal state. If the task succeeded.
1375 is_done() - Returns True if the task is finished.
1376 abort() - Called when an abort has been requested. The task must
1377 set its aborted attribute to True if it actually aborted.
1378
1379 The following attributes are required on all task objects:
1380 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001381 success - bool, True if this task succeeded.
1382 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1383 host_ids - A sequence of Host ids this task represents.
1384
1385 The following attribute is written to all task objects:
1386 agent - A reference to the Agent instance that the task has been
1387 added to.
1388 """
1389
1390
showard8cc058f2009-09-08 16:26:33 +00001391 def __init__(self, task, num_processes=1):
showard77182562009-06-10 00:16:05 +00001392 """
showard8cc058f2009-09-08 16:26:33 +00001393 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001394 @param num_processes: The number of subprocesses the Agent represents.
1395 This is used by the Dispatcher for managing the load on the
1396 system. Defaults to 1.
1397 """
showard8cc058f2009-09-08 16:26:33 +00001398 self.task = task
1399 task.agent = self
1400
showard77182562009-06-10 00:16:05 +00001401 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001402 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001403 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001404
showard8cc058f2009-09-08 16:26:33 +00001405 self.queue_entry_ids = task.queue_entry_ids
1406 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001407
showard8cc058f2009-09-08 16:26:33 +00001408 self.started = False
mbligh36768f02008-02-22 18:28:33 +00001409
1410
jadmanski0afbb632008-06-06 21:10:57 +00001411 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001412 self.started = True
1413 if self.task:
1414 self.task.poll()
1415 if self.task.is_done():
1416 self.task = None
showardec113162008-05-08 00:52:49 +00001417
1418
jadmanski0afbb632008-06-06 21:10:57 +00001419 def is_done(self):
showard8cc058f2009-09-08 16:26:33 +00001420 return self.task is None
mbligh36768f02008-02-22 18:28:33 +00001421
1422
showardd3dc1992009-04-22 21:01:40 +00001423 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001424 if self.task:
1425 self.task.abort()
1426 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001427 # tasks can choose to ignore aborts
showard8cc058f2009-09-08 16:26:33 +00001428 self.task = None
showard20f9bdd2009-04-29 19:48:33 +00001429
showardd3dc1992009-04-22 21:01:40 +00001430
showard77182562009-06-10 00:16:05 +00001431class DelayedCallTask(object):
1432 """
1433 A task object like AgentTask for an Agent to run that waits for the
1434 specified amount of time to have elapsed before calling the supplied
1435 callback once and finishing. If the callback returns anything, it is
1436 assumed to be a new Agent instance and will be added to the dispatcher.
1437
1438 @attribute end_time: The absolute posix time after which this task will
1439 call its callback when it is polled and be finished.
1440
1441 Also has all attributes required by the Agent class.
1442 """
1443 def __init__(self, delay_seconds, callback, now_func=None):
1444 """
1445 @param delay_seconds: The delay in seconds from now that this task
1446 will call the supplied callback and be done.
1447 @param callback: A callable to be called by this task once after at
1448 least delay_seconds time has elapsed. It must return None
1449 or a new Agent instance.
1450 @param now_func: A time.time like function. Default: time.time.
1451 Used for testing.
1452 """
1453 assert delay_seconds > 0
1454 assert callable(callback)
1455 if not now_func:
1456 now_func = time.time
1457 self._now_func = now_func
1458 self._callback = callback
1459
1460 self.end_time = self._now_func() + delay_seconds
1461
1462 # These attributes are required by Agent.
1463 self.aborted = False
showard77182562009-06-10 00:16:05 +00001464 self.host_ids = ()
1465 self.success = False
1466 self.queue_entry_ids = ()
1467 # This is filled in by Agent.add_task().
1468 self.agent = None
1469
1470
1471 def poll(self):
showard8cc058f2009-09-08 16:26:33 +00001472 if not self.is_done() and self._now_func() >= self.end_time:
1473 self._callback()
showard77182562009-06-10 00:16:05 +00001474 self.success = True
1475
1476
1477 def is_done(self):
showard8cc058f2009-09-08 16:26:33 +00001478 return self.success or self.aborted
showard77182562009-06-10 00:16:05 +00001479
1480
1481 def abort(self):
1482 self.aborted = True
showard77182562009-06-10 00:16:05 +00001483
1484
mbligh36768f02008-02-22 18:28:33 +00001485class AgentTask(object):
showard8cc058f2009-09-08 16:26:33 +00001486 def __init__(self, cmd=None, working_directory=None,
showarded2afea2009-07-07 20:54:07 +00001487 pidfile_name=None, paired_with_pidfile=None,
1488 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001489 self.done = False
jadmanski0afbb632008-06-06 21:10:57 +00001490 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001491 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001492 self.agent = None
showarded2afea2009-07-07 20:54:07 +00001493 self.monitor = recover_run_monitor
1494 self.started = bool(recover_run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +00001495 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001496 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001497 self.queue_entry_ids = []
1498 self.host_ids = []
1499 self.log_file = None
1500
1501
1502 def _set_ids(self, host=None, queue_entries=None):
1503 if queue_entries and queue_entries != [None]:
1504 self.host_ids = [entry.host.id for entry in queue_entries]
1505 self.queue_entry_ids = [entry.id for entry in queue_entries]
1506 else:
1507 assert host
1508 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001509
1510
jadmanski0afbb632008-06-06 21:10:57 +00001511 def poll(self):
showard08a36412009-05-05 01:01:13 +00001512 if not self.started:
1513 self.start()
1514 self.tick()
1515
1516
1517 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001518 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001519 exit_code = self.monitor.exit_code()
1520 if exit_code is None:
1521 return
1522 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001523 else:
1524 success = False
mbligh36768f02008-02-22 18:28:33 +00001525
jadmanski0afbb632008-06-06 21:10:57 +00001526 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001527
1528
jadmanski0afbb632008-06-06 21:10:57 +00001529 def is_done(self):
1530 return self.done
mbligh36768f02008-02-22 18:28:33 +00001531
1532
jadmanski0afbb632008-06-06 21:10:57 +00001533 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001534 if self.done:
1535 return
jadmanski0afbb632008-06-06 21:10:57 +00001536 self.done = True
1537 self.success = success
1538 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001539
1540
jadmanski0afbb632008-06-06 21:10:57 +00001541 def prolog(self):
showarded2afea2009-07-07 20:54:07 +00001542 assert not self.monitor
mblighd64e5702008-04-04 21:39:28 +00001543
mbligh36768f02008-02-22 18:28:33 +00001544
jadmanski0afbb632008-06-06 21:10:57 +00001545 def cleanup(self):
showardcdaeae82009-08-31 18:32:48 +00001546 if self.monitor and self.log_file:
1547 self.monitor.try_copy_to_results_repository(self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001548
1549
jadmanski0afbb632008-06-06 21:10:57 +00001550 def epilog(self):
1551 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001552
1553
jadmanski0afbb632008-06-06 21:10:57 +00001554 def start(self):
1555 assert self.agent
1556
1557 if not self.started:
1558 self.prolog()
1559 self.run()
1560
1561 self.started = True
1562
1563
1564 def abort(self):
1565 if self.monitor:
1566 self.monitor.kill()
1567 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001568 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001569 self.cleanup()
1570
1571
showarded2afea2009-07-07 20:54:07 +00001572 def _get_consistent_execution_path(self, execution_entries):
1573 first_execution_path = execution_entries[0].execution_path()
1574 for execution_entry in execution_entries[1:]:
1575 assert execution_entry.execution_path() == first_execution_path, (
1576 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1577 execution_entry,
1578 first_execution_path,
1579 execution_entries[0]))
1580 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001581
1582
showarded2afea2009-07-07 20:54:07 +00001583 def _copy_results(self, execution_entries, use_monitor=None):
1584 """
1585 @param execution_entries: list of objects with execution_path() method
1586 """
showard6d1c1432009-08-20 23:30:39 +00001587 if use_monitor is not None and not use_monitor.has_process():
1588 return
1589
showarded2afea2009-07-07 20:54:07 +00001590 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001591 if use_monitor is None:
1592 assert self.monitor
1593 use_monitor = self.monitor
1594 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001595 execution_path = self._get_consistent_execution_path(execution_entries)
1596 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001597 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001598
showarda1e74b32009-05-12 17:32:04 +00001599
1600 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001601 for queue_entry in queue_entries:
1602 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001603
1604
showarda1e74b32009-05-12 17:32:04 +00001605 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1606 self._copy_results(queue_entries, use_monitor)
1607 self._parse_results(queue_entries)
1608
1609
showardd3dc1992009-04-22 21:01:40 +00001610 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
showarded2afea2009-07-07 20:54:07 +00001611 assert not self.monitor
jadmanski0afbb632008-06-06 21:10:57 +00001612 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001613 self.monitor = PidfileRunMonitor()
1614 self.monitor.run(self.cmd, self._working_directory,
1615 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001616 log_file=self.log_file,
1617 pidfile_name=pidfile_name,
1618 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001619
1620
showardd9205182009-04-27 20:09:55 +00001621class TaskWithJobKeyvals(object):
1622 """AgentTask mixin providing functionality to help with job keyval files."""
1623 _KEYVAL_FILE = 'keyval'
1624 def _format_keyval(self, key, value):
1625 return '%s=%s' % (key, value)
1626
1627
1628 def _keyval_path(self):
1629 """Subclasses must override this"""
1630 raise NotImplemented
1631
1632
1633 def _write_keyval_after_job(self, field, value):
1634 assert self.monitor
1635 if not self.monitor.has_process():
1636 return
1637 _drone_manager.write_lines_to_file(
1638 self._keyval_path(), [self._format_keyval(field, value)],
1639 paired_with_process=self.monitor.get_process())
1640
1641
1642 def _job_queued_keyval(self, job):
1643 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1644
1645
1646 def _write_job_finished(self):
1647 self._write_keyval_after_job("job_finished", int(time.time()))
1648
1649
showarddb502762009-09-09 15:31:20 +00001650 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1651 keyval_contents = '\n'.join(self._format_keyval(key, value)
1652 for key, value in keyval_dict.iteritems())
1653 # always end with a newline to allow additional keyvals to be written
1654 keyval_contents += '\n'
1655 _drone_manager.attach_file_to_execution(self._working_directory,
1656 keyval_contents,
1657 file_path=keyval_path)
1658
1659
1660 def _write_keyvals_before_job(self, keyval_dict):
1661 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1662
1663
1664 def _write_host_keyvals(self, host):
1665 keyval_path = os.path.join(self._working_directory, 'host_keyvals',
1666 host.hostname)
1667 platform, all_labels = host.platform_and_labels()
1668 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1669 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1670
1671
showard8cc058f2009-09-08 16:26:33 +00001672class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001673 """
1674 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1675 """
1676
1677 TASK_TYPE = None
1678 host = None
1679 queue_entry = None
1680
1681 def __init__(self, task, extra_command_args, **kwargs):
showarded2afea2009-07-07 20:54:07 +00001682 assert (self.TASK_TYPE is not None,
1683 'self.TASK_TYPE must be overridden')
showard8cc058f2009-09-08 16:26:33 +00001684
1685 self.host = Host(id=task.host.id)
1686 self.queue_entry = None
1687 if task.queue_entry:
1688 self.queue_entry = HostQueueEntry(id=task.queue_entry.id)
1689
showarded2afea2009-07-07 20:54:07 +00001690 self.task = task
showarddb502762009-09-09 15:31:20 +00001691 kwargs['working_directory'] = task.execution_path()
showarded2afea2009-07-07 20:54:07 +00001692 self._extra_command_args = extra_command_args
1693 super(SpecialAgentTask, self).__init__(**kwargs)
1694
1695
showard8cc058f2009-09-08 16:26:33 +00001696 def _keyval_path(self):
1697 return os.path.join(self._working_directory, self._KEYVAL_FILE)
1698
1699
showarded2afea2009-07-07 20:54:07 +00001700 def prolog(self):
1701 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001702 self.cmd = _autoserv_command_line(self.host.hostname,
1703 self._extra_command_args,
1704 queue_entry=self.queue_entry)
1705 self._working_directory = self.task.execution_path()
1706 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001707 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001708
1709
showardde634ee2009-01-30 01:44:24 +00001710 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001711 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001712
showard2fe3f1d2009-07-06 20:19:11 +00001713 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001714 return # don't fail metahost entries, they'll be reassigned
1715
showard2fe3f1d2009-07-06 20:19:11 +00001716 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001717 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001718 return # entry has been aborted
1719
showard2fe3f1d2009-07-06 20:19:11 +00001720 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001721 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001722 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001723 self._write_keyval_after_job(queued_key, queued_time)
1724 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001725
showard8cc058f2009-09-08 16:26:33 +00001726 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001727 self.monitor.try_copy_results_on_drone(
1728 source_path=self._working_directory + '/',
1729 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001730
showard2fe3f1d2009-07-06 20:19:11 +00001731 self._copy_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001732 self.queue_entry.handle_host_failure()
showard2fe3f1d2009-07-06 20:19:11 +00001733 if self.queue_entry.job.parse_failed_repair:
1734 self._parse_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001735
1736 pidfile_id = _drone_manager.get_pidfile_id_from(
1737 self.queue_entry.execution_path(),
1738 pidfile_name=_AUTOSERV_PID_FILE)
1739 _drone_manager.register_pidfile(pidfile_id)
1740
1741
1742 def cleanup(self):
1743 super(SpecialAgentTask, self).cleanup()
1744 self.task.finish()
showardf85a0b72009-10-07 20:48:45 +00001745 if self.monitor:
1746 if self.monitor.has_process():
1747 self._copy_results([self.task])
1748 if self.monitor.pidfile_id is not None:
1749 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001750
1751
1752class RepairTask(SpecialAgentTask):
1753 TASK_TYPE = models.SpecialTask.Task.REPAIR
1754
1755
1756 def __init__(self, task, recover_run_monitor=None):
1757 """\
1758 queue_entry: queue entry to mark failed if this repair fails.
1759 """
1760 protection = host_protections.Protection.get_string(
1761 task.host.protection)
1762 # normalize the protection name
1763 protection = host_protections.Protection.get_attr_name(protection)
1764
1765 super(RepairTask, self).__init__(
1766 task, ['-R', '--host-protection', protection],
1767 recover_run_monitor=recover_run_monitor)
1768
1769 # *don't* include the queue entry in IDs -- if the queue entry is
1770 # aborted, we want to leave the repair task running
1771 self._set_ids(host=self.host)
1772
1773
1774 def prolog(self):
1775 super(RepairTask, self).prolog()
1776 logging.info("repair_task starting")
1777 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001778
1779
jadmanski0afbb632008-06-06 21:10:57 +00001780 def epilog(self):
1781 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001782
jadmanski0afbb632008-06-06 21:10:57 +00001783 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001784 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001785 else:
showard8cc058f2009-09-08 16:26:33 +00001786 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001787 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001788 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001789
1790
showarded2afea2009-07-07 20:54:07 +00001791class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001792 def _copy_to_results_repository(self):
1793 if not self.queue_entry or self.queue_entry.meta_host:
1794 return
1795
1796 self.queue_entry.set_execution_subdir()
1797 log_name = os.path.basename(self.task.execution_path())
1798 source = os.path.join(self.task.execution_path(), 'debug',
1799 'autoserv.DEBUG')
1800 destination = os.path.join(
1801 self.queue_entry.execution_path(), log_name)
1802
1803 self.monitor.try_copy_to_results_repository(
1804 source, destination_path=destination)
1805
1806
showard170873e2009-01-07 00:22:26 +00001807 def epilog(self):
1808 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001809
showard775300b2009-09-09 15:30:50 +00001810 if self.success:
1811 return
showard8fe93b52008-11-18 17:53:22 +00001812
showard775300b2009-09-09 15:30:50 +00001813 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001814
showard775300b2009-09-09 15:30:50 +00001815 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
1816 return
1817
1818 if self.queue_entry:
1819 self.queue_entry.requeue()
1820
1821 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001822 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001823 queue_entry__id=self.queue_entry.id):
1824 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1825 self._fail_queue_entry()
1826 return
1827
1828 queue_entry = models.HostQueueEntry(id=self.queue_entry.id)
1829 else:
1830 queue_entry = None
1831
1832 models.SpecialTask.objects.create(
1833 host=models.Host(id=self.host.id),
1834 task=models.SpecialTask.Task.REPAIR,
1835 queue_entry=queue_entry)
showard58721a82009-08-20 23:32:40 +00001836
showard8fe93b52008-11-18 17:53:22 +00001837
1838class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001839 TASK_TYPE = models.SpecialTask.Task.VERIFY
1840
1841
showard8cc058f2009-09-08 16:26:33 +00001842 def __init__(self, task, recover_run_monitor=None):
showarded2afea2009-07-07 20:54:07 +00001843 super(VerifyTask, self).__init__(
showard8cc058f2009-09-08 16:26:33 +00001844 task, ['-v'], recover_run_monitor=recover_run_monitor)
mblighe2586682008-02-29 22:45:46 +00001845
showard8cc058f2009-09-08 16:26:33 +00001846 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001847
1848
jadmanski0afbb632008-06-06 21:10:57 +00001849 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001850 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001851
showardb18134f2009-03-20 20:52:18 +00001852 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001853 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001854 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1855 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001856
showarded2afea2009-07-07 20:54:07 +00001857 # Delete any other queued verifies for this host. One verify will do
1858 # and there's no need to keep records of other requests.
1859 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001860 host__id=self.host.id,
1861 task=models.SpecialTask.Task.VERIFY,
showarded2afea2009-07-07 20:54:07 +00001862 is_active=False, is_complete=False)
1863 queued_verifies = queued_verifies.exclude(id=self.task.id)
1864 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001865
mbligh36768f02008-02-22 18:28:33 +00001866
jadmanski0afbb632008-06-06 21:10:57 +00001867 def epilog(self):
1868 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001869 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001870 if self.queue_entry:
1871 self.queue_entry.on_pending()
1872 else:
1873 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001874
1875
showardb5626452009-06-30 01:57:28 +00001876class CleanupHostsMixin(object):
1877 def _reboot_hosts(self, job, queue_entries, final_success,
1878 num_tests_failed):
1879 reboot_after = job.reboot_after
1880 do_reboot = (
1881 # always reboot after aborted jobs
1882 self._final_status == models.HostQueueEntry.Status.ABORTED
1883 or reboot_after == models.RebootAfter.ALWAYS
1884 or (reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED
1885 and final_success and num_tests_failed == 0))
1886
1887 for queue_entry in queue_entries:
1888 if do_reboot:
1889 # don't pass the queue entry to the CleanupTask. if the cleanup
1890 # fails, the job doesn't care -- it's over.
showard8cc058f2009-09-08 16:26:33 +00001891 models.SpecialTask.objects.create(
1892 host=models.Host(id=queue_entry.host.id),
1893 task=models.SpecialTask.Task.CLEANUP)
showardb5626452009-06-30 01:57:28 +00001894 else:
showard8cc058f2009-09-08 16:26:33 +00001895 queue_entry.host.set_status(models.Host.Status.READY)
showardb5626452009-06-30 01:57:28 +00001896
1897
1898class QueueTask(AgentTask, TaskWithJobKeyvals, CleanupHostsMixin):
showard8cc058f2009-09-08 16:26:33 +00001899 def __init__(self, job, queue_entries, cmd=None, recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001900 self.job = job
1901 self.queue_entries = queue_entries
showard8cc058f2009-09-08 16:26:33 +00001902 self.group_name = queue_entries[0].get_group_name()
showarded2afea2009-07-07 20:54:07 +00001903 super(QueueTask, self).__init__(
1904 cmd, self._execution_path(),
1905 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00001906 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001907
1908
showard73ec0442009-02-07 02:05:20 +00001909 def _keyval_path(self):
showarded2afea2009-07-07 20:54:07 +00001910 return os.path.join(self._execution_path(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001911
1912
showarded2afea2009-07-07 20:54:07 +00001913 def _execution_path(self):
1914 return self.queue_entries[0].execution_path()
mblighbb421852008-03-11 22:36:16 +00001915
1916
jadmanski0afbb632008-06-06 21:10:57 +00001917 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00001918 for entry in self.queue_entries:
1919 if entry.status not in (models.HostQueueEntry.Status.STARTING,
1920 models.HostQueueEntry.Status.RUNNING):
1921 raise SchedulerError('Queue task attempting to start '
1922 'entry with invalid status %s: %s'
1923 % (entry.status, entry))
1924 if entry.host.status not in (models.Host.Status.PENDING,
1925 models.Host.Status.RUNNING):
1926 raise SchedulerError('Queue task attempting to start on queue '
1927 'entry with invalid host status %s: %s'
1928 % (entry.host.status, entry))
1929
showardd9205182009-04-27 20:09:55 +00001930 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001931 keyval_dict = {queued_key: queued_time}
1932 if self.group_name:
1933 keyval_dict['host_group_name'] = self.group_name
1934 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001935 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001936 self._write_host_keyvals(queue_entry.host)
showard8cc058f2009-09-08 16:26:33 +00001937 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showard12f3e322009-05-13 21:27:42 +00001938 queue_entry.update_field('started_on', datetime.datetime.now())
showard8cc058f2009-09-08 16:26:33 +00001939 queue_entry.host.set_status(models.Host.Status.RUNNING)
showard21baa452008-10-21 00:08:39 +00001940 queue_entry.host.update_field('dirty', 1)
showardc6a56872009-07-28 20:11:58 +00001941 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
1942 # TODO(gps): Remove this if nothing needs it anymore.
1943 # A potential user is: tko/parser
jadmanski0afbb632008-06-06 21:10:57 +00001944 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001945
1946
showard35162b02009-03-03 02:17:30 +00001947 def _write_lost_process_error_file(self):
showarded2afea2009-07-07 20:54:07 +00001948 error_file_path = os.path.join(self._execution_path(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001949 _drone_manager.write_lines_to_file(error_file_path,
1950 [_LOST_PROCESS_ERROR])
1951
1952
showardd3dc1992009-04-22 21:01:40 +00001953 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001954 if not self.monitor:
1955 return
1956
showardd9205182009-04-27 20:09:55 +00001957 self._write_job_finished()
1958
showard35162b02009-03-03 02:17:30 +00001959 if self.monitor.lost_process:
1960 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001961
showard8cc058f2009-09-08 16:26:33 +00001962 for queue_entry in self.queue_entries:
1963 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001964
1965
showardcbd74612008-11-19 21:42:02 +00001966 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001967 _drone_manager.write_lines_to_file(
showarded2afea2009-07-07 20:54:07 +00001968 os.path.join(self._execution_path(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001969 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001970 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001971
1972
jadmanskif7fa2cc2008-10-01 14:13:23 +00001973 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001974 if not self.monitor or not self.monitor.has_process():
1975 return
1976
jadmanskif7fa2cc2008-10-01 14:13:23 +00001977 # build up sets of all the aborted_by and aborted_on values
1978 aborted_by, aborted_on = set(), set()
1979 for queue_entry in self.queue_entries:
1980 if queue_entry.aborted_by:
1981 aborted_by.add(queue_entry.aborted_by)
1982 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1983 aborted_on.add(t)
1984
1985 # extract some actual, unique aborted by value and write it out
1986 assert len(aborted_by) <= 1
1987 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001988 aborted_by_value = aborted_by.pop()
1989 aborted_on_value = max(aborted_on)
1990 else:
1991 aborted_by_value = 'autotest_system'
1992 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001993
showarda0382352009-02-11 23:36:43 +00001994 self._write_keyval_after_job("aborted_by", aborted_by_value)
1995 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001996
showardcbd74612008-11-19 21:42:02 +00001997 aborted_on_string = str(datetime.datetime.fromtimestamp(
1998 aborted_on_value))
1999 self._write_status_comment('Job aborted by %s on %s' %
2000 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00002001
2002
jadmanski0afbb632008-06-06 21:10:57 +00002003 def abort(self):
2004 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00002005 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00002006 self._finish_task()
showard21baa452008-10-21 00:08:39 +00002007
2008
jadmanski0afbb632008-06-06 21:10:57 +00002009 def epilog(self):
2010 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002011 self._finish_task()
2012 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00002013
2014
showardd3dc1992009-04-22 21:01:40 +00002015class PostJobTask(AgentTask):
2016 def __init__(self, queue_entries, pidfile_name, logfile_name,
showarded2afea2009-07-07 20:54:07 +00002017 recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00002018 self._queue_entries = queue_entries
2019 self._pidfile_name = pidfile_name
showardd3dc1992009-04-22 21:01:40 +00002020
showarded2afea2009-07-07 20:54:07 +00002021 self._execution_path = self._get_consistent_execution_path(
2022 queue_entries)
2023 self._results_dir = _drone_manager.absolute_path(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00002024 self._autoserv_monitor = PidfileRunMonitor()
showarded2afea2009-07-07 20:54:07 +00002025 self._autoserv_monitor.attach_to_existing_process(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00002026 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
2027
2028 if _testing_mode:
2029 command = 'true'
2030 else:
2031 command = self._generate_command(self._results_dir)
2032
showarded2afea2009-07-07 20:54:07 +00002033 super(PostJobTask, self).__init__(
2034 cmd=command, working_directory=self._execution_path,
2035 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00002036
showarded2afea2009-07-07 20:54:07 +00002037 self.log_file = os.path.join(self._execution_path, logfile_name)
showardd3dc1992009-04-22 21:01:40 +00002038 self._final_status = self._determine_final_status()
2039
2040
2041 def _generate_command(self, results_dir):
2042 raise NotImplementedError('Subclasses must override this')
2043
2044
2045 def _job_was_aborted(self):
2046 was_aborted = None
2047 for queue_entry in self._queue_entries:
2048 queue_entry.update_from_database()
2049 if was_aborted is None: # first queue entry
2050 was_aborted = bool(queue_entry.aborted)
2051 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
2052 email_manager.manager.enqueue_notify_email(
2053 'Inconsistent abort state',
2054 'Queue entries have inconsistent abort state: ' +
2055 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
2056 # don't crash here, just assume true
2057 return True
2058 return was_aborted
2059
2060
2061 def _determine_final_status(self):
2062 if self._job_was_aborted():
2063 return models.HostQueueEntry.Status.ABORTED
2064
2065 # we'll use a PidfileRunMonitor to read the autoserv exit status
2066 if self._autoserv_monitor.exit_code() == 0:
2067 return models.HostQueueEntry.Status.COMPLETED
2068 return models.HostQueueEntry.Status.FAILED
2069
2070
2071 def run(self):
showard8cc058f2009-09-08 16:26:33 +00002072 # Make sure we actually have results to work with.
2073 # This should never happen in normal operation.
showard5add1c82009-05-26 19:27:46 +00002074 if not self._autoserv_monitor.has_process():
2075 email_manager.manager.enqueue_notify_email(
showard8cc058f2009-09-08 16:26:33 +00002076 'No results in post-job task',
2077 'No results in post-job task at %s' %
2078 self._autoserv_monitor.pidfile_id)
showard5add1c82009-05-26 19:27:46 +00002079 self.finished(False)
2080 return
2081
2082 super(PostJobTask, self).run(
2083 pidfile_name=self._pidfile_name,
2084 paired_with_pidfile=self._paired_with_pidfile)
showardd3dc1992009-04-22 21:01:40 +00002085
2086
2087 def _set_all_statuses(self, status):
2088 for queue_entry in self._queue_entries:
2089 queue_entry.set_status(status)
2090
2091
2092 def abort(self):
2093 # override AgentTask.abort() to avoid killing the process and ending
2094 # the task. post-job tasks continue when the job is aborted.
2095 pass
2096
2097
showardb5626452009-06-30 01:57:28 +00002098class GatherLogsTask(PostJobTask, CleanupHostsMixin):
showardd3dc1992009-04-22 21:01:40 +00002099 """
2100 Task responsible for
2101 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2102 * copying logs to the results repository
2103 * spawning CleanupTasks for hosts, if necessary
2104 * spawning a FinalReparseTask for the job
2105 """
showarded2afea2009-07-07 20:54:07 +00002106 def __init__(self, job, queue_entries, recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00002107 self._job = job
2108 super(GatherLogsTask, self).__init__(
2109 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
showarded2afea2009-07-07 20:54:07 +00002110 logfile_name='.collect_crashinfo.log',
2111 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00002112 self._set_ids(queue_entries=queue_entries)
2113
2114
2115 def _generate_command(self, results_dir):
2116 host_list = ','.join(queue_entry.host.hostname
2117 for queue_entry in self._queue_entries)
2118 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
2119 '-r', results_dir]
2120
2121
2122 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00002123 for queue_entry in self._queue_entries:
2124 if queue_entry.status != models.HostQueueEntry.Status.GATHERING:
2125 raise SchedulerError('Gather task attempting to start on '
2126 'non-gathering entry: %s' % queue_entry)
2127 if queue_entry.host.status != models.Host.Status.RUNNING:
2128 raise SchedulerError('Gather task attempting to start on queue '
showard0c5c18d2009-09-24 22:20:45 +00002129 'entry with non-running host status %s: %s'
2130 % (queue_entry.host.status, queue_entry))
showard8cc058f2009-09-08 16:26:33 +00002131
showardd3dc1992009-04-22 21:01:40 +00002132 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002133
2134
showardd3dc1992009-04-22 21:01:40 +00002135 def epilog(self):
2136 super(GatherLogsTask, self).epilog()
showardb5626452009-06-30 01:57:28 +00002137
showard6d1c1432009-08-20 23:30:39 +00002138 self._copy_and_parse_results(self._queue_entries,
2139 use_monitor=self._autoserv_monitor)
2140
2141 if self._autoserv_monitor.has_process():
2142 final_success = (self._final_status ==
2143 models.HostQueueEntry.Status.COMPLETED)
2144 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2145 else:
2146 final_success = False
2147 num_tests_failed = 0
2148
showardb5626452009-06-30 01:57:28 +00002149 self._reboot_hosts(self._job, self._queue_entries, final_success,
2150 num_tests_failed)
showardd3dc1992009-04-22 21:01:40 +00002151
2152
showard0bbfc212009-04-29 21:06:13 +00002153 def run(self):
showard597bfd32009-05-08 18:22:50 +00002154 autoserv_exit_code = self._autoserv_monitor.exit_code()
2155 # only run if Autoserv exited due to some signal. if we have no exit
2156 # code, assume something bad (and signal-like) happened.
2157 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002158 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002159 else:
2160 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002161
2162
showard8fe93b52008-11-18 17:53:22 +00002163class CleanupTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002164 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2165
2166
showard8cc058f2009-09-08 16:26:33 +00002167 def __init__(self, task, recover_run_monitor=None):
showarded2afea2009-07-07 20:54:07 +00002168 super(CleanupTask, self).__init__(
showard8cc058f2009-09-08 16:26:33 +00002169 task, ['--cleanup'], recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002170
showard8cc058f2009-09-08 16:26:33 +00002171 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mbligh16c722d2008-03-05 00:58:44 +00002172
mblighd5c95802008-03-05 00:33:46 +00002173
jadmanski0afbb632008-06-06 21:10:57 +00002174 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002175 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00002176 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard8cc058f2009-09-08 16:26:33 +00002177 self.host.set_status(models.Host.Status.CLEANING)
2178 if self.queue_entry:
2179 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2180
2181
showard775300b2009-09-09 15:30:50 +00002182 def _finish_epilog(self):
2183 if not self.queue_entry:
2184 return
2185
2186 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
2187 self.queue_entry.on_pending()
2188 elif self.success:
2189 if self.queue_entry.job.run_verify:
2190 entry = models.HostQueueEntry(id=self.queue_entry.id)
2191 models.SpecialTask.objects.create(
2192 host=models.Host(id=self.host.id),
2193 queue_entry=entry,
2194 task=models.SpecialTask.Task.VERIFY)
2195 else:
2196 self.queue_entry.on_pending()
mblighd5c95802008-03-05 00:33:46 +00002197
mblighd5c95802008-03-05 00:33:46 +00002198
showard21baa452008-10-21 00:08:39 +00002199 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00002200 super(CleanupTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002201
showard21baa452008-10-21 00:08:39 +00002202 if self.success:
2203 self.host.update_field('dirty', 0)
showard775300b2009-09-09 15:30:50 +00002204 self.host.set_status(models.Host.Status.READY)
showard21baa452008-10-21 00:08:39 +00002205
showard775300b2009-09-09 15:30:50 +00002206 self._finish_epilog()
showard8cc058f2009-09-08 16:26:33 +00002207
showard21baa452008-10-21 00:08:39 +00002208
showardd3dc1992009-04-22 21:01:40 +00002209class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00002210 _num_running_parses = 0
2211
showarded2afea2009-07-07 20:54:07 +00002212 def __init__(self, queue_entries, recover_run_monitor=None):
2213 super(FinalReparseTask, self).__init__(
2214 queue_entries, pidfile_name=_PARSER_PID_FILE,
2215 logfile_name='.parse.log',
2216 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002217 # don't use _set_ids, since we don't want to set the host_ids
2218 self.queue_entry_ids = [entry.id for entry in queue_entries]
showarded2afea2009-07-07 20:54:07 +00002219 self._parse_started = self.started
showard97aed502008-11-04 02:01:24 +00002220
showard97aed502008-11-04 02:01:24 +00002221
2222 @classmethod
2223 def _increment_running_parses(cls):
2224 cls._num_running_parses += 1
2225
2226
2227 @classmethod
2228 def _decrement_running_parses(cls):
2229 cls._num_running_parses -= 1
2230
2231
2232 @classmethod
2233 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002234 return (cls._num_running_parses <
2235 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002236
2237
2238 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00002239 for queue_entry in self._queue_entries:
2240 if queue_entry.status != models.HostQueueEntry.Status.PARSING:
2241 raise SchedulerError('Parse task attempting to start on '
2242 'non-parsing entry: %s' % queue_entry)
2243
showard97aed502008-11-04 02:01:24 +00002244 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002245
2246
2247 def epilog(self):
2248 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002249 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00002250
2251
showardd3dc1992009-04-22 21:01:40 +00002252 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00002253 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00002254 results_dir]
showard97aed502008-11-04 02:01:24 +00002255
2256
showard08a36412009-05-05 01:01:13 +00002257 def tick(self):
2258 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002259 # and we can, at which point we revert to default behavior
2260 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00002261 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002262 else:
2263 self._try_starting_parse()
2264
2265
2266 def run(self):
2267 # override run() to not actually run unless we can
2268 self._try_starting_parse()
2269
2270
2271 def _try_starting_parse(self):
2272 if not self._can_run_new_parse():
2273 return
showard170873e2009-01-07 00:22:26 +00002274
showard97aed502008-11-04 02:01:24 +00002275 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002276 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00002277
showard97aed502008-11-04 02:01:24 +00002278 self._increment_running_parses()
2279 self._parse_started = True
2280
2281
2282 def finished(self, success):
2283 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00002284 if self._parse_started:
2285 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002286
2287
showarda3c58572009-03-12 20:36:59 +00002288class DBError(Exception):
2289 """Raised by the DBObject constructor when its select fails."""
2290
2291
mbligh36768f02008-02-22 18:28:33 +00002292class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002293 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002294
2295 # Subclasses MUST override these:
2296 _table_name = ''
2297 _fields = ()
2298
showarda3c58572009-03-12 20:36:59 +00002299 # A mapping from (type, id) to the instance of the object for that
2300 # particular id. This prevents us from creating new Job() and Host()
2301 # instances for every HostQueueEntry object that we instantiate as
2302 # multiple HQEs often share the same Job.
2303 _instances_by_type_and_id = weakref.WeakValueDictionary()
2304 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002305
showarda3c58572009-03-12 20:36:59 +00002306
2307 def __new__(cls, id=None, **kwargs):
2308 """
2309 Look to see if we already have an instance for this particular type
2310 and id. If so, use it instead of creating a duplicate instance.
2311 """
2312 if id is not None:
2313 instance = cls._instances_by_type_and_id.get((cls, id))
2314 if instance:
2315 return instance
2316 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2317
2318
2319 def __init__(self, id=None, row=None, new_record=False, always_query=True):
showard8cc058f2009-09-08 16:26:33 +00002320 assert bool(id) or bool(row)
2321 if id is not None and row is not None:
2322 assert id == row[0]
showard6ae5ea92009-02-25 00:11:51 +00002323 assert self._table_name, '_table_name must be defined in your class'
2324 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002325 if not new_record:
2326 if self._initialized and not always_query:
2327 return # We've already been initialized.
2328 if id is None:
2329 id = row[0]
2330 # Tell future constructors to use us instead of re-querying while
2331 # this instance is still around.
2332 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002333
showard6ae5ea92009-02-25 00:11:51 +00002334 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002335
jadmanski0afbb632008-06-06 21:10:57 +00002336 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002337
jadmanski0afbb632008-06-06 21:10:57 +00002338 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002339 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002340
showarda3c58572009-03-12 20:36:59 +00002341 if self._initialized:
2342 differences = self._compare_fields_in_row(row)
2343 if differences:
showard7629f142009-03-27 21:02:02 +00002344 logging.warn(
2345 'initialized %s %s instance requery is updating: %s',
2346 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002347 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002348 self._initialized = True
2349
2350
2351 @classmethod
2352 def _clear_instance_cache(cls):
2353 """Used for testing, clear the internal instance cache."""
2354 cls._instances_by_type_and_id.clear()
2355
2356
showardccbd6c52009-03-21 00:10:21 +00002357 def _fetch_row_from_db(self, row_id):
2358 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2359 rows = _db.execute(sql, (row_id,))
2360 if not rows:
showard76e29d12009-04-15 21:53:10 +00002361 raise DBError("row not found (table=%s, row id=%s)"
2362 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002363 return rows[0]
2364
2365
showarda3c58572009-03-12 20:36:59 +00002366 def _assert_row_length(self, row):
2367 assert len(row) == len(self._fields), (
2368 "table = %s, row = %s/%d, fields = %s/%d" % (
2369 self.__table, row, len(row), self._fields, len(self._fields)))
2370
2371
2372 def _compare_fields_in_row(self, row):
2373 """
showarddae680a2009-10-12 20:26:43 +00002374 Given a row as returned by a SELECT query, compare it to our existing in
2375 memory fields. Fractional seconds are stripped from datetime values
2376 before comparison.
showarda3c58572009-03-12 20:36:59 +00002377
2378 @param row - A sequence of values corresponding to fields named in
2379 The class attribute _fields.
2380
2381 @returns A dictionary listing the differences keyed by field name
2382 containing tuples of (current_value, row_value).
2383 """
2384 self._assert_row_length(row)
2385 differences = {}
showarddae680a2009-10-12 20:26:43 +00002386 datetime_cmp_fmt = '%Y-%m-%d %H:%M:%S' # Leave off the microseconds.
showarda3c58572009-03-12 20:36:59 +00002387 for field, row_value in itertools.izip(self._fields, row):
2388 current_value = getattr(self, field)
showarddae680a2009-10-12 20:26:43 +00002389 if (isinstance(current_value, datetime.datetime)
2390 and isinstance(row_value, datetime.datetime)):
2391 current_value = current_value.strftime(datetime_cmp_fmt)
2392 row_value = row_value.strftime(datetime_cmp_fmt)
showarda3c58572009-03-12 20:36:59 +00002393 if current_value != row_value:
2394 differences[field] = (current_value, row_value)
2395 return differences
showard2bab8f42008-11-12 18:15:22 +00002396
2397
2398 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002399 """
2400 Update our field attributes using a single row returned by SELECT.
2401
2402 @param row - A sequence of values corresponding to fields named in
2403 the class fields list.
2404 """
2405 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002406
showard2bab8f42008-11-12 18:15:22 +00002407 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002408 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002409 setattr(self, field, value)
2410 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002411
showard2bab8f42008-11-12 18:15:22 +00002412 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002413
mblighe2586682008-02-29 22:45:46 +00002414
showardccbd6c52009-03-21 00:10:21 +00002415 def update_from_database(self):
2416 assert self.id is not None
2417 row = self._fetch_row_from_db(self.id)
2418 self._update_fields_from_row(row)
2419
2420
jadmanski0afbb632008-06-06 21:10:57 +00002421 def count(self, where, table = None):
2422 if not table:
2423 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002424
jadmanski0afbb632008-06-06 21:10:57 +00002425 rows = _db.execute("""
2426 SELECT count(*) FROM %s
2427 WHERE %s
2428 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002429
jadmanski0afbb632008-06-06 21:10:57 +00002430 assert len(rows) == 1
2431
2432 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002433
2434
showardd3dc1992009-04-22 21:01:40 +00002435 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002436 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002437
showard2bab8f42008-11-12 18:15:22 +00002438 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002439 return
mbligh36768f02008-02-22 18:28:33 +00002440
mblighf8c624d2008-07-03 16:58:45 +00002441 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002442 _db.execute(query, (value, self.id))
2443
showard2bab8f42008-11-12 18:15:22 +00002444 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002445
2446
jadmanski0afbb632008-06-06 21:10:57 +00002447 def save(self):
2448 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002449 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002450 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002451 values = []
2452 for key in keys:
2453 value = getattr(self, key)
2454 if value is None:
2455 values.append('NULL')
2456 else:
2457 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002458 values_str = ','.join(values)
2459 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2460 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002461 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002462 # Update our id to the one the database just assigned to us.
2463 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002464
2465
jadmanski0afbb632008-06-06 21:10:57 +00002466 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002467 self._instances_by_type_and_id.pop((type(self), id), None)
2468 self._initialized = False
2469 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002470 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2471 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002472
2473
showard63a34772008-08-18 19:32:50 +00002474 @staticmethod
2475 def _prefix_with(string, prefix):
2476 if string:
2477 string = prefix + string
2478 return string
2479
2480
jadmanski0afbb632008-06-06 21:10:57 +00002481 @classmethod
showard989f25d2008-10-01 11:38:11 +00002482 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002483 """
2484 Construct instances of our class based on the given database query.
2485
2486 @yields One class instance for each row fetched.
2487 """
showard63a34772008-08-18 19:32:50 +00002488 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2489 where = cls._prefix_with(where, 'WHERE ')
2490 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002491 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002492 'joins' : joins,
2493 'where' : where,
2494 'order_by' : order_by})
2495 rows = _db.execute(query, params)
showard8cc058f2009-09-08 16:26:33 +00002496 return [cls(id=row[0], row=row) for row in rows]
mblighe2586682008-02-29 22:45:46 +00002497
mbligh36768f02008-02-22 18:28:33 +00002498
2499class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002500 _table_name = 'ineligible_host_queues'
2501 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002502
2503
showard89f84db2009-03-12 20:39:13 +00002504class AtomicGroup(DBObject):
2505 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002506 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2507 'invalid')
showard89f84db2009-03-12 20:39:13 +00002508
2509
showard989f25d2008-10-01 11:38:11 +00002510class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002511 _table_name = 'labels'
2512 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002513 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002514
2515
showard6157c632009-07-06 20:19:31 +00002516 def __repr__(self):
2517 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
2518 self.name, self.id, self.atomic_group_id)
2519
2520
mbligh36768f02008-02-22 18:28:33 +00002521class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002522 _table_name = 'hosts'
2523 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2524 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2525
2526
jadmanski0afbb632008-06-06 21:10:57 +00002527 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002528 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002529 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002530
2531
showard170873e2009-01-07 00:22:26 +00002532 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002533 """
showard170873e2009-01-07 00:22:26 +00002534 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002535 """
2536 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002537 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002538 FROM labels
2539 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002540 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002541 ORDER BY labels.name
2542 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002543 platform = None
2544 all_labels = []
2545 for label_name, is_platform in rows:
2546 if is_platform:
2547 platform = label_name
2548 all_labels.append(label_name)
2549 return platform, all_labels
2550
2551
showard54c1ea92009-05-20 00:32:58 +00002552 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2553
2554
2555 @classmethod
2556 def cmp_for_sort(cls, a, b):
2557 """
2558 A comparison function for sorting Host objects by hostname.
2559
2560 This strips any trailing numeric digits, ignores leading 0s and
2561 compares hostnames by the leading name and the trailing digits as a
2562 number. If both hostnames do not match this pattern, they are simply
2563 compared as lower case strings.
2564
2565 Example of how hostnames will be sorted:
2566
2567 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2568
2569 This hopefully satisfy most people's hostname sorting needs regardless
2570 of their exact naming schemes. Nobody sane should have both a host10
2571 and host010 (but the algorithm works regardless).
2572 """
2573 lower_a = a.hostname.lower()
2574 lower_b = b.hostname.lower()
2575 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2576 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2577 if match_a and match_b:
2578 name_a, number_a_str = match_a.groups()
2579 name_b, number_b_str = match_b.groups()
2580 number_a = int(number_a_str.lstrip('0'))
2581 number_b = int(number_b_str.lstrip('0'))
2582 result = cmp((name_a, number_a), (name_b, number_b))
2583 if result == 0 and lower_a != lower_b:
2584 # If they compared equal above but the lower case names are
2585 # indeed different, don't report equality. abc012 != abc12.
2586 return cmp(lower_a, lower_b)
2587 return result
2588 else:
2589 return cmp(lower_a, lower_b)
2590
2591
mbligh36768f02008-02-22 18:28:33 +00002592class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002593 _table_name = 'host_queue_entries'
2594 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002595 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002596 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002597
2598
showarda3c58572009-03-12 20:36:59 +00002599 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002600 assert id or row
showarda3c58572009-03-12 20:36:59 +00002601 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002602 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002603
jadmanski0afbb632008-06-06 21:10:57 +00002604 if self.host_id:
2605 self.host = Host(self.host_id)
2606 else:
2607 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002608
showard77182562009-06-10 00:16:05 +00002609 if self.atomic_group_id:
2610 self.atomic_group = AtomicGroup(self.atomic_group_id,
2611 always_query=False)
2612 else:
2613 self.atomic_group = None
2614
showard170873e2009-01-07 00:22:26 +00002615 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002616 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002617
2618
showard89f84db2009-03-12 20:39:13 +00002619 @classmethod
2620 def clone(cls, template):
2621 """
2622 Creates a new row using the values from a template instance.
2623
2624 The new instance will not exist in the database or have a valid
2625 id attribute until its save() method is called.
2626 """
2627 assert isinstance(template, cls)
2628 new_row = [getattr(template, field) for field in cls._fields]
2629 clone = cls(row=new_row, new_record=True)
2630 clone.id = None
2631 return clone
2632
2633
showardc85c21b2008-11-24 22:17:37 +00002634 def _view_job_url(self):
2635 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2636
2637
showardf1ae3542009-05-11 19:26:02 +00002638 def get_labels(self):
2639 """
2640 Get all labels associated with this host queue entry (either via the
2641 meta_host or as a job dependency label). The labels yielded are not
2642 guaranteed to be unique.
2643
2644 @yields Label instances associated with this host_queue_entry.
2645 """
2646 if self.meta_host:
2647 yield Label(id=self.meta_host, always_query=False)
2648 labels = Label.fetch(
2649 joins="JOIN jobs_dependency_labels AS deps "
2650 "ON (labels.id = deps.label_id)",
2651 where="deps.job_id = %d" % self.job.id)
2652 for label in labels:
2653 yield label
2654
2655
jadmanski0afbb632008-06-06 21:10:57 +00002656 def set_host(self, host):
2657 if host:
2658 self.queue_log_record('Assigning host ' + host.hostname)
2659 self.update_field('host_id', host.id)
2660 self.update_field('active', True)
2661 self.block_host(host.id)
2662 else:
2663 self.queue_log_record('Releasing host')
2664 self.unblock_host(self.host.id)
2665 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002666
jadmanski0afbb632008-06-06 21:10:57 +00002667 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002668
2669
jadmanski0afbb632008-06-06 21:10:57 +00002670 def get_host(self):
2671 return self.host
mbligh36768f02008-02-22 18:28:33 +00002672
2673
jadmanski0afbb632008-06-06 21:10:57 +00002674 def queue_log_record(self, log_line):
2675 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002676 _drone_manager.write_lines_to_file(self.queue_log_path,
2677 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002678
2679
jadmanski0afbb632008-06-06 21:10:57 +00002680 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002681 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002682 row = [0, self.job.id, host_id]
2683 block = IneligibleHostQueue(row=row, new_record=True)
2684 block.save()
mblighe2586682008-02-29 22:45:46 +00002685
2686
jadmanski0afbb632008-06-06 21:10:57 +00002687 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002688 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002689 blocks = IneligibleHostQueue.fetch(
2690 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2691 for block in blocks:
2692 block.delete()
mblighe2586682008-02-29 22:45:46 +00002693
2694
showard2bab8f42008-11-12 18:15:22 +00002695 def set_execution_subdir(self, subdir=None):
2696 if subdir is None:
2697 assert self.get_host()
2698 subdir = self.get_host().hostname
2699 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002700
2701
showard6355f6b2008-12-05 18:52:13 +00002702 def _get_hostname(self):
2703 if self.host:
2704 return self.host.hostname
2705 return 'no host'
2706
2707
showard170873e2009-01-07 00:22:26 +00002708 def __str__(self):
showard828fc4c2009-09-14 20:31:00 +00002709 flags = []
2710 if self.active:
2711 flags.append('active')
2712 if self.complete:
2713 flags.append('complete')
2714 if self.deleted:
2715 flags.append('deleted')
2716 if self.aborted:
2717 flags.append('aborted')
2718 flags_str = ','.join(flags)
2719 if flags_str:
2720 flags_str = ' [%s]' % flags_str
2721 return "%s/%d (%d) %s%s" % (self._get_hostname(), self.job.id, self.id,
2722 self.status, flags_str)
showard170873e2009-01-07 00:22:26 +00002723
2724
jadmanski0afbb632008-06-06 21:10:57 +00002725 def set_status(self, status):
showard56824072009-10-12 20:30:21 +00002726 logging.info("%s -> %s", self, status)
mblighf8c624d2008-07-03 16:58:45 +00002727
showard56824072009-10-12 20:30:21 +00002728 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002729
showard8cc058f2009-09-08 16:26:33 +00002730 if status in (models.HostQueueEntry.Status.QUEUED,
2731 models.HostQueueEntry.Status.PARSING):
jadmanski0afbb632008-06-06 21:10:57 +00002732 self.update_field('complete', False)
2733 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002734
showard8cc058f2009-09-08 16:26:33 +00002735 if status in (models.HostQueueEntry.Status.PENDING,
2736 models.HostQueueEntry.Status.RUNNING,
2737 models.HostQueueEntry.Status.VERIFYING,
2738 models.HostQueueEntry.Status.STARTING,
2739 models.HostQueueEntry.Status.GATHERING):
jadmanski0afbb632008-06-06 21:10:57 +00002740 self.update_field('complete', False)
2741 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002742
showard8cc058f2009-09-08 16:26:33 +00002743 if status in (models.HostQueueEntry.Status.FAILED,
2744 models.HostQueueEntry.Status.COMPLETED,
2745 models.HostQueueEntry.Status.STOPPED,
2746 models.HostQueueEntry.Status.ABORTED):
jadmanski0afbb632008-06-06 21:10:57 +00002747 self.update_field('complete', True)
2748 self.update_field('active', False)
showardf85a0b72009-10-07 20:48:45 +00002749 self._on_complete()
showardc85c21b2008-11-24 22:17:37 +00002750
2751 should_email_status = (status.lower() in _notify_email_statuses or
2752 'all' in _notify_email_statuses)
2753 if should_email_status:
2754 self._email_on_status(status)
2755
2756 self._email_on_job_complete()
2757
2758
showardf85a0b72009-10-07 20:48:45 +00002759 def _on_complete(self):
2760 if not self.execution_subdir:
2761 return
2762 # unregister any possible pidfiles associated with this queue entry
2763 for pidfile_name in _ALL_PIDFILE_NAMES:
2764 pidfile_id = _drone_manager.get_pidfile_id_from(
2765 self.execution_path(), pidfile_name=pidfile_name)
2766 _drone_manager.unregister_pidfile(pidfile_id)
2767
2768
showardc85c21b2008-11-24 22:17:37 +00002769 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002770 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002771
2772 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2773 self.job.id, self.job.name, hostname, status)
2774 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2775 self.job.id, self.job.name, hostname, status,
2776 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002777 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002778
2779
2780 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002781 if not self.job.is_finished():
2782 return
showard542e8402008-09-19 20:16:18 +00002783
showardc85c21b2008-11-24 22:17:37 +00002784 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002785 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002786 for queue_entry in hosts_queue:
2787 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002788 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002789 queue_entry.status))
2790
2791 summary_text = "\n".join(summary_text)
2792 status_counts = models.Job.objects.get_status_counts(
2793 [self.job.id])[self.job.id]
2794 status = ', '.join('%d %s' % (count, status) for status, count
2795 in status_counts.iteritems())
2796
2797 subject = 'Autotest: Job ID: %s "%s" %s' % (
2798 self.job.id, self.job.name, status)
2799 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2800 self.job.id, self.job.name, status, self._view_job_url(),
2801 summary_text)
showard170873e2009-01-07 00:22:26 +00002802 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002803
2804
showard8cc058f2009-09-08 16:26:33 +00002805 def schedule_pre_job_tasks(self, assigned_host=None):
showard77182562009-06-10 00:16:05 +00002806 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002807 assert assigned_host
2808 # ensure results dir exists for the queue log
showard08356c12009-06-15 20:24:01 +00002809 if self.host_id is None:
2810 self.set_host(assigned_host)
2811 else:
2812 assert assigned_host.id == self.host_id
mbligh36768f02008-02-22 18:28:33 +00002813
showardcfd4a7e2009-07-11 01:47:33 +00002814 logging.info("%s/%s/%s scheduled on %s, status=%s",
showardb18134f2009-03-20 20:52:18 +00002815 self.job.name, self.meta_host, self.atomic_group_id,
2816 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002817
showard8cc058f2009-09-08 16:26:33 +00002818 self._do_schedule_pre_job_tasks()
showard77182562009-06-10 00:16:05 +00002819
2820
showard8cc058f2009-09-08 16:26:33 +00002821 def _do_schedule_pre_job_tasks(self):
showard77182562009-06-10 00:16:05 +00002822 # Every host goes thru the Verifying stage (which may or may not
2823 # actually do anything as determined by get_pre_job_tasks).
2824 self.set_status(models.HostQueueEntry.Status.VERIFYING)
showard8cc058f2009-09-08 16:26:33 +00002825 self.job.schedule_pre_job_tasks(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002826
showard6ae5ea92009-02-25 00:11:51 +00002827
jadmanski0afbb632008-06-06 21:10:57 +00002828 def requeue(self):
showardcfd4a7e2009-07-11 01:47:33 +00002829 assert self.host
showard8cc058f2009-09-08 16:26:33 +00002830 self.set_status(models.HostQueueEntry.Status.QUEUED)
showard12f3e322009-05-13 21:27:42 +00002831 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002832 # verify/cleanup failure sets the execution subdir, so reset it here
2833 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002834 if self.meta_host:
2835 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002836
2837
jadmanski0afbb632008-06-06 21:10:57 +00002838 def handle_host_failure(self):
2839 """\
2840 Called when this queue entry's host has failed verification and
2841 repair.
2842 """
2843 assert not self.meta_host
showard8cc058f2009-09-08 16:26:33 +00002844 self.set_status(models.HostQueueEntry.Status.FAILED)
showard2bab8f42008-11-12 18:15:22 +00002845 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002846
2847
jadmanskif7fa2cc2008-10-01 14:13:23 +00002848 @property
2849 def aborted_by(self):
2850 self._load_abort_info()
2851 return self._aborted_by
2852
2853
2854 @property
2855 def aborted_on(self):
2856 self._load_abort_info()
2857 return self._aborted_on
2858
2859
2860 def _load_abort_info(self):
2861 """ Fetch info about who aborted the job. """
2862 if hasattr(self, "_aborted_by"):
2863 return
2864 rows = _db.execute("""
2865 SELECT users.login, aborted_host_queue_entries.aborted_on
2866 FROM aborted_host_queue_entries
2867 INNER JOIN users
2868 ON users.id = aborted_host_queue_entries.aborted_by_id
2869 WHERE aborted_host_queue_entries.queue_entry_id = %s
2870 """, (self.id,))
2871 if rows:
2872 self._aborted_by, self._aborted_on = rows[0]
2873 else:
2874 self._aborted_by = self._aborted_on = None
2875
2876
showardb2e2c322008-10-14 17:33:55 +00002877 def on_pending(self):
2878 """
2879 Called when an entry in a synchronous job has passed verify. If the
showard8cc058f2009-09-08 16:26:33 +00002880 job is ready to run, sets the entries to STARTING. Otherwise, it leaves
2881 them in PENDING.
showardb2e2c322008-10-14 17:33:55 +00002882 """
showard8cc058f2009-09-08 16:26:33 +00002883 self.set_status(models.HostQueueEntry.Status.PENDING)
2884 self.host.set_status(models.Host.Status.PENDING)
showardb000a8d2009-07-28 20:02:07 +00002885
2886 # Some debug code here: sends an email if an asynchronous job does not
2887 # immediately enter Starting.
2888 # TODO: Remove this once we figure out why asynchronous jobs are getting
2889 # stuck in Pending.
showard8cc058f2009-09-08 16:26:33 +00002890 self.job.run_if_ready(queue_entry=self)
2891 if (self.job.synch_count == 1 and
2892 self.status == models.HostQueueEntry.Status.PENDING):
showardb000a8d2009-07-28 20:02:07 +00002893 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
2894 message = 'Asynchronous job stuck in Pending'
2895 email_manager.manager.enqueue_notify_email(subject, message)
showardb2e2c322008-10-14 17:33:55 +00002896
2897
showardd3dc1992009-04-22 21:01:40 +00002898 def abort(self, dispatcher):
2899 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002900
showardd3dc1992009-04-22 21:01:40 +00002901 Status = models.HostQueueEntry.Status
showard8cc058f2009-09-08 16:26:33 +00002902 if self.status in (Status.GATHERING, Status.PARSING):
showardd3dc1992009-04-22 21:01:40 +00002903 # do nothing; post-job tasks will finish and then mark this entry
2904 # with status "Aborted" and take care of the host
2905 return
2906
showard8cc058f2009-09-08 16:26:33 +00002907 if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING):
2908 assert not dispatcher.get_agents_for_entry(self)
showardd3dc1992009-04-22 21:01:40 +00002909 self.host.set_status(models.Host.Status.READY)
2910 elif self.status == Status.VERIFYING:
showard8cc058f2009-09-08 16:26:33 +00002911 models.SpecialTask.objects.create(
2912 task=models.SpecialTask.Task.CLEANUP,
2913 host=models.Host(id=self.host.id))
showardd3dc1992009-04-22 21:01:40 +00002914
2915 self.set_status(Status.ABORTED)
showardd2014822009-10-12 20:26:58 +00002916 self.job.abort_delay_ready_task()
showard170873e2009-01-07 00:22:26 +00002917
showard8cc058f2009-09-08 16:26:33 +00002918
2919 def get_group_name(self):
2920 atomic_group = self.atomic_group
2921 if not atomic_group:
2922 return ''
2923
2924 # Look at any meta_host and dependency labels and pick the first
2925 # one that also specifies this atomic group. Use that label name
2926 # as the group name if possible (it is more specific).
2927 for label in self.get_labels():
2928 if label.atomic_group_id:
2929 assert label.atomic_group_id == atomic_group.id
2930 return label.name
2931 return atomic_group.name
2932
2933
showard170873e2009-01-07 00:22:26 +00002934 def execution_tag(self):
2935 assert self.execution_subdir
mblighe7d9c602009-07-02 19:02:33 +00002936 return "%s/%s" % (self.job.tag(), self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002937
2938
showarded2afea2009-07-07 20:54:07 +00002939 def execution_path(self):
2940 return self.execution_tag()
2941
2942
mbligh36768f02008-02-22 18:28:33 +00002943class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002944 _table_name = 'jobs'
2945 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2946 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002947 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002948 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002949
showard77182562009-06-10 00:16:05 +00002950 # This does not need to be a column in the DB. The delays are likely to
2951 # be configured short. If the scheduler is stopped and restarted in
2952 # the middle of a job's delay cycle, the delay cycle will either be
2953 # repeated or skipped depending on the number of Pending machines found
2954 # when the restarted scheduler recovers to track it. Not a problem.
2955 #
2956 # A reference to the DelayedCallTask that will wake up the job should
2957 # no other HQEs change state in time. Its end_time attribute is used
2958 # by our run_with_ready_delay() method to determine if the wait is over.
2959 _delay_ready_task = None
2960
2961 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
2962 # all status='Pending' atomic group HQEs incase a delay was running when the
2963 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00002964
showarda3c58572009-03-12 20:36:59 +00002965 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002966 assert id or row
showarda3c58572009-03-12 20:36:59 +00002967 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002968
mblighe2586682008-02-29 22:45:46 +00002969
jadmanski0afbb632008-06-06 21:10:57 +00002970 def is_server_job(self):
2971 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002972
2973
showard170873e2009-01-07 00:22:26 +00002974 def tag(self):
2975 return "%s-%s" % (self.id, self.owner)
2976
2977
jadmanski0afbb632008-06-06 21:10:57 +00002978 def get_host_queue_entries(self):
2979 rows = _db.execute("""
2980 SELECT * FROM host_queue_entries
2981 WHERE job_id= %s
2982 """, (self.id,))
2983 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002984
jadmanski0afbb632008-06-06 21:10:57 +00002985 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002986
jadmanski0afbb632008-06-06 21:10:57 +00002987 return entries
mbligh36768f02008-02-22 18:28:33 +00002988
2989
jadmanski0afbb632008-06-06 21:10:57 +00002990 def set_status(self, status, update_queues=False):
2991 self.update_field('status',status)
2992
2993 if update_queues:
2994 for queue_entry in self.get_host_queue_entries():
2995 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002996
2997
showard77182562009-06-10 00:16:05 +00002998 def _atomic_and_has_started(self):
2999 """
3000 @returns True if any of the HostQueueEntries associated with this job
3001 have entered the Status.STARTING state or beyond.
3002 """
3003 atomic_entries = models.HostQueueEntry.objects.filter(
3004 job=self.id, atomic_group__isnull=False)
3005 if atomic_entries.count() <= 0:
3006 return False
3007
showardaf8b4ca2009-06-16 18:47:26 +00003008 # These states may *only* be reached if Job.run() has been called.
3009 started_statuses = (models.HostQueueEntry.Status.STARTING,
3010 models.HostQueueEntry.Status.RUNNING,
3011 models.HostQueueEntry.Status.COMPLETED)
3012
3013 started_entries = atomic_entries.filter(status__in=started_statuses)
showard77182562009-06-10 00:16:05 +00003014 return started_entries.count() > 0
3015
3016
showard708b3522009-08-20 23:26:15 +00003017 def _hosts_assigned_count(self):
3018 """The number of HostQueueEntries assigned a Host for this job."""
3019 entries = models.HostQueueEntry.objects.filter(job=self.id,
3020 host__isnull=False)
3021 return entries.count()
3022
3023
showard77182562009-06-10 00:16:05 +00003024 def _pending_count(self):
3025 """The number of HostQueueEntries for this job in the Pending state."""
3026 pending_entries = models.HostQueueEntry.objects.filter(
3027 job=self.id, status=models.HostQueueEntry.Status.PENDING)
3028 return pending_entries.count()
3029
3030
showardd2014822009-10-12 20:26:58 +00003031 def _pending_threshold(self, atomic_group):
3032 """
3033 @param atomic_group: The AtomicGroup associated with this job that we
3034 are using to bound the threshold.
3035 @returns The minimum number of HostQueueEntries assigned a Host before
3036 this job can run.
3037 """
3038 return min(self._hosts_assigned_count(),
3039 atomic_group.max_number_of_machines)
3040
3041
jadmanski0afbb632008-06-06 21:10:57 +00003042 def is_ready(self):
showard77182562009-06-10 00:16:05 +00003043 # NOTE: Atomic group jobs stop reporting ready after they have been
3044 # started to avoid launching multiple copies of one atomic job.
3045 # Only possible if synch_count is less than than half the number of
3046 # machines in the atomic group.
showardb000a8d2009-07-28 20:02:07 +00003047 pending_count = self._pending_count()
3048 atomic_and_has_started = self._atomic_and_has_started()
3049 ready = (pending_count >= self.synch_count
showardd2014822009-10-12 20:26:58 +00003050 and not atomic_and_has_started)
showardb000a8d2009-07-28 20:02:07 +00003051
3052 if not ready:
3053 logging.info(
3054 'Job %s not ready: %s pending, %s required '
3055 '(Atomic and started: %s)',
3056 self, pending_count, self.synch_count,
3057 atomic_and_has_started)
3058
3059 return ready
mbligh36768f02008-02-22 18:28:33 +00003060
3061
jadmanski0afbb632008-06-06 21:10:57 +00003062 def num_machines(self, clause = None):
3063 sql = "job_id=%s" % self.id
3064 if clause:
3065 sql += " AND (%s)" % clause
3066 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00003067
3068
jadmanski0afbb632008-06-06 21:10:57 +00003069 def num_queued(self):
3070 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00003071
3072
jadmanski0afbb632008-06-06 21:10:57 +00003073 def num_active(self):
3074 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00003075
3076
jadmanski0afbb632008-06-06 21:10:57 +00003077 def num_complete(self):
3078 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00003079
3080
jadmanski0afbb632008-06-06 21:10:57 +00003081 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00003082 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00003083
mbligh36768f02008-02-22 18:28:33 +00003084
showard6bb7c292009-01-30 01:44:51 +00003085 def _not_yet_run_entries(self, include_verifying=True):
3086 statuses = [models.HostQueueEntry.Status.QUEUED,
3087 models.HostQueueEntry.Status.PENDING]
3088 if include_verifying:
3089 statuses.append(models.HostQueueEntry.Status.VERIFYING)
3090 return models.HostQueueEntry.objects.filter(job=self.id,
3091 status__in=statuses)
3092
3093
3094 def _stop_all_entries(self):
3095 entries_to_stop = self._not_yet_run_entries(
3096 include_verifying=False)
3097 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00003098 assert not child_entry.complete, (
3099 '%s status=%s, active=%s, complete=%s' %
3100 (child_entry.id, child_entry.status, child_entry.active,
3101 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00003102 if child_entry.status == models.HostQueueEntry.Status.PENDING:
3103 child_entry.host.status = models.Host.Status.READY
3104 child_entry.host.save()
3105 child_entry.status = models.HostQueueEntry.Status.STOPPED
3106 child_entry.save()
3107
showard2bab8f42008-11-12 18:15:22 +00003108 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00003109 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00003110 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00003111 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00003112
3113
jadmanski0afbb632008-06-06 21:10:57 +00003114 def write_to_machines_file(self, queue_entry):
3115 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00003116 file_path = os.path.join(self.tag(), '.machines')
3117 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00003118
3119
showardf1ae3542009-05-11 19:26:02 +00003120 def _next_group_name(self, group_name=''):
3121 """@returns a directory name to use for the next host group results."""
3122 if group_name:
3123 # Sanitize for use as a pathname.
3124 group_name = group_name.replace(os.path.sep, '_')
3125 if group_name.startswith('.'):
3126 group_name = '_' + group_name[1:]
3127 # Add a separator between the group name and 'group%d'.
3128 group_name += '.'
3129 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00003130 query = models.HostQueueEntry.objects.filter(
3131 job=self.id).values('execution_subdir').distinct()
3132 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00003133 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
3134 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00003135 if ids:
3136 next_id = max(ids) + 1
3137 else:
3138 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00003139 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00003140
3141
showarddb502762009-09-09 15:31:20 +00003142 def _write_control_file(self, execution_path):
showard170873e2009-01-07 00:22:26 +00003143 control_path = _drone_manager.attach_file_to_execution(
showarddb502762009-09-09 15:31:20 +00003144 execution_path, self.control_file)
showard170873e2009-01-07 00:22:26 +00003145 return control_path
mbligh36768f02008-02-22 18:28:33 +00003146
showardb2e2c322008-10-14 17:33:55 +00003147
showard2bab8f42008-11-12 18:15:22 +00003148 def get_group_entries(self, queue_entry_from_group):
3149 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00003150 return list(HostQueueEntry.fetch(
3151 where='job_id=%s AND execution_subdir=%s',
3152 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00003153
3154
showard8cc058f2009-09-08 16:26:33 +00003155 def get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00003156 assert queue_entries
showarddb502762009-09-09 15:31:20 +00003157 execution_path = queue_entries[0].execution_path()
3158 control_path = self._write_control_file(execution_path)
jadmanski0afbb632008-06-06 21:10:57 +00003159 hostnames = ','.join([entry.get_host().hostname
3160 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00003161
showarddb502762009-09-09 15:31:20 +00003162 execution_tag = queue_entries[0].execution_tag()
showard87ba02a2009-04-20 19:37:32 +00003163 params = _autoserv_command_line(
showarded2afea2009-07-07 20:54:07 +00003164 hostnames,
showard87ba02a2009-04-20 19:37:32 +00003165 ['-P', execution_tag, '-n',
3166 _drone_manager.absolute_path(control_path)],
showarde9c69362009-06-30 01:58:03 +00003167 job=self, verbose=False)
mbligh36768f02008-02-22 18:28:33 +00003168
jadmanski0afbb632008-06-06 21:10:57 +00003169 if not self.is_server_job():
3170 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00003171
showardb2e2c322008-10-14 17:33:55 +00003172 return params
mblighe2586682008-02-29 22:45:46 +00003173
mbligh36768f02008-02-22 18:28:33 +00003174
showardc9ae1782009-01-30 01:42:37 +00003175 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00003176 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00003177 return True
showard0fc38302008-10-23 00:44:07 +00003178 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00003179 return queue_entry.get_host().dirty
3180 return False
showard21baa452008-10-21 00:08:39 +00003181
showardc9ae1782009-01-30 01:42:37 +00003182
showard8cc058f2009-09-08 16:26:33 +00003183 def _should_run_verify(self, queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003184 do_not_verify = (queue_entry.host.protection ==
3185 host_protections.Protection.DO_NOT_VERIFY)
3186 if do_not_verify:
3187 return False
3188 return self.run_verify
3189
3190
showard8cc058f2009-09-08 16:26:33 +00003191 def schedule_pre_job_tasks(self, queue_entry):
showard77182562009-06-10 00:16:05 +00003192 """
3193 Get a list of tasks to perform before the host_queue_entry
3194 may be used to run this Job (such as Cleanup & Verify).
3195
3196 @returns A list of tasks to be done to the given queue_entry before
mbligh6fbdb802009-08-03 16:42:55 +00003197 it should be considered be ready to run this job. The last
showard77182562009-06-10 00:16:05 +00003198 task in the list calls HostQueueEntry.on_pending(), which
3199 continues the flow of the job.
3200 """
showardc9ae1782009-01-30 01:42:37 +00003201 if self._should_run_cleanup(queue_entry):
showard8cc058f2009-09-08 16:26:33 +00003202 task = models.SpecialTask.Task.CLEANUP
3203 elif self._should_run_verify(queue_entry):
3204 task = models.SpecialTask.Task.VERIFY
3205 else:
3206 queue_entry.on_pending()
3207 return
3208
3209 models.SpecialTask.objects.create(
3210 host=models.Host(id=queue_entry.host_id),
3211 queue_entry=models.HostQueueEntry(id=queue_entry.id),
3212 task=task)
showard21baa452008-10-21 00:08:39 +00003213
3214
showardf1ae3542009-05-11 19:26:02 +00003215 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00003216 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00003217 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00003218 else:
showardf1ae3542009-05-11 19:26:02 +00003219 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00003220 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00003221 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00003222 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003223
3224 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00003225 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003226
3227
3228 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00003229 """
3230 @returns A tuple containing a list of HostQueueEntry instances to be
3231 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00003232 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00003233 """
showard77182562009-06-10 00:16:05 +00003234 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00003235 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00003236 if atomic_group:
3237 num_entries_wanted = atomic_group.max_number_of_machines
3238 else:
3239 num_entries_wanted = self.synch_count
3240 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00003241
showardf1ae3542009-05-11 19:26:02 +00003242 if num_entries_wanted > 0:
3243 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00003244 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00003245 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00003246 params=(self.id, include_queue_entry.id)))
3247
3248 # Sort the chosen hosts by hostname before slicing.
3249 def cmp_queue_entries_by_hostname(entry_a, entry_b):
3250 return Host.cmp_for_sort(entry_a.host, entry_b.host)
3251 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
3252 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00003253
showardf1ae3542009-05-11 19:26:02 +00003254 # Sanity check. We'll only ever be called if this can be met.
showard828fc4c2009-09-14 20:31:00 +00003255 if len(chosen_entries) < self.synch_count:
3256 message = ('job %s got less than %s chosen entries: %s' % (
3257 self.id, self.synch_count, chosen_entries))
3258 logging.error(message)
3259 email_manager.manager.enqueue_notify_email(
3260 'Job not started, too few chosen entries', message)
3261 return []
showardf1ae3542009-05-11 19:26:02 +00003262
showard8cc058f2009-09-08 16:26:33 +00003263 group_name = include_queue_entry.get_group_name()
showardf1ae3542009-05-11 19:26:02 +00003264
3265 self._assign_new_group(chosen_entries, group_name=group_name)
showard8cc058f2009-09-08 16:26:33 +00003266 return chosen_entries
showard2bab8f42008-11-12 18:15:22 +00003267
3268
showard77182562009-06-10 00:16:05 +00003269 def run_if_ready(self, queue_entry):
3270 """
3271 @returns An Agent instance to ultimately run this job if enough hosts
3272 are ready for it to run.
3273 @returns None and potentially cleans up excess hosts if this Job
3274 is not ready to run.
3275 """
showardb2e2c322008-10-14 17:33:55 +00003276 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00003277 self.stop_if_necessary()
showard8cc058f2009-09-08 16:26:33 +00003278 elif queue_entry.atomic_group:
3279 self.run_with_ready_delay(queue_entry)
3280 else:
3281 self.run(queue_entry)
showard77182562009-06-10 00:16:05 +00003282
3283
3284 def run_with_ready_delay(self, queue_entry):
3285 """
3286 Start a delay to wait for more hosts to enter Pending state before
3287 launching an atomic group job. Once set, the a delay cannot be reset.
3288
3289 @param queue_entry: The HostQueueEntry object to get atomic group
3290 info from and pass to run_if_ready when the delay is up.
3291
3292 @returns An Agent to run the job as appropriate or None if a delay
3293 has already been set.
3294 """
3295 assert queue_entry.job_id == self.id
3296 assert queue_entry.atomic_group
3297 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
showardd2014822009-10-12 20:26:58 +00003298 over_max_threshold = (self._pending_count() >=
3299 self._pending_threshold(queue_entry.atomic_group))
showard77182562009-06-10 00:16:05 +00003300 delay_expired = (self._delay_ready_task and
3301 time.time() >= self._delay_ready_task.end_time)
3302
3303 # Delay is disabled or we already have enough? Do not wait to run.
3304 if not delay or over_max_threshold or delay_expired:
showard8cc058f2009-09-08 16:26:33 +00003305 self.run(queue_entry)
3306 else:
3307 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
showard77182562009-06-10 00:16:05 +00003308
showard8cc058f2009-09-08 16:26:33 +00003309
3310 def schedule_delayed_callback_task(self, queue_entry):
3311 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
3312
showard77182562009-06-10 00:16:05 +00003313 if self._delay_ready_task:
3314 return None
3315
showard8cc058f2009-09-08 16:26:33 +00003316 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
3317
showard77182562009-06-10 00:16:05 +00003318 def run_job_after_delay():
showardd2014822009-10-12 20:26:58 +00003319 logging.info('Job %s done waiting for extra hosts.', self)
3320 # Check to see if the job is still relevant. It could have aborted
3321 # while we were waiting or hosts could have disappearred, etc.
3322 threshold = self._pending_threshold(queue_entry.atomic_group)
3323 if self._pending_count() < threshold:
3324 logging.info('Job %s had too few Pending hosts after waiting '
3325 'for extras. Not running.', self)
3326 return
showard77182562009-06-10 00:16:05 +00003327 return self.run(queue_entry)
3328
showard708b3522009-08-20 23:26:15 +00003329 logging.info('Job %s waiting up to %s seconds for more hosts.',
3330 self.id, delay)
showard77182562009-06-10 00:16:05 +00003331 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3332 callback=run_job_after_delay)
showard8cc058f2009-09-08 16:26:33 +00003333 return self._delay_ready_task
showard77182562009-06-10 00:16:05 +00003334
3335
3336 def run(self, queue_entry):
3337 """
3338 @param queue_entry: The HostQueueEntry instance calling this method.
showard77182562009-06-10 00:16:05 +00003339 """
3340 if queue_entry.atomic_group and self._atomic_and_has_started():
3341 logging.error('Job.run() called on running atomic Job %d '
3342 'with HQE %s.', self.id, queue_entry)
showard8cc058f2009-09-08 16:26:33 +00003343 return
3344 queue_entries = self._choose_group_to_run(queue_entry)
showard828fc4c2009-09-14 20:31:00 +00003345 if queue_entries:
3346 self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00003347
3348
showard8cc058f2009-09-08 16:26:33 +00003349 def _finish_run(self, queue_entries):
showardb2ccdda2008-10-28 20:39:05 +00003350 for queue_entry in queue_entries:
showard8cc058f2009-09-08 16:26:33 +00003351 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showardd2014822009-10-12 20:26:58 +00003352 self.abort_delay_ready_task()
3353
3354
3355 def abort_delay_ready_task(self):
3356 """Abort the delayed task associated with this job, if any."""
showard77182562009-06-10 00:16:05 +00003357 if self._delay_ready_task:
3358 # Cancel any pending callback that would try to run again
3359 # as we are already running.
3360 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003361
showardd2014822009-10-12 20:26:58 +00003362
showardb000a8d2009-07-28 20:02:07 +00003363 def __str__(self):
3364 return '%s-%s' % (self.id, self.owner)
3365
3366
mbligh36768f02008-02-22 18:28:33 +00003367if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003368 main()