blob: ca71582ae372977e7cfc21e18063bca846c21ae0 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showard136e6dc2009-06-10 19:38:49 +000010import itertools, logging, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard043c62a2009-06-10 19:48:57 +000013from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard136e6dc2009-06-10 19:38:49 +000015from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000016from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000017from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000018from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000019from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000020from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000021from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000022
showard549afad2009-08-20 23:33:36 +000023BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
24PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000025
mbligh36768f02008-02-22 18:28:33 +000026RESULTS_DIR = '.'
27AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000028DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000029AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
30
31if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000032 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000033AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
34AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
35
36if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000037 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000038
showardd3dc1992009-04-22 21:01:40 +000039_AUTOSERV_PID_FILE = '.autoserv_execute'
40_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
41_PARSER_PID_FILE = '.parser_execute'
42
43_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
44 _PARSER_PID_FILE)
45
showard35162b02009-03-03 02:17:30 +000046# error message to leave in results dir when an autoserv process disappears
47# mysteriously
48_LOST_PROCESS_ERROR = """\
49Autoserv failed abnormally during execution for this job, probably due to a
50system error on the Autotest server. Full results may not be available. Sorry.
51"""
52
mbligh6f8bab42008-02-29 22:45:14 +000053_db = None
mbligh36768f02008-02-22 18:28:33 +000054_shutdown = False
showard170873e2009-01-07 00:22:26 +000055_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
56_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000057_testing_mode = False
showard542e8402008-09-19 20:16:18 +000058_base_url = None
showardc85c21b2008-11-24 22:17:37 +000059_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000060_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000061
62
showardec6a3b92009-09-25 20:29:13 +000063def _get_pidfile_timeout_secs():
64 """@returns How long to wait for autoserv to write pidfile."""
65 pidfile_timeout_mins = global_config.global_config.get_config_value(
66 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
67 return pidfile_timeout_mins * 60
68
69
mbligh83c1e9e2009-05-01 23:10:41 +000070def _site_init_monitor_db_dummy():
71 return {}
72
73
mbligh36768f02008-02-22 18:28:33 +000074def main():
showard27f33872009-04-07 18:20:53 +000075 try:
showard549afad2009-08-20 23:33:36 +000076 try:
77 main_without_exception_handling()
78 except SystemExit:
79 raise
80 except:
81 logging.exception('Exception escaping in monitor_db')
82 raise
83 finally:
84 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000085
86
87def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000088 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000089
showard136e6dc2009-06-10 19:38:49 +000090 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000091 parser = optparse.OptionParser(usage)
92 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
93 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000094 parser.add_option('--test', help='Indicate that scheduler is under ' +
95 'test and should use dummy autoserv and no parsing',
96 action='store_true')
97 (options, args) = parser.parse_args()
98 if len(args) != 1:
99 parser.print_usage()
100 return
mbligh36768f02008-02-22 18:28:33 +0000101
showard5613c662009-06-08 23:30:33 +0000102 scheduler_enabled = global_config.global_config.get_config_value(
103 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
104
105 if not scheduler_enabled:
106 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
107 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000108 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000109 sys.exit(1)
110
jadmanski0afbb632008-06-06 21:10:57 +0000111 global RESULTS_DIR
112 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000113
mbligh83c1e9e2009-05-01 23:10:41 +0000114 site_init = utils.import_site_function(__file__,
115 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
116 _site_init_monitor_db_dummy)
117 site_init()
118
showardcca334f2009-03-12 20:38:34 +0000119 # Change the cwd while running to avoid issues incase we were launched from
120 # somewhere odd (such as a random NFS home directory of the person running
121 # sudo to launch us as the appropriate user).
122 os.chdir(RESULTS_DIR)
123
jadmanski0afbb632008-06-06 21:10:57 +0000124 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000125 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
126 "notify_email_statuses",
127 default='')
showardc85c21b2008-11-24 22:17:37 +0000128 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000129 _notify_email_statuses = [status for status in
130 re.split(r'[\s,;:]', notify_statuses_list.lower())
131 if status]
showardc85c21b2008-11-24 22:17:37 +0000132
jadmanski0afbb632008-06-06 21:10:57 +0000133 if options.test:
134 global _autoserv_path
135 _autoserv_path = 'autoserv_dummy'
136 global _testing_mode
137 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000138
mbligh37eceaa2008-12-15 22:56:37 +0000139 # AUTOTEST_WEB.base_url is still a supported config option as some people
140 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000141 global _base_url
showard170873e2009-01-07 00:22:26 +0000142 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
143 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000144 if config_base_url:
145 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000146 else:
mbligh37eceaa2008-12-15 22:56:37 +0000147 # For the common case of everything running on a single server you
148 # can just set the hostname in a single place in the config file.
149 server_name = c.get_config_value('SERVER', 'hostname')
150 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000151 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000152 sys.exit(1)
153 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000154
showardc5afc462009-01-13 00:09:39 +0000155 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000156 server.start()
157
jadmanski0afbb632008-06-06 21:10:57 +0000158 try:
showard136e6dc2009-06-10 19:38:49 +0000159 init()
showardc5afc462009-01-13 00:09:39 +0000160 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000161 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000162
jadmanski0afbb632008-06-06 21:10:57 +0000163 while not _shutdown:
164 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000165 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000166 except:
showard170873e2009-01-07 00:22:26 +0000167 email_manager.manager.log_stacktrace(
168 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000169
showard170873e2009-01-07 00:22:26 +0000170 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000171 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000172 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000173 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000174
175
showard136e6dc2009-06-10 19:38:49 +0000176def setup_logging():
177 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
178 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
179 logging_manager.configure_logging(
180 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
181 logfile_name=log_name)
182
183
mbligh36768f02008-02-22 18:28:33 +0000184def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000185 global _shutdown
186 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000187 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000188
189
showard136e6dc2009-06-10 19:38:49 +0000190def init():
showardb18134f2009-03-20 20:52:18 +0000191 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
192 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000193
showard8de37132009-08-31 18:33:08 +0000194 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000195 logging.critical("monitor_db already running, aborting!")
196 sys.exit(1)
197 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000198
showardb1e51872008-10-07 11:08:18 +0000199 if _testing_mode:
200 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000201 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000202
jadmanski0afbb632008-06-06 21:10:57 +0000203 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
204 global _db
showard170873e2009-01-07 00:22:26 +0000205 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000206 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000207
showardfa8629c2008-11-04 16:51:23 +0000208 # ensure Django connection is in autocommit
209 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000210 # bypass the readonly connection
211 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000212
showardb18134f2009-03-20 20:52:18 +0000213 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000214 signal.signal(signal.SIGINT, handle_sigint)
215
showardd1ee1dd2009-01-07 21:33:08 +0000216 drones = global_config.global_config.get_config_value(
217 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
218 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000219 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000220 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000221 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
222
showardb18134f2009-03-20 20:52:18 +0000223 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000224
225
showarded2afea2009-07-07 20:54:07 +0000226def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
227 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000228 """
229 @returns The autoserv command line as a list of executable + parameters.
230
231 @param machines - string - A machine or comma separated list of machines
232 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000233 @param extra_args - list - Additional arguments to pass to autoserv.
234 @param job - Job object - If supplied, -u owner and -l name parameters
235 will be added.
236 @param queue_entry - A HostQueueEntry object - If supplied and no Job
237 object was supplied, this will be used to lookup the Job object.
238 """
showard87ba02a2009-04-20 19:37:32 +0000239 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
showarded2afea2009-07-07 20:54:07 +0000240 '-r', drone_manager.WORKING_DIRECTORY]
showard87ba02a2009-04-20 19:37:32 +0000241 if job or queue_entry:
242 if not job:
243 job = queue_entry.job
244 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000245 if verbose:
246 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000247 return autoserv_argv + extra_args
248
249
showard89f84db2009-03-12 20:39:13 +0000250class SchedulerError(Exception):
251 """Raised by HostScheduler when an inconsistent state occurs."""
252
253
showard63a34772008-08-18 19:32:50 +0000254class HostScheduler(object):
255 def _get_ready_hosts(self):
256 # avoid any host with a currently active queue entry against it
257 hosts = Host.fetch(
258 joins='LEFT JOIN host_queue_entries AS active_hqe '
259 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000260 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000261 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000262 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000263 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
264 return dict((host.id, host) for host in hosts)
265
266
267 @staticmethod
268 def _get_sql_id_list(id_list):
269 return ','.join(str(item_id) for item_id in id_list)
270
271
272 @classmethod
showard989f25d2008-10-01 11:38:11 +0000273 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000274 if not id_list:
275 return {}
showard63a34772008-08-18 19:32:50 +0000276 query %= cls._get_sql_id_list(id_list)
277 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000278 return cls._process_many2many_dict(rows, flip)
279
280
281 @staticmethod
282 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000283 result = {}
284 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000285 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000286 if flip:
287 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000288 result.setdefault(left_id, set()).add(right_id)
289 return result
290
291
292 @classmethod
293 def _get_job_acl_groups(cls, job_ids):
294 query = """
showardd9ac4452009-02-07 02:04:37 +0000295 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000296 FROM jobs
297 INNER JOIN users ON users.login = jobs.owner
298 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
299 WHERE jobs.id IN (%s)
300 """
301 return cls._get_many2many_dict(query, job_ids)
302
303
304 @classmethod
305 def _get_job_ineligible_hosts(cls, job_ids):
306 query = """
307 SELECT job_id, host_id
308 FROM ineligible_host_queues
309 WHERE job_id IN (%s)
310 """
311 return cls._get_many2many_dict(query, job_ids)
312
313
314 @classmethod
showard989f25d2008-10-01 11:38:11 +0000315 def _get_job_dependencies(cls, job_ids):
316 query = """
317 SELECT job_id, label_id
318 FROM jobs_dependency_labels
319 WHERE job_id IN (%s)
320 """
321 return cls._get_many2many_dict(query, job_ids)
322
323
324 @classmethod
showard63a34772008-08-18 19:32:50 +0000325 def _get_host_acls(cls, host_ids):
326 query = """
showardd9ac4452009-02-07 02:04:37 +0000327 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000328 FROM acl_groups_hosts
329 WHERE host_id IN (%s)
330 """
331 return cls._get_many2many_dict(query, host_ids)
332
333
334 @classmethod
335 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000336 if not host_ids:
337 return {}, {}
showard63a34772008-08-18 19:32:50 +0000338 query = """
339 SELECT label_id, host_id
340 FROM hosts_labels
341 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000342 """ % cls._get_sql_id_list(host_ids)
343 rows = _db.execute(query)
344 labels_to_hosts = cls._process_many2many_dict(rows)
345 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
346 return labels_to_hosts, hosts_to_labels
347
348
349 @classmethod
350 def _get_labels(cls):
351 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000352
353
354 def refresh(self, pending_queue_entries):
355 self._hosts_available = self._get_ready_hosts()
356
357 relevant_jobs = [queue_entry.job_id
358 for queue_entry in pending_queue_entries]
359 self._job_acls = self._get_job_acl_groups(relevant_jobs)
360 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000361 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000362
363 host_ids = self._hosts_available.keys()
364 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000365 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
366
367 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000368
369
370 def _is_acl_accessible(self, host_id, queue_entry):
371 job_acls = self._job_acls.get(queue_entry.job_id, set())
372 host_acls = self._host_acls.get(host_id, set())
373 return len(host_acls.intersection(job_acls)) > 0
374
375
showard989f25d2008-10-01 11:38:11 +0000376 def _check_job_dependencies(self, job_dependencies, host_labels):
377 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000378 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000379
380
381 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
382 queue_entry):
showardade14e22009-01-26 22:38:32 +0000383 if not queue_entry.meta_host:
384 # bypass only_if_needed labels when a specific host is selected
385 return True
386
showard989f25d2008-10-01 11:38:11 +0000387 for label_id in host_labels:
388 label = self._labels[label_id]
389 if not label.only_if_needed:
390 # we don't care about non-only_if_needed labels
391 continue
392 if queue_entry.meta_host == label_id:
393 # if the label was requested in a metahost it's OK
394 continue
395 if label_id not in job_dependencies:
396 return False
397 return True
398
399
showard89f84db2009-03-12 20:39:13 +0000400 def _check_atomic_group_labels(self, host_labels, queue_entry):
401 """
402 Determine if the given HostQueueEntry's atomic group settings are okay
403 to schedule on a host with the given labels.
404
showard6157c632009-07-06 20:19:31 +0000405 @param host_labels: A list of label ids that the host has.
406 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000407
408 @returns True if atomic group settings are okay, False otherwise.
409 """
showard6157c632009-07-06 20:19:31 +0000410 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000411 queue_entry.atomic_group_id)
412
413
showard6157c632009-07-06 20:19:31 +0000414 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000415 """
416 Return the atomic group label id for a host with the given set of
417 labels if any, or None otherwise. Raises an exception if more than
418 one atomic group are found in the set of labels.
419
showard6157c632009-07-06 20:19:31 +0000420 @param host_labels: A list of label ids that the host has.
421 @param queue_entry: The HostQueueEntry we're testing. Only used for
422 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000423
424 @returns The id of the atomic group found on a label in host_labels
425 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000426 """
showard6157c632009-07-06 20:19:31 +0000427 atomic_labels = [self._labels[label_id] for label_id in host_labels
428 if self._labels[label_id].atomic_group_id is not None]
429 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000430 if not atomic_ids:
431 return None
432 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000433 logging.error('More than one Atomic Group on HQE "%s" via: %r',
434 queue_entry, atomic_labels)
435 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000436
437
438 def _get_atomic_group_labels(self, atomic_group_id):
439 """
440 Lookup the label ids that an atomic_group is associated with.
441
442 @param atomic_group_id - The id of the AtomicGroup to look up.
443
444 @returns A generator yeilding Label ids for this atomic group.
445 """
446 return (id for id, label in self._labels.iteritems()
447 if label.atomic_group_id == atomic_group_id
448 and not label.invalid)
449
450
showard54c1ea92009-05-20 00:32:58 +0000451 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000452 """
453 @param group_hosts - A sequence of Host ids to test for usability
454 and eligibility against the Job associated with queue_entry.
455 @param queue_entry - The HostQueueEntry that these hosts are being
456 tested for eligibility against.
457
458 @returns A subset of group_hosts Host ids that are eligible for the
459 supplied queue_entry.
460 """
461 return set(host_id for host_id in group_hosts
462 if self._is_host_usable(host_id)
463 and self._is_host_eligible_for_job(host_id, queue_entry))
464
465
showard989f25d2008-10-01 11:38:11 +0000466 def _is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000467 if self._is_host_invalid(host_id):
468 # if an invalid host is scheduled for a job, it's a one-time host
469 # and it therefore bypasses eligibility checks. note this can only
470 # happen for non-metahosts, because invalid hosts have their label
471 # relationships cleared.
472 return True
473
showard989f25d2008-10-01 11:38:11 +0000474 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
475 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000476
showard89f84db2009-03-12 20:39:13 +0000477 return (self._is_acl_accessible(host_id, queue_entry) and
478 self._check_job_dependencies(job_dependencies, host_labels) and
479 self._check_only_if_needed_labels(
480 job_dependencies, host_labels, queue_entry) and
481 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000482
483
showard2924b0a2009-06-18 23:16:15 +0000484 def _is_host_invalid(self, host_id):
485 host_object = self._hosts_available.get(host_id, None)
486 return host_object and host_object.invalid
487
488
showard63a34772008-08-18 19:32:50 +0000489 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000490 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000491 return None
492 return self._hosts_available.pop(queue_entry.host_id, None)
493
494
495 def _is_host_usable(self, host_id):
496 if host_id not in self._hosts_available:
497 # host was already used during this scheduling cycle
498 return False
499 if self._hosts_available[host_id].invalid:
500 # Invalid hosts cannot be used for metahosts. They're included in
501 # the original query because they can be used by non-metahosts.
502 return False
503 return True
504
505
506 def _schedule_metahost(self, queue_entry):
507 label_id = queue_entry.meta_host
508 hosts_in_label = self._label_hosts.get(label_id, set())
509 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
510 set())
511
512 # must iterate over a copy so we can mutate the original while iterating
513 for host_id in list(hosts_in_label):
514 if not self._is_host_usable(host_id):
515 hosts_in_label.remove(host_id)
516 continue
517 if host_id in ineligible_host_ids:
518 continue
showard989f25d2008-10-01 11:38:11 +0000519 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000520 continue
521
showard89f84db2009-03-12 20:39:13 +0000522 # Remove the host from our cached internal state before returning
523 # the host object.
showard63a34772008-08-18 19:32:50 +0000524 hosts_in_label.remove(host_id)
525 return self._hosts_available.pop(host_id)
526 return None
527
528
529 def find_eligible_host(self, queue_entry):
530 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000531 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000532 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000533 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000534 return self._schedule_metahost(queue_entry)
535
536
showard89f84db2009-03-12 20:39:13 +0000537 def find_eligible_atomic_group(self, queue_entry):
538 """
539 Given an atomic group host queue entry, locate an appropriate group
540 of hosts for the associated job to run on.
541
542 The caller is responsible for creating new HQEs for the additional
543 hosts returned in order to run the actual job on them.
544
545 @returns A list of Host instances in a ready state to satisfy this
546 atomic group scheduling. Hosts will all belong to the same
547 atomic group label as specified by the queue_entry.
548 An empty list will be returned if no suitable atomic
549 group could be found.
550
551 TODO(gps): what is responsible for kicking off any attempted repairs on
552 a group of hosts? not this function, but something needs to. We do
553 not communicate that reason for returning [] outside of here...
554 For now, we'll just be unschedulable if enough hosts within one group
555 enter Repair Failed state.
556 """
557 assert queue_entry.atomic_group_id is not None
558 job = queue_entry.job
559 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000560 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000561 if job.synch_count > atomic_group.max_number_of_machines:
562 # Such a Job and HostQueueEntry should never be possible to
563 # create using the frontend. Regardless, we can't process it.
564 # Abort it immediately and log an error on the scheduler.
565 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000566 logging.error(
567 'Error: job %d synch_count=%d > requested atomic_group %d '
568 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
569 job.id, job.synch_count, atomic_group.id,
570 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000571 return []
572 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
573 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
574 set())
575
576 # Look in each label associated with atomic_group until we find one with
577 # enough hosts to satisfy the job.
578 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
579 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
580 if queue_entry.meta_host is not None:
581 # If we have a metahost label, only allow its hosts.
582 group_hosts.intersection_update(hosts_in_label)
583 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000584 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000585 group_hosts, queue_entry)
586
587 # Job.synch_count is treated as "minimum synch count" when
588 # scheduling for an atomic group of hosts. The atomic group
589 # number of machines is the maximum to pick out of a single
590 # atomic group label for scheduling at one time.
591 min_hosts = job.synch_count
592 max_hosts = atomic_group.max_number_of_machines
593
showard54c1ea92009-05-20 00:32:58 +0000594 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000595 # Not enough eligible hosts in this atomic group label.
596 continue
597
showard54c1ea92009-05-20 00:32:58 +0000598 eligible_hosts_in_group = [self._hosts_available[id]
599 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000600 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000601 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000602
showard89f84db2009-03-12 20:39:13 +0000603 # Limit ourselves to scheduling the atomic group size.
604 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000605 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000606
607 # Remove the selected hosts from our cached internal state
608 # of available hosts in order to return the Host objects.
609 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000610 for host in eligible_hosts_in_group:
611 hosts_in_label.discard(host.id)
612 self._hosts_available.pop(host.id)
613 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000614 return host_list
615
616 return []
617
618
showard170873e2009-01-07 00:22:26 +0000619class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000620 def __init__(self):
621 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000622 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000623 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000624 user_cleanup_time = scheduler_config.config.clean_interval
625 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
626 _db, user_cleanup_time)
627 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000628 self._host_agents = {}
629 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000630
mbligh36768f02008-02-22 18:28:33 +0000631
showard915958d2009-04-22 21:00:58 +0000632 def initialize(self, recover_hosts=True):
633 self._periodic_cleanup.initialize()
634 self._24hr_upkeep.initialize()
635
jadmanski0afbb632008-06-06 21:10:57 +0000636 # always recover processes
637 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000638
jadmanski0afbb632008-06-06 21:10:57 +0000639 if recover_hosts:
640 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000641
642
jadmanski0afbb632008-06-06 21:10:57 +0000643 def tick(self):
showard170873e2009-01-07 00:22:26 +0000644 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000645 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000646 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000647 self._process_recurring_runs()
showard8cc058f2009-09-08 16:26:33 +0000648 self._schedule_delay_tasks()
jadmanski0afbb632008-06-06 21:10:57 +0000649 self._schedule_new_jobs()
showard8cc058f2009-09-08 16:26:33 +0000650 self._schedule_running_host_queue_entries()
651 self._schedule_special_tasks()
jadmanski0afbb632008-06-06 21:10:57 +0000652 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000653 _drone_manager.execute_actions()
654 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000655
showard97aed502008-11-04 02:01:24 +0000656
mblighf3294cc2009-04-08 21:17:38 +0000657 def _run_cleanup(self):
658 self._periodic_cleanup.run_cleanup_maybe()
659 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000660
mbligh36768f02008-02-22 18:28:33 +0000661
showard170873e2009-01-07 00:22:26 +0000662 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
663 for object_id in object_ids:
664 agent_dict.setdefault(object_id, set()).add(agent)
665
666
667 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
668 for object_id in object_ids:
669 assert object_id in agent_dict
670 agent_dict[object_id].remove(agent)
671
672
jadmanski0afbb632008-06-06 21:10:57 +0000673 def add_agent(self, agent):
674 self._agents.append(agent)
675 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000676 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
677 self._register_agent_for_ids(self._queue_entry_agents,
678 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000679
showard170873e2009-01-07 00:22:26 +0000680
681 def get_agents_for_entry(self, queue_entry):
682 """
683 Find agents corresponding to the specified queue_entry.
684 """
showardd3dc1992009-04-22 21:01:40 +0000685 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000686
687
688 def host_has_agent(self, host):
689 """
690 Determine if there is currently an Agent present using this host.
691 """
692 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000693
694
jadmanski0afbb632008-06-06 21:10:57 +0000695 def remove_agent(self, agent):
696 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000697 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
698 agent)
699 self._unregister_agent_for_ids(self._queue_entry_agents,
700 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000701
702
showard8cc058f2009-09-08 16:26:33 +0000703 def _host_has_scheduled_special_task(self, host):
704 return bool(models.SpecialTask.objects.filter(host__id=host.id,
705 is_active=False,
706 is_complete=False))
707
708
jadmanski0afbb632008-06-06 21:10:57 +0000709 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000710 self._register_pidfiles()
711 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000712 self._recover_all_recoverable_entries()
showard8cc058f2009-09-08 16:26:33 +0000713 self._recover_pending_entries()
showard6878e8b2009-07-20 22:37:45 +0000714 self._check_for_remaining_active_entries()
showard170873e2009-01-07 00:22:26 +0000715 self._reverify_remaining_hosts()
716 # reinitialize drones after killing orphaned processes, since they can
717 # leave around files when they die
718 _drone_manager.execute_actions()
719 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000720
showard170873e2009-01-07 00:22:26 +0000721
722 def _register_pidfiles(self):
723 # during recovery we may need to read pidfiles for both running and
724 # parsing entries
725 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000726 where="status IN ('Running', 'Gathering', 'Parsing')")
showarded2afea2009-07-07 20:54:07 +0000727 special_tasks = models.SpecialTask.objects.filter(is_active=True)
728 for execution_entry in itertools.chain(queue_entries, special_tasks):
showardd3dc1992009-04-22 21:01:40 +0000729 for pidfile_name in _ALL_PIDFILE_NAMES:
730 pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000731 execution_entry.execution_path(), pidfile_name=pidfile_name)
showardd3dc1992009-04-22 21:01:40 +0000732 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000733
734
showarded2afea2009-07-07 20:54:07 +0000735 def _get_recovery_run_monitor(self, execution_path, pidfile_name, orphans):
736 run_monitor = PidfileRunMonitor()
737 run_monitor.attach_to_existing_process(execution_path,
738 pidfile_name=pidfile_name)
739 if run_monitor.has_process():
740 orphans.discard(run_monitor.get_process())
741 return run_monitor, '(process %s)' % run_monitor.get_process()
742 return None, 'without process'
743
744
showard8cc058f2009-09-08 16:26:33 +0000745 def _get_unassigned_entries(self, status):
746 for entry in HostQueueEntry.fetch(where="status = '%s'" % status):
747 if not self.get_agents_for_entry(entry):
748 yield entry
749
750
showardd3dc1992009-04-22 21:01:40 +0000751 def _recover_entries_with_status(self, status, orphans, pidfile_name,
752 recover_entries_fn):
showard8cc058f2009-09-08 16:26:33 +0000753 for queue_entry in self._get_unassigned_entries(status):
showardd3dc1992009-04-22 21:01:40 +0000754 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showarded2afea2009-07-07 20:54:07 +0000755 run_monitor, process_string = self._get_recovery_run_monitor(
756 queue_entry.execution_path(), pidfile_name, orphans)
showard8cc058f2009-09-08 16:26:33 +0000757 if not run_monitor:
758 # _schedule_running_host_queue_entries should schedule and
759 # recover these entries
760 continue
showard597bfd32009-05-08 18:22:50 +0000761
showarded2afea2009-07-07 20:54:07 +0000762 logging.info('Recovering %s entry %s %s',status.lower(),
763 ', '.join(str(entry) for entry in queue_entries),
764 process_string)
showardd3dc1992009-04-22 21:01:40 +0000765 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000766
767
showard6878e8b2009-07-20 22:37:45 +0000768 def _check_for_remaining_orphan_processes(self, orphans):
769 if not orphans:
770 return
771 subject = 'Unrecovered orphan autoserv processes remain'
772 message = '\n'.join(str(process) for process in orphans)
773 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000774
775 die_on_orphans = global_config.global_config.get_config_value(
776 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
777
778 if die_on_orphans:
779 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000780
showard170873e2009-01-07 00:22:26 +0000781
showardd3dc1992009-04-22 21:01:40 +0000782 def _recover_running_entries(self, orphans):
783 def recover_entries(job, queue_entries, run_monitor):
showard8cc058f2009-09-08 16:26:33 +0000784 queue_task = QueueTask(job=job, queue_entries=queue_entries,
785 recover_run_monitor=run_monitor)
786 self.add_agent(Agent(task=queue_task,
787 num_processes=len(queue_entries)))
showardd3dc1992009-04-22 21:01:40 +0000788
789 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
showarded2afea2009-07-07 20:54:07 +0000790 orphans, _AUTOSERV_PID_FILE,
showardd3dc1992009-04-22 21:01:40 +0000791 recover_entries)
792
793
794 def _recover_gathering_entries(self, orphans):
795 def recover_entries(job, queue_entries, run_monitor):
796 gather_task = GatherLogsTask(job, queue_entries,
showarded2afea2009-07-07 20:54:07 +0000797 recover_run_monitor=run_monitor)
showard8cc058f2009-09-08 16:26:33 +0000798 self.add_agent(Agent(gather_task))
showardd3dc1992009-04-22 21:01:40 +0000799
800 self._recover_entries_with_status(
801 models.HostQueueEntry.Status.GATHERING,
802 orphans, _CRASHINFO_PID_FILE, recover_entries)
803
804
805 def _recover_parsing_entries(self, orphans):
806 def recover_entries(job, queue_entries, run_monitor):
807 reparse_task = FinalReparseTask(queue_entries,
showarded2afea2009-07-07 20:54:07 +0000808 recover_run_monitor=run_monitor)
showard8cc058f2009-09-08 16:26:33 +0000809 self.add_agent(Agent(reparse_task, num_processes=0))
showardd3dc1992009-04-22 21:01:40 +0000810
811 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
812 orphans, _PARSER_PID_FILE,
813 recover_entries)
814
815
showard8cc058f2009-09-08 16:26:33 +0000816 def _recover_pending_entries(self):
817 for entry in self._get_unassigned_entries(
818 models.HostQueueEntry.Status.PENDING):
819 entry.on_pending()
820
821
showardd3dc1992009-04-22 21:01:40 +0000822 def _recover_all_recoverable_entries(self):
823 orphans = _drone_manager.get_orphaned_autoserv_processes()
824 self._recover_running_entries(orphans)
825 self._recover_gathering_entries(orphans)
826 self._recover_parsing_entries(orphans)
showarded2afea2009-07-07 20:54:07 +0000827 self._recover_special_tasks(orphans)
showard6878e8b2009-07-20 22:37:45 +0000828 self._check_for_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000829
showard97aed502008-11-04 02:01:24 +0000830
showarded2afea2009-07-07 20:54:07 +0000831 def _recover_special_tasks(self, orphans):
showard2fe3f1d2009-07-06 20:19:11 +0000832 """\
833 Recovers all special tasks that have started running but have not
834 completed.
835 """
836
837 tasks = models.SpecialTask.objects.filter(is_active=True,
838 is_complete=False)
839 # Use ordering to force NULL queue_entry_id's to the end of the list
showarda5288b42009-07-28 20:06:08 +0000840 for task in tasks.order_by('-queue_entry__id'):
showard9b6ec502009-08-20 23:25:17 +0000841 if self.host_has_agent(task.host):
842 raise SchedulerError(
843 "%s already has a host agent %s." % (
showard8cc058f2009-09-08 16:26:33 +0000844 task, self._host_agents.get(task.host.id)))
showard2fe3f1d2009-07-06 20:19:11 +0000845
showarded2afea2009-07-07 20:54:07 +0000846 run_monitor, process_string = self._get_recovery_run_monitor(
847 task.execution_path(), _AUTOSERV_PID_FILE, orphans)
848
849 logging.info('Recovering %s %s', task, process_string)
showard8cc058f2009-09-08 16:26:33 +0000850 self._recover_special_task(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000851
852
showard8cc058f2009-09-08 16:26:33 +0000853 def _recover_special_task(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000854 """\
855 Recovers a single special task.
856 """
857 if task.task == models.SpecialTask.Task.VERIFY:
showard8cc058f2009-09-08 16:26:33 +0000858 agent_task = self._recover_verify(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000859 elif task.task == models.SpecialTask.Task.REPAIR:
showard8cc058f2009-09-08 16:26:33 +0000860 agent_task = self._recover_repair(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000861 elif task.task == models.SpecialTask.Task.CLEANUP:
showard8cc058f2009-09-08 16:26:33 +0000862 agent_task = self._recover_cleanup(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000863 else:
864 # Should never happen
865 logging.error(
866 "Special task id %d had invalid task %s", (task.id, task.task))
867
showard8cc058f2009-09-08 16:26:33 +0000868 self.add_agent(Agent(agent_task))
showard2fe3f1d2009-07-06 20:19:11 +0000869
870
showard8cc058f2009-09-08 16:26:33 +0000871 def _recover_verify(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000872 """\
873 Recovers a verify task.
874 No associated queue entry: Verify host
875 With associated queue entry: Verify host, and run associated queue
876 entry
877 """
showard8cc058f2009-09-08 16:26:33 +0000878 return VerifyTask(task=task, recover_run_monitor=run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000879
880
showard8cc058f2009-09-08 16:26:33 +0000881 def _recover_repair(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000882 """\
883 Recovers a repair task.
884 Always repair host
885 """
showard8cc058f2009-09-08 16:26:33 +0000886 return RepairTask(task=task, recover_run_monitor=run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000887
888
showard8cc058f2009-09-08 16:26:33 +0000889 def _recover_cleanup(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000890 """\
891 Recovers a cleanup task.
892 No associated queue entry: Clean host
893 With associated queue entry: Clean host, verify host if needed, and
894 run associated queue entry
895 """
showard8cc058f2009-09-08 16:26:33 +0000896 return CleanupTask(task=task, recover_run_monitor=run_monitor)
showard6af73ad2009-07-28 20:00:58 +0000897
898
showard6878e8b2009-07-20 22:37:45 +0000899 def _check_for_remaining_active_entries(self):
showard170873e2009-01-07 00:22:26 +0000900 queue_entries = HostQueueEntry.fetch(
showard8cc058f2009-09-08 16:26:33 +0000901 where='active AND NOT complete AND status NOT IN '
902 '("Starting", "Gathering", "Pending")')
showardd3dc1992009-04-22 21:01:40 +0000903
showarde8e37072009-08-20 23:31:30 +0000904 unrecovered_active_hqes = [entry for entry in queue_entries
showard8cc058f2009-09-08 16:26:33 +0000905 if not self.get_agents_for_entry(entry) and
906 not self._host_has_scheduled_special_task(
907 entry.host)]
showarde8e37072009-08-20 23:31:30 +0000908 if unrecovered_active_hqes:
909 message = '\n'.join(str(hqe) for hqe in unrecovered_active_hqes)
910 raise SchedulerError(
911 '%d unrecovered active host queue entries:\n%s' %
912 (len(unrecovered_active_hqes), message))
showard170873e2009-01-07 00:22:26 +0000913
914
showard8cc058f2009-09-08 16:26:33 +0000915 def _schedule_special_tasks(self):
916 tasks = models.SpecialTask.objects.filter(is_active=False,
917 is_complete=False,
918 host__locked=False)
919 # We want lower ids to come first, but the NULL queue_entry_ids need to
920 # come last
921 tasks = tasks.extra(select={'isnull' : 'queue_entry_id IS NULL'})
922 tasks = tasks.extra(order_by=['isnull', 'id'])
showard6d7b2ff2009-06-10 00:16:47 +0000923
showard2fe3f1d2009-07-06 20:19:11 +0000924 for task in tasks:
showard8cc058f2009-09-08 16:26:33 +0000925 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000926 continue
showard6d7b2ff2009-06-10 00:16:47 +0000927
showard8cc058f2009-09-08 16:26:33 +0000928 if task.task == models.SpecialTask.Task.CLEANUP:
929 agent_task = CleanupTask(task=task)
930 elif task.task == models.SpecialTask.Task.VERIFY:
931 agent_task = VerifyTask(task=task)
932 elif task.task == models.SpecialTask.Task.REPAIR:
933 agent_task = RepairTask(task=task)
934 else:
935 email_manager.manager.enqueue_notify_email(
936 'Special task with invalid task', task)
937 continue
938
939 self.add_agent(Agent(agent_task))
showard1ff7b2e2009-05-15 23:17:18 +0000940
941
showard170873e2009-01-07 00:22:26 +0000942 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000943 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000944 # should never happen
showarded2afea2009-07-07 20:54:07 +0000945 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000946 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000947 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000948 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000949 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000950
951
jadmanski0afbb632008-06-06 21:10:57 +0000952 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000953 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +0000954 full_where='locked = 0 AND invalid = 0 AND ' + where
955 for host in Host.fetch(where=full_where):
956 if self.host_has_agent(host):
957 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000958 continue
showard8cc058f2009-09-08 16:26:33 +0000959 if self._host_has_scheduled_special_task(host):
960 # host will have a special task scheduled on the next cycle
961 continue
showard170873e2009-01-07 00:22:26 +0000962 if print_message:
showardb18134f2009-03-20 20:52:18 +0000963 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000964 models.SpecialTask.objects.create(
965 task=models.SpecialTask.Task.CLEANUP,
966 host=models.Host(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000967
968
jadmanski0afbb632008-06-06 21:10:57 +0000969 def _recover_hosts(self):
970 # recover "Repair Failed" hosts
971 message = 'Reverifying dead host %s'
972 self._reverify_hosts_where("status = 'Repair Failed'",
973 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000974
975
showard04c82c52008-05-29 19:38:12 +0000976
showardb95b1bd2008-08-15 18:11:04 +0000977 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000978 # prioritize by job priority, then non-metahost over metahost, then FIFO
979 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000980 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000981 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000982 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000983
984
showard89f84db2009-03-12 20:39:13 +0000985 def _refresh_pending_queue_entries(self):
986 """
987 Lookup the pending HostQueueEntries and call our HostScheduler
988 refresh() method given that list. Return the list.
989
990 @returns A list of pending HostQueueEntries sorted in priority order.
991 """
showard63a34772008-08-18 19:32:50 +0000992 queue_entries = self._get_pending_queue_entries()
993 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000994 return []
showardb95b1bd2008-08-15 18:11:04 +0000995
showard63a34772008-08-18 19:32:50 +0000996 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000997
showard89f84db2009-03-12 20:39:13 +0000998 return queue_entries
999
1000
1001 def _schedule_atomic_group(self, queue_entry):
1002 """
1003 Schedule the given queue_entry on an atomic group of hosts.
1004
1005 Returns immediately if there are insufficient available hosts.
1006
1007 Creates new HostQueueEntries based off of queue_entry for the
1008 scheduled hosts and starts them all running.
1009 """
1010 # This is a virtual host queue entry representing an entire
1011 # atomic group, find a group and schedule their hosts.
1012 group_hosts = self._host_scheduler.find_eligible_atomic_group(
1013 queue_entry)
1014 if not group_hosts:
1015 return
showardcbe6f942009-06-17 19:33:49 +00001016
1017 logging.info('Expanding atomic group entry %s with hosts %s',
1018 queue_entry,
1019 ', '.join(host.hostname for host in group_hosts))
showard89f84db2009-03-12 20:39:13 +00001020 # The first assigned host uses the original HostQueueEntry
1021 group_queue_entries = [queue_entry]
1022 for assigned_host in group_hosts[1:]:
1023 # Create a new HQE for every additional assigned_host.
1024 new_hqe = HostQueueEntry.clone(queue_entry)
1025 new_hqe.save()
1026 group_queue_entries.append(new_hqe)
1027 assert len(group_queue_entries) == len(group_hosts)
1028 for queue_entry, host in itertools.izip(group_queue_entries,
1029 group_hosts):
1030 self._run_queue_entry(queue_entry, host)
1031
1032
1033 def _schedule_new_jobs(self):
1034 queue_entries = self._refresh_pending_queue_entries()
1035 if not queue_entries:
1036 return
1037
showard63a34772008-08-18 19:32:50 +00001038 for queue_entry in queue_entries:
showarde55955f2009-10-07 20:48:58 +00001039 is_unassigned_atomic_group = (
1040 queue_entry.atomic_group_id is not None
1041 and queue_entry.host_id is None)
1042 if is_unassigned_atomic_group:
1043 self._schedule_atomic_group(queue_entry)
1044 else:
showard89f84db2009-03-12 20:39:13 +00001045 assigned_host = self._host_scheduler.find_eligible_host(
1046 queue_entry)
1047 if assigned_host:
1048 self._run_queue_entry(queue_entry, assigned_host)
showardb95b1bd2008-08-15 18:11:04 +00001049
1050
showard8cc058f2009-09-08 16:26:33 +00001051 def _schedule_running_host_queue_entries(self):
1052 entries = HostQueueEntry.fetch(
1053 where="status IN "
1054 "('Starting', 'Running', 'Gathering', 'Parsing')")
1055 for entry in entries:
1056 if self.get_agents_for_entry(entry):
1057 continue
1058
1059 task_entries = entry.job.get_group_entries(entry)
1060 for task_entry in task_entries:
1061 if (task_entry.status != models.HostQueueEntry.Status.PARSING
1062 and self.host_has_agent(task_entry.host)):
1063 agent = self._host_agents.get(task_entry.host.id)[0]
1064 raise SchedulerError('Attempted to schedule on host that '
1065 'already has agent: %s (previous '
1066 'agent task: %s)'
1067 % (task_entry, agent.task))
1068
1069 if entry.status in (models.HostQueueEntry.Status.STARTING,
1070 models.HostQueueEntry.Status.RUNNING):
1071 params = entry.job.get_autoserv_params(task_entries)
1072 agent_task = QueueTask(job=entry.job,
1073 queue_entries=task_entries, cmd=params)
1074 elif entry.status == models.HostQueueEntry.Status.GATHERING:
1075 agent_task = GatherLogsTask(
1076 job=entry.job, queue_entries=task_entries)
1077 elif entry.status == models.HostQueueEntry.Status.PARSING:
1078 agent_task = FinalReparseTask(queue_entries=task_entries)
1079 else:
1080 raise SchedulerError('_schedule_running_host_queue_entries got '
1081 'entry with invalid status %s: %s'
1082 % (entry.status, entry))
1083
1084 self.add_agent(Agent(agent_task, num_processes=len(task_entries)))
1085
1086
1087 def _schedule_delay_tasks(self):
showardd2014822009-10-12 20:26:58 +00001088 for entry in HostQueueEntry.fetch(where='status = "%s"' %
1089 models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +00001090 task = entry.job.schedule_delayed_callback_task(entry)
1091 if task:
1092 self.add_agent(Agent(task, num_processes=0))
1093
1094
showardb95b1bd2008-08-15 18:11:04 +00001095 def _run_queue_entry(self, queue_entry, host):
showard8cc058f2009-09-08 16:26:33 +00001096 queue_entry.schedule_pre_job_tasks(assigned_host=host)
mblighd5c95802008-03-05 00:33:46 +00001097
1098
jadmanski0afbb632008-06-06 21:10:57 +00001099 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +00001100 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001101 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001102 for agent in self.get_agents_for_entry(entry):
1103 agent.abort()
1104 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001105
1106
showard324bf812009-01-20 23:23:38 +00001107 def _can_start_agent(self, agent, num_started_this_cycle,
1108 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001109 # always allow zero-process agents to run
1110 if agent.num_processes == 0:
1111 return True
1112 # don't allow any nonzero-process agents to run after we've reached a
1113 # limit (this avoids starvation of many-process agents)
1114 if have_reached_limit:
1115 return False
1116 # total process throttling
showard324bf812009-01-20 23:23:38 +00001117 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +00001118 return False
1119 # if a single agent exceeds the per-cycle throttling, still allow it to
1120 # run when it's the first agent in the cycle
1121 if num_started_this_cycle == 0:
1122 return True
1123 # per-cycle throttling
1124 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +00001125 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001126 return False
1127 return True
1128
1129
jadmanski0afbb632008-06-06 21:10:57 +00001130 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001131 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001132 have_reached_limit = False
1133 # iterate over copy, so we can remove agents during iteration
1134 for agent in list(self._agents):
showard8cc058f2009-09-08 16:26:33 +00001135 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001136 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001137 have_reached_limit):
1138 have_reached_limit = True
1139 continue
showard4c5374f2008-09-04 17:02:56 +00001140 num_started_this_cycle += agent.num_processes
1141 agent.tick()
showard8cc058f2009-09-08 16:26:33 +00001142 if agent.is_done():
1143 logging.info("agent finished")
1144 self.remove_agent(agent)
showarda9435c02009-05-13 21:28:17 +00001145 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001146 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001147
1148
showard29f7cd22009-04-29 21:16:24 +00001149 def _process_recurring_runs(self):
1150 recurring_runs = models.RecurringRun.objects.filter(
1151 start_date__lte=datetime.datetime.now())
1152 for rrun in recurring_runs:
1153 # Create job from template
1154 job = rrun.job
1155 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001156 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001157
1158 host_objects = info['hosts']
1159 one_time_hosts = info['one_time_hosts']
1160 metahost_objects = info['meta_hosts']
1161 dependencies = info['dependencies']
1162 atomic_group = info['atomic_group']
1163
1164 for host in one_time_hosts or []:
1165 this_host = models.Host.create_one_time_host(host.hostname)
1166 host_objects.append(this_host)
1167
1168 try:
1169 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001170 options=options,
showard29f7cd22009-04-29 21:16:24 +00001171 host_objects=host_objects,
1172 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001173 atomic_group=atomic_group)
1174
1175 except Exception, ex:
1176 logging.exception(ex)
1177 #TODO send email
1178
1179 if rrun.loop_count == 1:
1180 rrun.delete()
1181 else:
1182 if rrun.loop_count != 0: # if not infinite loop
1183 # calculate new start_date
1184 difference = datetime.timedelta(seconds=rrun.loop_period)
1185 rrun.start_date = rrun.start_date + difference
1186 rrun.loop_count -= 1
1187 rrun.save()
1188
1189
showard170873e2009-01-07 00:22:26 +00001190class PidfileRunMonitor(object):
1191 """
1192 Client must call either run() to start a new process or
1193 attach_to_existing_process().
1194 """
mbligh36768f02008-02-22 18:28:33 +00001195
showard170873e2009-01-07 00:22:26 +00001196 class _PidfileException(Exception):
1197 """
1198 Raised when there's some unexpected behavior with the pid file, but only
1199 used internally (never allowed to escape this class).
1200 """
mbligh36768f02008-02-22 18:28:33 +00001201
1202
showard170873e2009-01-07 00:22:26 +00001203 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001204 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001205 self._start_time = None
1206 self.pidfile_id = None
1207 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001208
1209
showard170873e2009-01-07 00:22:26 +00001210 def _add_nice_command(self, command, nice_level):
1211 if not nice_level:
1212 return command
1213 return ['nice', '-n', str(nice_level)] + command
1214
1215
1216 def _set_start_time(self):
1217 self._start_time = time.time()
1218
1219
1220 def run(self, command, working_directory, nice_level=None, log_file=None,
1221 pidfile_name=None, paired_with_pidfile=None):
1222 assert command is not None
1223 if nice_level is not None:
1224 command = ['nice', '-n', str(nice_level)] + command
1225 self._set_start_time()
1226 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001227 command, working_directory, pidfile_name=pidfile_name,
1228 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001229
1230
showarded2afea2009-07-07 20:54:07 +00001231 def attach_to_existing_process(self, execution_path,
showardd3dc1992009-04-22 21:01:40 +00001232 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001233 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001234 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001235 execution_path, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001236 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001237
1238
jadmanski0afbb632008-06-06 21:10:57 +00001239 def kill(self):
showard170873e2009-01-07 00:22:26 +00001240 if self.has_process():
1241 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001242
mbligh36768f02008-02-22 18:28:33 +00001243
showard170873e2009-01-07 00:22:26 +00001244 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001245 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001246 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001247
1248
showard170873e2009-01-07 00:22:26 +00001249 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001250 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001251 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001252 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001253
1254
showard170873e2009-01-07 00:22:26 +00001255 def _read_pidfile(self, use_second_read=False):
1256 assert self.pidfile_id is not None, (
1257 'You must call run() or attach_to_existing_process()')
1258 contents = _drone_manager.get_pidfile_contents(
1259 self.pidfile_id, use_second_read=use_second_read)
1260 if contents.is_invalid():
1261 self._state = drone_manager.PidfileContents()
1262 raise self._PidfileException(contents)
1263 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001264
1265
showard21baa452008-10-21 00:08:39 +00001266 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001267 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1268 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001269 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001270 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001271
1272
1273 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001274 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001275 return
mblighbb421852008-03-11 22:36:16 +00001276
showard21baa452008-10-21 00:08:39 +00001277 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001278
showard170873e2009-01-07 00:22:26 +00001279 if self._state.process is None:
1280 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001281 return
mbligh90a549d2008-03-25 23:52:34 +00001282
showard21baa452008-10-21 00:08:39 +00001283 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001284 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001285 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001286 return
mbligh90a549d2008-03-25 23:52:34 +00001287
showard170873e2009-01-07 00:22:26 +00001288 # pid but no running process - maybe process *just* exited
1289 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001290 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001291 # autoserv exited without writing an exit code
1292 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001293 self._handle_pidfile_error(
1294 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001295
showard21baa452008-10-21 00:08:39 +00001296
1297 def _get_pidfile_info(self):
1298 """\
1299 After completion, self._state will contain:
1300 pid=None, exit_status=None if autoserv has not yet run
1301 pid!=None, exit_status=None if autoserv is running
1302 pid!=None, exit_status!=None if autoserv has completed
1303 """
1304 try:
1305 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001306 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001307 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001308
1309
showard170873e2009-01-07 00:22:26 +00001310 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001311 """\
1312 Called when no pidfile is found or no pid is in the pidfile.
1313 """
showard170873e2009-01-07 00:22:26 +00001314 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001315 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001316 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001317 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001318 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001319
1320
showard35162b02009-03-03 02:17:30 +00001321 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001322 """\
1323 Called when autoserv has exited without writing an exit status,
1324 or we've timed out waiting for autoserv to write a pid to the
1325 pidfile. In either case, we just return failure and the caller
1326 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001327
showard170873e2009-01-07 00:22:26 +00001328 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001329 """
1330 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001331 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001332 self._state.exit_status = 1
1333 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001334
1335
jadmanski0afbb632008-06-06 21:10:57 +00001336 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001337 self._get_pidfile_info()
1338 return self._state.exit_status
1339
1340
1341 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001342 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001343 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001344 if self._state.num_tests_failed is None:
1345 return -1
showard21baa452008-10-21 00:08:39 +00001346 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001347
1348
showardcdaeae82009-08-31 18:32:48 +00001349 def try_copy_results_on_drone(self, **kwargs):
1350 if self.has_process():
1351 # copy results logs into the normal place for job results
1352 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1353
1354
1355 def try_copy_to_results_repository(self, source, **kwargs):
1356 if self.has_process():
1357 _drone_manager.copy_to_results_repository(self.get_process(),
1358 source, **kwargs)
1359
1360
mbligh36768f02008-02-22 18:28:33 +00001361class Agent(object):
showard77182562009-06-10 00:16:05 +00001362 """
showard8cc058f2009-09-08 16:26:33 +00001363 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001364
1365 The following methods are required on all task objects:
1366 poll() - Called periodically to let the task check its status and
1367 update its internal state. If the task succeeded.
1368 is_done() - Returns True if the task is finished.
1369 abort() - Called when an abort has been requested. The task must
1370 set its aborted attribute to True if it actually aborted.
1371
1372 The following attributes are required on all task objects:
1373 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001374 success - bool, True if this task succeeded.
1375 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1376 host_ids - A sequence of Host ids this task represents.
1377
1378 The following attribute is written to all task objects:
1379 agent - A reference to the Agent instance that the task has been
1380 added to.
1381 """
1382
1383
showard8cc058f2009-09-08 16:26:33 +00001384 def __init__(self, task, num_processes=1):
showard77182562009-06-10 00:16:05 +00001385 """
showard8cc058f2009-09-08 16:26:33 +00001386 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001387 @param num_processes: The number of subprocesses the Agent represents.
1388 This is used by the Dispatcher for managing the load on the
1389 system. Defaults to 1.
1390 """
showard8cc058f2009-09-08 16:26:33 +00001391 self.task = task
1392 task.agent = self
1393
showard77182562009-06-10 00:16:05 +00001394 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001395 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001396 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001397
showard8cc058f2009-09-08 16:26:33 +00001398 self.queue_entry_ids = task.queue_entry_ids
1399 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001400
showard8cc058f2009-09-08 16:26:33 +00001401 self.started = False
mbligh36768f02008-02-22 18:28:33 +00001402
1403
jadmanski0afbb632008-06-06 21:10:57 +00001404 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001405 self.started = True
1406 if self.task:
1407 self.task.poll()
1408 if self.task.is_done():
1409 self.task = None
showardec113162008-05-08 00:52:49 +00001410
1411
jadmanski0afbb632008-06-06 21:10:57 +00001412 def is_done(self):
showard8cc058f2009-09-08 16:26:33 +00001413 return self.task is None
mbligh36768f02008-02-22 18:28:33 +00001414
1415
showardd3dc1992009-04-22 21:01:40 +00001416 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001417 if self.task:
1418 self.task.abort()
1419 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001420 # tasks can choose to ignore aborts
showard8cc058f2009-09-08 16:26:33 +00001421 self.task = None
showard20f9bdd2009-04-29 19:48:33 +00001422
showardd3dc1992009-04-22 21:01:40 +00001423
showard77182562009-06-10 00:16:05 +00001424class DelayedCallTask(object):
1425 """
1426 A task object like AgentTask for an Agent to run that waits for the
1427 specified amount of time to have elapsed before calling the supplied
1428 callback once and finishing. If the callback returns anything, it is
1429 assumed to be a new Agent instance and will be added to the dispatcher.
1430
1431 @attribute end_time: The absolute posix time after which this task will
1432 call its callback when it is polled and be finished.
1433
1434 Also has all attributes required by the Agent class.
1435 """
1436 def __init__(self, delay_seconds, callback, now_func=None):
1437 """
1438 @param delay_seconds: The delay in seconds from now that this task
1439 will call the supplied callback and be done.
1440 @param callback: A callable to be called by this task once after at
1441 least delay_seconds time has elapsed. It must return None
1442 or a new Agent instance.
1443 @param now_func: A time.time like function. Default: time.time.
1444 Used for testing.
1445 """
1446 assert delay_seconds > 0
1447 assert callable(callback)
1448 if not now_func:
1449 now_func = time.time
1450 self._now_func = now_func
1451 self._callback = callback
1452
1453 self.end_time = self._now_func() + delay_seconds
1454
1455 # These attributes are required by Agent.
1456 self.aborted = False
showard77182562009-06-10 00:16:05 +00001457 self.host_ids = ()
1458 self.success = False
1459 self.queue_entry_ids = ()
1460 # This is filled in by Agent.add_task().
1461 self.agent = None
1462
1463
1464 def poll(self):
showard8cc058f2009-09-08 16:26:33 +00001465 if not self.is_done() and self._now_func() >= self.end_time:
1466 self._callback()
showard77182562009-06-10 00:16:05 +00001467 self.success = True
1468
1469
1470 def is_done(self):
showard8cc058f2009-09-08 16:26:33 +00001471 return self.success or self.aborted
showard77182562009-06-10 00:16:05 +00001472
1473
1474 def abort(self):
1475 self.aborted = True
showard77182562009-06-10 00:16:05 +00001476
1477
mbligh36768f02008-02-22 18:28:33 +00001478class AgentTask(object):
showard8cc058f2009-09-08 16:26:33 +00001479 def __init__(self, cmd=None, working_directory=None,
showarded2afea2009-07-07 20:54:07 +00001480 pidfile_name=None, paired_with_pidfile=None,
1481 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001482 self.done = False
jadmanski0afbb632008-06-06 21:10:57 +00001483 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001484 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001485 self.agent = None
showarded2afea2009-07-07 20:54:07 +00001486 self.monitor = recover_run_monitor
1487 self.started = bool(recover_run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +00001488 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001489 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001490 self.queue_entry_ids = []
1491 self.host_ids = []
1492 self.log_file = None
1493
1494
1495 def _set_ids(self, host=None, queue_entries=None):
1496 if queue_entries and queue_entries != [None]:
1497 self.host_ids = [entry.host.id for entry in queue_entries]
1498 self.queue_entry_ids = [entry.id for entry in queue_entries]
1499 else:
1500 assert host
1501 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001502
1503
jadmanski0afbb632008-06-06 21:10:57 +00001504 def poll(self):
showard08a36412009-05-05 01:01:13 +00001505 if not self.started:
1506 self.start()
1507 self.tick()
1508
1509
1510 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001511 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001512 exit_code = self.monitor.exit_code()
1513 if exit_code is None:
1514 return
1515 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001516 else:
1517 success = False
mbligh36768f02008-02-22 18:28:33 +00001518
jadmanski0afbb632008-06-06 21:10:57 +00001519 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001520
1521
jadmanski0afbb632008-06-06 21:10:57 +00001522 def is_done(self):
1523 return self.done
mbligh36768f02008-02-22 18:28:33 +00001524
1525
jadmanski0afbb632008-06-06 21:10:57 +00001526 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001527 if self.done:
1528 return
jadmanski0afbb632008-06-06 21:10:57 +00001529 self.done = True
1530 self.success = success
1531 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001532
1533
jadmanski0afbb632008-06-06 21:10:57 +00001534 def prolog(self):
showarded2afea2009-07-07 20:54:07 +00001535 assert not self.monitor
mblighd64e5702008-04-04 21:39:28 +00001536
mbligh36768f02008-02-22 18:28:33 +00001537
jadmanski0afbb632008-06-06 21:10:57 +00001538 def cleanup(self):
showardcdaeae82009-08-31 18:32:48 +00001539 if self.monitor and self.log_file:
1540 self.monitor.try_copy_to_results_repository(self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001541
1542
jadmanski0afbb632008-06-06 21:10:57 +00001543 def epilog(self):
1544 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001545
1546
jadmanski0afbb632008-06-06 21:10:57 +00001547 def start(self):
1548 assert self.agent
1549
1550 if not self.started:
1551 self.prolog()
1552 self.run()
1553
1554 self.started = True
1555
1556
1557 def abort(self):
1558 if self.monitor:
1559 self.monitor.kill()
1560 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001561 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001562 self.cleanup()
1563
1564
showarded2afea2009-07-07 20:54:07 +00001565 def _get_consistent_execution_path(self, execution_entries):
1566 first_execution_path = execution_entries[0].execution_path()
1567 for execution_entry in execution_entries[1:]:
1568 assert execution_entry.execution_path() == first_execution_path, (
1569 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1570 execution_entry,
1571 first_execution_path,
1572 execution_entries[0]))
1573 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001574
1575
showarded2afea2009-07-07 20:54:07 +00001576 def _copy_results(self, execution_entries, use_monitor=None):
1577 """
1578 @param execution_entries: list of objects with execution_path() method
1579 """
showard6d1c1432009-08-20 23:30:39 +00001580 if use_monitor is not None and not use_monitor.has_process():
1581 return
1582
showarded2afea2009-07-07 20:54:07 +00001583 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001584 if use_monitor is None:
1585 assert self.monitor
1586 use_monitor = self.monitor
1587 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001588 execution_path = self._get_consistent_execution_path(execution_entries)
1589 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001590 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001591
showarda1e74b32009-05-12 17:32:04 +00001592
1593 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001594 for queue_entry in queue_entries:
1595 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001596
1597
showarda1e74b32009-05-12 17:32:04 +00001598 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1599 self._copy_results(queue_entries, use_monitor)
1600 self._parse_results(queue_entries)
1601
1602
showardd3dc1992009-04-22 21:01:40 +00001603 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
showarded2afea2009-07-07 20:54:07 +00001604 assert not self.monitor
jadmanski0afbb632008-06-06 21:10:57 +00001605 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001606 self.monitor = PidfileRunMonitor()
1607 self.monitor.run(self.cmd, self._working_directory,
1608 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001609 log_file=self.log_file,
1610 pidfile_name=pidfile_name,
1611 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001612
1613
showardd9205182009-04-27 20:09:55 +00001614class TaskWithJobKeyvals(object):
1615 """AgentTask mixin providing functionality to help with job keyval files."""
1616 _KEYVAL_FILE = 'keyval'
1617 def _format_keyval(self, key, value):
1618 return '%s=%s' % (key, value)
1619
1620
1621 def _keyval_path(self):
1622 """Subclasses must override this"""
1623 raise NotImplemented
1624
1625
1626 def _write_keyval_after_job(self, field, value):
1627 assert self.monitor
1628 if not self.monitor.has_process():
1629 return
1630 _drone_manager.write_lines_to_file(
1631 self._keyval_path(), [self._format_keyval(field, value)],
1632 paired_with_process=self.monitor.get_process())
1633
1634
1635 def _job_queued_keyval(self, job):
1636 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1637
1638
1639 def _write_job_finished(self):
1640 self._write_keyval_after_job("job_finished", int(time.time()))
1641
1642
showarddb502762009-09-09 15:31:20 +00001643 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1644 keyval_contents = '\n'.join(self._format_keyval(key, value)
1645 for key, value in keyval_dict.iteritems())
1646 # always end with a newline to allow additional keyvals to be written
1647 keyval_contents += '\n'
1648 _drone_manager.attach_file_to_execution(self._working_directory,
1649 keyval_contents,
1650 file_path=keyval_path)
1651
1652
1653 def _write_keyvals_before_job(self, keyval_dict):
1654 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1655
1656
1657 def _write_host_keyvals(self, host):
1658 keyval_path = os.path.join(self._working_directory, 'host_keyvals',
1659 host.hostname)
1660 platform, all_labels = host.platform_and_labels()
1661 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1662 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1663
1664
showard8cc058f2009-09-08 16:26:33 +00001665class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001666 """
1667 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1668 """
1669
1670 TASK_TYPE = None
1671 host = None
1672 queue_entry = None
1673
1674 def __init__(self, task, extra_command_args, **kwargs):
showarded2afea2009-07-07 20:54:07 +00001675 assert (self.TASK_TYPE is not None,
1676 'self.TASK_TYPE must be overridden')
showard8cc058f2009-09-08 16:26:33 +00001677
1678 self.host = Host(id=task.host.id)
1679 self.queue_entry = None
1680 if task.queue_entry:
1681 self.queue_entry = HostQueueEntry(id=task.queue_entry.id)
1682
showarded2afea2009-07-07 20:54:07 +00001683 self.task = task
showarddb502762009-09-09 15:31:20 +00001684 kwargs['working_directory'] = task.execution_path()
showarded2afea2009-07-07 20:54:07 +00001685 self._extra_command_args = extra_command_args
1686 super(SpecialAgentTask, self).__init__(**kwargs)
1687
1688
showard8cc058f2009-09-08 16:26:33 +00001689 def _keyval_path(self):
1690 return os.path.join(self._working_directory, self._KEYVAL_FILE)
1691
1692
showarded2afea2009-07-07 20:54:07 +00001693 def prolog(self):
1694 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001695 self.cmd = _autoserv_command_line(self.host.hostname,
1696 self._extra_command_args,
1697 queue_entry=self.queue_entry)
1698 self._working_directory = self.task.execution_path()
1699 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001700 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001701
1702
showardde634ee2009-01-30 01:44:24 +00001703 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001704 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001705
showard2fe3f1d2009-07-06 20:19:11 +00001706 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001707 return # don't fail metahost entries, they'll be reassigned
1708
showard2fe3f1d2009-07-06 20:19:11 +00001709 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001710 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001711 return # entry has been aborted
1712
showard2fe3f1d2009-07-06 20:19:11 +00001713 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001714 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001715 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001716 self._write_keyval_after_job(queued_key, queued_time)
1717 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001718
showard8cc058f2009-09-08 16:26:33 +00001719 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001720 self.monitor.try_copy_results_on_drone(
1721 source_path=self._working_directory + '/',
1722 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001723
showard2fe3f1d2009-07-06 20:19:11 +00001724 self._copy_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001725 self.queue_entry.handle_host_failure()
showard2fe3f1d2009-07-06 20:19:11 +00001726 if self.queue_entry.job.parse_failed_repair:
1727 self._parse_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001728
1729 pidfile_id = _drone_manager.get_pidfile_id_from(
1730 self.queue_entry.execution_path(),
1731 pidfile_name=_AUTOSERV_PID_FILE)
1732 _drone_manager.register_pidfile(pidfile_id)
1733
1734
1735 def cleanup(self):
1736 super(SpecialAgentTask, self).cleanup()
1737 self.task.finish()
showardf85a0b72009-10-07 20:48:45 +00001738 if self.monitor:
1739 if self.monitor.has_process():
1740 self._copy_results([self.task])
1741 if self.monitor.pidfile_id is not None:
1742 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001743
1744
1745class RepairTask(SpecialAgentTask):
1746 TASK_TYPE = models.SpecialTask.Task.REPAIR
1747
1748
1749 def __init__(self, task, recover_run_monitor=None):
1750 """\
1751 queue_entry: queue entry to mark failed if this repair fails.
1752 """
1753 protection = host_protections.Protection.get_string(
1754 task.host.protection)
1755 # normalize the protection name
1756 protection = host_protections.Protection.get_attr_name(protection)
1757
1758 super(RepairTask, self).__init__(
1759 task, ['-R', '--host-protection', protection],
1760 recover_run_monitor=recover_run_monitor)
1761
1762 # *don't* include the queue entry in IDs -- if the queue entry is
1763 # aborted, we want to leave the repair task running
1764 self._set_ids(host=self.host)
1765
1766
1767 def prolog(self):
1768 super(RepairTask, self).prolog()
1769 logging.info("repair_task starting")
1770 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001771
1772
jadmanski0afbb632008-06-06 21:10:57 +00001773 def epilog(self):
1774 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001775
jadmanski0afbb632008-06-06 21:10:57 +00001776 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001777 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001778 else:
showard8cc058f2009-09-08 16:26:33 +00001779 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001780 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001781 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001782
1783
showarded2afea2009-07-07 20:54:07 +00001784class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001785 def _copy_to_results_repository(self):
1786 if not self.queue_entry or self.queue_entry.meta_host:
1787 return
1788
1789 self.queue_entry.set_execution_subdir()
1790 log_name = os.path.basename(self.task.execution_path())
1791 source = os.path.join(self.task.execution_path(), 'debug',
1792 'autoserv.DEBUG')
1793 destination = os.path.join(
1794 self.queue_entry.execution_path(), log_name)
1795
1796 self.monitor.try_copy_to_results_repository(
1797 source, destination_path=destination)
1798
1799
showard170873e2009-01-07 00:22:26 +00001800 def epilog(self):
1801 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001802
showard775300b2009-09-09 15:30:50 +00001803 if self.success:
1804 return
showard8fe93b52008-11-18 17:53:22 +00001805
showard775300b2009-09-09 15:30:50 +00001806 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001807
showard775300b2009-09-09 15:30:50 +00001808 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
1809 return
1810
1811 if self.queue_entry:
1812 self.queue_entry.requeue()
1813
1814 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001815 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001816 queue_entry__id=self.queue_entry.id):
1817 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1818 self._fail_queue_entry()
1819 return
1820
1821 queue_entry = models.HostQueueEntry(id=self.queue_entry.id)
1822 else:
1823 queue_entry = None
1824
1825 models.SpecialTask.objects.create(
1826 host=models.Host(id=self.host.id),
1827 task=models.SpecialTask.Task.REPAIR,
1828 queue_entry=queue_entry)
showard58721a82009-08-20 23:32:40 +00001829
showard8fe93b52008-11-18 17:53:22 +00001830
1831class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001832 TASK_TYPE = models.SpecialTask.Task.VERIFY
1833
1834
showard8cc058f2009-09-08 16:26:33 +00001835 def __init__(self, task, recover_run_monitor=None):
showarded2afea2009-07-07 20:54:07 +00001836 super(VerifyTask, self).__init__(
showard8cc058f2009-09-08 16:26:33 +00001837 task, ['-v'], recover_run_monitor=recover_run_monitor)
mblighe2586682008-02-29 22:45:46 +00001838
showard8cc058f2009-09-08 16:26:33 +00001839 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001840
1841
jadmanski0afbb632008-06-06 21:10:57 +00001842 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001843 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001844
showardb18134f2009-03-20 20:52:18 +00001845 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001846 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001847 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1848 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001849
showarded2afea2009-07-07 20:54:07 +00001850 # Delete any other queued verifies for this host. One verify will do
1851 # and there's no need to keep records of other requests.
1852 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001853 host__id=self.host.id,
1854 task=models.SpecialTask.Task.VERIFY,
showarded2afea2009-07-07 20:54:07 +00001855 is_active=False, is_complete=False)
1856 queued_verifies = queued_verifies.exclude(id=self.task.id)
1857 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001858
mbligh36768f02008-02-22 18:28:33 +00001859
jadmanski0afbb632008-06-06 21:10:57 +00001860 def epilog(self):
1861 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001862 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001863 if self.queue_entry:
1864 self.queue_entry.on_pending()
1865 else:
1866 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001867
1868
showardb5626452009-06-30 01:57:28 +00001869class CleanupHostsMixin(object):
1870 def _reboot_hosts(self, job, queue_entries, final_success,
1871 num_tests_failed):
1872 reboot_after = job.reboot_after
1873 do_reboot = (
1874 # always reboot after aborted jobs
1875 self._final_status == models.HostQueueEntry.Status.ABORTED
1876 or reboot_after == models.RebootAfter.ALWAYS
1877 or (reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED
1878 and final_success and num_tests_failed == 0))
1879
1880 for queue_entry in queue_entries:
1881 if do_reboot:
1882 # don't pass the queue entry to the CleanupTask. if the cleanup
1883 # fails, the job doesn't care -- it's over.
showard8cc058f2009-09-08 16:26:33 +00001884 models.SpecialTask.objects.create(
1885 host=models.Host(id=queue_entry.host.id),
1886 task=models.SpecialTask.Task.CLEANUP)
showardb5626452009-06-30 01:57:28 +00001887 else:
showard8cc058f2009-09-08 16:26:33 +00001888 queue_entry.host.set_status(models.Host.Status.READY)
showardb5626452009-06-30 01:57:28 +00001889
1890
1891class QueueTask(AgentTask, TaskWithJobKeyvals, CleanupHostsMixin):
showard8cc058f2009-09-08 16:26:33 +00001892 def __init__(self, job, queue_entries, cmd=None, recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001893 self.job = job
1894 self.queue_entries = queue_entries
showard8cc058f2009-09-08 16:26:33 +00001895 self.group_name = queue_entries[0].get_group_name()
showarded2afea2009-07-07 20:54:07 +00001896 super(QueueTask, self).__init__(
1897 cmd, self._execution_path(),
1898 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00001899 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001900
1901
showard73ec0442009-02-07 02:05:20 +00001902 def _keyval_path(self):
showarded2afea2009-07-07 20:54:07 +00001903 return os.path.join(self._execution_path(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001904
1905
showarded2afea2009-07-07 20:54:07 +00001906 def _execution_path(self):
1907 return self.queue_entries[0].execution_path()
mblighbb421852008-03-11 22:36:16 +00001908
1909
jadmanski0afbb632008-06-06 21:10:57 +00001910 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00001911 for entry in self.queue_entries:
1912 if entry.status not in (models.HostQueueEntry.Status.STARTING,
1913 models.HostQueueEntry.Status.RUNNING):
1914 raise SchedulerError('Queue task attempting to start '
1915 'entry with invalid status %s: %s'
1916 % (entry.status, entry))
1917 if entry.host.status not in (models.Host.Status.PENDING,
1918 models.Host.Status.RUNNING):
1919 raise SchedulerError('Queue task attempting to start on queue '
1920 'entry with invalid host status %s: %s'
1921 % (entry.host.status, entry))
1922
showardd9205182009-04-27 20:09:55 +00001923 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001924 keyval_dict = {queued_key: queued_time}
1925 if self.group_name:
1926 keyval_dict['host_group_name'] = self.group_name
1927 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001928 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001929 self._write_host_keyvals(queue_entry.host)
showard8cc058f2009-09-08 16:26:33 +00001930 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showard12f3e322009-05-13 21:27:42 +00001931 queue_entry.update_field('started_on', datetime.datetime.now())
showard8cc058f2009-09-08 16:26:33 +00001932 queue_entry.host.set_status(models.Host.Status.RUNNING)
showard21baa452008-10-21 00:08:39 +00001933 queue_entry.host.update_field('dirty', 1)
showardc6a56872009-07-28 20:11:58 +00001934 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
1935 # TODO(gps): Remove this if nothing needs it anymore.
1936 # A potential user is: tko/parser
jadmanski0afbb632008-06-06 21:10:57 +00001937 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001938
1939
showard35162b02009-03-03 02:17:30 +00001940 def _write_lost_process_error_file(self):
showarded2afea2009-07-07 20:54:07 +00001941 error_file_path = os.path.join(self._execution_path(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001942 _drone_manager.write_lines_to_file(error_file_path,
1943 [_LOST_PROCESS_ERROR])
1944
1945
showardd3dc1992009-04-22 21:01:40 +00001946 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001947 if not self.monitor:
1948 return
1949
showardd9205182009-04-27 20:09:55 +00001950 self._write_job_finished()
1951
showard35162b02009-03-03 02:17:30 +00001952 if self.monitor.lost_process:
1953 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001954
showard8cc058f2009-09-08 16:26:33 +00001955 for queue_entry in self.queue_entries:
1956 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001957
1958
showardcbd74612008-11-19 21:42:02 +00001959 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001960 _drone_manager.write_lines_to_file(
showarded2afea2009-07-07 20:54:07 +00001961 os.path.join(self._execution_path(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001962 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001963 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001964
1965
jadmanskif7fa2cc2008-10-01 14:13:23 +00001966 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001967 if not self.monitor or not self.monitor.has_process():
1968 return
1969
jadmanskif7fa2cc2008-10-01 14:13:23 +00001970 # build up sets of all the aborted_by and aborted_on values
1971 aborted_by, aborted_on = set(), set()
1972 for queue_entry in self.queue_entries:
1973 if queue_entry.aborted_by:
1974 aborted_by.add(queue_entry.aborted_by)
1975 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1976 aborted_on.add(t)
1977
1978 # extract some actual, unique aborted by value and write it out
1979 assert len(aborted_by) <= 1
1980 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001981 aborted_by_value = aborted_by.pop()
1982 aborted_on_value = max(aborted_on)
1983 else:
1984 aborted_by_value = 'autotest_system'
1985 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001986
showarda0382352009-02-11 23:36:43 +00001987 self._write_keyval_after_job("aborted_by", aborted_by_value)
1988 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001989
showardcbd74612008-11-19 21:42:02 +00001990 aborted_on_string = str(datetime.datetime.fromtimestamp(
1991 aborted_on_value))
1992 self._write_status_comment('Job aborted by %s on %s' %
1993 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001994
1995
jadmanski0afbb632008-06-06 21:10:57 +00001996 def abort(self):
1997 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001998 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001999 self._finish_task()
showard21baa452008-10-21 00:08:39 +00002000
2001
jadmanski0afbb632008-06-06 21:10:57 +00002002 def epilog(self):
2003 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002004 self._finish_task()
2005 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00002006
2007
showardd3dc1992009-04-22 21:01:40 +00002008class PostJobTask(AgentTask):
2009 def __init__(self, queue_entries, pidfile_name, logfile_name,
showarded2afea2009-07-07 20:54:07 +00002010 recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00002011 self._queue_entries = queue_entries
2012 self._pidfile_name = pidfile_name
showardd3dc1992009-04-22 21:01:40 +00002013
showarded2afea2009-07-07 20:54:07 +00002014 self._execution_path = self._get_consistent_execution_path(
2015 queue_entries)
2016 self._results_dir = _drone_manager.absolute_path(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00002017 self._autoserv_monitor = PidfileRunMonitor()
showarded2afea2009-07-07 20:54:07 +00002018 self._autoserv_monitor.attach_to_existing_process(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00002019 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
2020
2021 if _testing_mode:
2022 command = 'true'
2023 else:
2024 command = self._generate_command(self._results_dir)
2025
showarded2afea2009-07-07 20:54:07 +00002026 super(PostJobTask, self).__init__(
2027 cmd=command, working_directory=self._execution_path,
2028 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00002029
showarded2afea2009-07-07 20:54:07 +00002030 self.log_file = os.path.join(self._execution_path, logfile_name)
showardd3dc1992009-04-22 21:01:40 +00002031 self._final_status = self._determine_final_status()
2032
2033
2034 def _generate_command(self, results_dir):
2035 raise NotImplementedError('Subclasses must override this')
2036
2037
2038 def _job_was_aborted(self):
2039 was_aborted = None
2040 for queue_entry in self._queue_entries:
2041 queue_entry.update_from_database()
2042 if was_aborted is None: # first queue entry
2043 was_aborted = bool(queue_entry.aborted)
2044 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
2045 email_manager.manager.enqueue_notify_email(
2046 'Inconsistent abort state',
2047 'Queue entries have inconsistent abort state: ' +
2048 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
2049 # don't crash here, just assume true
2050 return True
2051 return was_aborted
2052
2053
2054 def _determine_final_status(self):
2055 if self._job_was_aborted():
2056 return models.HostQueueEntry.Status.ABORTED
2057
2058 # we'll use a PidfileRunMonitor to read the autoserv exit status
2059 if self._autoserv_monitor.exit_code() == 0:
2060 return models.HostQueueEntry.Status.COMPLETED
2061 return models.HostQueueEntry.Status.FAILED
2062
2063
2064 def run(self):
showard8cc058f2009-09-08 16:26:33 +00002065 # Make sure we actually have results to work with.
2066 # This should never happen in normal operation.
showard5add1c82009-05-26 19:27:46 +00002067 if not self._autoserv_monitor.has_process():
2068 email_manager.manager.enqueue_notify_email(
showard8cc058f2009-09-08 16:26:33 +00002069 'No results in post-job task',
2070 'No results in post-job task at %s' %
2071 self._autoserv_monitor.pidfile_id)
showard5add1c82009-05-26 19:27:46 +00002072 self.finished(False)
2073 return
2074
2075 super(PostJobTask, self).run(
2076 pidfile_name=self._pidfile_name,
2077 paired_with_pidfile=self._paired_with_pidfile)
showardd3dc1992009-04-22 21:01:40 +00002078
2079
2080 def _set_all_statuses(self, status):
2081 for queue_entry in self._queue_entries:
2082 queue_entry.set_status(status)
2083
2084
2085 def abort(self):
2086 # override AgentTask.abort() to avoid killing the process and ending
2087 # the task. post-job tasks continue when the job is aborted.
2088 pass
2089
2090
showardb5626452009-06-30 01:57:28 +00002091class GatherLogsTask(PostJobTask, CleanupHostsMixin):
showardd3dc1992009-04-22 21:01:40 +00002092 """
2093 Task responsible for
2094 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2095 * copying logs to the results repository
2096 * spawning CleanupTasks for hosts, if necessary
2097 * spawning a FinalReparseTask for the job
2098 """
showarded2afea2009-07-07 20:54:07 +00002099 def __init__(self, job, queue_entries, recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00002100 self._job = job
2101 super(GatherLogsTask, self).__init__(
2102 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
showarded2afea2009-07-07 20:54:07 +00002103 logfile_name='.collect_crashinfo.log',
2104 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00002105 self._set_ids(queue_entries=queue_entries)
2106
2107
2108 def _generate_command(self, results_dir):
2109 host_list = ','.join(queue_entry.host.hostname
2110 for queue_entry in self._queue_entries)
2111 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
2112 '-r', results_dir]
2113
2114
2115 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00002116 for queue_entry in self._queue_entries:
2117 if queue_entry.status != models.HostQueueEntry.Status.GATHERING:
2118 raise SchedulerError('Gather task attempting to start on '
2119 'non-gathering entry: %s' % queue_entry)
2120 if queue_entry.host.status != models.Host.Status.RUNNING:
2121 raise SchedulerError('Gather task attempting to start on queue '
showard0c5c18d2009-09-24 22:20:45 +00002122 'entry with non-running host status %s: %s'
2123 % (queue_entry.host.status, queue_entry))
showard8cc058f2009-09-08 16:26:33 +00002124
showardd3dc1992009-04-22 21:01:40 +00002125 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002126
2127
showardd3dc1992009-04-22 21:01:40 +00002128 def epilog(self):
2129 super(GatherLogsTask, self).epilog()
showardb5626452009-06-30 01:57:28 +00002130
showard6d1c1432009-08-20 23:30:39 +00002131 self._copy_and_parse_results(self._queue_entries,
2132 use_monitor=self._autoserv_monitor)
2133
2134 if self._autoserv_monitor.has_process():
2135 final_success = (self._final_status ==
2136 models.HostQueueEntry.Status.COMPLETED)
2137 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2138 else:
2139 final_success = False
2140 num_tests_failed = 0
2141
showardb5626452009-06-30 01:57:28 +00002142 self._reboot_hosts(self._job, self._queue_entries, final_success,
2143 num_tests_failed)
showardd3dc1992009-04-22 21:01:40 +00002144
2145
showard0bbfc212009-04-29 21:06:13 +00002146 def run(self):
showard597bfd32009-05-08 18:22:50 +00002147 autoserv_exit_code = self._autoserv_monitor.exit_code()
2148 # only run if Autoserv exited due to some signal. if we have no exit
2149 # code, assume something bad (and signal-like) happened.
2150 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002151 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002152 else:
2153 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002154
2155
showard8fe93b52008-11-18 17:53:22 +00002156class CleanupTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002157 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2158
2159
showard8cc058f2009-09-08 16:26:33 +00002160 def __init__(self, task, recover_run_monitor=None):
showarded2afea2009-07-07 20:54:07 +00002161 super(CleanupTask, self).__init__(
showard8cc058f2009-09-08 16:26:33 +00002162 task, ['--cleanup'], recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002163
showard8cc058f2009-09-08 16:26:33 +00002164 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mbligh16c722d2008-03-05 00:58:44 +00002165
mblighd5c95802008-03-05 00:33:46 +00002166
jadmanski0afbb632008-06-06 21:10:57 +00002167 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002168 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00002169 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard8cc058f2009-09-08 16:26:33 +00002170 self.host.set_status(models.Host.Status.CLEANING)
2171 if self.queue_entry:
2172 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2173
2174
showard775300b2009-09-09 15:30:50 +00002175 def _finish_epilog(self):
2176 if not self.queue_entry:
2177 return
2178
2179 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
2180 self.queue_entry.on_pending()
2181 elif self.success:
2182 if self.queue_entry.job.run_verify:
2183 entry = models.HostQueueEntry(id=self.queue_entry.id)
2184 models.SpecialTask.objects.create(
2185 host=models.Host(id=self.host.id),
2186 queue_entry=entry,
2187 task=models.SpecialTask.Task.VERIFY)
2188 else:
2189 self.queue_entry.on_pending()
mblighd5c95802008-03-05 00:33:46 +00002190
mblighd5c95802008-03-05 00:33:46 +00002191
showard21baa452008-10-21 00:08:39 +00002192 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00002193 super(CleanupTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002194
showard21baa452008-10-21 00:08:39 +00002195 if self.success:
2196 self.host.update_field('dirty', 0)
showard775300b2009-09-09 15:30:50 +00002197 self.host.set_status(models.Host.Status.READY)
showard21baa452008-10-21 00:08:39 +00002198
showard775300b2009-09-09 15:30:50 +00002199 self._finish_epilog()
showard8cc058f2009-09-08 16:26:33 +00002200
showard21baa452008-10-21 00:08:39 +00002201
showardd3dc1992009-04-22 21:01:40 +00002202class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00002203 _num_running_parses = 0
2204
showarded2afea2009-07-07 20:54:07 +00002205 def __init__(self, queue_entries, recover_run_monitor=None):
2206 super(FinalReparseTask, self).__init__(
2207 queue_entries, pidfile_name=_PARSER_PID_FILE,
2208 logfile_name='.parse.log',
2209 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002210 # don't use _set_ids, since we don't want to set the host_ids
2211 self.queue_entry_ids = [entry.id for entry in queue_entries]
showarded2afea2009-07-07 20:54:07 +00002212 self._parse_started = self.started
showard97aed502008-11-04 02:01:24 +00002213
showard97aed502008-11-04 02:01:24 +00002214
2215 @classmethod
2216 def _increment_running_parses(cls):
2217 cls._num_running_parses += 1
2218
2219
2220 @classmethod
2221 def _decrement_running_parses(cls):
2222 cls._num_running_parses -= 1
2223
2224
2225 @classmethod
2226 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002227 return (cls._num_running_parses <
2228 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002229
2230
2231 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00002232 for queue_entry in self._queue_entries:
2233 if queue_entry.status != models.HostQueueEntry.Status.PARSING:
2234 raise SchedulerError('Parse task attempting to start on '
2235 'non-parsing entry: %s' % queue_entry)
2236
showard97aed502008-11-04 02:01:24 +00002237 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002238
2239
2240 def epilog(self):
2241 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002242 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00002243
2244
showardd3dc1992009-04-22 21:01:40 +00002245 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00002246 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00002247 results_dir]
showard97aed502008-11-04 02:01:24 +00002248
2249
showard08a36412009-05-05 01:01:13 +00002250 def tick(self):
2251 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002252 # and we can, at which point we revert to default behavior
2253 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00002254 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002255 else:
2256 self._try_starting_parse()
2257
2258
2259 def run(self):
2260 # override run() to not actually run unless we can
2261 self._try_starting_parse()
2262
2263
2264 def _try_starting_parse(self):
2265 if not self._can_run_new_parse():
2266 return
showard170873e2009-01-07 00:22:26 +00002267
showard97aed502008-11-04 02:01:24 +00002268 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002269 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00002270
showard97aed502008-11-04 02:01:24 +00002271 self._increment_running_parses()
2272 self._parse_started = True
2273
2274
2275 def finished(self, success):
2276 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00002277 if self._parse_started:
2278 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002279
2280
showarda3c58572009-03-12 20:36:59 +00002281class DBError(Exception):
2282 """Raised by the DBObject constructor when its select fails."""
2283
2284
mbligh36768f02008-02-22 18:28:33 +00002285class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002286 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002287
2288 # Subclasses MUST override these:
2289 _table_name = ''
2290 _fields = ()
2291
showarda3c58572009-03-12 20:36:59 +00002292 # A mapping from (type, id) to the instance of the object for that
2293 # particular id. This prevents us from creating new Job() and Host()
2294 # instances for every HostQueueEntry object that we instantiate as
2295 # multiple HQEs often share the same Job.
2296 _instances_by_type_and_id = weakref.WeakValueDictionary()
2297 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002298
showarda3c58572009-03-12 20:36:59 +00002299
2300 def __new__(cls, id=None, **kwargs):
2301 """
2302 Look to see if we already have an instance for this particular type
2303 and id. If so, use it instead of creating a duplicate instance.
2304 """
2305 if id is not None:
2306 instance = cls._instances_by_type_and_id.get((cls, id))
2307 if instance:
2308 return instance
2309 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2310
2311
2312 def __init__(self, id=None, row=None, new_record=False, always_query=True):
showard8cc058f2009-09-08 16:26:33 +00002313 assert bool(id) or bool(row)
2314 if id is not None and row is not None:
2315 assert id == row[0]
showard6ae5ea92009-02-25 00:11:51 +00002316 assert self._table_name, '_table_name must be defined in your class'
2317 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002318 if not new_record:
2319 if self._initialized and not always_query:
2320 return # We've already been initialized.
2321 if id is None:
2322 id = row[0]
2323 # Tell future constructors to use us instead of re-querying while
2324 # this instance is still around.
2325 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002326
showard6ae5ea92009-02-25 00:11:51 +00002327 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002328
jadmanski0afbb632008-06-06 21:10:57 +00002329 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002330
jadmanski0afbb632008-06-06 21:10:57 +00002331 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002332 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002333
showarda3c58572009-03-12 20:36:59 +00002334 if self._initialized:
2335 differences = self._compare_fields_in_row(row)
2336 if differences:
showard7629f142009-03-27 21:02:02 +00002337 logging.warn(
2338 'initialized %s %s instance requery is updating: %s',
2339 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002340 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002341 self._initialized = True
2342
2343
2344 @classmethod
2345 def _clear_instance_cache(cls):
2346 """Used for testing, clear the internal instance cache."""
2347 cls._instances_by_type_and_id.clear()
2348
2349
showardccbd6c52009-03-21 00:10:21 +00002350 def _fetch_row_from_db(self, row_id):
2351 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2352 rows = _db.execute(sql, (row_id,))
2353 if not rows:
showard76e29d12009-04-15 21:53:10 +00002354 raise DBError("row not found (table=%s, row id=%s)"
2355 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002356 return rows[0]
2357
2358
showarda3c58572009-03-12 20:36:59 +00002359 def _assert_row_length(self, row):
2360 assert len(row) == len(self._fields), (
2361 "table = %s, row = %s/%d, fields = %s/%d" % (
2362 self.__table, row, len(row), self._fields, len(self._fields)))
2363
2364
2365 def _compare_fields_in_row(self, row):
2366 """
showarddae680a2009-10-12 20:26:43 +00002367 Given a row as returned by a SELECT query, compare it to our existing in
2368 memory fields. Fractional seconds are stripped from datetime values
2369 before comparison.
showarda3c58572009-03-12 20:36:59 +00002370
2371 @param row - A sequence of values corresponding to fields named in
2372 The class attribute _fields.
2373
2374 @returns A dictionary listing the differences keyed by field name
2375 containing tuples of (current_value, row_value).
2376 """
2377 self._assert_row_length(row)
2378 differences = {}
showarddae680a2009-10-12 20:26:43 +00002379 datetime_cmp_fmt = '%Y-%m-%d %H:%M:%S' # Leave off the microseconds.
showarda3c58572009-03-12 20:36:59 +00002380 for field, row_value in itertools.izip(self._fields, row):
2381 current_value = getattr(self, field)
showarddae680a2009-10-12 20:26:43 +00002382 if (isinstance(current_value, datetime.datetime)
2383 and isinstance(row_value, datetime.datetime)):
2384 current_value = current_value.strftime(datetime_cmp_fmt)
2385 row_value = row_value.strftime(datetime_cmp_fmt)
showarda3c58572009-03-12 20:36:59 +00002386 if current_value != row_value:
2387 differences[field] = (current_value, row_value)
2388 return differences
showard2bab8f42008-11-12 18:15:22 +00002389
2390
2391 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002392 """
2393 Update our field attributes using a single row returned by SELECT.
2394
2395 @param row - A sequence of values corresponding to fields named in
2396 the class fields list.
2397 """
2398 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002399
showard2bab8f42008-11-12 18:15:22 +00002400 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002401 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002402 setattr(self, field, value)
2403 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002404
showard2bab8f42008-11-12 18:15:22 +00002405 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002406
mblighe2586682008-02-29 22:45:46 +00002407
showardccbd6c52009-03-21 00:10:21 +00002408 def update_from_database(self):
2409 assert self.id is not None
2410 row = self._fetch_row_from_db(self.id)
2411 self._update_fields_from_row(row)
2412
2413
jadmanski0afbb632008-06-06 21:10:57 +00002414 def count(self, where, table = None):
2415 if not table:
2416 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002417
jadmanski0afbb632008-06-06 21:10:57 +00002418 rows = _db.execute("""
2419 SELECT count(*) FROM %s
2420 WHERE %s
2421 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002422
jadmanski0afbb632008-06-06 21:10:57 +00002423 assert len(rows) == 1
2424
2425 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002426
2427
showardd3dc1992009-04-22 21:01:40 +00002428 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002429 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002430
showard2bab8f42008-11-12 18:15:22 +00002431 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002432 return
mbligh36768f02008-02-22 18:28:33 +00002433
mblighf8c624d2008-07-03 16:58:45 +00002434 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002435 _db.execute(query, (value, self.id))
2436
showard2bab8f42008-11-12 18:15:22 +00002437 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002438
2439
jadmanski0afbb632008-06-06 21:10:57 +00002440 def save(self):
2441 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002442 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002443 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002444 values = []
2445 for key in keys:
2446 value = getattr(self, key)
2447 if value is None:
2448 values.append('NULL')
2449 else:
2450 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002451 values_str = ','.join(values)
2452 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2453 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002454 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002455 # Update our id to the one the database just assigned to us.
2456 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002457
2458
jadmanski0afbb632008-06-06 21:10:57 +00002459 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002460 self._instances_by_type_and_id.pop((type(self), id), None)
2461 self._initialized = False
2462 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002463 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2464 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002465
2466
showard63a34772008-08-18 19:32:50 +00002467 @staticmethod
2468 def _prefix_with(string, prefix):
2469 if string:
2470 string = prefix + string
2471 return string
2472
2473
jadmanski0afbb632008-06-06 21:10:57 +00002474 @classmethod
showard989f25d2008-10-01 11:38:11 +00002475 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002476 """
2477 Construct instances of our class based on the given database query.
2478
2479 @yields One class instance for each row fetched.
2480 """
showard63a34772008-08-18 19:32:50 +00002481 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2482 where = cls._prefix_with(where, 'WHERE ')
2483 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002484 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002485 'joins' : joins,
2486 'where' : where,
2487 'order_by' : order_by})
2488 rows = _db.execute(query, params)
showard8cc058f2009-09-08 16:26:33 +00002489 return [cls(id=row[0], row=row) for row in rows]
mblighe2586682008-02-29 22:45:46 +00002490
mbligh36768f02008-02-22 18:28:33 +00002491
2492class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002493 _table_name = 'ineligible_host_queues'
2494 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002495
2496
showard89f84db2009-03-12 20:39:13 +00002497class AtomicGroup(DBObject):
2498 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002499 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2500 'invalid')
showard89f84db2009-03-12 20:39:13 +00002501
2502
showard989f25d2008-10-01 11:38:11 +00002503class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002504 _table_name = 'labels'
2505 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002506 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002507
2508
showard6157c632009-07-06 20:19:31 +00002509 def __repr__(self):
2510 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
2511 self.name, self.id, self.atomic_group_id)
2512
2513
mbligh36768f02008-02-22 18:28:33 +00002514class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002515 _table_name = 'hosts'
2516 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2517 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2518
2519
jadmanski0afbb632008-06-06 21:10:57 +00002520 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002521 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002522 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002523
2524
showard170873e2009-01-07 00:22:26 +00002525 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002526 """
showard170873e2009-01-07 00:22:26 +00002527 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002528 """
2529 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002530 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002531 FROM labels
2532 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002533 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002534 ORDER BY labels.name
2535 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002536 platform = None
2537 all_labels = []
2538 for label_name, is_platform in rows:
2539 if is_platform:
2540 platform = label_name
2541 all_labels.append(label_name)
2542 return platform, all_labels
2543
2544
showard54c1ea92009-05-20 00:32:58 +00002545 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2546
2547
2548 @classmethod
2549 def cmp_for_sort(cls, a, b):
2550 """
2551 A comparison function for sorting Host objects by hostname.
2552
2553 This strips any trailing numeric digits, ignores leading 0s and
2554 compares hostnames by the leading name and the trailing digits as a
2555 number. If both hostnames do not match this pattern, they are simply
2556 compared as lower case strings.
2557
2558 Example of how hostnames will be sorted:
2559
2560 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2561
2562 This hopefully satisfy most people's hostname sorting needs regardless
2563 of their exact naming schemes. Nobody sane should have both a host10
2564 and host010 (but the algorithm works regardless).
2565 """
2566 lower_a = a.hostname.lower()
2567 lower_b = b.hostname.lower()
2568 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2569 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2570 if match_a and match_b:
2571 name_a, number_a_str = match_a.groups()
2572 name_b, number_b_str = match_b.groups()
2573 number_a = int(number_a_str.lstrip('0'))
2574 number_b = int(number_b_str.lstrip('0'))
2575 result = cmp((name_a, number_a), (name_b, number_b))
2576 if result == 0 and lower_a != lower_b:
2577 # If they compared equal above but the lower case names are
2578 # indeed different, don't report equality. abc012 != abc12.
2579 return cmp(lower_a, lower_b)
2580 return result
2581 else:
2582 return cmp(lower_a, lower_b)
2583
2584
mbligh36768f02008-02-22 18:28:33 +00002585class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002586 _table_name = 'host_queue_entries'
2587 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002588 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002589 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002590
2591
showarda3c58572009-03-12 20:36:59 +00002592 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002593 assert id or row
showarda3c58572009-03-12 20:36:59 +00002594 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002595 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002596
jadmanski0afbb632008-06-06 21:10:57 +00002597 if self.host_id:
2598 self.host = Host(self.host_id)
2599 else:
2600 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002601
showard77182562009-06-10 00:16:05 +00002602 if self.atomic_group_id:
2603 self.atomic_group = AtomicGroup(self.atomic_group_id,
2604 always_query=False)
2605 else:
2606 self.atomic_group = None
2607
showard170873e2009-01-07 00:22:26 +00002608 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002609 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002610
2611
showard89f84db2009-03-12 20:39:13 +00002612 @classmethod
2613 def clone(cls, template):
2614 """
2615 Creates a new row using the values from a template instance.
2616
2617 The new instance will not exist in the database or have a valid
2618 id attribute until its save() method is called.
2619 """
2620 assert isinstance(template, cls)
2621 new_row = [getattr(template, field) for field in cls._fields]
2622 clone = cls(row=new_row, new_record=True)
2623 clone.id = None
2624 return clone
2625
2626
showardc85c21b2008-11-24 22:17:37 +00002627 def _view_job_url(self):
2628 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2629
2630
showardf1ae3542009-05-11 19:26:02 +00002631 def get_labels(self):
2632 """
2633 Get all labels associated with this host queue entry (either via the
2634 meta_host or as a job dependency label). The labels yielded are not
2635 guaranteed to be unique.
2636
2637 @yields Label instances associated with this host_queue_entry.
2638 """
2639 if self.meta_host:
2640 yield Label(id=self.meta_host, always_query=False)
2641 labels = Label.fetch(
2642 joins="JOIN jobs_dependency_labels AS deps "
2643 "ON (labels.id = deps.label_id)",
2644 where="deps.job_id = %d" % self.job.id)
2645 for label in labels:
2646 yield label
2647
2648
jadmanski0afbb632008-06-06 21:10:57 +00002649 def set_host(self, host):
2650 if host:
2651 self.queue_log_record('Assigning host ' + host.hostname)
2652 self.update_field('host_id', host.id)
2653 self.update_field('active', True)
2654 self.block_host(host.id)
2655 else:
2656 self.queue_log_record('Releasing host')
2657 self.unblock_host(self.host.id)
2658 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002659
jadmanski0afbb632008-06-06 21:10:57 +00002660 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002661
2662
jadmanski0afbb632008-06-06 21:10:57 +00002663 def get_host(self):
2664 return self.host
mbligh36768f02008-02-22 18:28:33 +00002665
2666
jadmanski0afbb632008-06-06 21:10:57 +00002667 def queue_log_record(self, log_line):
2668 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002669 _drone_manager.write_lines_to_file(self.queue_log_path,
2670 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002671
2672
jadmanski0afbb632008-06-06 21:10:57 +00002673 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002674 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002675 row = [0, self.job.id, host_id]
2676 block = IneligibleHostQueue(row=row, new_record=True)
2677 block.save()
mblighe2586682008-02-29 22:45:46 +00002678
2679
jadmanski0afbb632008-06-06 21:10:57 +00002680 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002681 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002682 blocks = IneligibleHostQueue.fetch(
2683 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2684 for block in blocks:
2685 block.delete()
mblighe2586682008-02-29 22:45:46 +00002686
2687
showard2bab8f42008-11-12 18:15:22 +00002688 def set_execution_subdir(self, subdir=None):
2689 if subdir is None:
2690 assert self.get_host()
2691 subdir = self.get_host().hostname
2692 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002693
2694
showard6355f6b2008-12-05 18:52:13 +00002695 def _get_hostname(self):
2696 if self.host:
2697 return self.host.hostname
2698 return 'no host'
2699
2700
showard170873e2009-01-07 00:22:26 +00002701 def __str__(self):
showard828fc4c2009-09-14 20:31:00 +00002702 flags = []
2703 if self.active:
2704 flags.append('active')
2705 if self.complete:
2706 flags.append('complete')
2707 if self.deleted:
2708 flags.append('deleted')
2709 if self.aborted:
2710 flags.append('aborted')
2711 flags_str = ','.join(flags)
2712 if flags_str:
2713 flags_str = ' [%s]' % flags_str
2714 return "%s/%d (%d) %s%s" % (self._get_hostname(), self.job.id, self.id,
2715 self.status, flags_str)
showard170873e2009-01-07 00:22:26 +00002716
2717
jadmanski0afbb632008-06-06 21:10:57 +00002718 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002719 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002720
showardb18134f2009-03-20 20:52:18 +00002721 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002722
showard8cc058f2009-09-08 16:26:33 +00002723 if status in (models.HostQueueEntry.Status.QUEUED,
2724 models.HostQueueEntry.Status.PARSING):
jadmanski0afbb632008-06-06 21:10:57 +00002725 self.update_field('complete', False)
2726 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002727
showard8cc058f2009-09-08 16:26:33 +00002728 if status in (models.HostQueueEntry.Status.PENDING,
2729 models.HostQueueEntry.Status.RUNNING,
2730 models.HostQueueEntry.Status.VERIFYING,
2731 models.HostQueueEntry.Status.STARTING,
2732 models.HostQueueEntry.Status.GATHERING):
jadmanski0afbb632008-06-06 21:10:57 +00002733 self.update_field('complete', False)
2734 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002735
showard8cc058f2009-09-08 16:26:33 +00002736 if status in (models.HostQueueEntry.Status.FAILED,
2737 models.HostQueueEntry.Status.COMPLETED,
2738 models.HostQueueEntry.Status.STOPPED,
2739 models.HostQueueEntry.Status.ABORTED):
jadmanski0afbb632008-06-06 21:10:57 +00002740 self.update_field('complete', True)
2741 self.update_field('active', False)
showardf85a0b72009-10-07 20:48:45 +00002742 self._on_complete()
showardc85c21b2008-11-24 22:17:37 +00002743
2744 should_email_status = (status.lower() in _notify_email_statuses or
2745 'all' in _notify_email_statuses)
2746 if should_email_status:
2747 self._email_on_status(status)
2748
2749 self._email_on_job_complete()
2750
2751
showardf85a0b72009-10-07 20:48:45 +00002752 def _on_complete(self):
2753 if not self.execution_subdir:
2754 return
2755 # unregister any possible pidfiles associated with this queue entry
2756 for pidfile_name in _ALL_PIDFILE_NAMES:
2757 pidfile_id = _drone_manager.get_pidfile_id_from(
2758 self.execution_path(), pidfile_name=pidfile_name)
2759 _drone_manager.unregister_pidfile(pidfile_id)
2760
2761
showardc85c21b2008-11-24 22:17:37 +00002762 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002763 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002764
2765 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2766 self.job.id, self.job.name, hostname, status)
2767 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2768 self.job.id, self.job.name, hostname, status,
2769 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002770 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002771
2772
2773 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002774 if not self.job.is_finished():
2775 return
showard542e8402008-09-19 20:16:18 +00002776
showardc85c21b2008-11-24 22:17:37 +00002777 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002778 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002779 for queue_entry in hosts_queue:
2780 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002781 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002782 queue_entry.status))
2783
2784 summary_text = "\n".join(summary_text)
2785 status_counts = models.Job.objects.get_status_counts(
2786 [self.job.id])[self.job.id]
2787 status = ', '.join('%d %s' % (count, status) for status, count
2788 in status_counts.iteritems())
2789
2790 subject = 'Autotest: Job ID: %s "%s" %s' % (
2791 self.job.id, self.job.name, status)
2792 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2793 self.job.id, self.job.name, status, self._view_job_url(),
2794 summary_text)
showard170873e2009-01-07 00:22:26 +00002795 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002796
2797
showard8cc058f2009-09-08 16:26:33 +00002798 def schedule_pre_job_tasks(self, assigned_host=None):
showard77182562009-06-10 00:16:05 +00002799 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002800 assert assigned_host
2801 # ensure results dir exists for the queue log
showard08356c12009-06-15 20:24:01 +00002802 if self.host_id is None:
2803 self.set_host(assigned_host)
2804 else:
2805 assert assigned_host.id == self.host_id
mbligh36768f02008-02-22 18:28:33 +00002806
showardcfd4a7e2009-07-11 01:47:33 +00002807 logging.info("%s/%s/%s scheduled on %s, status=%s",
showardb18134f2009-03-20 20:52:18 +00002808 self.job.name, self.meta_host, self.atomic_group_id,
2809 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002810
showard8cc058f2009-09-08 16:26:33 +00002811 self._do_schedule_pre_job_tasks()
showard77182562009-06-10 00:16:05 +00002812
2813
showard8cc058f2009-09-08 16:26:33 +00002814 def _do_schedule_pre_job_tasks(self):
showard77182562009-06-10 00:16:05 +00002815 # Every host goes thru the Verifying stage (which may or may not
2816 # actually do anything as determined by get_pre_job_tasks).
2817 self.set_status(models.HostQueueEntry.Status.VERIFYING)
showard8cc058f2009-09-08 16:26:33 +00002818 self.job.schedule_pre_job_tasks(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002819
showard6ae5ea92009-02-25 00:11:51 +00002820
jadmanski0afbb632008-06-06 21:10:57 +00002821 def requeue(self):
showardcfd4a7e2009-07-11 01:47:33 +00002822 assert self.host
showard8cc058f2009-09-08 16:26:33 +00002823 self.set_status(models.HostQueueEntry.Status.QUEUED)
showard12f3e322009-05-13 21:27:42 +00002824 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002825 # verify/cleanup failure sets the execution subdir, so reset it here
2826 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002827 if self.meta_host:
2828 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002829
2830
jadmanski0afbb632008-06-06 21:10:57 +00002831 def handle_host_failure(self):
2832 """\
2833 Called when this queue entry's host has failed verification and
2834 repair.
2835 """
2836 assert not self.meta_host
showard8cc058f2009-09-08 16:26:33 +00002837 self.set_status(models.HostQueueEntry.Status.FAILED)
showard2bab8f42008-11-12 18:15:22 +00002838 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002839
2840
jadmanskif7fa2cc2008-10-01 14:13:23 +00002841 @property
2842 def aborted_by(self):
2843 self._load_abort_info()
2844 return self._aborted_by
2845
2846
2847 @property
2848 def aborted_on(self):
2849 self._load_abort_info()
2850 return self._aborted_on
2851
2852
2853 def _load_abort_info(self):
2854 """ Fetch info about who aborted the job. """
2855 if hasattr(self, "_aborted_by"):
2856 return
2857 rows = _db.execute("""
2858 SELECT users.login, aborted_host_queue_entries.aborted_on
2859 FROM aborted_host_queue_entries
2860 INNER JOIN users
2861 ON users.id = aborted_host_queue_entries.aborted_by_id
2862 WHERE aborted_host_queue_entries.queue_entry_id = %s
2863 """, (self.id,))
2864 if rows:
2865 self._aborted_by, self._aborted_on = rows[0]
2866 else:
2867 self._aborted_by = self._aborted_on = None
2868
2869
showardb2e2c322008-10-14 17:33:55 +00002870 def on_pending(self):
2871 """
2872 Called when an entry in a synchronous job has passed verify. If the
showard8cc058f2009-09-08 16:26:33 +00002873 job is ready to run, sets the entries to STARTING. Otherwise, it leaves
2874 them in PENDING.
showardb2e2c322008-10-14 17:33:55 +00002875 """
showard8cc058f2009-09-08 16:26:33 +00002876 self.set_status(models.HostQueueEntry.Status.PENDING)
2877 self.host.set_status(models.Host.Status.PENDING)
showardb000a8d2009-07-28 20:02:07 +00002878
2879 # Some debug code here: sends an email if an asynchronous job does not
2880 # immediately enter Starting.
2881 # TODO: Remove this once we figure out why asynchronous jobs are getting
2882 # stuck in Pending.
showard8cc058f2009-09-08 16:26:33 +00002883 self.job.run_if_ready(queue_entry=self)
2884 if (self.job.synch_count == 1 and
2885 self.status == models.HostQueueEntry.Status.PENDING):
showardb000a8d2009-07-28 20:02:07 +00002886 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
2887 message = 'Asynchronous job stuck in Pending'
2888 email_manager.manager.enqueue_notify_email(subject, message)
showardb2e2c322008-10-14 17:33:55 +00002889
2890
showardd3dc1992009-04-22 21:01:40 +00002891 def abort(self, dispatcher):
2892 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002893
showardd3dc1992009-04-22 21:01:40 +00002894 Status = models.HostQueueEntry.Status
showard8cc058f2009-09-08 16:26:33 +00002895 if self.status in (Status.GATHERING, Status.PARSING):
showardd3dc1992009-04-22 21:01:40 +00002896 # do nothing; post-job tasks will finish and then mark this entry
2897 # with status "Aborted" and take care of the host
2898 return
2899
showard8cc058f2009-09-08 16:26:33 +00002900 if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING):
2901 assert not dispatcher.get_agents_for_entry(self)
showardd3dc1992009-04-22 21:01:40 +00002902 self.host.set_status(models.Host.Status.READY)
2903 elif self.status == Status.VERIFYING:
showard8cc058f2009-09-08 16:26:33 +00002904 models.SpecialTask.objects.create(
2905 task=models.SpecialTask.Task.CLEANUP,
2906 host=models.Host(id=self.host.id))
showardd3dc1992009-04-22 21:01:40 +00002907
2908 self.set_status(Status.ABORTED)
showardd2014822009-10-12 20:26:58 +00002909 self.job.abort_delay_ready_task()
showard170873e2009-01-07 00:22:26 +00002910
showard8cc058f2009-09-08 16:26:33 +00002911
2912 def get_group_name(self):
2913 atomic_group = self.atomic_group
2914 if not atomic_group:
2915 return ''
2916
2917 # Look at any meta_host and dependency labels and pick the first
2918 # one that also specifies this atomic group. Use that label name
2919 # as the group name if possible (it is more specific).
2920 for label in self.get_labels():
2921 if label.atomic_group_id:
2922 assert label.atomic_group_id == atomic_group.id
2923 return label.name
2924 return atomic_group.name
2925
2926
showard170873e2009-01-07 00:22:26 +00002927 def execution_tag(self):
2928 assert self.execution_subdir
mblighe7d9c602009-07-02 19:02:33 +00002929 return "%s/%s" % (self.job.tag(), self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002930
2931
showarded2afea2009-07-07 20:54:07 +00002932 def execution_path(self):
2933 return self.execution_tag()
2934
2935
mbligh36768f02008-02-22 18:28:33 +00002936class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002937 _table_name = 'jobs'
2938 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2939 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002940 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002941 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002942
showard77182562009-06-10 00:16:05 +00002943 # This does not need to be a column in the DB. The delays are likely to
2944 # be configured short. If the scheduler is stopped and restarted in
2945 # the middle of a job's delay cycle, the delay cycle will either be
2946 # repeated or skipped depending on the number of Pending machines found
2947 # when the restarted scheduler recovers to track it. Not a problem.
2948 #
2949 # A reference to the DelayedCallTask that will wake up the job should
2950 # no other HQEs change state in time. Its end_time attribute is used
2951 # by our run_with_ready_delay() method to determine if the wait is over.
2952 _delay_ready_task = None
2953
2954 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
2955 # all status='Pending' atomic group HQEs incase a delay was running when the
2956 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00002957
showarda3c58572009-03-12 20:36:59 +00002958 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002959 assert id or row
showarda3c58572009-03-12 20:36:59 +00002960 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002961
mblighe2586682008-02-29 22:45:46 +00002962
jadmanski0afbb632008-06-06 21:10:57 +00002963 def is_server_job(self):
2964 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002965
2966
showard170873e2009-01-07 00:22:26 +00002967 def tag(self):
2968 return "%s-%s" % (self.id, self.owner)
2969
2970
jadmanski0afbb632008-06-06 21:10:57 +00002971 def get_host_queue_entries(self):
2972 rows = _db.execute("""
2973 SELECT * FROM host_queue_entries
2974 WHERE job_id= %s
2975 """, (self.id,))
2976 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002977
jadmanski0afbb632008-06-06 21:10:57 +00002978 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002979
jadmanski0afbb632008-06-06 21:10:57 +00002980 return entries
mbligh36768f02008-02-22 18:28:33 +00002981
2982
jadmanski0afbb632008-06-06 21:10:57 +00002983 def set_status(self, status, update_queues=False):
2984 self.update_field('status',status)
2985
2986 if update_queues:
2987 for queue_entry in self.get_host_queue_entries():
2988 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002989
2990
showard77182562009-06-10 00:16:05 +00002991 def _atomic_and_has_started(self):
2992 """
2993 @returns True if any of the HostQueueEntries associated with this job
2994 have entered the Status.STARTING state or beyond.
2995 """
2996 atomic_entries = models.HostQueueEntry.objects.filter(
2997 job=self.id, atomic_group__isnull=False)
2998 if atomic_entries.count() <= 0:
2999 return False
3000
showardaf8b4ca2009-06-16 18:47:26 +00003001 # These states may *only* be reached if Job.run() has been called.
3002 started_statuses = (models.HostQueueEntry.Status.STARTING,
3003 models.HostQueueEntry.Status.RUNNING,
3004 models.HostQueueEntry.Status.COMPLETED)
3005
3006 started_entries = atomic_entries.filter(status__in=started_statuses)
showard77182562009-06-10 00:16:05 +00003007 return started_entries.count() > 0
3008
3009
showard708b3522009-08-20 23:26:15 +00003010 def _hosts_assigned_count(self):
3011 """The number of HostQueueEntries assigned a Host for this job."""
3012 entries = models.HostQueueEntry.objects.filter(job=self.id,
3013 host__isnull=False)
3014 return entries.count()
3015
3016
showard77182562009-06-10 00:16:05 +00003017 def _pending_count(self):
3018 """The number of HostQueueEntries for this job in the Pending state."""
3019 pending_entries = models.HostQueueEntry.objects.filter(
3020 job=self.id, status=models.HostQueueEntry.Status.PENDING)
3021 return pending_entries.count()
3022
3023
showardd2014822009-10-12 20:26:58 +00003024 def _pending_threshold(self, atomic_group):
3025 """
3026 @param atomic_group: The AtomicGroup associated with this job that we
3027 are using to bound the threshold.
3028 @returns The minimum number of HostQueueEntries assigned a Host before
3029 this job can run.
3030 """
3031 return min(self._hosts_assigned_count(),
3032 atomic_group.max_number_of_machines)
3033
3034
jadmanski0afbb632008-06-06 21:10:57 +00003035 def is_ready(self):
showard77182562009-06-10 00:16:05 +00003036 # NOTE: Atomic group jobs stop reporting ready after they have been
3037 # started to avoid launching multiple copies of one atomic job.
3038 # Only possible if synch_count is less than than half the number of
3039 # machines in the atomic group.
showardb000a8d2009-07-28 20:02:07 +00003040 pending_count = self._pending_count()
3041 atomic_and_has_started = self._atomic_and_has_started()
3042 ready = (pending_count >= self.synch_count
showardd2014822009-10-12 20:26:58 +00003043 and not atomic_and_has_started)
showardb000a8d2009-07-28 20:02:07 +00003044
3045 if not ready:
3046 logging.info(
3047 'Job %s not ready: %s pending, %s required '
3048 '(Atomic and started: %s)',
3049 self, pending_count, self.synch_count,
3050 atomic_and_has_started)
3051
3052 return ready
mbligh36768f02008-02-22 18:28:33 +00003053
3054
jadmanski0afbb632008-06-06 21:10:57 +00003055 def num_machines(self, clause = None):
3056 sql = "job_id=%s" % self.id
3057 if clause:
3058 sql += " AND (%s)" % clause
3059 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00003060
3061
jadmanski0afbb632008-06-06 21:10:57 +00003062 def num_queued(self):
3063 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00003064
3065
jadmanski0afbb632008-06-06 21:10:57 +00003066 def num_active(self):
3067 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00003068
3069
jadmanski0afbb632008-06-06 21:10:57 +00003070 def num_complete(self):
3071 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00003072
3073
jadmanski0afbb632008-06-06 21:10:57 +00003074 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00003075 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00003076
mbligh36768f02008-02-22 18:28:33 +00003077
showard6bb7c292009-01-30 01:44:51 +00003078 def _not_yet_run_entries(self, include_verifying=True):
3079 statuses = [models.HostQueueEntry.Status.QUEUED,
3080 models.HostQueueEntry.Status.PENDING]
3081 if include_verifying:
3082 statuses.append(models.HostQueueEntry.Status.VERIFYING)
3083 return models.HostQueueEntry.objects.filter(job=self.id,
3084 status__in=statuses)
3085
3086
3087 def _stop_all_entries(self):
3088 entries_to_stop = self._not_yet_run_entries(
3089 include_verifying=False)
3090 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00003091 assert not child_entry.complete, (
3092 '%s status=%s, active=%s, complete=%s' %
3093 (child_entry.id, child_entry.status, child_entry.active,
3094 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00003095 if child_entry.status == models.HostQueueEntry.Status.PENDING:
3096 child_entry.host.status = models.Host.Status.READY
3097 child_entry.host.save()
3098 child_entry.status = models.HostQueueEntry.Status.STOPPED
3099 child_entry.save()
3100
showard2bab8f42008-11-12 18:15:22 +00003101 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00003102 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00003103 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00003104 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00003105
3106
jadmanski0afbb632008-06-06 21:10:57 +00003107 def write_to_machines_file(self, queue_entry):
3108 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00003109 file_path = os.path.join(self.tag(), '.machines')
3110 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00003111
3112
showardf1ae3542009-05-11 19:26:02 +00003113 def _next_group_name(self, group_name=''):
3114 """@returns a directory name to use for the next host group results."""
3115 if group_name:
3116 # Sanitize for use as a pathname.
3117 group_name = group_name.replace(os.path.sep, '_')
3118 if group_name.startswith('.'):
3119 group_name = '_' + group_name[1:]
3120 # Add a separator between the group name and 'group%d'.
3121 group_name += '.'
3122 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00003123 query = models.HostQueueEntry.objects.filter(
3124 job=self.id).values('execution_subdir').distinct()
3125 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00003126 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
3127 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00003128 if ids:
3129 next_id = max(ids) + 1
3130 else:
3131 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00003132 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00003133
3134
showarddb502762009-09-09 15:31:20 +00003135 def _write_control_file(self, execution_path):
showard170873e2009-01-07 00:22:26 +00003136 control_path = _drone_manager.attach_file_to_execution(
showarddb502762009-09-09 15:31:20 +00003137 execution_path, self.control_file)
showard170873e2009-01-07 00:22:26 +00003138 return control_path
mbligh36768f02008-02-22 18:28:33 +00003139
showardb2e2c322008-10-14 17:33:55 +00003140
showard2bab8f42008-11-12 18:15:22 +00003141 def get_group_entries(self, queue_entry_from_group):
3142 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00003143 return list(HostQueueEntry.fetch(
3144 where='job_id=%s AND execution_subdir=%s',
3145 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00003146
3147
showard8cc058f2009-09-08 16:26:33 +00003148 def get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00003149 assert queue_entries
showarddb502762009-09-09 15:31:20 +00003150 execution_path = queue_entries[0].execution_path()
3151 control_path = self._write_control_file(execution_path)
jadmanski0afbb632008-06-06 21:10:57 +00003152 hostnames = ','.join([entry.get_host().hostname
3153 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00003154
showarddb502762009-09-09 15:31:20 +00003155 execution_tag = queue_entries[0].execution_tag()
showard87ba02a2009-04-20 19:37:32 +00003156 params = _autoserv_command_line(
showarded2afea2009-07-07 20:54:07 +00003157 hostnames,
showard87ba02a2009-04-20 19:37:32 +00003158 ['-P', execution_tag, '-n',
3159 _drone_manager.absolute_path(control_path)],
showarde9c69362009-06-30 01:58:03 +00003160 job=self, verbose=False)
mbligh36768f02008-02-22 18:28:33 +00003161
jadmanski0afbb632008-06-06 21:10:57 +00003162 if not self.is_server_job():
3163 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00003164
showardb2e2c322008-10-14 17:33:55 +00003165 return params
mblighe2586682008-02-29 22:45:46 +00003166
mbligh36768f02008-02-22 18:28:33 +00003167
showardc9ae1782009-01-30 01:42:37 +00003168 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00003169 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00003170 return True
showard0fc38302008-10-23 00:44:07 +00003171 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00003172 return queue_entry.get_host().dirty
3173 return False
showard21baa452008-10-21 00:08:39 +00003174
showardc9ae1782009-01-30 01:42:37 +00003175
showard8cc058f2009-09-08 16:26:33 +00003176 def _should_run_verify(self, queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003177 do_not_verify = (queue_entry.host.protection ==
3178 host_protections.Protection.DO_NOT_VERIFY)
3179 if do_not_verify:
3180 return False
3181 return self.run_verify
3182
3183
showard8cc058f2009-09-08 16:26:33 +00003184 def schedule_pre_job_tasks(self, queue_entry):
showard77182562009-06-10 00:16:05 +00003185 """
3186 Get a list of tasks to perform before the host_queue_entry
3187 may be used to run this Job (such as Cleanup & Verify).
3188
3189 @returns A list of tasks to be done to the given queue_entry before
mbligh6fbdb802009-08-03 16:42:55 +00003190 it should be considered be ready to run this job. The last
showard77182562009-06-10 00:16:05 +00003191 task in the list calls HostQueueEntry.on_pending(), which
3192 continues the flow of the job.
3193 """
showardc9ae1782009-01-30 01:42:37 +00003194 if self._should_run_cleanup(queue_entry):
showard8cc058f2009-09-08 16:26:33 +00003195 task = models.SpecialTask.Task.CLEANUP
3196 elif self._should_run_verify(queue_entry):
3197 task = models.SpecialTask.Task.VERIFY
3198 else:
3199 queue_entry.on_pending()
3200 return
3201
3202 models.SpecialTask.objects.create(
3203 host=models.Host(id=queue_entry.host_id),
3204 queue_entry=models.HostQueueEntry(id=queue_entry.id),
3205 task=task)
showard21baa452008-10-21 00:08:39 +00003206
3207
showardf1ae3542009-05-11 19:26:02 +00003208 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00003209 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00003210 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00003211 else:
showardf1ae3542009-05-11 19:26:02 +00003212 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00003213 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00003214 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00003215 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003216
3217 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00003218 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003219
3220
3221 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00003222 """
3223 @returns A tuple containing a list of HostQueueEntry instances to be
3224 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00003225 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00003226 """
showard77182562009-06-10 00:16:05 +00003227 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00003228 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00003229 if atomic_group:
3230 num_entries_wanted = atomic_group.max_number_of_machines
3231 else:
3232 num_entries_wanted = self.synch_count
3233 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00003234
showardf1ae3542009-05-11 19:26:02 +00003235 if num_entries_wanted > 0:
3236 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00003237 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00003238 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00003239 params=(self.id, include_queue_entry.id)))
3240
3241 # Sort the chosen hosts by hostname before slicing.
3242 def cmp_queue_entries_by_hostname(entry_a, entry_b):
3243 return Host.cmp_for_sort(entry_a.host, entry_b.host)
3244 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
3245 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00003246
showardf1ae3542009-05-11 19:26:02 +00003247 # Sanity check. We'll only ever be called if this can be met.
showard828fc4c2009-09-14 20:31:00 +00003248 if len(chosen_entries) < self.synch_count:
3249 message = ('job %s got less than %s chosen entries: %s' % (
3250 self.id, self.synch_count, chosen_entries))
3251 logging.error(message)
3252 email_manager.manager.enqueue_notify_email(
3253 'Job not started, too few chosen entries', message)
3254 return []
showardf1ae3542009-05-11 19:26:02 +00003255
showard8cc058f2009-09-08 16:26:33 +00003256 group_name = include_queue_entry.get_group_name()
showardf1ae3542009-05-11 19:26:02 +00003257
3258 self._assign_new_group(chosen_entries, group_name=group_name)
showard8cc058f2009-09-08 16:26:33 +00003259 return chosen_entries
showard2bab8f42008-11-12 18:15:22 +00003260
3261
showard77182562009-06-10 00:16:05 +00003262 def run_if_ready(self, queue_entry):
3263 """
3264 @returns An Agent instance to ultimately run this job if enough hosts
3265 are ready for it to run.
3266 @returns None and potentially cleans up excess hosts if this Job
3267 is not ready to run.
3268 """
showardb2e2c322008-10-14 17:33:55 +00003269 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00003270 self.stop_if_necessary()
showard8cc058f2009-09-08 16:26:33 +00003271 elif queue_entry.atomic_group:
3272 self.run_with_ready_delay(queue_entry)
3273 else:
3274 self.run(queue_entry)
showard77182562009-06-10 00:16:05 +00003275
3276
3277 def run_with_ready_delay(self, queue_entry):
3278 """
3279 Start a delay to wait for more hosts to enter Pending state before
3280 launching an atomic group job. Once set, the a delay cannot be reset.
3281
3282 @param queue_entry: The HostQueueEntry object to get atomic group
3283 info from and pass to run_if_ready when the delay is up.
3284
3285 @returns An Agent to run the job as appropriate or None if a delay
3286 has already been set.
3287 """
3288 assert queue_entry.job_id == self.id
3289 assert queue_entry.atomic_group
3290 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
showardd2014822009-10-12 20:26:58 +00003291 over_max_threshold = (self._pending_count() >=
3292 self._pending_threshold(queue_entry.atomic_group))
showard77182562009-06-10 00:16:05 +00003293 delay_expired = (self._delay_ready_task and
3294 time.time() >= self._delay_ready_task.end_time)
3295
3296 # Delay is disabled or we already have enough? Do not wait to run.
3297 if not delay or over_max_threshold or delay_expired:
showard8cc058f2009-09-08 16:26:33 +00003298 self.run(queue_entry)
3299 else:
3300 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
showard77182562009-06-10 00:16:05 +00003301
showard8cc058f2009-09-08 16:26:33 +00003302
3303 def schedule_delayed_callback_task(self, queue_entry):
3304 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
3305
showard77182562009-06-10 00:16:05 +00003306 if self._delay_ready_task:
3307 return None
3308
showard8cc058f2009-09-08 16:26:33 +00003309 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
3310
showard77182562009-06-10 00:16:05 +00003311 def run_job_after_delay():
showardd2014822009-10-12 20:26:58 +00003312 logging.info('Job %s done waiting for extra hosts.', self)
3313 # Check to see if the job is still relevant. It could have aborted
3314 # while we were waiting or hosts could have disappearred, etc.
3315 threshold = self._pending_threshold(queue_entry.atomic_group)
3316 if self._pending_count() < threshold:
3317 logging.info('Job %s had too few Pending hosts after waiting '
3318 'for extras. Not running.', self)
3319 return
showard77182562009-06-10 00:16:05 +00003320 return self.run(queue_entry)
3321
showard708b3522009-08-20 23:26:15 +00003322 logging.info('Job %s waiting up to %s seconds for more hosts.',
3323 self.id, delay)
showard77182562009-06-10 00:16:05 +00003324 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3325 callback=run_job_after_delay)
showard8cc058f2009-09-08 16:26:33 +00003326 return self._delay_ready_task
showard77182562009-06-10 00:16:05 +00003327
3328
3329 def run(self, queue_entry):
3330 """
3331 @param queue_entry: The HostQueueEntry instance calling this method.
showard77182562009-06-10 00:16:05 +00003332 """
3333 if queue_entry.atomic_group and self._atomic_and_has_started():
3334 logging.error('Job.run() called on running atomic Job %d '
3335 'with HQE %s.', self.id, queue_entry)
showard8cc058f2009-09-08 16:26:33 +00003336 return
3337 queue_entries = self._choose_group_to_run(queue_entry)
showard828fc4c2009-09-14 20:31:00 +00003338 if queue_entries:
3339 self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00003340
3341
showard8cc058f2009-09-08 16:26:33 +00003342 def _finish_run(self, queue_entries):
showardb2ccdda2008-10-28 20:39:05 +00003343 for queue_entry in queue_entries:
showard8cc058f2009-09-08 16:26:33 +00003344 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showardd2014822009-10-12 20:26:58 +00003345 self.abort_delay_ready_task()
3346
3347
3348 def abort_delay_ready_task(self):
3349 """Abort the delayed task associated with this job, if any."""
showard77182562009-06-10 00:16:05 +00003350 if self._delay_ready_task:
3351 # Cancel any pending callback that would try to run again
3352 # as we are already running.
3353 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003354
showardd2014822009-10-12 20:26:58 +00003355
showardb000a8d2009-07-28 20:02:07 +00003356 def __str__(self):
3357 return '%s-%s' % (self.id, self.owner)
3358
3359
mbligh36768f02008-02-22 18:28:33 +00003360if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003361 main()