blob: 52f5b960136b2fdb4f7db60ca90fc2599767d2a4 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showard136e6dc2009-06-10 19:38:49 +000010import itertools, logging, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard043c62a2009-06-10 19:48:57 +000013from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard136e6dc2009-06-10 19:38:49 +000015from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000016from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000017from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000018from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000019from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000020from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000021from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000022
showard549afad2009-08-20 23:33:36 +000023BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
24PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000025
mbligh36768f02008-02-22 18:28:33 +000026RESULTS_DIR = '.'
27AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000028DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000029AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
30
31if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000032 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000033AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
34AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
35
36if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000037 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000038
showardd3dc1992009-04-22 21:01:40 +000039_AUTOSERV_PID_FILE = '.autoserv_execute'
40_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
41_PARSER_PID_FILE = '.parser_execute'
42
43_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
44 _PARSER_PID_FILE)
45
showard35162b02009-03-03 02:17:30 +000046# error message to leave in results dir when an autoserv process disappears
47# mysteriously
48_LOST_PROCESS_ERROR = """\
49Autoserv failed abnormally during execution for this job, probably due to a
50system error on the Autotest server. Full results may not be available. Sorry.
51"""
52
mbligh6f8bab42008-02-29 22:45:14 +000053_db = None
mbligh36768f02008-02-22 18:28:33 +000054_shutdown = False
showard170873e2009-01-07 00:22:26 +000055_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
56_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000057_testing_mode = False
showard542e8402008-09-19 20:16:18 +000058_base_url = None
showardc85c21b2008-11-24 22:17:37 +000059_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000060_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000061
62
showardec6a3b92009-09-25 20:29:13 +000063def _get_pidfile_timeout_secs():
64 """@returns How long to wait for autoserv to write pidfile."""
65 pidfile_timeout_mins = global_config.global_config.get_config_value(
66 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
67 return pidfile_timeout_mins * 60
68
69
mbligh83c1e9e2009-05-01 23:10:41 +000070def _site_init_monitor_db_dummy():
71 return {}
72
73
mbligh36768f02008-02-22 18:28:33 +000074def main():
showard27f33872009-04-07 18:20:53 +000075 try:
showard549afad2009-08-20 23:33:36 +000076 try:
77 main_without_exception_handling()
78 except SystemExit:
79 raise
80 except:
81 logging.exception('Exception escaping in monitor_db')
82 raise
83 finally:
84 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000085
86
87def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000088 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000089
showard136e6dc2009-06-10 19:38:49 +000090 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000091 parser = optparse.OptionParser(usage)
92 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
93 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000094 parser.add_option('--test', help='Indicate that scheduler is under ' +
95 'test and should use dummy autoserv and no parsing',
96 action='store_true')
97 (options, args) = parser.parse_args()
98 if len(args) != 1:
99 parser.print_usage()
100 return
mbligh36768f02008-02-22 18:28:33 +0000101
showard5613c662009-06-08 23:30:33 +0000102 scheduler_enabled = global_config.global_config.get_config_value(
103 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
104
105 if not scheduler_enabled:
106 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
107 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000108 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000109 sys.exit(1)
110
jadmanski0afbb632008-06-06 21:10:57 +0000111 global RESULTS_DIR
112 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000113
mbligh83c1e9e2009-05-01 23:10:41 +0000114 site_init = utils.import_site_function(__file__,
115 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
116 _site_init_monitor_db_dummy)
117 site_init()
118
showardcca334f2009-03-12 20:38:34 +0000119 # Change the cwd while running to avoid issues incase we were launched from
120 # somewhere odd (such as a random NFS home directory of the person running
121 # sudo to launch us as the appropriate user).
122 os.chdir(RESULTS_DIR)
123
jadmanski0afbb632008-06-06 21:10:57 +0000124 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000125 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
126 "notify_email_statuses",
127 default='')
showardc85c21b2008-11-24 22:17:37 +0000128 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000129 _notify_email_statuses = [status for status in
130 re.split(r'[\s,;:]', notify_statuses_list.lower())
131 if status]
showardc85c21b2008-11-24 22:17:37 +0000132
jadmanski0afbb632008-06-06 21:10:57 +0000133 if options.test:
134 global _autoserv_path
135 _autoserv_path = 'autoserv_dummy'
136 global _testing_mode
137 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000138
mbligh37eceaa2008-12-15 22:56:37 +0000139 # AUTOTEST_WEB.base_url is still a supported config option as some people
140 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000141 global _base_url
showard170873e2009-01-07 00:22:26 +0000142 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
143 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000144 if config_base_url:
145 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000146 else:
mbligh37eceaa2008-12-15 22:56:37 +0000147 # For the common case of everything running on a single server you
148 # can just set the hostname in a single place in the config file.
149 server_name = c.get_config_value('SERVER', 'hostname')
150 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000151 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000152 sys.exit(1)
153 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000154
showardc5afc462009-01-13 00:09:39 +0000155 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000156 server.start()
157
jadmanski0afbb632008-06-06 21:10:57 +0000158 try:
showard136e6dc2009-06-10 19:38:49 +0000159 init()
showardc5afc462009-01-13 00:09:39 +0000160 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000161 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000162
jadmanski0afbb632008-06-06 21:10:57 +0000163 while not _shutdown:
164 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000165 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000166 except:
showard170873e2009-01-07 00:22:26 +0000167 email_manager.manager.log_stacktrace(
168 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000169
showard170873e2009-01-07 00:22:26 +0000170 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000171 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000172 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000173 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000174
175
showard136e6dc2009-06-10 19:38:49 +0000176def setup_logging():
177 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
178 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
179 logging_manager.configure_logging(
180 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
181 logfile_name=log_name)
182
183
mbligh36768f02008-02-22 18:28:33 +0000184def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000185 global _shutdown
186 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000187 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000188
189
showard136e6dc2009-06-10 19:38:49 +0000190def init():
showardb18134f2009-03-20 20:52:18 +0000191 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
192 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000193
showard8de37132009-08-31 18:33:08 +0000194 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000195 logging.critical("monitor_db already running, aborting!")
196 sys.exit(1)
197 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000198
showardb1e51872008-10-07 11:08:18 +0000199 if _testing_mode:
200 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000201 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000202
jadmanski0afbb632008-06-06 21:10:57 +0000203 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
204 global _db
showard170873e2009-01-07 00:22:26 +0000205 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000206 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000207
showardfa8629c2008-11-04 16:51:23 +0000208 # ensure Django connection is in autocommit
209 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000210 # bypass the readonly connection
211 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000212
showardb18134f2009-03-20 20:52:18 +0000213 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000214 signal.signal(signal.SIGINT, handle_sigint)
215
showardd1ee1dd2009-01-07 21:33:08 +0000216 drones = global_config.global_config.get_config_value(
217 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
218 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000219 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000220 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000221 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
222
showardb18134f2009-03-20 20:52:18 +0000223 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000224
225
showarded2afea2009-07-07 20:54:07 +0000226def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
227 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000228 """
229 @returns The autoserv command line as a list of executable + parameters.
230
231 @param machines - string - A machine or comma separated list of machines
232 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000233 @param extra_args - list - Additional arguments to pass to autoserv.
234 @param job - Job object - If supplied, -u owner and -l name parameters
235 will be added.
236 @param queue_entry - A HostQueueEntry object - If supplied and no Job
237 object was supplied, this will be used to lookup the Job object.
238 """
showard87ba02a2009-04-20 19:37:32 +0000239 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
showarded2afea2009-07-07 20:54:07 +0000240 '-r', drone_manager.WORKING_DIRECTORY]
showard87ba02a2009-04-20 19:37:32 +0000241 if job or queue_entry:
242 if not job:
243 job = queue_entry.job
244 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000245 if verbose:
246 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000247 return autoserv_argv + extra_args
248
249
showard89f84db2009-03-12 20:39:13 +0000250class SchedulerError(Exception):
251 """Raised by HostScheduler when an inconsistent state occurs."""
252
253
showard63a34772008-08-18 19:32:50 +0000254class HostScheduler(object):
255 def _get_ready_hosts(self):
256 # avoid any host with a currently active queue entry against it
257 hosts = Host.fetch(
258 joins='LEFT JOIN host_queue_entries AS active_hqe '
259 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000260 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000261 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000262 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000263 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
264 return dict((host.id, host) for host in hosts)
265
266
267 @staticmethod
268 def _get_sql_id_list(id_list):
269 return ','.join(str(item_id) for item_id in id_list)
270
271
272 @classmethod
showard989f25d2008-10-01 11:38:11 +0000273 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000274 if not id_list:
275 return {}
showard63a34772008-08-18 19:32:50 +0000276 query %= cls._get_sql_id_list(id_list)
277 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000278 return cls._process_many2many_dict(rows, flip)
279
280
281 @staticmethod
282 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000283 result = {}
284 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000285 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000286 if flip:
287 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000288 result.setdefault(left_id, set()).add(right_id)
289 return result
290
291
292 @classmethod
293 def _get_job_acl_groups(cls, job_ids):
294 query = """
showardd9ac4452009-02-07 02:04:37 +0000295 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000296 FROM jobs
297 INNER JOIN users ON users.login = jobs.owner
298 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
299 WHERE jobs.id IN (%s)
300 """
301 return cls._get_many2many_dict(query, job_ids)
302
303
304 @classmethod
305 def _get_job_ineligible_hosts(cls, job_ids):
306 query = """
307 SELECT job_id, host_id
308 FROM ineligible_host_queues
309 WHERE job_id IN (%s)
310 """
311 return cls._get_many2many_dict(query, job_ids)
312
313
314 @classmethod
showard989f25d2008-10-01 11:38:11 +0000315 def _get_job_dependencies(cls, job_ids):
316 query = """
317 SELECT job_id, label_id
318 FROM jobs_dependency_labels
319 WHERE job_id IN (%s)
320 """
321 return cls._get_many2many_dict(query, job_ids)
322
323
324 @classmethod
showard63a34772008-08-18 19:32:50 +0000325 def _get_host_acls(cls, host_ids):
326 query = """
showardd9ac4452009-02-07 02:04:37 +0000327 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000328 FROM acl_groups_hosts
329 WHERE host_id IN (%s)
330 """
331 return cls._get_many2many_dict(query, host_ids)
332
333
334 @classmethod
335 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000336 if not host_ids:
337 return {}, {}
showard63a34772008-08-18 19:32:50 +0000338 query = """
339 SELECT label_id, host_id
340 FROM hosts_labels
341 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000342 """ % cls._get_sql_id_list(host_ids)
343 rows = _db.execute(query)
344 labels_to_hosts = cls._process_many2many_dict(rows)
345 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
346 return labels_to_hosts, hosts_to_labels
347
348
349 @classmethod
350 def _get_labels(cls):
351 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000352
353
354 def refresh(self, pending_queue_entries):
355 self._hosts_available = self._get_ready_hosts()
356
357 relevant_jobs = [queue_entry.job_id
358 for queue_entry in pending_queue_entries]
359 self._job_acls = self._get_job_acl_groups(relevant_jobs)
360 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000361 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000362
363 host_ids = self._hosts_available.keys()
364 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000365 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
366
367 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000368
369
370 def _is_acl_accessible(self, host_id, queue_entry):
371 job_acls = self._job_acls.get(queue_entry.job_id, set())
372 host_acls = self._host_acls.get(host_id, set())
373 return len(host_acls.intersection(job_acls)) > 0
374
375
showard989f25d2008-10-01 11:38:11 +0000376 def _check_job_dependencies(self, job_dependencies, host_labels):
377 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000378 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000379
380
381 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
382 queue_entry):
showardade14e22009-01-26 22:38:32 +0000383 if not queue_entry.meta_host:
384 # bypass only_if_needed labels when a specific host is selected
385 return True
386
showard989f25d2008-10-01 11:38:11 +0000387 for label_id in host_labels:
388 label = self._labels[label_id]
389 if not label.only_if_needed:
390 # we don't care about non-only_if_needed labels
391 continue
392 if queue_entry.meta_host == label_id:
393 # if the label was requested in a metahost it's OK
394 continue
395 if label_id not in job_dependencies:
396 return False
397 return True
398
399
showard89f84db2009-03-12 20:39:13 +0000400 def _check_atomic_group_labels(self, host_labels, queue_entry):
401 """
402 Determine if the given HostQueueEntry's atomic group settings are okay
403 to schedule on a host with the given labels.
404
showard6157c632009-07-06 20:19:31 +0000405 @param host_labels: A list of label ids that the host has.
406 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000407
408 @returns True if atomic group settings are okay, False otherwise.
409 """
showard6157c632009-07-06 20:19:31 +0000410 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000411 queue_entry.atomic_group_id)
412
413
showard6157c632009-07-06 20:19:31 +0000414 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000415 """
416 Return the atomic group label id for a host with the given set of
417 labels if any, or None otherwise. Raises an exception if more than
418 one atomic group are found in the set of labels.
419
showard6157c632009-07-06 20:19:31 +0000420 @param host_labels: A list of label ids that the host has.
421 @param queue_entry: The HostQueueEntry we're testing. Only used for
422 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000423
424 @returns The id of the atomic group found on a label in host_labels
425 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000426 """
showard6157c632009-07-06 20:19:31 +0000427 atomic_labels = [self._labels[label_id] for label_id in host_labels
428 if self._labels[label_id].atomic_group_id is not None]
429 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000430 if not atomic_ids:
431 return None
432 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000433 logging.error('More than one Atomic Group on HQE "%s" via: %r',
434 queue_entry, atomic_labels)
435 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000436
437
438 def _get_atomic_group_labels(self, atomic_group_id):
439 """
440 Lookup the label ids that an atomic_group is associated with.
441
442 @param atomic_group_id - The id of the AtomicGroup to look up.
443
444 @returns A generator yeilding Label ids for this atomic group.
445 """
446 return (id for id, label in self._labels.iteritems()
447 if label.atomic_group_id == atomic_group_id
448 and not label.invalid)
449
450
showard54c1ea92009-05-20 00:32:58 +0000451 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000452 """
453 @param group_hosts - A sequence of Host ids to test for usability
454 and eligibility against the Job associated with queue_entry.
455 @param queue_entry - The HostQueueEntry that these hosts are being
456 tested for eligibility against.
457
458 @returns A subset of group_hosts Host ids that are eligible for the
459 supplied queue_entry.
460 """
461 return set(host_id for host_id in group_hosts
462 if self._is_host_usable(host_id)
463 and self._is_host_eligible_for_job(host_id, queue_entry))
464
465
showard989f25d2008-10-01 11:38:11 +0000466 def _is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000467 if self._is_host_invalid(host_id):
468 # if an invalid host is scheduled for a job, it's a one-time host
469 # and it therefore bypasses eligibility checks. note this can only
470 # happen for non-metahosts, because invalid hosts have their label
471 # relationships cleared.
472 return True
473
showard989f25d2008-10-01 11:38:11 +0000474 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
475 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000476
showard89f84db2009-03-12 20:39:13 +0000477 return (self._is_acl_accessible(host_id, queue_entry) and
478 self._check_job_dependencies(job_dependencies, host_labels) and
479 self._check_only_if_needed_labels(
480 job_dependencies, host_labels, queue_entry) and
481 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000482
483
showard2924b0a2009-06-18 23:16:15 +0000484 def _is_host_invalid(self, host_id):
485 host_object = self._hosts_available.get(host_id, None)
486 return host_object and host_object.invalid
487
488
showard63a34772008-08-18 19:32:50 +0000489 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000490 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000491 return None
492 return self._hosts_available.pop(queue_entry.host_id, None)
493
494
495 def _is_host_usable(self, host_id):
496 if host_id not in self._hosts_available:
497 # host was already used during this scheduling cycle
498 return False
499 if self._hosts_available[host_id].invalid:
500 # Invalid hosts cannot be used for metahosts. They're included in
501 # the original query because they can be used by non-metahosts.
502 return False
503 return True
504
505
506 def _schedule_metahost(self, queue_entry):
507 label_id = queue_entry.meta_host
508 hosts_in_label = self._label_hosts.get(label_id, set())
509 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
510 set())
511
512 # must iterate over a copy so we can mutate the original while iterating
513 for host_id in list(hosts_in_label):
514 if not self._is_host_usable(host_id):
515 hosts_in_label.remove(host_id)
516 continue
517 if host_id in ineligible_host_ids:
518 continue
showard989f25d2008-10-01 11:38:11 +0000519 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000520 continue
521
showard89f84db2009-03-12 20:39:13 +0000522 # Remove the host from our cached internal state before returning
523 # the host object.
showard63a34772008-08-18 19:32:50 +0000524 hosts_in_label.remove(host_id)
525 return self._hosts_available.pop(host_id)
526 return None
527
528
529 def find_eligible_host(self, queue_entry):
530 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000531 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000532 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000533 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000534 return self._schedule_metahost(queue_entry)
535
536
showard89f84db2009-03-12 20:39:13 +0000537 def find_eligible_atomic_group(self, queue_entry):
538 """
539 Given an atomic group host queue entry, locate an appropriate group
540 of hosts for the associated job to run on.
541
542 The caller is responsible for creating new HQEs for the additional
543 hosts returned in order to run the actual job on them.
544
545 @returns A list of Host instances in a ready state to satisfy this
546 atomic group scheduling. Hosts will all belong to the same
547 atomic group label as specified by the queue_entry.
548 An empty list will be returned if no suitable atomic
549 group could be found.
550
551 TODO(gps): what is responsible for kicking off any attempted repairs on
552 a group of hosts? not this function, but something needs to. We do
553 not communicate that reason for returning [] outside of here...
554 For now, we'll just be unschedulable if enough hosts within one group
555 enter Repair Failed state.
556 """
557 assert queue_entry.atomic_group_id is not None
558 job = queue_entry.job
559 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000560 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000561 if job.synch_count > atomic_group.max_number_of_machines:
562 # Such a Job and HostQueueEntry should never be possible to
563 # create using the frontend. Regardless, we can't process it.
564 # Abort it immediately and log an error on the scheduler.
565 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000566 logging.error(
567 'Error: job %d synch_count=%d > requested atomic_group %d '
568 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
569 job.id, job.synch_count, atomic_group.id,
570 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000571 return []
572 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
573 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
574 set())
575
576 # Look in each label associated with atomic_group until we find one with
577 # enough hosts to satisfy the job.
578 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
579 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
580 if queue_entry.meta_host is not None:
581 # If we have a metahost label, only allow its hosts.
582 group_hosts.intersection_update(hosts_in_label)
583 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000584 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000585 group_hosts, queue_entry)
586
587 # Job.synch_count is treated as "minimum synch count" when
588 # scheduling for an atomic group of hosts. The atomic group
589 # number of machines is the maximum to pick out of a single
590 # atomic group label for scheduling at one time.
591 min_hosts = job.synch_count
592 max_hosts = atomic_group.max_number_of_machines
593
showard54c1ea92009-05-20 00:32:58 +0000594 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000595 # Not enough eligible hosts in this atomic group label.
596 continue
597
showard54c1ea92009-05-20 00:32:58 +0000598 eligible_hosts_in_group = [self._hosts_available[id]
599 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000600 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000601 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000602
showard89f84db2009-03-12 20:39:13 +0000603 # Limit ourselves to scheduling the atomic group size.
604 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000605 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000606
607 # Remove the selected hosts from our cached internal state
608 # of available hosts in order to return the Host objects.
609 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000610 for host in eligible_hosts_in_group:
611 hosts_in_label.discard(host.id)
612 self._hosts_available.pop(host.id)
613 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000614 return host_list
615
616 return []
617
618
showard170873e2009-01-07 00:22:26 +0000619class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000620 def __init__(self):
621 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000622 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000623 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000624 user_cleanup_time = scheduler_config.config.clean_interval
625 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
626 _db, user_cleanup_time)
627 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000628 self._host_agents = {}
629 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000630
mbligh36768f02008-02-22 18:28:33 +0000631
showard915958d2009-04-22 21:00:58 +0000632 def initialize(self, recover_hosts=True):
633 self._periodic_cleanup.initialize()
634 self._24hr_upkeep.initialize()
635
jadmanski0afbb632008-06-06 21:10:57 +0000636 # always recover processes
637 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000638
jadmanski0afbb632008-06-06 21:10:57 +0000639 if recover_hosts:
640 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000641
642
jadmanski0afbb632008-06-06 21:10:57 +0000643 def tick(self):
showard170873e2009-01-07 00:22:26 +0000644 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000645 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000646 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000647 self._process_recurring_runs()
showard8cc058f2009-09-08 16:26:33 +0000648 self._schedule_delay_tasks()
showard8cc058f2009-09-08 16:26:33 +0000649 self._schedule_running_host_queue_entries()
650 self._schedule_special_tasks()
showard65db3932009-10-28 19:54:35 +0000651 self._schedule_new_jobs()
jadmanski0afbb632008-06-06 21:10:57 +0000652 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000653 _drone_manager.execute_actions()
654 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000655
showard97aed502008-11-04 02:01:24 +0000656
mblighf3294cc2009-04-08 21:17:38 +0000657 def _run_cleanup(self):
658 self._periodic_cleanup.run_cleanup_maybe()
659 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000660
mbligh36768f02008-02-22 18:28:33 +0000661
showard170873e2009-01-07 00:22:26 +0000662 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
663 for object_id in object_ids:
664 agent_dict.setdefault(object_id, set()).add(agent)
665
666
667 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
668 for object_id in object_ids:
669 assert object_id in agent_dict
670 agent_dict[object_id].remove(agent)
671
672
jadmanski0afbb632008-06-06 21:10:57 +0000673 def add_agent(self, agent):
674 self._agents.append(agent)
675 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000676 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
677 self._register_agent_for_ids(self._queue_entry_agents,
678 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000679
showard170873e2009-01-07 00:22:26 +0000680
681 def get_agents_for_entry(self, queue_entry):
682 """
683 Find agents corresponding to the specified queue_entry.
684 """
showardd3dc1992009-04-22 21:01:40 +0000685 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000686
687
688 def host_has_agent(self, host):
689 """
690 Determine if there is currently an Agent present using this host.
691 """
692 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000693
694
jadmanski0afbb632008-06-06 21:10:57 +0000695 def remove_agent(self, agent):
696 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000697 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
698 agent)
699 self._unregister_agent_for_ids(self._queue_entry_agents,
700 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000701
702
showard8cc058f2009-09-08 16:26:33 +0000703 def _host_has_scheduled_special_task(self, host):
704 return bool(models.SpecialTask.objects.filter(host__id=host.id,
705 is_active=False,
706 is_complete=False))
707
708
jadmanski0afbb632008-06-06 21:10:57 +0000709 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000710 self._register_pidfiles()
711 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000712 self._recover_all_recoverable_entries()
showard8cc058f2009-09-08 16:26:33 +0000713 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000714 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000715 self._reverify_remaining_hosts()
716 # reinitialize drones after killing orphaned processes, since they can
717 # leave around files when they die
718 _drone_manager.execute_actions()
719 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000720
showard170873e2009-01-07 00:22:26 +0000721
722 def _register_pidfiles(self):
723 # during recovery we may need to read pidfiles for both running and
724 # parsing entries
725 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000726 where="status IN ('Running', 'Gathering', 'Parsing')")
showarded2afea2009-07-07 20:54:07 +0000727 special_tasks = models.SpecialTask.objects.filter(is_active=True)
728 for execution_entry in itertools.chain(queue_entries, special_tasks):
showardd3dc1992009-04-22 21:01:40 +0000729 for pidfile_name in _ALL_PIDFILE_NAMES:
730 pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000731 execution_entry.execution_path(), pidfile_name=pidfile_name)
showardd3dc1992009-04-22 21:01:40 +0000732 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000733
734
showarded2afea2009-07-07 20:54:07 +0000735 def _get_recovery_run_monitor(self, execution_path, pidfile_name, orphans):
736 run_monitor = PidfileRunMonitor()
737 run_monitor.attach_to_existing_process(execution_path,
738 pidfile_name=pidfile_name)
739 if run_monitor.has_process():
740 orphans.discard(run_monitor.get_process())
741 return run_monitor, '(process %s)' % run_monitor.get_process()
742 return None, 'without process'
743
744
showard8cc058f2009-09-08 16:26:33 +0000745 def _get_unassigned_entries(self, status):
746 for entry in HostQueueEntry.fetch(where="status = '%s'" % status):
showard0db3d432009-10-12 20:29:15 +0000747 if entry.status == status and not self.get_agents_for_entry(entry):
748 # The status can change during iteration, e.g., if job.run()
749 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000750 yield entry
751
752
showardd3dc1992009-04-22 21:01:40 +0000753 def _recover_entries_with_status(self, status, orphans, pidfile_name,
754 recover_entries_fn):
showard8cc058f2009-09-08 16:26:33 +0000755 for queue_entry in self._get_unassigned_entries(status):
showardd3dc1992009-04-22 21:01:40 +0000756 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showarded2afea2009-07-07 20:54:07 +0000757 run_monitor, process_string = self._get_recovery_run_monitor(
758 queue_entry.execution_path(), pidfile_name, orphans)
showard8cc058f2009-09-08 16:26:33 +0000759 if not run_monitor:
760 # _schedule_running_host_queue_entries should schedule and
761 # recover these entries
762 continue
showard597bfd32009-05-08 18:22:50 +0000763
showarded2afea2009-07-07 20:54:07 +0000764 logging.info('Recovering %s entry %s %s',status.lower(),
765 ', '.join(str(entry) for entry in queue_entries),
766 process_string)
showardd3dc1992009-04-22 21:01:40 +0000767 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000768
769
showard6878e8b2009-07-20 22:37:45 +0000770 def _check_for_remaining_orphan_processes(self, orphans):
771 if not orphans:
772 return
773 subject = 'Unrecovered orphan autoserv processes remain'
774 message = '\n'.join(str(process) for process in orphans)
775 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000776
777 die_on_orphans = global_config.global_config.get_config_value(
778 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
779
780 if die_on_orphans:
781 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000782
showard170873e2009-01-07 00:22:26 +0000783
showardd3dc1992009-04-22 21:01:40 +0000784 def _recover_running_entries(self, orphans):
785 def recover_entries(job, queue_entries, run_monitor):
showard8cc058f2009-09-08 16:26:33 +0000786 queue_task = QueueTask(job=job, queue_entries=queue_entries,
787 recover_run_monitor=run_monitor)
788 self.add_agent(Agent(task=queue_task,
789 num_processes=len(queue_entries)))
showardd3dc1992009-04-22 21:01:40 +0000790
791 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
showarded2afea2009-07-07 20:54:07 +0000792 orphans, _AUTOSERV_PID_FILE,
showardd3dc1992009-04-22 21:01:40 +0000793 recover_entries)
794
795
796 def _recover_gathering_entries(self, orphans):
797 def recover_entries(job, queue_entries, run_monitor):
798 gather_task = GatherLogsTask(job, queue_entries,
showarded2afea2009-07-07 20:54:07 +0000799 recover_run_monitor=run_monitor)
showard8cc058f2009-09-08 16:26:33 +0000800 self.add_agent(Agent(gather_task))
showardd3dc1992009-04-22 21:01:40 +0000801
802 self._recover_entries_with_status(
803 models.HostQueueEntry.Status.GATHERING,
804 orphans, _CRASHINFO_PID_FILE, recover_entries)
805
806
807 def _recover_parsing_entries(self, orphans):
808 def recover_entries(job, queue_entries, run_monitor):
809 reparse_task = FinalReparseTask(queue_entries,
showarded2afea2009-07-07 20:54:07 +0000810 recover_run_monitor=run_monitor)
showard8cc058f2009-09-08 16:26:33 +0000811 self.add_agent(Agent(reparse_task, num_processes=0))
showardd3dc1992009-04-22 21:01:40 +0000812
813 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
814 orphans, _PARSER_PID_FILE,
815 recover_entries)
816
817
showard8cc058f2009-09-08 16:26:33 +0000818 def _recover_pending_entries(self):
819 for entry in self._get_unassigned_entries(
820 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000821 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000822 entry.on_pending()
823
824
showardd3dc1992009-04-22 21:01:40 +0000825 def _recover_all_recoverable_entries(self):
826 orphans = _drone_manager.get_orphaned_autoserv_processes()
827 self._recover_running_entries(orphans)
828 self._recover_gathering_entries(orphans)
829 self._recover_parsing_entries(orphans)
showarded2afea2009-07-07 20:54:07 +0000830 self._recover_special_tasks(orphans)
showard6878e8b2009-07-20 22:37:45 +0000831 self._check_for_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000832
showard97aed502008-11-04 02:01:24 +0000833
showarded2afea2009-07-07 20:54:07 +0000834 def _recover_special_tasks(self, orphans):
showard2fe3f1d2009-07-06 20:19:11 +0000835 """\
836 Recovers all special tasks that have started running but have not
837 completed.
838 """
showard2fe3f1d2009-07-06 20:19:11 +0000839 tasks = models.SpecialTask.objects.filter(is_active=True,
840 is_complete=False)
showard65db3932009-10-28 19:54:35 +0000841 for task in tasks:
showard9b6ec502009-08-20 23:25:17 +0000842 if self.host_has_agent(task.host):
843 raise SchedulerError(
844 "%s already has a host agent %s." % (
showard8cc058f2009-09-08 16:26:33 +0000845 task, self._host_agents.get(task.host.id)))
showard2fe3f1d2009-07-06 20:19:11 +0000846
showarded2afea2009-07-07 20:54:07 +0000847 run_monitor, process_string = self._get_recovery_run_monitor(
848 task.execution_path(), _AUTOSERV_PID_FILE, orphans)
849
850 logging.info('Recovering %s %s', task, process_string)
showard65db3932009-10-28 19:54:35 +0000851 self._run_or_recover_special_task(task, run_monitor=run_monitor)
showard6af73ad2009-07-28 20:00:58 +0000852
853
showardb8900452009-10-12 20:31:01 +0000854 def _check_for_unrecovered_verifying_entries(self):
showard170873e2009-01-07 00:22:26 +0000855 queue_entries = HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000856 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
857 unrecovered_hqes = []
858 for queue_entry in queue_entries:
859 special_tasks = models.SpecialTask.objects.filter(
860 task__in=(models.SpecialTask.Task.CLEANUP,
861 models.SpecialTask.Task.VERIFY),
862 queue_entry__id=queue_entry.id,
863 is_complete=False)
864 if special_tasks.count() == 0:
865 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000866
showardb8900452009-10-12 20:31:01 +0000867 if unrecovered_hqes:
868 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
showarde8e37072009-08-20 23:31:30 +0000869 raise SchedulerError(
showard37757f32009-10-19 18:34:24 +0000870 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000871 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000872
873
showard65db3932009-10-28 19:54:35 +0000874 def _get_prioritized_special_tasks(self):
875 """
876 Returns all queued SpecialTasks prioritized for repair first, then
877 cleanup, then verify.
878 """
879 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
880 is_complete=False,
881 host__locked=False)
882 # exclude hosts with active queue entries unless the SpecialTask is for
883 # that queue entry
884 queued_tasks = models.Host.objects.add_join(
885 queued_tasks, 'host_queue_entries', 'host_id',
886 join_condition='host_queue_entries.active',
887 force_left_join=True)
888 queued_tasks = queued_tasks.extra(
889 where=['(host_queue_entries.id IS NULL OR '
890 'host_queue_entries.id = special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000891
showard65db3932009-10-28 19:54:35 +0000892 # reorder tasks by priority
893 task_priority_order = [models.SpecialTask.Task.REPAIR,
894 models.SpecialTask.Task.CLEANUP,
895 models.SpecialTask.Task.VERIFY]
896 def task_priority_key(task):
897 return task_priority_order.index(task.task)
898 return sorted(queued_tasks, key=task_priority_key)
899
900
901 def _run_or_recover_special_task(self, special_task, run_monitor=None):
902 """
903 Construct an AgentTask class to run the given SpecialTask and add it
904 to this dispatcher.
905 @param special_task: a models.SpecialTask instance
906 @run_monitor: if given, a running SpecialTask will be recovered with
907 this monitor.
908 """
909 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
910 for agent_task_class in special_agent_task_classes:
911 if agent_task_class.TASK_TYPE == special_task.task:
912 agent_task = agent_task_class(task=special_task,
913 recover_run_monitor=run_monitor)
914 self.add_agent(Agent(agent_task))
915 return
916
917 email_manager.manager.enqueue_notify_email(
918 'No AgentTask class for task', str(special_task))
919
920
921 def _schedule_special_tasks(self):
922 """
923 Execute queued SpecialTasks that are ready to run on idle hosts.
924 """
925 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000926 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000927 continue
showard65db3932009-10-28 19:54:35 +0000928 self._run_or_recover_special_task(task)
showard1ff7b2e2009-05-15 23:17:18 +0000929
930
showard170873e2009-01-07 00:22:26 +0000931 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000932 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000933 # should never happen
showarded2afea2009-07-07 20:54:07 +0000934 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000935 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000936 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000937 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000938 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000939
940
jadmanski0afbb632008-06-06 21:10:57 +0000941 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000942 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +0000943 full_where='locked = 0 AND invalid = 0 AND ' + where
944 for host in Host.fetch(where=full_where):
945 if self.host_has_agent(host):
946 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000947 continue
showard8cc058f2009-09-08 16:26:33 +0000948 if self._host_has_scheduled_special_task(host):
949 # host will have a special task scheduled on the next cycle
950 continue
showard170873e2009-01-07 00:22:26 +0000951 if print_message:
showardb18134f2009-03-20 20:52:18 +0000952 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000953 models.SpecialTask.objects.create(
954 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000955 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000956
957
jadmanski0afbb632008-06-06 21:10:57 +0000958 def _recover_hosts(self):
959 # recover "Repair Failed" hosts
960 message = 'Reverifying dead host %s'
961 self._reverify_hosts_where("status = 'Repair Failed'",
962 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000963
964
showard04c82c52008-05-29 19:38:12 +0000965
showardb95b1bd2008-08-15 18:11:04 +0000966 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000967 # prioritize by job priority, then non-metahost over metahost, then FIFO
968 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000969 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000970 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000971 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000972
973
showard89f84db2009-03-12 20:39:13 +0000974 def _refresh_pending_queue_entries(self):
975 """
976 Lookup the pending HostQueueEntries and call our HostScheduler
977 refresh() method given that list. Return the list.
978
979 @returns A list of pending HostQueueEntries sorted in priority order.
980 """
showard63a34772008-08-18 19:32:50 +0000981 queue_entries = self._get_pending_queue_entries()
982 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000983 return []
showardb95b1bd2008-08-15 18:11:04 +0000984
showard63a34772008-08-18 19:32:50 +0000985 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000986
showard89f84db2009-03-12 20:39:13 +0000987 return queue_entries
988
989
990 def _schedule_atomic_group(self, queue_entry):
991 """
992 Schedule the given queue_entry on an atomic group of hosts.
993
994 Returns immediately if there are insufficient available hosts.
995
996 Creates new HostQueueEntries based off of queue_entry for the
997 scheduled hosts and starts them all running.
998 """
999 # This is a virtual host queue entry representing an entire
1000 # atomic group, find a group and schedule their hosts.
1001 group_hosts = self._host_scheduler.find_eligible_atomic_group(
1002 queue_entry)
1003 if not group_hosts:
1004 return
showardcbe6f942009-06-17 19:33:49 +00001005
1006 logging.info('Expanding atomic group entry %s with hosts %s',
1007 queue_entry,
1008 ', '.join(host.hostname for host in group_hosts))
showard89f84db2009-03-12 20:39:13 +00001009 # The first assigned host uses the original HostQueueEntry
1010 group_queue_entries = [queue_entry]
1011 for assigned_host in group_hosts[1:]:
1012 # Create a new HQE for every additional assigned_host.
1013 new_hqe = HostQueueEntry.clone(queue_entry)
1014 new_hqe.save()
1015 group_queue_entries.append(new_hqe)
1016 assert len(group_queue_entries) == len(group_hosts)
1017 for queue_entry, host in itertools.izip(group_queue_entries,
1018 group_hosts):
1019 self._run_queue_entry(queue_entry, host)
1020
1021
1022 def _schedule_new_jobs(self):
1023 queue_entries = self._refresh_pending_queue_entries()
1024 if not queue_entries:
1025 return
1026
showard63a34772008-08-18 19:32:50 +00001027 for queue_entry in queue_entries:
showarde55955f2009-10-07 20:48:58 +00001028 is_unassigned_atomic_group = (
1029 queue_entry.atomic_group_id is not None
1030 and queue_entry.host_id is None)
1031 if is_unassigned_atomic_group:
1032 self._schedule_atomic_group(queue_entry)
1033 else:
showard89f84db2009-03-12 20:39:13 +00001034 assigned_host = self._host_scheduler.find_eligible_host(
1035 queue_entry)
showard65db3932009-10-28 19:54:35 +00001036 if assigned_host and not self.host_has_agent(assigned_host):
showard89f84db2009-03-12 20:39:13 +00001037 self._run_queue_entry(queue_entry, assigned_host)
showardb95b1bd2008-08-15 18:11:04 +00001038
1039
showard8cc058f2009-09-08 16:26:33 +00001040 def _schedule_running_host_queue_entries(self):
showard8375ce02009-10-12 20:35:13 +00001041 status_enum = models.HostQueueEntry.Status
1042 running_statuses = (status_enum.STARTING, status_enum.RUNNING,
1043 status_enum.GATHERING, status_enum.PARSING)
1044 sql_statuses = ', '.join(('"%s"' % s for s in running_statuses))
1045 entries = HostQueueEntry.fetch(where="status IN (%s)" % sql_statuses)
showard8cc058f2009-09-08 16:26:33 +00001046 for entry in entries:
1047 if self.get_agents_for_entry(entry):
1048 continue
1049
1050 task_entries = entry.job.get_group_entries(entry)
1051 for task_entry in task_entries:
1052 if (task_entry.status != models.HostQueueEntry.Status.PARSING
1053 and self.host_has_agent(task_entry.host)):
showard8375ce02009-10-12 20:35:13 +00001054 agent = tuple(self._host_agents.get(task_entry.host.id))[0]
showard8cc058f2009-09-08 16:26:33 +00001055 raise SchedulerError('Attempted to schedule on host that '
1056 'already has agent: %s (previous '
1057 'agent task: %s)'
1058 % (task_entry, agent.task))
1059
1060 if entry.status in (models.HostQueueEntry.Status.STARTING,
1061 models.HostQueueEntry.Status.RUNNING):
1062 params = entry.job.get_autoserv_params(task_entries)
1063 agent_task = QueueTask(job=entry.job,
1064 queue_entries=task_entries, cmd=params)
1065 elif entry.status == models.HostQueueEntry.Status.GATHERING:
1066 agent_task = GatherLogsTask(
1067 job=entry.job, queue_entries=task_entries)
1068 elif entry.status == models.HostQueueEntry.Status.PARSING:
1069 agent_task = FinalReparseTask(queue_entries=task_entries)
1070 else:
1071 raise SchedulerError('_schedule_running_host_queue_entries got '
1072 'entry with invalid status %s: %s'
1073 % (entry.status, entry))
1074
1075 self.add_agent(Agent(agent_task, num_processes=len(task_entries)))
1076
1077
1078 def _schedule_delay_tasks(self):
showardd2014822009-10-12 20:26:58 +00001079 for entry in HostQueueEntry.fetch(where='status = "%s"' %
1080 models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +00001081 task = entry.job.schedule_delayed_callback_task(entry)
1082 if task:
1083 self.add_agent(Agent(task, num_processes=0))
1084
1085
showardb95b1bd2008-08-15 18:11:04 +00001086 def _run_queue_entry(self, queue_entry, host):
showard8cc058f2009-09-08 16:26:33 +00001087 queue_entry.schedule_pre_job_tasks(assigned_host=host)
mblighd5c95802008-03-05 00:33:46 +00001088
1089
jadmanski0afbb632008-06-06 21:10:57 +00001090 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +00001091 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001092 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001093 for agent in self.get_agents_for_entry(entry):
1094 agent.abort()
1095 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001096
1097
showard324bf812009-01-20 23:23:38 +00001098 def _can_start_agent(self, agent, num_started_this_cycle,
1099 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001100 # always allow zero-process agents to run
1101 if agent.num_processes == 0:
1102 return True
1103 # don't allow any nonzero-process agents to run after we've reached a
1104 # limit (this avoids starvation of many-process agents)
1105 if have_reached_limit:
1106 return False
1107 # total process throttling
showard9bb960b2009-11-19 01:02:11 +00001108 max_runnable_processes = _drone_manager.max_runnable_processes(
1109 agent.task.username)
1110 if agent.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +00001111 return False
1112 # if a single agent exceeds the per-cycle throttling, still allow it to
1113 # run when it's the first agent in the cycle
1114 if num_started_this_cycle == 0:
1115 return True
1116 # per-cycle throttling
1117 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +00001118 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001119 return False
1120 return True
1121
1122
jadmanski0afbb632008-06-06 21:10:57 +00001123 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001124 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001125 have_reached_limit = False
1126 # iterate over copy, so we can remove agents during iteration
1127 for agent in list(self._agents):
showard8cc058f2009-09-08 16:26:33 +00001128 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001129 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001130 have_reached_limit):
1131 have_reached_limit = True
1132 continue
showard4c5374f2008-09-04 17:02:56 +00001133 num_started_this_cycle += agent.num_processes
1134 agent.tick()
showard8cc058f2009-09-08 16:26:33 +00001135 if agent.is_done():
1136 logging.info("agent finished")
1137 self.remove_agent(agent)
showarda9435c02009-05-13 21:28:17 +00001138 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001139 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001140
1141
showard29f7cd22009-04-29 21:16:24 +00001142 def _process_recurring_runs(self):
1143 recurring_runs = models.RecurringRun.objects.filter(
1144 start_date__lte=datetime.datetime.now())
1145 for rrun in recurring_runs:
1146 # Create job from template
1147 job = rrun.job
1148 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001149 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001150
1151 host_objects = info['hosts']
1152 one_time_hosts = info['one_time_hosts']
1153 metahost_objects = info['meta_hosts']
1154 dependencies = info['dependencies']
1155 atomic_group = info['atomic_group']
1156
1157 for host in one_time_hosts or []:
1158 this_host = models.Host.create_one_time_host(host.hostname)
1159 host_objects.append(this_host)
1160
1161 try:
1162 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001163 options=options,
showard29f7cd22009-04-29 21:16:24 +00001164 host_objects=host_objects,
1165 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001166 atomic_group=atomic_group)
1167
1168 except Exception, ex:
1169 logging.exception(ex)
1170 #TODO send email
1171
1172 if rrun.loop_count == 1:
1173 rrun.delete()
1174 else:
1175 if rrun.loop_count != 0: # if not infinite loop
1176 # calculate new start_date
1177 difference = datetime.timedelta(seconds=rrun.loop_period)
1178 rrun.start_date = rrun.start_date + difference
1179 rrun.loop_count -= 1
1180 rrun.save()
1181
1182
showard170873e2009-01-07 00:22:26 +00001183class PidfileRunMonitor(object):
1184 """
1185 Client must call either run() to start a new process or
1186 attach_to_existing_process().
1187 """
mbligh36768f02008-02-22 18:28:33 +00001188
showard170873e2009-01-07 00:22:26 +00001189 class _PidfileException(Exception):
1190 """
1191 Raised when there's some unexpected behavior with the pid file, but only
1192 used internally (never allowed to escape this class).
1193 """
mbligh36768f02008-02-22 18:28:33 +00001194
1195
showard170873e2009-01-07 00:22:26 +00001196 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001197 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001198 self._start_time = None
1199 self.pidfile_id = None
1200 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001201
1202
showard170873e2009-01-07 00:22:26 +00001203 def _add_nice_command(self, command, nice_level):
1204 if not nice_level:
1205 return command
1206 return ['nice', '-n', str(nice_level)] + command
1207
1208
1209 def _set_start_time(self):
1210 self._start_time = time.time()
1211
1212
1213 def run(self, command, working_directory, nice_level=None, log_file=None,
showard9bb960b2009-11-19 01:02:11 +00001214 pidfile_name=None, paired_with_pidfile=None, username=None):
showard170873e2009-01-07 00:22:26 +00001215 assert command is not None
1216 if nice_level is not None:
1217 command = ['nice', '-n', str(nice_level)] + command
1218 self._set_start_time()
1219 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001220 command, working_directory, pidfile_name=pidfile_name,
showard9bb960b2009-11-19 01:02:11 +00001221 log_file=log_file, paired_with_pidfile=paired_with_pidfile,
1222 username=username)
showard170873e2009-01-07 00:22:26 +00001223
1224
showarded2afea2009-07-07 20:54:07 +00001225 def attach_to_existing_process(self, execution_path,
showardd3dc1992009-04-22 21:01:40 +00001226 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001227 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001228 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001229 execution_path, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001230 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001231
1232
jadmanski0afbb632008-06-06 21:10:57 +00001233 def kill(self):
showard170873e2009-01-07 00:22:26 +00001234 if self.has_process():
1235 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001236
mbligh36768f02008-02-22 18:28:33 +00001237
showard170873e2009-01-07 00:22:26 +00001238 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001239 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001240 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001241
1242
showard170873e2009-01-07 00:22:26 +00001243 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001244 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001245 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001246 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001247
1248
showard170873e2009-01-07 00:22:26 +00001249 def _read_pidfile(self, use_second_read=False):
1250 assert self.pidfile_id is not None, (
1251 'You must call run() or attach_to_existing_process()')
1252 contents = _drone_manager.get_pidfile_contents(
1253 self.pidfile_id, use_second_read=use_second_read)
1254 if contents.is_invalid():
1255 self._state = drone_manager.PidfileContents()
1256 raise self._PidfileException(contents)
1257 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001258
1259
showard21baa452008-10-21 00:08:39 +00001260 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001261 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1262 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001263 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001264 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001265
1266
1267 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001268 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001269 return
mblighbb421852008-03-11 22:36:16 +00001270
showard21baa452008-10-21 00:08:39 +00001271 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001272
showard170873e2009-01-07 00:22:26 +00001273 if self._state.process is None:
1274 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001275 return
mbligh90a549d2008-03-25 23:52:34 +00001276
showard21baa452008-10-21 00:08:39 +00001277 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001278 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001279 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001280 return
mbligh90a549d2008-03-25 23:52:34 +00001281
showard170873e2009-01-07 00:22:26 +00001282 # pid but no running process - maybe process *just* exited
1283 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001284 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001285 # autoserv exited without writing an exit code
1286 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001287 self._handle_pidfile_error(
1288 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001289
showard21baa452008-10-21 00:08:39 +00001290
1291 def _get_pidfile_info(self):
1292 """\
1293 After completion, self._state will contain:
1294 pid=None, exit_status=None if autoserv has not yet run
1295 pid!=None, exit_status=None if autoserv is running
1296 pid!=None, exit_status!=None if autoserv has completed
1297 """
1298 try:
1299 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001300 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001301 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001302
1303
showard170873e2009-01-07 00:22:26 +00001304 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001305 """\
1306 Called when no pidfile is found or no pid is in the pidfile.
1307 """
showard170873e2009-01-07 00:22:26 +00001308 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001309 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001310 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001311 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001312 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001313
1314
showard35162b02009-03-03 02:17:30 +00001315 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001316 """\
1317 Called when autoserv has exited without writing an exit status,
1318 or we've timed out waiting for autoserv to write a pid to the
1319 pidfile. In either case, we just return failure and the caller
1320 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001321
showard170873e2009-01-07 00:22:26 +00001322 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001323 """
1324 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001325 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001326 self._state.exit_status = 1
1327 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001328
1329
jadmanski0afbb632008-06-06 21:10:57 +00001330 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001331 self._get_pidfile_info()
1332 return self._state.exit_status
1333
1334
1335 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001336 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001337 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001338 if self._state.num_tests_failed is None:
1339 return -1
showard21baa452008-10-21 00:08:39 +00001340 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001341
1342
showardcdaeae82009-08-31 18:32:48 +00001343 def try_copy_results_on_drone(self, **kwargs):
1344 if self.has_process():
1345 # copy results logs into the normal place for job results
1346 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1347
1348
1349 def try_copy_to_results_repository(self, source, **kwargs):
1350 if self.has_process():
1351 _drone_manager.copy_to_results_repository(self.get_process(),
1352 source, **kwargs)
1353
1354
mbligh36768f02008-02-22 18:28:33 +00001355class Agent(object):
showard77182562009-06-10 00:16:05 +00001356 """
showard8cc058f2009-09-08 16:26:33 +00001357 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001358
1359 The following methods are required on all task objects:
1360 poll() - Called periodically to let the task check its status and
1361 update its internal state. If the task succeeded.
1362 is_done() - Returns True if the task is finished.
1363 abort() - Called when an abort has been requested. The task must
1364 set its aborted attribute to True if it actually aborted.
1365
1366 The following attributes are required on all task objects:
1367 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001368 success - bool, True if this task succeeded.
1369 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1370 host_ids - A sequence of Host ids this task represents.
1371
1372 The following attribute is written to all task objects:
1373 agent - A reference to the Agent instance that the task has been
1374 added to.
1375 """
1376
1377
showard8cc058f2009-09-08 16:26:33 +00001378 def __init__(self, task, num_processes=1):
showard77182562009-06-10 00:16:05 +00001379 """
showard8cc058f2009-09-08 16:26:33 +00001380 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001381 @param num_processes: The number of subprocesses the Agent represents.
1382 This is used by the Dispatcher for managing the load on the
1383 system. Defaults to 1.
1384 """
showard8cc058f2009-09-08 16:26:33 +00001385 self.task = task
1386 task.agent = self
1387
showard77182562009-06-10 00:16:05 +00001388 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001389 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001390 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001391
showard8cc058f2009-09-08 16:26:33 +00001392 self.queue_entry_ids = task.queue_entry_ids
1393 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001394
showard8cc058f2009-09-08 16:26:33 +00001395 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001396 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001397
1398
jadmanski0afbb632008-06-06 21:10:57 +00001399 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001400 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001401 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001402 self.task.poll()
1403 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001404 self.finished = True
showardec113162008-05-08 00:52:49 +00001405
1406
jadmanski0afbb632008-06-06 21:10:57 +00001407 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001408 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001409
1410
showardd3dc1992009-04-22 21:01:40 +00001411 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001412 if self.task:
1413 self.task.abort()
1414 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001415 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001416 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001417
showardd3dc1992009-04-22 21:01:40 +00001418
showard77182562009-06-10 00:16:05 +00001419class DelayedCallTask(object):
1420 """
1421 A task object like AgentTask for an Agent to run that waits for the
1422 specified amount of time to have elapsed before calling the supplied
1423 callback once and finishing. If the callback returns anything, it is
1424 assumed to be a new Agent instance and will be added to the dispatcher.
1425
1426 @attribute end_time: The absolute posix time after which this task will
1427 call its callback when it is polled and be finished.
1428
1429 Also has all attributes required by the Agent class.
1430 """
1431 def __init__(self, delay_seconds, callback, now_func=None):
1432 """
1433 @param delay_seconds: The delay in seconds from now that this task
1434 will call the supplied callback and be done.
1435 @param callback: A callable to be called by this task once after at
1436 least delay_seconds time has elapsed. It must return None
1437 or a new Agent instance.
1438 @param now_func: A time.time like function. Default: time.time.
1439 Used for testing.
1440 """
1441 assert delay_seconds > 0
1442 assert callable(callback)
1443 if not now_func:
1444 now_func = time.time
1445 self._now_func = now_func
1446 self._callback = callback
1447
1448 self.end_time = self._now_func() + delay_seconds
1449
1450 # These attributes are required by Agent.
1451 self.aborted = False
showard77182562009-06-10 00:16:05 +00001452 self.host_ids = ()
1453 self.success = False
1454 self.queue_entry_ids = ()
1455 # This is filled in by Agent.add_task().
1456 self.agent = None
1457
1458
1459 def poll(self):
showard8cc058f2009-09-08 16:26:33 +00001460 if not self.is_done() and self._now_func() >= self.end_time:
1461 self._callback()
showard77182562009-06-10 00:16:05 +00001462 self.success = True
1463
1464
1465 def is_done(self):
showard8cc058f2009-09-08 16:26:33 +00001466 return self.success or self.aborted
showard77182562009-06-10 00:16:05 +00001467
1468
1469 def abort(self):
1470 self.aborted = True
showard77182562009-06-10 00:16:05 +00001471
1472
mbligh36768f02008-02-22 18:28:33 +00001473class AgentTask(object):
showard8cc058f2009-09-08 16:26:33 +00001474 def __init__(self, cmd=None, working_directory=None,
showard9bb960b2009-11-19 01:02:11 +00001475 recover_run_monitor=None, username=None):
1476 """
1477 username: login of user responsible for this task. may be None.
1478 """
jadmanski0afbb632008-06-06 21:10:57 +00001479 self.done = False
jadmanski0afbb632008-06-06 21:10:57 +00001480 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001481 self._working_directory = working_directory
showard9bb960b2009-11-19 01:02:11 +00001482 self.username = username
jadmanski0afbb632008-06-06 21:10:57 +00001483 self.agent = None
showarded2afea2009-07-07 20:54:07 +00001484 self.monitor = recover_run_monitor
1485 self.started = bool(recover_run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +00001486 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001487 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001488 self.queue_entry_ids = []
1489 self.host_ids = []
1490 self.log_file = None
1491
1492
1493 def _set_ids(self, host=None, queue_entries=None):
1494 if queue_entries and queue_entries != [None]:
1495 self.host_ids = [entry.host.id for entry in queue_entries]
1496 self.queue_entry_ids = [entry.id for entry in queue_entries]
1497 else:
1498 assert host
1499 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001500
1501
jadmanski0afbb632008-06-06 21:10:57 +00001502 def poll(self):
showard08a36412009-05-05 01:01:13 +00001503 if not self.started:
1504 self.start()
1505 self.tick()
1506
1507
1508 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001509 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001510 exit_code = self.monitor.exit_code()
1511 if exit_code is None:
1512 return
1513 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001514 else:
1515 success = False
mbligh36768f02008-02-22 18:28:33 +00001516
jadmanski0afbb632008-06-06 21:10:57 +00001517 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001518
1519
jadmanski0afbb632008-06-06 21:10:57 +00001520 def is_done(self):
1521 return self.done
mbligh36768f02008-02-22 18:28:33 +00001522
1523
jadmanski0afbb632008-06-06 21:10:57 +00001524 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001525 if self.done:
1526 return
jadmanski0afbb632008-06-06 21:10:57 +00001527 self.done = True
1528 self.success = success
1529 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001530
1531
jadmanski0afbb632008-06-06 21:10:57 +00001532 def prolog(self):
showarded2afea2009-07-07 20:54:07 +00001533 assert not self.monitor
mblighd64e5702008-04-04 21:39:28 +00001534
mbligh36768f02008-02-22 18:28:33 +00001535
jadmanski0afbb632008-06-06 21:10:57 +00001536 def cleanup(self):
showardcdaeae82009-08-31 18:32:48 +00001537 if self.monitor and self.log_file:
1538 self.monitor.try_copy_to_results_repository(self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001539
1540
jadmanski0afbb632008-06-06 21:10:57 +00001541 def epilog(self):
1542 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001543
1544
jadmanski0afbb632008-06-06 21:10:57 +00001545 def start(self):
1546 assert self.agent
1547
1548 if not self.started:
1549 self.prolog()
1550 self.run()
1551
1552 self.started = True
1553
1554
1555 def abort(self):
1556 if self.monitor:
1557 self.monitor.kill()
1558 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001559 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001560 self.cleanup()
1561
1562
showarded2afea2009-07-07 20:54:07 +00001563 def _get_consistent_execution_path(self, execution_entries):
1564 first_execution_path = execution_entries[0].execution_path()
1565 for execution_entry in execution_entries[1:]:
1566 assert execution_entry.execution_path() == first_execution_path, (
1567 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1568 execution_entry,
1569 first_execution_path,
1570 execution_entries[0]))
1571 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001572
1573
showarded2afea2009-07-07 20:54:07 +00001574 def _copy_results(self, execution_entries, use_monitor=None):
1575 """
1576 @param execution_entries: list of objects with execution_path() method
1577 """
showard6d1c1432009-08-20 23:30:39 +00001578 if use_monitor is not None and not use_monitor.has_process():
1579 return
1580
showarded2afea2009-07-07 20:54:07 +00001581 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001582 if use_monitor is None:
1583 assert self.monitor
1584 use_monitor = self.monitor
1585 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001586 execution_path = self._get_consistent_execution_path(execution_entries)
1587 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001588 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001589
showarda1e74b32009-05-12 17:32:04 +00001590
1591 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001592 for queue_entry in queue_entries:
1593 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001594
1595
showarda1e74b32009-05-12 17:32:04 +00001596 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1597 self._copy_results(queue_entries, use_monitor)
1598 self._parse_results(queue_entries)
1599
1600
showardd3dc1992009-04-22 21:01:40 +00001601 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
showarded2afea2009-07-07 20:54:07 +00001602 assert not self.monitor
jadmanski0afbb632008-06-06 21:10:57 +00001603 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001604 self.monitor = PidfileRunMonitor()
1605 self.monitor.run(self.cmd, self._working_directory,
1606 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001607 log_file=self.log_file,
1608 pidfile_name=pidfile_name,
showard9bb960b2009-11-19 01:02:11 +00001609 paired_with_pidfile=paired_with_pidfile,
1610 username=self.username)
mbligh36768f02008-02-22 18:28:33 +00001611
1612
showardd9205182009-04-27 20:09:55 +00001613class TaskWithJobKeyvals(object):
1614 """AgentTask mixin providing functionality to help with job keyval files."""
1615 _KEYVAL_FILE = 'keyval'
1616 def _format_keyval(self, key, value):
1617 return '%s=%s' % (key, value)
1618
1619
1620 def _keyval_path(self):
1621 """Subclasses must override this"""
1622 raise NotImplemented
1623
1624
1625 def _write_keyval_after_job(self, field, value):
1626 assert self.monitor
1627 if not self.monitor.has_process():
1628 return
1629 _drone_manager.write_lines_to_file(
1630 self._keyval_path(), [self._format_keyval(field, value)],
1631 paired_with_process=self.monitor.get_process())
1632
1633
1634 def _job_queued_keyval(self, job):
1635 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1636
1637
1638 def _write_job_finished(self):
1639 self._write_keyval_after_job("job_finished", int(time.time()))
1640
1641
showarddb502762009-09-09 15:31:20 +00001642 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1643 keyval_contents = '\n'.join(self._format_keyval(key, value)
1644 for key, value in keyval_dict.iteritems())
1645 # always end with a newline to allow additional keyvals to be written
1646 keyval_contents += '\n'
1647 _drone_manager.attach_file_to_execution(self._working_directory,
1648 keyval_contents,
1649 file_path=keyval_path)
1650
1651
1652 def _write_keyvals_before_job(self, keyval_dict):
1653 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1654
1655
1656 def _write_host_keyvals(self, host):
1657 keyval_path = os.path.join(self._working_directory, 'host_keyvals',
1658 host.hostname)
1659 platform, all_labels = host.platform_and_labels()
1660 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1661 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1662
1663
showard8cc058f2009-09-08 16:26:33 +00001664class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001665 """
1666 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1667 """
1668
1669 TASK_TYPE = None
1670 host = None
1671 queue_entry = None
1672
1673 def __init__(self, task, extra_command_args, **kwargs):
showarded2afea2009-07-07 20:54:07 +00001674 assert (self.TASK_TYPE is not None,
1675 'self.TASK_TYPE must be overridden')
showard8cc058f2009-09-08 16:26:33 +00001676
1677 self.host = Host(id=task.host.id)
1678 self.queue_entry = None
1679 if task.queue_entry:
1680 self.queue_entry = HostQueueEntry(id=task.queue_entry.id)
1681
showarded2afea2009-07-07 20:54:07 +00001682 self.task = task
showarddb502762009-09-09 15:31:20 +00001683 kwargs['working_directory'] = task.execution_path()
showard9bb960b2009-11-19 01:02:11 +00001684 if task.requested_by:
1685 kwargs['username'] = task.requested_by.login
showarded2afea2009-07-07 20:54:07 +00001686 self._extra_command_args = extra_command_args
1687 super(SpecialAgentTask, self).__init__(**kwargs)
1688
1689
showard8cc058f2009-09-08 16:26:33 +00001690 def _keyval_path(self):
1691 return os.path.join(self._working_directory, self._KEYVAL_FILE)
1692
1693
showarded2afea2009-07-07 20:54:07 +00001694 def prolog(self):
1695 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001696 self.cmd = _autoserv_command_line(self.host.hostname,
1697 self._extra_command_args,
1698 queue_entry=self.queue_entry)
1699 self._working_directory = self.task.execution_path()
1700 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001701 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001702
1703
showardde634ee2009-01-30 01:44:24 +00001704 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001705 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001706
showard2fe3f1d2009-07-06 20:19:11 +00001707 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001708 return # don't fail metahost entries, they'll be reassigned
1709
showard2fe3f1d2009-07-06 20:19:11 +00001710 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001711 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001712 return # entry has been aborted
1713
showard2fe3f1d2009-07-06 20:19:11 +00001714 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001715 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001716 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001717 self._write_keyval_after_job(queued_key, queued_time)
1718 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001719
showard8cc058f2009-09-08 16:26:33 +00001720 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001721 self.monitor.try_copy_results_on_drone(
1722 source_path=self._working_directory + '/',
1723 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001724
showard2fe3f1d2009-07-06 20:19:11 +00001725 self._copy_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001726 self.queue_entry.handle_host_failure()
showard2fe3f1d2009-07-06 20:19:11 +00001727 if self.queue_entry.job.parse_failed_repair:
1728 self._parse_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001729
1730 pidfile_id = _drone_manager.get_pidfile_id_from(
1731 self.queue_entry.execution_path(),
1732 pidfile_name=_AUTOSERV_PID_FILE)
1733 _drone_manager.register_pidfile(pidfile_id)
1734
1735
1736 def cleanup(self):
1737 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001738
1739 # We will consider an aborted task to be "Failed"
1740 self.task.finish(bool(self.success))
1741
showardf85a0b72009-10-07 20:48:45 +00001742 if self.monitor:
1743 if self.monitor.has_process():
1744 self._copy_results([self.task])
1745 if self.monitor.pidfile_id is not None:
1746 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001747
1748
1749class RepairTask(SpecialAgentTask):
1750 TASK_TYPE = models.SpecialTask.Task.REPAIR
1751
1752
1753 def __init__(self, task, recover_run_monitor=None):
1754 """\
1755 queue_entry: queue entry to mark failed if this repair fails.
1756 """
1757 protection = host_protections.Protection.get_string(
1758 task.host.protection)
1759 # normalize the protection name
1760 protection = host_protections.Protection.get_attr_name(protection)
1761
1762 super(RepairTask, self).__init__(
1763 task, ['-R', '--host-protection', protection],
1764 recover_run_monitor=recover_run_monitor)
1765
1766 # *don't* include the queue entry in IDs -- if the queue entry is
1767 # aborted, we want to leave the repair task running
1768 self._set_ids(host=self.host)
1769
1770
1771 def prolog(self):
1772 super(RepairTask, self).prolog()
1773 logging.info("repair_task starting")
1774 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001775
1776
jadmanski0afbb632008-06-06 21:10:57 +00001777 def epilog(self):
1778 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001779
jadmanski0afbb632008-06-06 21:10:57 +00001780 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001781 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001782 else:
showard8cc058f2009-09-08 16:26:33 +00001783 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001784 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001785 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001786
1787
showarded2afea2009-07-07 20:54:07 +00001788class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001789 def _copy_to_results_repository(self):
1790 if not self.queue_entry or self.queue_entry.meta_host:
1791 return
1792
1793 self.queue_entry.set_execution_subdir()
1794 log_name = os.path.basename(self.task.execution_path())
1795 source = os.path.join(self.task.execution_path(), 'debug',
1796 'autoserv.DEBUG')
1797 destination = os.path.join(
1798 self.queue_entry.execution_path(), log_name)
1799
1800 self.monitor.try_copy_to_results_repository(
1801 source, destination_path=destination)
1802
1803
showard170873e2009-01-07 00:22:26 +00001804 def epilog(self):
1805 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001806
showard775300b2009-09-09 15:30:50 +00001807 if self.success:
1808 return
showard8fe93b52008-11-18 17:53:22 +00001809
showard775300b2009-09-09 15:30:50 +00001810 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001811
showard775300b2009-09-09 15:30:50 +00001812 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001813 # effectively ignore failure for these hosts
1814 self.success = True
showard775300b2009-09-09 15:30:50 +00001815 return
1816
1817 if self.queue_entry:
1818 self.queue_entry.requeue()
1819
1820 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001821 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001822 queue_entry__id=self.queue_entry.id):
1823 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1824 self._fail_queue_entry()
1825 return
1826
showard9bb960b2009-11-19 01:02:11 +00001827 queue_entry = models.HostQueueEntry.objects.get(
1828 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00001829 else:
1830 queue_entry = None
1831
1832 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00001833 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00001834 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00001835 queue_entry=queue_entry,
1836 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00001837
showard8fe93b52008-11-18 17:53:22 +00001838
1839class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001840 TASK_TYPE = models.SpecialTask.Task.VERIFY
1841
1842
showard8cc058f2009-09-08 16:26:33 +00001843 def __init__(self, task, recover_run_monitor=None):
showarded2afea2009-07-07 20:54:07 +00001844 super(VerifyTask, self).__init__(
showard8cc058f2009-09-08 16:26:33 +00001845 task, ['-v'], recover_run_monitor=recover_run_monitor)
mblighe2586682008-02-29 22:45:46 +00001846
showard8cc058f2009-09-08 16:26:33 +00001847 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001848
1849
jadmanski0afbb632008-06-06 21:10:57 +00001850 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001851 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001852
showardb18134f2009-03-20 20:52:18 +00001853 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001854 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001855 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1856 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001857
showarded2afea2009-07-07 20:54:07 +00001858 # Delete any other queued verifies for this host. One verify will do
1859 # and there's no need to keep records of other requests.
1860 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001861 host__id=self.host.id,
1862 task=models.SpecialTask.Task.VERIFY,
showarded2afea2009-07-07 20:54:07 +00001863 is_active=False, is_complete=False)
1864 queued_verifies = queued_verifies.exclude(id=self.task.id)
1865 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001866
mbligh36768f02008-02-22 18:28:33 +00001867
jadmanski0afbb632008-06-06 21:10:57 +00001868 def epilog(self):
1869 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001870 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001871 if self.queue_entry:
1872 self.queue_entry.on_pending()
1873 else:
1874 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001875
1876
showard9bb960b2009-11-19 01:02:11 +00001877class QueueTask(AgentTask, TaskWithJobKeyvals):
showard8cc058f2009-09-08 16:26:33 +00001878 def __init__(self, job, queue_entries, cmd=None, recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001879 self.job = job
1880 self.queue_entries = queue_entries
showard8cc058f2009-09-08 16:26:33 +00001881 self.group_name = queue_entries[0].get_group_name()
showarded2afea2009-07-07 20:54:07 +00001882 super(QueueTask, self).__init__(
showard9bb960b2009-11-19 01:02:11 +00001883 cmd=cmd, working_directory=self._execution_path(),
1884 recover_run_monitor=recover_run_monitor,
1885 username=job.owner)
showard170873e2009-01-07 00:22:26 +00001886 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001887
1888
showard73ec0442009-02-07 02:05:20 +00001889 def _keyval_path(self):
showarded2afea2009-07-07 20:54:07 +00001890 return os.path.join(self._execution_path(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001891
1892
showarded2afea2009-07-07 20:54:07 +00001893 def _execution_path(self):
1894 return self.queue_entries[0].execution_path()
mblighbb421852008-03-11 22:36:16 +00001895
1896
jadmanski0afbb632008-06-06 21:10:57 +00001897 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00001898 for entry in self.queue_entries:
1899 if entry.status not in (models.HostQueueEntry.Status.STARTING,
1900 models.HostQueueEntry.Status.RUNNING):
1901 raise SchedulerError('Queue task attempting to start '
1902 'entry with invalid status %s: %s'
1903 % (entry.status, entry))
1904 if entry.host.status not in (models.Host.Status.PENDING,
1905 models.Host.Status.RUNNING):
1906 raise SchedulerError('Queue task attempting to start on queue '
1907 'entry with invalid host status %s: %s'
1908 % (entry.host.status, entry))
1909
showardd9205182009-04-27 20:09:55 +00001910 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001911 keyval_dict = {queued_key: queued_time}
1912 if self.group_name:
1913 keyval_dict['host_group_name'] = self.group_name
1914 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001915 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001916 self._write_host_keyvals(queue_entry.host)
showard8cc058f2009-09-08 16:26:33 +00001917 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showard12f3e322009-05-13 21:27:42 +00001918 queue_entry.update_field('started_on', datetime.datetime.now())
showard8cc058f2009-09-08 16:26:33 +00001919 queue_entry.host.set_status(models.Host.Status.RUNNING)
showard21baa452008-10-21 00:08:39 +00001920 queue_entry.host.update_field('dirty', 1)
showardc6a56872009-07-28 20:11:58 +00001921 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
1922 # TODO(gps): Remove this if nothing needs it anymore.
1923 # A potential user is: tko/parser
jadmanski0afbb632008-06-06 21:10:57 +00001924 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001925
1926
showard35162b02009-03-03 02:17:30 +00001927 def _write_lost_process_error_file(self):
showarded2afea2009-07-07 20:54:07 +00001928 error_file_path = os.path.join(self._execution_path(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001929 _drone_manager.write_lines_to_file(error_file_path,
1930 [_LOST_PROCESS_ERROR])
1931
1932
showardd3dc1992009-04-22 21:01:40 +00001933 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001934 if not self.monitor:
1935 return
1936
showardd9205182009-04-27 20:09:55 +00001937 self._write_job_finished()
1938
showard35162b02009-03-03 02:17:30 +00001939 if self.monitor.lost_process:
1940 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001941
showard8cc058f2009-09-08 16:26:33 +00001942 for queue_entry in self.queue_entries:
1943 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001944
1945
showardcbd74612008-11-19 21:42:02 +00001946 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001947 _drone_manager.write_lines_to_file(
showarded2afea2009-07-07 20:54:07 +00001948 os.path.join(self._execution_path(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001949 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001950 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001951
1952
jadmanskif7fa2cc2008-10-01 14:13:23 +00001953 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001954 if not self.monitor or not self.monitor.has_process():
1955 return
1956
jadmanskif7fa2cc2008-10-01 14:13:23 +00001957 # build up sets of all the aborted_by and aborted_on values
1958 aborted_by, aborted_on = set(), set()
1959 for queue_entry in self.queue_entries:
1960 if queue_entry.aborted_by:
1961 aborted_by.add(queue_entry.aborted_by)
1962 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1963 aborted_on.add(t)
1964
1965 # extract some actual, unique aborted by value and write it out
1966 assert len(aborted_by) <= 1
1967 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001968 aborted_by_value = aborted_by.pop()
1969 aborted_on_value = max(aborted_on)
1970 else:
1971 aborted_by_value = 'autotest_system'
1972 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001973
showarda0382352009-02-11 23:36:43 +00001974 self._write_keyval_after_job("aborted_by", aborted_by_value)
1975 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001976
showardcbd74612008-11-19 21:42:02 +00001977 aborted_on_string = str(datetime.datetime.fromtimestamp(
1978 aborted_on_value))
1979 self._write_status_comment('Job aborted by %s on %s' %
1980 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001981
1982
jadmanski0afbb632008-06-06 21:10:57 +00001983 def abort(self):
1984 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001985 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001986 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001987
1988
jadmanski0afbb632008-06-06 21:10:57 +00001989 def epilog(self):
1990 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001991 self._finish_task()
1992 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001993
1994
showardd3dc1992009-04-22 21:01:40 +00001995class PostJobTask(AgentTask):
1996 def __init__(self, queue_entries, pidfile_name, logfile_name,
showarded2afea2009-07-07 20:54:07 +00001997 recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00001998 self._queue_entries = queue_entries
1999 self._pidfile_name = pidfile_name
showardd3dc1992009-04-22 21:01:40 +00002000
showarded2afea2009-07-07 20:54:07 +00002001 self._execution_path = self._get_consistent_execution_path(
2002 queue_entries)
2003 self._results_dir = _drone_manager.absolute_path(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00002004 self._autoserv_monitor = PidfileRunMonitor()
showarded2afea2009-07-07 20:54:07 +00002005 self._autoserv_monitor.attach_to_existing_process(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00002006 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
2007
2008 if _testing_mode:
2009 command = 'true'
2010 else:
2011 command = self._generate_command(self._results_dir)
2012
showarded2afea2009-07-07 20:54:07 +00002013 super(PostJobTask, self).__init__(
2014 cmd=command, working_directory=self._execution_path,
showard9bb960b2009-11-19 01:02:11 +00002015 recover_run_monitor=recover_run_monitor,
2016 username=queue_entries[0].job.owner)
showardd3dc1992009-04-22 21:01:40 +00002017
showarded2afea2009-07-07 20:54:07 +00002018 self.log_file = os.path.join(self._execution_path, logfile_name)
showardd3dc1992009-04-22 21:01:40 +00002019 self._final_status = self._determine_final_status()
2020
2021
2022 def _generate_command(self, results_dir):
2023 raise NotImplementedError('Subclasses must override this')
2024
2025
2026 def _job_was_aborted(self):
2027 was_aborted = None
2028 for queue_entry in self._queue_entries:
2029 queue_entry.update_from_database()
2030 if was_aborted is None: # first queue entry
2031 was_aborted = bool(queue_entry.aborted)
2032 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
2033 email_manager.manager.enqueue_notify_email(
2034 'Inconsistent abort state',
2035 'Queue entries have inconsistent abort state: ' +
2036 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
2037 # don't crash here, just assume true
2038 return True
2039 return was_aborted
2040
2041
2042 def _determine_final_status(self):
2043 if self._job_was_aborted():
2044 return models.HostQueueEntry.Status.ABORTED
2045
2046 # we'll use a PidfileRunMonitor to read the autoserv exit status
2047 if self._autoserv_monitor.exit_code() == 0:
2048 return models.HostQueueEntry.Status.COMPLETED
2049 return models.HostQueueEntry.Status.FAILED
2050
2051
2052 def run(self):
showard8cc058f2009-09-08 16:26:33 +00002053 # Make sure we actually have results to work with.
2054 # This should never happen in normal operation.
showard5add1c82009-05-26 19:27:46 +00002055 if not self._autoserv_monitor.has_process():
2056 email_manager.manager.enqueue_notify_email(
showard8cc058f2009-09-08 16:26:33 +00002057 'No results in post-job task',
2058 'No results in post-job task at %s' %
2059 self._autoserv_monitor.pidfile_id)
showard5add1c82009-05-26 19:27:46 +00002060 self.finished(False)
2061 return
2062
2063 super(PostJobTask, self).run(
2064 pidfile_name=self._pidfile_name,
2065 paired_with_pidfile=self._paired_with_pidfile)
showardd3dc1992009-04-22 21:01:40 +00002066
2067
2068 def _set_all_statuses(self, status):
2069 for queue_entry in self._queue_entries:
2070 queue_entry.set_status(status)
2071
2072
2073 def abort(self):
2074 # override AgentTask.abort() to avoid killing the process and ending
2075 # the task. post-job tasks continue when the job is aborted.
2076 pass
2077
2078
showard9bb960b2009-11-19 01:02:11 +00002079class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002080 """
2081 Task responsible for
2082 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2083 * copying logs to the results repository
2084 * spawning CleanupTasks for hosts, if necessary
2085 * spawning a FinalReparseTask for the job
2086 """
showarded2afea2009-07-07 20:54:07 +00002087 def __init__(self, job, queue_entries, recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00002088 self._job = job
2089 super(GatherLogsTask, self).__init__(
2090 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
showarded2afea2009-07-07 20:54:07 +00002091 logfile_name='.collect_crashinfo.log',
2092 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00002093 self._set_ids(queue_entries=queue_entries)
2094
2095
2096 def _generate_command(self, results_dir):
2097 host_list = ','.join(queue_entry.host.hostname
2098 for queue_entry in self._queue_entries)
2099 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
2100 '-r', results_dir]
2101
2102
2103 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00002104 for queue_entry in self._queue_entries:
2105 if queue_entry.status != models.HostQueueEntry.Status.GATHERING:
2106 raise SchedulerError('Gather task attempting to start on '
2107 'non-gathering entry: %s' % queue_entry)
2108 if queue_entry.host.status != models.Host.Status.RUNNING:
2109 raise SchedulerError('Gather task attempting to start on queue '
showard0c5c18d2009-09-24 22:20:45 +00002110 'entry with non-running host status %s: %s'
2111 % (queue_entry.host.status, queue_entry))
showard8cc058f2009-09-08 16:26:33 +00002112
showardd3dc1992009-04-22 21:01:40 +00002113 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002114
2115
showardd3dc1992009-04-22 21:01:40 +00002116 def epilog(self):
2117 super(GatherLogsTask, self).epilog()
showardb5626452009-06-30 01:57:28 +00002118
showard6d1c1432009-08-20 23:30:39 +00002119 self._copy_and_parse_results(self._queue_entries,
2120 use_monitor=self._autoserv_monitor)
showard9bb960b2009-11-19 01:02:11 +00002121 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002122
showard9bb960b2009-11-19 01:02:11 +00002123
2124 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002125 if self._autoserv_monitor.has_process():
2126 final_success = (self._final_status ==
2127 models.HostQueueEntry.Status.COMPLETED)
2128 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2129 else:
2130 final_success = False
2131 num_tests_failed = 0
2132
showard9bb960b2009-11-19 01:02:11 +00002133 reboot_after = self._job.reboot_after
2134 do_reboot = (
2135 # always reboot after aborted jobs
2136 self._final_status == models.HostQueueEntry.Status.ABORTED
2137 or reboot_after == models.RebootAfter.ALWAYS
2138 or (reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED
2139 and final_success and num_tests_failed == 0))
2140
2141 for queue_entry in self._queue_entries:
2142 if do_reboot:
2143 # don't pass the queue entry to the CleanupTask. if the cleanup
2144 # fails, the job doesn't care -- it's over.
2145 models.SpecialTask.objects.create(
2146 host=models.Host.objects.get(id=queue_entry.host.id),
2147 task=models.SpecialTask.Task.CLEANUP,
2148 requested_by=self._job.owner_model())
2149 else:
2150 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002151
2152
showard0bbfc212009-04-29 21:06:13 +00002153 def run(self):
showard597bfd32009-05-08 18:22:50 +00002154 autoserv_exit_code = self._autoserv_monitor.exit_code()
2155 # only run if Autoserv exited due to some signal. if we have no exit
2156 # code, assume something bad (and signal-like) happened.
2157 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002158 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002159 else:
2160 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002161
2162
showard8fe93b52008-11-18 17:53:22 +00002163class CleanupTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002164 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2165
2166
showard8cc058f2009-09-08 16:26:33 +00002167 def __init__(self, task, recover_run_monitor=None):
showarded2afea2009-07-07 20:54:07 +00002168 super(CleanupTask, self).__init__(
showard8cc058f2009-09-08 16:26:33 +00002169 task, ['--cleanup'], recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002170
showard8cc058f2009-09-08 16:26:33 +00002171 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mbligh16c722d2008-03-05 00:58:44 +00002172
mblighd5c95802008-03-05 00:33:46 +00002173
jadmanski0afbb632008-06-06 21:10:57 +00002174 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002175 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00002176 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard8cc058f2009-09-08 16:26:33 +00002177 self.host.set_status(models.Host.Status.CLEANING)
2178 if self.queue_entry:
2179 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2180
2181
showard775300b2009-09-09 15:30:50 +00002182 def _finish_epilog(self):
showard7b2d7cb2009-10-28 19:53:03 +00002183 if not self.queue_entry or not self.success:
showard775300b2009-09-09 15:30:50 +00002184 return
2185
showard7b2d7cb2009-10-28 19:53:03 +00002186 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
2187 should_run_verify = (
2188 self.queue_entry.job.run_verify
2189 and self.host.protection != do_not_verify_protection)
2190 if should_run_verify:
showard9bb960b2009-11-19 01:02:11 +00002191 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
showard7b2d7cb2009-10-28 19:53:03 +00002192 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00002193 host=models.Host.objects.get(id=self.host.id),
showard7b2d7cb2009-10-28 19:53:03 +00002194 queue_entry=entry,
2195 task=models.SpecialTask.Task.VERIFY)
2196 else:
showard775300b2009-09-09 15:30:50 +00002197 self.queue_entry.on_pending()
mblighd5c95802008-03-05 00:33:46 +00002198
mblighd5c95802008-03-05 00:33:46 +00002199
showard21baa452008-10-21 00:08:39 +00002200 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00002201 super(CleanupTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002202
showard21baa452008-10-21 00:08:39 +00002203 if self.success:
2204 self.host.update_field('dirty', 0)
showard775300b2009-09-09 15:30:50 +00002205 self.host.set_status(models.Host.Status.READY)
showard21baa452008-10-21 00:08:39 +00002206
showard775300b2009-09-09 15:30:50 +00002207 self._finish_epilog()
showard8cc058f2009-09-08 16:26:33 +00002208
showard21baa452008-10-21 00:08:39 +00002209
showardd3dc1992009-04-22 21:01:40 +00002210class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00002211 _num_running_parses = 0
2212
showarded2afea2009-07-07 20:54:07 +00002213 def __init__(self, queue_entries, recover_run_monitor=None):
2214 super(FinalReparseTask, self).__init__(
2215 queue_entries, pidfile_name=_PARSER_PID_FILE,
2216 logfile_name='.parse.log',
2217 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002218 # don't use _set_ids, since we don't want to set the host_ids
2219 self.queue_entry_ids = [entry.id for entry in queue_entries]
showarded2afea2009-07-07 20:54:07 +00002220 self._parse_started = self.started
showard97aed502008-11-04 02:01:24 +00002221
showard97aed502008-11-04 02:01:24 +00002222
2223 @classmethod
2224 def _increment_running_parses(cls):
2225 cls._num_running_parses += 1
2226
2227
2228 @classmethod
2229 def _decrement_running_parses(cls):
2230 cls._num_running_parses -= 1
2231
2232
2233 @classmethod
2234 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002235 return (cls._num_running_parses <
2236 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002237
2238
2239 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00002240 for queue_entry in self._queue_entries:
2241 if queue_entry.status != models.HostQueueEntry.Status.PARSING:
2242 raise SchedulerError('Parse task attempting to start on '
2243 'non-parsing entry: %s' % queue_entry)
2244
showard97aed502008-11-04 02:01:24 +00002245 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002246
2247
2248 def epilog(self):
2249 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002250 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00002251
2252
showardd3dc1992009-04-22 21:01:40 +00002253 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00002254 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00002255 results_dir]
showard97aed502008-11-04 02:01:24 +00002256
2257
showard08a36412009-05-05 01:01:13 +00002258 def tick(self):
2259 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002260 # and we can, at which point we revert to default behavior
2261 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00002262 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002263 else:
2264 self._try_starting_parse()
2265
2266
2267 def run(self):
2268 # override run() to not actually run unless we can
2269 self._try_starting_parse()
2270
2271
2272 def _try_starting_parse(self):
2273 if not self._can_run_new_parse():
2274 return
showard170873e2009-01-07 00:22:26 +00002275
showard97aed502008-11-04 02:01:24 +00002276 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002277 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00002278
showard97aed502008-11-04 02:01:24 +00002279 self._increment_running_parses()
2280 self._parse_started = True
2281
2282
2283 def finished(self, success):
2284 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00002285 if self._parse_started:
2286 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002287
2288
showarda3c58572009-03-12 20:36:59 +00002289class DBError(Exception):
2290 """Raised by the DBObject constructor when its select fails."""
2291
2292
mbligh36768f02008-02-22 18:28:33 +00002293class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002294 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002295
2296 # Subclasses MUST override these:
2297 _table_name = ''
2298 _fields = ()
2299
showarda3c58572009-03-12 20:36:59 +00002300 # A mapping from (type, id) to the instance of the object for that
2301 # particular id. This prevents us from creating new Job() and Host()
2302 # instances for every HostQueueEntry object that we instantiate as
2303 # multiple HQEs often share the same Job.
2304 _instances_by_type_and_id = weakref.WeakValueDictionary()
2305 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002306
showarda3c58572009-03-12 20:36:59 +00002307
2308 def __new__(cls, id=None, **kwargs):
2309 """
2310 Look to see if we already have an instance for this particular type
2311 and id. If so, use it instead of creating a duplicate instance.
2312 """
2313 if id is not None:
2314 instance = cls._instances_by_type_and_id.get((cls, id))
2315 if instance:
2316 return instance
2317 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2318
2319
2320 def __init__(self, id=None, row=None, new_record=False, always_query=True):
showard8cc058f2009-09-08 16:26:33 +00002321 assert bool(id) or bool(row)
2322 if id is not None and row is not None:
2323 assert id == row[0]
showard6ae5ea92009-02-25 00:11:51 +00002324 assert self._table_name, '_table_name must be defined in your class'
2325 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002326 if not new_record:
2327 if self._initialized and not always_query:
2328 return # We've already been initialized.
2329 if id is None:
2330 id = row[0]
2331 # Tell future constructors to use us instead of re-querying while
2332 # this instance is still around.
2333 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002334
showard6ae5ea92009-02-25 00:11:51 +00002335 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002336
jadmanski0afbb632008-06-06 21:10:57 +00002337 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002338
jadmanski0afbb632008-06-06 21:10:57 +00002339 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002340 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002341
showarda3c58572009-03-12 20:36:59 +00002342 if self._initialized:
2343 differences = self._compare_fields_in_row(row)
2344 if differences:
showard7629f142009-03-27 21:02:02 +00002345 logging.warn(
2346 'initialized %s %s instance requery is updating: %s',
2347 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002348 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002349 self._initialized = True
2350
2351
2352 @classmethod
2353 def _clear_instance_cache(cls):
2354 """Used for testing, clear the internal instance cache."""
2355 cls._instances_by_type_and_id.clear()
2356
2357
showardccbd6c52009-03-21 00:10:21 +00002358 def _fetch_row_from_db(self, row_id):
2359 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2360 rows = _db.execute(sql, (row_id,))
2361 if not rows:
showard76e29d12009-04-15 21:53:10 +00002362 raise DBError("row not found (table=%s, row id=%s)"
2363 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002364 return rows[0]
2365
2366
showarda3c58572009-03-12 20:36:59 +00002367 def _assert_row_length(self, row):
2368 assert len(row) == len(self._fields), (
2369 "table = %s, row = %s/%d, fields = %s/%d" % (
2370 self.__table, row, len(row), self._fields, len(self._fields)))
2371
2372
2373 def _compare_fields_in_row(self, row):
2374 """
showarddae680a2009-10-12 20:26:43 +00002375 Given a row as returned by a SELECT query, compare it to our existing in
2376 memory fields. Fractional seconds are stripped from datetime values
2377 before comparison.
showarda3c58572009-03-12 20:36:59 +00002378
2379 @param row - A sequence of values corresponding to fields named in
2380 The class attribute _fields.
2381
2382 @returns A dictionary listing the differences keyed by field name
2383 containing tuples of (current_value, row_value).
2384 """
2385 self._assert_row_length(row)
2386 differences = {}
showarddae680a2009-10-12 20:26:43 +00002387 datetime_cmp_fmt = '%Y-%m-%d %H:%M:%S' # Leave off the microseconds.
showarda3c58572009-03-12 20:36:59 +00002388 for field, row_value in itertools.izip(self._fields, row):
2389 current_value = getattr(self, field)
showarddae680a2009-10-12 20:26:43 +00002390 if (isinstance(current_value, datetime.datetime)
2391 and isinstance(row_value, datetime.datetime)):
2392 current_value = current_value.strftime(datetime_cmp_fmt)
2393 row_value = row_value.strftime(datetime_cmp_fmt)
showarda3c58572009-03-12 20:36:59 +00002394 if current_value != row_value:
2395 differences[field] = (current_value, row_value)
2396 return differences
showard2bab8f42008-11-12 18:15:22 +00002397
2398
2399 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002400 """
2401 Update our field attributes using a single row returned by SELECT.
2402
2403 @param row - A sequence of values corresponding to fields named in
2404 the class fields list.
2405 """
2406 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002407
showard2bab8f42008-11-12 18:15:22 +00002408 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002409 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002410 setattr(self, field, value)
2411 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002412
showard2bab8f42008-11-12 18:15:22 +00002413 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002414
mblighe2586682008-02-29 22:45:46 +00002415
showardccbd6c52009-03-21 00:10:21 +00002416 def update_from_database(self):
2417 assert self.id is not None
2418 row = self._fetch_row_from_db(self.id)
2419 self._update_fields_from_row(row)
2420
2421
jadmanski0afbb632008-06-06 21:10:57 +00002422 def count(self, where, table = None):
2423 if not table:
2424 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002425
jadmanski0afbb632008-06-06 21:10:57 +00002426 rows = _db.execute("""
2427 SELECT count(*) FROM %s
2428 WHERE %s
2429 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002430
jadmanski0afbb632008-06-06 21:10:57 +00002431 assert len(rows) == 1
2432
2433 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002434
2435
showardd3dc1992009-04-22 21:01:40 +00002436 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002437 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002438
showard2bab8f42008-11-12 18:15:22 +00002439 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002440 return
mbligh36768f02008-02-22 18:28:33 +00002441
mblighf8c624d2008-07-03 16:58:45 +00002442 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002443 _db.execute(query, (value, self.id))
2444
showard2bab8f42008-11-12 18:15:22 +00002445 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002446
2447
jadmanski0afbb632008-06-06 21:10:57 +00002448 def save(self):
2449 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002450 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002451 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002452 values = []
2453 for key in keys:
2454 value = getattr(self, key)
2455 if value is None:
2456 values.append('NULL')
2457 else:
2458 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002459 values_str = ','.join(values)
2460 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2461 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002462 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002463 # Update our id to the one the database just assigned to us.
2464 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002465
2466
jadmanski0afbb632008-06-06 21:10:57 +00002467 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002468 self._instances_by_type_and_id.pop((type(self), id), None)
2469 self._initialized = False
2470 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002471 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2472 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002473
2474
showard63a34772008-08-18 19:32:50 +00002475 @staticmethod
2476 def _prefix_with(string, prefix):
2477 if string:
2478 string = prefix + string
2479 return string
2480
2481
jadmanski0afbb632008-06-06 21:10:57 +00002482 @classmethod
showard989f25d2008-10-01 11:38:11 +00002483 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002484 """
2485 Construct instances of our class based on the given database query.
2486
2487 @yields One class instance for each row fetched.
2488 """
showard63a34772008-08-18 19:32:50 +00002489 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2490 where = cls._prefix_with(where, 'WHERE ')
2491 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002492 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002493 'joins' : joins,
2494 'where' : where,
2495 'order_by' : order_by})
2496 rows = _db.execute(query, params)
showard8cc058f2009-09-08 16:26:33 +00002497 return [cls(id=row[0], row=row) for row in rows]
mblighe2586682008-02-29 22:45:46 +00002498
mbligh36768f02008-02-22 18:28:33 +00002499
2500class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002501 _table_name = 'ineligible_host_queues'
2502 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002503
2504
showard89f84db2009-03-12 20:39:13 +00002505class AtomicGroup(DBObject):
2506 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002507 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2508 'invalid')
showard89f84db2009-03-12 20:39:13 +00002509
2510
showard989f25d2008-10-01 11:38:11 +00002511class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002512 _table_name = 'labels'
2513 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002514 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002515
2516
showard6157c632009-07-06 20:19:31 +00002517 def __repr__(self):
2518 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
2519 self.name, self.id, self.atomic_group_id)
2520
2521
mbligh36768f02008-02-22 18:28:33 +00002522class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002523 _table_name = 'hosts'
2524 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2525 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2526
2527
jadmanski0afbb632008-06-06 21:10:57 +00002528 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002529 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002530 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002531
2532
showard170873e2009-01-07 00:22:26 +00002533 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002534 """
showard170873e2009-01-07 00:22:26 +00002535 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002536 """
2537 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002538 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002539 FROM labels
2540 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002541 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002542 ORDER BY labels.name
2543 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002544 platform = None
2545 all_labels = []
2546 for label_name, is_platform in rows:
2547 if is_platform:
2548 platform = label_name
2549 all_labels.append(label_name)
2550 return platform, all_labels
2551
2552
showard54c1ea92009-05-20 00:32:58 +00002553 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2554
2555
2556 @classmethod
2557 def cmp_for_sort(cls, a, b):
2558 """
2559 A comparison function for sorting Host objects by hostname.
2560
2561 This strips any trailing numeric digits, ignores leading 0s and
2562 compares hostnames by the leading name and the trailing digits as a
2563 number. If both hostnames do not match this pattern, they are simply
2564 compared as lower case strings.
2565
2566 Example of how hostnames will be sorted:
2567
2568 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2569
2570 This hopefully satisfy most people's hostname sorting needs regardless
2571 of their exact naming schemes. Nobody sane should have both a host10
2572 and host010 (but the algorithm works regardless).
2573 """
2574 lower_a = a.hostname.lower()
2575 lower_b = b.hostname.lower()
2576 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2577 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2578 if match_a and match_b:
2579 name_a, number_a_str = match_a.groups()
2580 name_b, number_b_str = match_b.groups()
2581 number_a = int(number_a_str.lstrip('0'))
2582 number_b = int(number_b_str.lstrip('0'))
2583 result = cmp((name_a, number_a), (name_b, number_b))
2584 if result == 0 and lower_a != lower_b:
2585 # If they compared equal above but the lower case names are
2586 # indeed different, don't report equality. abc012 != abc12.
2587 return cmp(lower_a, lower_b)
2588 return result
2589 else:
2590 return cmp(lower_a, lower_b)
2591
2592
mbligh36768f02008-02-22 18:28:33 +00002593class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002594 _table_name = 'host_queue_entries'
2595 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002596 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002597 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002598
2599
showarda3c58572009-03-12 20:36:59 +00002600 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002601 assert id or row
showarda3c58572009-03-12 20:36:59 +00002602 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002603 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002604
jadmanski0afbb632008-06-06 21:10:57 +00002605 if self.host_id:
2606 self.host = Host(self.host_id)
2607 else:
2608 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002609
showard77182562009-06-10 00:16:05 +00002610 if self.atomic_group_id:
2611 self.atomic_group = AtomicGroup(self.atomic_group_id,
2612 always_query=False)
2613 else:
2614 self.atomic_group = None
2615
showard170873e2009-01-07 00:22:26 +00002616 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002617 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002618
2619
showard89f84db2009-03-12 20:39:13 +00002620 @classmethod
2621 def clone(cls, template):
2622 """
2623 Creates a new row using the values from a template instance.
2624
2625 The new instance will not exist in the database or have a valid
2626 id attribute until its save() method is called.
2627 """
2628 assert isinstance(template, cls)
2629 new_row = [getattr(template, field) for field in cls._fields]
2630 clone = cls(row=new_row, new_record=True)
2631 clone.id = None
2632 return clone
2633
2634
showardc85c21b2008-11-24 22:17:37 +00002635 def _view_job_url(self):
2636 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2637
2638
showardf1ae3542009-05-11 19:26:02 +00002639 def get_labels(self):
2640 """
2641 Get all labels associated with this host queue entry (either via the
2642 meta_host or as a job dependency label). The labels yielded are not
2643 guaranteed to be unique.
2644
2645 @yields Label instances associated with this host_queue_entry.
2646 """
2647 if self.meta_host:
2648 yield Label(id=self.meta_host, always_query=False)
2649 labels = Label.fetch(
2650 joins="JOIN jobs_dependency_labels AS deps "
2651 "ON (labels.id = deps.label_id)",
2652 where="deps.job_id = %d" % self.job.id)
2653 for label in labels:
2654 yield label
2655
2656
jadmanski0afbb632008-06-06 21:10:57 +00002657 def set_host(self, host):
2658 if host:
2659 self.queue_log_record('Assigning host ' + host.hostname)
2660 self.update_field('host_id', host.id)
2661 self.update_field('active', True)
2662 self.block_host(host.id)
2663 else:
2664 self.queue_log_record('Releasing host')
2665 self.unblock_host(self.host.id)
2666 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002667
jadmanski0afbb632008-06-06 21:10:57 +00002668 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002669
2670
jadmanski0afbb632008-06-06 21:10:57 +00002671 def get_host(self):
2672 return self.host
mbligh36768f02008-02-22 18:28:33 +00002673
2674
jadmanski0afbb632008-06-06 21:10:57 +00002675 def queue_log_record(self, log_line):
2676 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002677 _drone_manager.write_lines_to_file(self.queue_log_path,
2678 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002679
2680
jadmanski0afbb632008-06-06 21:10:57 +00002681 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002682 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002683 row = [0, self.job.id, host_id]
2684 block = IneligibleHostQueue(row=row, new_record=True)
2685 block.save()
mblighe2586682008-02-29 22:45:46 +00002686
2687
jadmanski0afbb632008-06-06 21:10:57 +00002688 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002689 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002690 blocks = IneligibleHostQueue.fetch(
2691 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2692 for block in blocks:
2693 block.delete()
mblighe2586682008-02-29 22:45:46 +00002694
2695
showard2bab8f42008-11-12 18:15:22 +00002696 def set_execution_subdir(self, subdir=None):
2697 if subdir is None:
2698 assert self.get_host()
2699 subdir = self.get_host().hostname
2700 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002701
2702
showard6355f6b2008-12-05 18:52:13 +00002703 def _get_hostname(self):
2704 if self.host:
2705 return self.host.hostname
2706 return 'no host'
2707
2708
showard170873e2009-01-07 00:22:26 +00002709 def __str__(self):
showard828fc4c2009-09-14 20:31:00 +00002710 flags = []
2711 if self.active:
2712 flags.append('active')
2713 if self.complete:
2714 flags.append('complete')
2715 if self.deleted:
2716 flags.append('deleted')
2717 if self.aborted:
2718 flags.append('aborted')
2719 flags_str = ','.join(flags)
2720 if flags_str:
2721 flags_str = ' [%s]' % flags_str
2722 return "%s/%d (%d) %s%s" % (self._get_hostname(), self.job.id, self.id,
2723 self.status, flags_str)
showard170873e2009-01-07 00:22:26 +00002724
2725
jadmanski0afbb632008-06-06 21:10:57 +00002726 def set_status(self, status):
showard56824072009-10-12 20:30:21 +00002727 logging.info("%s -> %s", self, status)
mblighf8c624d2008-07-03 16:58:45 +00002728
showard56824072009-10-12 20:30:21 +00002729 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002730
showard8cc058f2009-09-08 16:26:33 +00002731 if status in (models.HostQueueEntry.Status.QUEUED,
2732 models.HostQueueEntry.Status.PARSING):
jadmanski0afbb632008-06-06 21:10:57 +00002733 self.update_field('complete', False)
2734 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002735
showard8cc058f2009-09-08 16:26:33 +00002736 if status in (models.HostQueueEntry.Status.PENDING,
2737 models.HostQueueEntry.Status.RUNNING,
2738 models.HostQueueEntry.Status.VERIFYING,
2739 models.HostQueueEntry.Status.STARTING,
2740 models.HostQueueEntry.Status.GATHERING):
jadmanski0afbb632008-06-06 21:10:57 +00002741 self.update_field('complete', False)
2742 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002743
showard8cc058f2009-09-08 16:26:33 +00002744 if status in (models.HostQueueEntry.Status.FAILED,
2745 models.HostQueueEntry.Status.COMPLETED,
2746 models.HostQueueEntry.Status.STOPPED,
2747 models.HostQueueEntry.Status.ABORTED):
jadmanski0afbb632008-06-06 21:10:57 +00002748 self.update_field('complete', True)
2749 self.update_field('active', False)
showardf85a0b72009-10-07 20:48:45 +00002750 self._on_complete()
showardc85c21b2008-11-24 22:17:37 +00002751
2752 should_email_status = (status.lower() in _notify_email_statuses or
2753 'all' in _notify_email_statuses)
2754 if should_email_status:
2755 self._email_on_status(status)
2756
2757 self._email_on_job_complete()
2758
2759
showardf85a0b72009-10-07 20:48:45 +00002760 def _on_complete(self):
2761 if not self.execution_subdir:
2762 return
2763 # unregister any possible pidfiles associated with this queue entry
2764 for pidfile_name in _ALL_PIDFILE_NAMES:
2765 pidfile_id = _drone_manager.get_pidfile_id_from(
2766 self.execution_path(), pidfile_name=pidfile_name)
2767 _drone_manager.unregister_pidfile(pidfile_id)
2768
2769
showardc85c21b2008-11-24 22:17:37 +00002770 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002771 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002772
2773 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2774 self.job.id, self.job.name, hostname, status)
2775 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2776 self.job.id, self.job.name, hostname, status,
2777 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002778 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002779
2780
2781 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002782 if not self.job.is_finished():
2783 return
showard542e8402008-09-19 20:16:18 +00002784
showardc85c21b2008-11-24 22:17:37 +00002785 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002786 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002787 for queue_entry in hosts_queue:
2788 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002789 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002790 queue_entry.status))
2791
2792 summary_text = "\n".join(summary_text)
2793 status_counts = models.Job.objects.get_status_counts(
2794 [self.job.id])[self.job.id]
2795 status = ', '.join('%d %s' % (count, status) for status, count
2796 in status_counts.iteritems())
2797
2798 subject = 'Autotest: Job ID: %s "%s" %s' % (
2799 self.job.id, self.job.name, status)
2800 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2801 self.job.id, self.job.name, status, self._view_job_url(),
2802 summary_text)
showard170873e2009-01-07 00:22:26 +00002803 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002804
2805
showard8cc058f2009-09-08 16:26:33 +00002806 def schedule_pre_job_tasks(self, assigned_host=None):
showard77182562009-06-10 00:16:05 +00002807 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002808 assert assigned_host
2809 # ensure results dir exists for the queue log
showard08356c12009-06-15 20:24:01 +00002810 if self.host_id is None:
2811 self.set_host(assigned_host)
2812 else:
2813 assert assigned_host.id == self.host_id
mbligh36768f02008-02-22 18:28:33 +00002814
showardcfd4a7e2009-07-11 01:47:33 +00002815 logging.info("%s/%s/%s scheduled on %s, status=%s",
showardb18134f2009-03-20 20:52:18 +00002816 self.job.name, self.meta_host, self.atomic_group_id,
2817 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002818
showard8cc058f2009-09-08 16:26:33 +00002819 self._do_schedule_pre_job_tasks()
showard77182562009-06-10 00:16:05 +00002820
2821
showard8cc058f2009-09-08 16:26:33 +00002822 def _do_schedule_pre_job_tasks(self):
showard77182562009-06-10 00:16:05 +00002823 # Every host goes thru the Verifying stage (which may or may not
2824 # actually do anything as determined by get_pre_job_tasks).
2825 self.set_status(models.HostQueueEntry.Status.VERIFYING)
showard8cc058f2009-09-08 16:26:33 +00002826 self.job.schedule_pre_job_tasks(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002827
showard6ae5ea92009-02-25 00:11:51 +00002828
jadmanski0afbb632008-06-06 21:10:57 +00002829 def requeue(self):
showardcfd4a7e2009-07-11 01:47:33 +00002830 assert self.host
showard8cc058f2009-09-08 16:26:33 +00002831 self.set_status(models.HostQueueEntry.Status.QUEUED)
showard12f3e322009-05-13 21:27:42 +00002832 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002833 # verify/cleanup failure sets the execution subdir, so reset it here
2834 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002835 if self.meta_host:
2836 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002837
2838
jadmanski0afbb632008-06-06 21:10:57 +00002839 def handle_host_failure(self):
2840 """\
2841 Called when this queue entry's host has failed verification and
2842 repair.
2843 """
2844 assert not self.meta_host
showard8cc058f2009-09-08 16:26:33 +00002845 self.set_status(models.HostQueueEntry.Status.FAILED)
showard2bab8f42008-11-12 18:15:22 +00002846 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002847
2848
jadmanskif7fa2cc2008-10-01 14:13:23 +00002849 @property
2850 def aborted_by(self):
2851 self._load_abort_info()
2852 return self._aborted_by
2853
2854
2855 @property
2856 def aborted_on(self):
2857 self._load_abort_info()
2858 return self._aborted_on
2859
2860
2861 def _load_abort_info(self):
2862 """ Fetch info about who aborted the job. """
2863 if hasattr(self, "_aborted_by"):
2864 return
2865 rows = _db.execute("""
2866 SELECT users.login, aborted_host_queue_entries.aborted_on
2867 FROM aborted_host_queue_entries
2868 INNER JOIN users
2869 ON users.id = aborted_host_queue_entries.aborted_by_id
2870 WHERE aborted_host_queue_entries.queue_entry_id = %s
2871 """, (self.id,))
2872 if rows:
2873 self._aborted_by, self._aborted_on = rows[0]
2874 else:
2875 self._aborted_by = self._aborted_on = None
2876
2877
showardb2e2c322008-10-14 17:33:55 +00002878 def on_pending(self):
2879 """
2880 Called when an entry in a synchronous job has passed verify. If the
showard8cc058f2009-09-08 16:26:33 +00002881 job is ready to run, sets the entries to STARTING. Otherwise, it leaves
2882 them in PENDING.
showardb2e2c322008-10-14 17:33:55 +00002883 """
showard8cc058f2009-09-08 16:26:33 +00002884 self.set_status(models.HostQueueEntry.Status.PENDING)
2885 self.host.set_status(models.Host.Status.PENDING)
showardb000a8d2009-07-28 20:02:07 +00002886
2887 # Some debug code here: sends an email if an asynchronous job does not
2888 # immediately enter Starting.
2889 # TODO: Remove this once we figure out why asynchronous jobs are getting
2890 # stuck in Pending.
showard8cc058f2009-09-08 16:26:33 +00002891 self.job.run_if_ready(queue_entry=self)
2892 if (self.job.synch_count == 1 and
2893 self.status == models.HostQueueEntry.Status.PENDING):
showardb000a8d2009-07-28 20:02:07 +00002894 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
2895 message = 'Asynchronous job stuck in Pending'
2896 email_manager.manager.enqueue_notify_email(subject, message)
showardb2e2c322008-10-14 17:33:55 +00002897
2898
showardd3dc1992009-04-22 21:01:40 +00002899 def abort(self, dispatcher):
2900 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002901
showardd3dc1992009-04-22 21:01:40 +00002902 Status = models.HostQueueEntry.Status
showard8cc058f2009-09-08 16:26:33 +00002903 if self.status in (Status.GATHERING, Status.PARSING):
showardd3dc1992009-04-22 21:01:40 +00002904 # do nothing; post-job tasks will finish and then mark this entry
2905 # with status "Aborted" and take care of the host
2906 return
2907
showard8cc058f2009-09-08 16:26:33 +00002908 if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING):
2909 assert not dispatcher.get_agents_for_entry(self)
showardd3dc1992009-04-22 21:01:40 +00002910 self.host.set_status(models.Host.Status.READY)
2911 elif self.status == Status.VERIFYING:
showard8cc058f2009-09-08 16:26:33 +00002912 models.SpecialTask.objects.create(
2913 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +00002914 host=models.Host.objects.get(id=self.host.id),
2915 requested_by=self.job.owner_model())
showardd3dc1992009-04-22 21:01:40 +00002916
2917 self.set_status(Status.ABORTED)
showardd2014822009-10-12 20:26:58 +00002918 self.job.abort_delay_ready_task()
showard170873e2009-01-07 00:22:26 +00002919
showard8cc058f2009-09-08 16:26:33 +00002920
2921 def get_group_name(self):
2922 atomic_group = self.atomic_group
2923 if not atomic_group:
2924 return ''
2925
2926 # Look at any meta_host and dependency labels and pick the first
2927 # one that also specifies this atomic group. Use that label name
2928 # as the group name if possible (it is more specific).
2929 for label in self.get_labels():
2930 if label.atomic_group_id:
2931 assert label.atomic_group_id == atomic_group.id
2932 return label.name
2933 return atomic_group.name
2934
2935
showard170873e2009-01-07 00:22:26 +00002936 def execution_tag(self):
2937 assert self.execution_subdir
mblighe7d9c602009-07-02 19:02:33 +00002938 return "%s/%s" % (self.job.tag(), self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002939
2940
showarded2afea2009-07-07 20:54:07 +00002941 def execution_path(self):
2942 return self.execution_tag()
2943
2944
mbligh36768f02008-02-22 18:28:33 +00002945class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002946 _table_name = 'jobs'
2947 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2948 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002949 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002950 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002951
showard77182562009-06-10 00:16:05 +00002952 # This does not need to be a column in the DB. The delays are likely to
2953 # be configured short. If the scheduler is stopped and restarted in
2954 # the middle of a job's delay cycle, the delay cycle will either be
2955 # repeated or skipped depending on the number of Pending machines found
2956 # when the restarted scheduler recovers to track it. Not a problem.
2957 #
2958 # A reference to the DelayedCallTask that will wake up the job should
2959 # no other HQEs change state in time. Its end_time attribute is used
2960 # by our run_with_ready_delay() method to determine if the wait is over.
2961 _delay_ready_task = None
2962
2963 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
2964 # all status='Pending' atomic group HQEs incase a delay was running when the
2965 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00002966
showarda3c58572009-03-12 20:36:59 +00002967 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002968 assert id or row
showarda3c58572009-03-12 20:36:59 +00002969 super(Job, self).__init__(id=id, row=row, **kwargs)
showard9bb960b2009-11-19 01:02:11 +00002970 self._owner_model = None # caches model instance of owner
2971
2972
2973 def owner_model(self):
2974 # work around the fact that the Job owner field is a string, not a
2975 # foreign key
2976 if not self._owner_model:
2977 self._owner_model = models.User.objects.get(login=self.owner)
2978 return self._owner_model
mbligh36768f02008-02-22 18:28:33 +00002979
mblighe2586682008-02-29 22:45:46 +00002980
jadmanski0afbb632008-06-06 21:10:57 +00002981 def is_server_job(self):
2982 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002983
2984
showard170873e2009-01-07 00:22:26 +00002985 def tag(self):
2986 return "%s-%s" % (self.id, self.owner)
2987
2988
jadmanski0afbb632008-06-06 21:10:57 +00002989 def get_host_queue_entries(self):
2990 rows = _db.execute("""
2991 SELECT * FROM host_queue_entries
2992 WHERE job_id= %s
2993 """, (self.id,))
2994 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002995
jadmanski0afbb632008-06-06 21:10:57 +00002996 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002997
jadmanski0afbb632008-06-06 21:10:57 +00002998 return entries
mbligh36768f02008-02-22 18:28:33 +00002999
3000
jadmanski0afbb632008-06-06 21:10:57 +00003001 def set_status(self, status, update_queues=False):
3002 self.update_field('status',status)
3003
3004 if update_queues:
3005 for queue_entry in self.get_host_queue_entries():
3006 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00003007
3008
showard77182562009-06-10 00:16:05 +00003009 def _atomic_and_has_started(self):
3010 """
3011 @returns True if any of the HostQueueEntries associated with this job
3012 have entered the Status.STARTING state or beyond.
3013 """
3014 atomic_entries = models.HostQueueEntry.objects.filter(
3015 job=self.id, atomic_group__isnull=False)
3016 if atomic_entries.count() <= 0:
3017 return False
3018
showardaf8b4ca2009-06-16 18:47:26 +00003019 # These states may *only* be reached if Job.run() has been called.
3020 started_statuses = (models.HostQueueEntry.Status.STARTING,
3021 models.HostQueueEntry.Status.RUNNING,
3022 models.HostQueueEntry.Status.COMPLETED)
3023
3024 started_entries = atomic_entries.filter(status__in=started_statuses)
showard77182562009-06-10 00:16:05 +00003025 return started_entries.count() > 0
3026
3027
showard708b3522009-08-20 23:26:15 +00003028 def _hosts_assigned_count(self):
3029 """The number of HostQueueEntries assigned a Host for this job."""
3030 entries = models.HostQueueEntry.objects.filter(job=self.id,
3031 host__isnull=False)
3032 return entries.count()
3033
3034
showard77182562009-06-10 00:16:05 +00003035 def _pending_count(self):
3036 """The number of HostQueueEntries for this job in the Pending state."""
3037 pending_entries = models.HostQueueEntry.objects.filter(
3038 job=self.id, status=models.HostQueueEntry.Status.PENDING)
3039 return pending_entries.count()
3040
3041
showardd2014822009-10-12 20:26:58 +00003042 def _pending_threshold(self, atomic_group):
3043 """
3044 @param atomic_group: The AtomicGroup associated with this job that we
3045 are using to bound the threshold.
3046 @returns The minimum number of HostQueueEntries assigned a Host before
3047 this job can run.
3048 """
3049 return min(self._hosts_assigned_count(),
3050 atomic_group.max_number_of_machines)
3051
3052
jadmanski0afbb632008-06-06 21:10:57 +00003053 def is_ready(self):
showard77182562009-06-10 00:16:05 +00003054 # NOTE: Atomic group jobs stop reporting ready after they have been
3055 # started to avoid launching multiple copies of one atomic job.
3056 # Only possible if synch_count is less than than half the number of
3057 # machines in the atomic group.
showardb000a8d2009-07-28 20:02:07 +00003058 pending_count = self._pending_count()
3059 atomic_and_has_started = self._atomic_and_has_started()
3060 ready = (pending_count >= self.synch_count
showardd2014822009-10-12 20:26:58 +00003061 and not atomic_and_has_started)
showardb000a8d2009-07-28 20:02:07 +00003062
3063 if not ready:
3064 logging.info(
3065 'Job %s not ready: %s pending, %s required '
3066 '(Atomic and started: %s)',
3067 self, pending_count, self.synch_count,
3068 atomic_and_has_started)
3069
3070 return ready
mbligh36768f02008-02-22 18:28:33 +00003071
3072
jadmanski0afbb632008-06-06 21:10:57 +00003073 def num_machines(self, clause = None):
3074 sql = "job_id=%s" % self.id
3075 if clause:
3076 sql += " AND (%s)" % clause
3077 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00003078
3079
jadmanski0afbb632008-06-06 21:10:57 +00003080 def num_queued(self):
3081 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00003082
3083
jadmanski0afbb632008-06-06 21:10:57 +00003084 def num_active(self):
3085 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00003086
3087
jadmanski0afbb632008-06-06 21:10:57 +00003088 def num_complete(self):
3089 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00003090
3091
jadmanski0afbb632008-06-06 21:10:57 +00003092 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00003093 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00003094
mbligh36768f02008-02-22 18:28:33 +00003095
showard6bb7c292009-01-30 01:44:51 +00003096 def _not_yet_run_entries(self, include_verifying=True):
3097 statuses = [models.HostQueueEntry.Status.QUEUED,
3098 models.HostQueueEntry.Status.PENDING]
3099 if include_verifying:
3100 statuses.append(models.HostQueueEntry.Status.VERIFYING)
3101 return models.HostQueueEntry.objects.filter(job=self.id,
3102 status__in=statuses)
3103
3104
3105 def _stop_all_entries(self):
3106 entries_to_stop = self._not_yet_run_entries(
3107 include_verifying=False)
3108 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00003109 assert not child_entry.complete, (
3110 '%s status=%s, active=%s, complete=%s' %
3111 (child_entry.id, child_entry.status, child_entry.active,
3112 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00003113 if child_entry.status == models.HostQueueEntry.Status.PENDING:
3114 child_entry.host.status = models.Host.Status.READY
3115 child_entry.host.save()
3116 child_entry.status = models.HostQueueEntry.Status.STOPPED
3117 child_entry.save()
3118
showard2bab8f42008-11-12 18:15:22 +00003119 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00003120 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00003121 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00003122 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00003123
3124
jadmanski0afbb632008-06-06 21:10:57 +00003125 def write_to_machines_file(self, queue_entry):
3126 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00003127 file_path = os.path.join(self.tag(), '.machines')
3128 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00003129
3130
showardf1ae3542009-05-11 19:26:02 +00003131 def _next_group_name(self, group_name=''):
3132 """@returns a directory name to use for the next host group results."""
3133 if group_name:
3134 # Sanitize for use as a pathname.
3135 group_name = group_name.replace(os.path.sep, '_')
3136 if group_name.startswith('.'):
3137 group_name = '_' + group_name[1:]
3138 # Add a separator between the group name and 'group%d'.
3139 group_name += '.'
3140 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00003141 query = models.HostQueueEntry.objects.filter(
3142 job=self.id).values('execution_subdir').distinct()
3143 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00003144 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
3145 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00003146 if ids:
3147 next_id = max(ids) + 1
3148 else:
3149 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00003150 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00003151
3152
showarddb502762009-09-09 15:31:20 +00003153 def _write_control_file(self, execution_path):
showard170873e2009-01-07 00:22:26 +00003154 control_path = _drone_manager.attach_file_to_execution(
showarddb502762009-09-09 15:31:20 +00003155 execution_path, self.control_file)
showard170873e2009-01-07 00:22:26 +00003156 return control_path
mbligh36768f02008-02-22 18:28:33 +00003157
showardb2e2c322008-10-14 17:33:55 +00003158
showard2bab8f42008-11-12 18:15:22 +00003159 def get_group_entries(self, queue_entry_from_group):
showard8375ce02009-10-12 20:35:13 +00003160 """
3161 @param queue_entry_from_group: A HostQueueEntry instance to find other
3162 group entries on this job for.
3163
3164 @returns A list of HostQueueEntry objects all executing this job as
3165 part of the same group as the one supplied (having the same
3166 execution_subdir).
3167 """
showard2bab8f42008-11-12 18:15:22 +00003168 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00003169 return list(HostQueueEntry.fetch(
3170 where='job_id=%s AND execution_subdir=%s',
3171 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00003172
3173
showard8cc058f2009-09-08 16:26:33 +00003174 def get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00003175 assert queue_entries
showarddb502762009-09-09 15:31:20 +00003176 execution_path = queue_entries[0].execution_path()
3177 control_path = self._write_control_file(execution_path)
jadmanski0afbb632008-06-06 21:10:57 +00003178 hostnames = ','.join([entry.get_host().hostname
3179 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00003180
showarddb502762009-09-09 15:31:20 +00003181 execution_tag = queue_entries[0].execution_tag()
showard87ba02a2009-04-20 19:37:32 +00003182 params = _autoserv_command_line(
showarded2afea2009-07-07 20:54:07 +00003183 hostnames,
showard87ba02a2009-04-20 19:37:32 +00003184 ['-P', execution_tag, '-n',
3185 _drone_manager.absolute_path(control_path)],
showarde9c69362009-06-30 01:58:03 +00003186 job=self, verbose=False)
mbligh36768f02008-02-22 18:28:33 +00003187
jadmanski0afbb632008-06-06 21:10:57 +00003188 if not self.is_server_job():
3189 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00003190
showardb2e2c322008-10-14 17:33:55 +00003191 return params
mblighe2586682008-02-29 22:45:46 +00003192
mbligh36768f02008-02-22 18:28:33 +00003193
showardc9ae1782009-01-30 01:42:37 +00003194 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00003195 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00003196 return True
showard0fc38302008-10-23 00:44:07 +00003197 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00003198 return queue_entry.get_host().dirty
3199 return False
showard21baa452008-10-21 00:08:39 +00003200
showardc9ae1782009-01-30 01:42:37 +00003201
showard8cc058f2009-09-08 16:26:33 +00003202 def _should_run_verify(self, queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003203 do_not_verify = (queue_entry.host.protection ==
3204 host_protections.Protection.DO_NOT_VERIFY)
3205 if do_not_verify:
3206 return False
3207 return self.run_verify
3208
3209
showard8cc058f2009-09-08 16:26:33 +00003210 def schedule_pre_job_tasks(self, queue_entry):
showard77182562009-06-10 00:16:05 +00003211 """
3212 Get a list of tasks to perform before the host_queue_entry
3213 may be used to run this Job (such as Cleanup & Verify).
3214
3215 @returns A list of tasks to be done to the given queue_entry before
mbligh6fbdb802009-08-03 16:42:55 +00003216 it should be considered be ready to run this job. The last
showard77182562009-06-10 00:16:05 +00003217 task in the list calls HostQueueEntry.on_pending(), which
3218 continues the flow of the job.
3219 """
showardc9ae1782009-01-30 01:42:37 +00003220 if self._should_run_cleanup(queue_entry):
showard8cc058f2009-09-08 16:26:33 +00003221 task = models.SpecialTask.Task.CLEANUP
3222 elif self._should_run_verify(queue_entry):
3223 task = models.SpecialTask.Task.VERIFY
3224 else:
3225 queue_entry.on_pending()
3226 return
3227
showard9bb960b2009-11-19 01:02:11 +00003228 queue_entry = models.HostQueueEntry.objects.get(id=queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00003229 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00003230 host=models.Host.objects.get(id=queue_entry.host_id),
3231 queue_entry=queue_entry, task=task)
showard21baa452008-10-21 00:08:39 +00003232
3233
showardf1ae3542009-05-11 19:26:02 +00003234 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00003235 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00003236 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00003237 else:
showardf1ae3542009-05-11 19:26:02 +00003238 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00003239 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00003240 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00003241 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003242
3243 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00003244 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003245
3246
3247 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00003248 """
3249 @returns A tuple containing a list of HostQueueEntry instances to be
3250 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00003251 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00003252 """
showard77182562009-06-10 00:16:05 +00003253 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00003254 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00003255 if atomic_group:
3256 num_entries_wanted = atomic_group.max_number_of_machines
3257 else:
3258 num_entries_wanted = self.synch_count
3259 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00003260
showardf1ae3542009-05-11 19:26:02 +00003261 if num_entries_wanted > 0:
3262 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00003263 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00003264 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00003265 params=(self.id, include_queue_entry.id)))
3266
3267 # Sort the chosen hosts by hostname before slicing.
3268 def cmp_queue_entries_by_hostname(entry_a, entry_b):
3269 return Host.cmp_for_sort(entry_a.host, entry_b.host)
3270 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
3271 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00003272
showardf1ae3542009-05-11 19:26:02 +00003273 # Sanity check. We'll only ever be called if this can be met.
showard828fc4c2009-09-14 20:31:00 +00003274 if len(chosen_entries) < self.synch_count:
3275 message = ('job %s got less than %s chosen entries: %s' % (
3276 self.id, self.synch_count, chosen_entries))
3277 logging.error(message)
3278 email_manager.manager.enqueue_notify_email(
3279 'Job not started, too few chosen entries', message)
3280 return []
showardf1ae3542009-05-11 19:26:02 +00003281
showard8cc058f2009-09-08 16:26:33 +00003282 group_name = include_queue_entry.get_group_name()
showardf1ae3542009-05-11 19:26:02 +00003283
3284 self._assign_new_group(chosen_entries, group_name=group_name)
showard8cc058f2009-09-08 16:26:33 +00003285 return chosen_entries
showard2bab8f42008-11-12 18:15:22 +00003286
3287
showard77182562009-06-10 00:16:05 +00003288 def run_if_ready(self, queue_entry):
3289 """
showard8375ce02009-10-12 20:35:13 +00003290 Run this job by kicking its HQEs into status='Starting' if enough
3291 hosts are ready for it to run.
3292
3293 Cleans up by kicking HQEs into status='Stopped' if this Job is not
3294 ready to run.
showard77182562009-06-10 00:16:05 +00003295 """
showardb2e2c322008-10-14 17:33:55 +00003296 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00003297 self.stop_if_necessary()
showard8cc058f2009-09-08 16:26:33 +00003298 elif queue_entry.atomic_group:
3299 self.run_with_ready_delay(queue_entry)
3300 else:
3301 self.run(queue_entry)
showard77182562009-06-10 00:16:05 +00003302
3303
3304 def run_with_ready_delay(self, queue_entry):
3305 """
3306 Start a delay to wait for more hosts to enter Pending state before
3307 launching an atomic group job. Once set, the a delay cannot be reset.
3308
3309 @param queue_entry: The HostQueueEntry object to get atomic group
3310 info from and pass to run_if_ready when the delay is up.
3311
3312 @returns An Agent to run the job as appropriate or None if a delay
3313 has already been set.
3314 """
3315 assert queue_entry.job_id == self.id
3316 assert queue_entry.atomic_group
3317 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
showardd2014822009-10-12 20:26:58 +00003318 over_max_threshold = (self._pending_count() >=
3319 self._pending_threshold(queue_entry.atomic_group))
showard77182562009-06-10 00:16:05 +00003320 delay_expired = (self._delay_ready_task and
3321 time.time() >= self._delay_ready_task.end_time)
3322
3323 # Delay is disabled or we already have enough? Do not wait to run.
3324 if not delay or over_max_threshold or delay_expired:
showard8cc058f2009-09-08 16:26:33 +00003325 self.run(queue_entry)
3326 else:
3327 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
showard77182562009-06-10 00:16:05 +00003328
showard8cc058f2009-09-08 16:26:33 +00003329
3330 def schedule_delayed_callback_task(self, queue_entry):
3331 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
3332
showard77182562009-06-10 00:16:05 +00003333 if self._delay_ready_task:
3334 return None
3335
showard8cc058f2009-09-08 16:26:33 +00003336 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
3337
showard77182562009-06-10 00:16:05 +00003338 def run_job_after_delay():
showardd2014822009-10-12 20:26:58 +00003339 logging.info('Job %s done waiting for extra hosts.', self)
3340 # Check to see if the job is still relevant. It could have aborted
3341 # while we were waiting or hosts could have disappearred, etc.
3342 threshold = self._pending_threshold(queue_entry.atomic_group)
3343 if self._pending_count() < threshold:
3344 logging.info('Job %s had too few Pending hosts after waiting '
3345 'for extras. Not running.', self)
3346 return
showard77182562009-06-10 00:16:05 +00003347 return self.run(queue_entry)
3348
showard708b3522009-08-20 23:26:15 +00003349 logging.info('Job %s waiting up to %s seconds for more hosts.',
3350 self.id, delay)
showard77182562009-06-10 00:16:05 +00003351 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3352 callback=run_job_after_delay)
showard8cc058f2009-09-08 16:26:33 +00003353 return self._delay_ready_task
showard77182562009-06-10 00:16:05 +00003354
3355
3356 def run(self, queue_entry):
3357 """
3358 @param queue_entry: The HostQueueEntry instance calling this method.
showard77182562009-06-10 00:16:05 +00003359 """
3360 if queue_entry.atomic_group and self._atomic_and_has_started():
3361 logging.error('Job.run() called on running atomic Job %d '
3362 'with HQE %s.', self.id, queue_entry)
showard8cc058f2009-09-08 16:26:33 +00003363 return
3364 queue_entries = self._choose_group_to_run(queue_entry)
showard828fc4c2009-09-14 20:31:00 +00003365 if queue_entries:
3366 self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00003367
3368
showard8cc058f2009-09-08 16:26:33 +00003369 def _finish_run(self, queue_entries):
showardb2ccdda2008-10-28 20:39:05 +00003370 for queue_entry in queue_entries:
showard8cc058f2009-09-08 16:26:33 +00003371 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showardd2014822009-10-12 20:26:58 +00003372 self.abort_delay_ready_task()
3373
3374
3375 def abort_delay_ready_task(self):
3376 """Abort the delayed task associated with this job, if any."""
showard77182562009-06-10 00:16:05 +00003377 if self._delay_ready_task:
3378 # Cancel any pending callback that would try to run again
3379 # as we are already running.
3380 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003381
showardd2014822009-10-12 20:26:58 +00003382
showardb000a8d2009-07-28 20:02:07 +00003383 def __str__(self):
3384 return '%s-%s' % (self.id, self.owner)
3385
3386
mbligh36768f02008-02-22 18:28:33 +00003387if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003388 main()