blob: 6cd7015169242221fbbccb0f7343e56c7dd1fb8e [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showard136e6dc2009-06-10 19:38:49 +000010import itertools, logging, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard043c62a2009-06-10 19:48:57 +000013from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard136e6dc2009-06-10 19:38:49 +000015from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000016from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000017from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000018from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000019from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000020from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000021from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000022
showard549afad2009-08-20 23:33:36 +000023BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
24PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000025
mbligh36768f02008-02-22 18:28:33 +000026RESULTS_DIR = '.'
27AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000028DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000029AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
30
31if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000032 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000033AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
34AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
35
36if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000037 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000038
showardd3dc1992009-04-22 21:01:40 +000039_AUTOSERV_PID_FILE = '.autoserv_execute'
40_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
41_PARSER_PID_FILE = '.parser_execute'
42
43_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
44 _PARSER_PID_FILE)
45
showard35162b02009-03-03 02:17:30 +000046# error message to leave in results dir when an autoserv process disappears
47# mysteriously
48_LOST_PROCESS_ERROR = """\
49Autoserv failed abnormally during execution for this job, probably due to a
50system error on the Autotest server. Full results may not be available. Sorry.
51"""
52
mbligh6f8bab42008-02-29 22:45:14 +000053_db = None
mbligh36768f02008-02-22 18:28:33 +000054_shutdown = False
showard170873e2009-01-07 00:22:26 +000055_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
56_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000057_testing_mode = False
showard542e8402008-09-19 20:16:18 +000058_base_url = None
showardc85c21b2008-11-24 22:17:37 +000059_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000060_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000061
62
showardec6a3b92009-09-25 20:29:13 +000063def _get_pidfile_timeout_secs():
64 """@returns How long to wait for autoserv to write pidfile."""
65 pidfile_timeout_mins = global_config.global_config.get_config_value(
66 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
67 return pidfile_timeout_mins * 60
68
69
mbligh83c1e9e2009-05-01 23:10:41 +000070def _site_init_monitor_db_dummy():
71 return {}
72
73
mbligh36768f02008-02-22 18:28:33 +000074def main():
showard27f33872009-04-07 18:20:53 +000075 try:
showard549afad2009-08-20 23:33:36 +000076 try:
77 main_without_exception_handling()
78 except SystemExit:
79 raise
80 except:
81 logging.exception('Exception escaping in monitor_db')
82 raise
83 finally:
84 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000085
86
87def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000088 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000089
showard136e6dc2009-06-10 19:38:49 +000090 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000091 parser = optparse.OptionParser(usage)
92 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
93 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000094 parser.add_option('--test', help='Indicate that scheduler is under ' +
95 'test and should use dummy autoserv and no parsing',
96 action='store_true')
97 (options, args) = parser.parse_args()
98 if len(args) != 1:
99 parser.print_usage()
100 return
mbligh36768f02008-02-22 18:28:33 +0000101
showard5613c662009-06-08 23:30:33 +0000102 scheduler_enabled = global_config.global_config.get_config_value(
103 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
104
105 if not scheduler_enabled:
106 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
107 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000108 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000109 sys.exit(1)
110
jadmanski0afbb632008-06-06 21:10:57 +0000111 global RESULTS_DIR
112 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000113
mbligh83c1e9e2009-05-01 23:10:41 +0000114 site_init = utils.import_site_function(__file__,
115 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
116 _site_init_monitor_db_dummy)
117 site_init()
118
showardcca334f2009-03-12 20:38:34 +0000119 # Change the cwd while running to avoid issues incase we were launched from
120 # somewhere odd (such as a random NFS home directory of the person running
121 # sudo to launch us as the appropriate user).
122 os.chdir(RESULTS_DIR)
123
jadmanski0afbb632008-06-06 21:10:57 +0000124 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000125 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
126 "notify_email_statuses",
127 default='')
showardc85c21b2008-11-24 22:17:37 +0000128 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000129 _notify_email_statuses = [status for status in
130 re.split(r'[\s,;:]', notify_statuses_list.lower())
131 if status]
showardc85c21b2008-11-24 22:17:37 +0000132
jadmanski0afbb632008-06-06 21:10:57 +0000133 if options.test:
134 global _autoserv_path
135 _autoserv_path = 'autoserv_dummy'
136 global _testing_mode
137 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000138
mbligh37eceaa2008-12-15 22:56:37 +0000139 # AUTOTEST_WEB.base_url is still a supported config option as some people
140 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000141 global _base_url
showard170873e2009-01-07 00:22:26 +0000142 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
143 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000144 if config_base_url:
145 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000146 else:
mbligh37eceaa2008-12-15 22:56:37 +0000147 # For the common case of everything running on a single server you
148 # can just set the hostname in a single place in the config file.
149 server_name = c.get_config_value('SERVER', 'hostname')
150 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000151 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000152 sys.exit(1)
153 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000154
showardc5afc462009-01-13 00:09:39 +0000155 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000156 server.start()
157
jadmanski0afbb632008-06-06 21:10:57 +0000158 try:
showard136e6dc2009-06-10 19:38:49 +0000159 init()
showardc5afc462009-01-13 00:09:39 +0000160 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000161 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000162
jadmanski0afbb632008-06-06 21:10:57 +0000163 while not _shutdown:
164 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000165 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000166 except:
showard170873e2009-01-07 00:22:26 +0000167 email_manager.manager.log_stacktrace(
168 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000169
showard170873e2009-01-07 00:22:26 +0000170 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000171 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000172 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000173 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000174
175
showard136e6dc2009-06-10 19:38:49 +0000176def setup_logging():
177 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
178 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
179 logging_manager.configure_logging(
180 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
181 logfile_name=log_name)
182
183
mbligh36768f02008-02-22 18:28:33 +0000184def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000185 global _shutdown
186 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000187 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000188
189
showard136e6dc2009-06-10 19:38:49 +0000190def init():
showardb18134f2009-03-20 20:52:18 +0000191 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
192 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000193
showard8de37132009-08-31 18:33:08 +0000194 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000195 logging.critical("monitor_db already running, aborting!")
196 sys.exit(1)
197 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000198
showardb1e51872008-10-07 11:08:18 +0000199 if _testing_mode:
200 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000201 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000202
jadmanski0afbb632008-06-06 21:10:57 +0000203 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
204 global _db
showard170873e2009-01-07 00:22:26 +0000205 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000206 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000207
showardfa8629c2008-11-04 16:51:23 +0000208 # ensure Django connection is in autocommit
209 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000210 # bypass the readonly connection
211 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000212
showardb18134f2009-03-20 20:52:18 +0000213 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000214 signal.signal(signal.SIGINT, handle_sigint)
215
showardd1ee1dd2009-01-07 21:33:08 +0000216 drones = global_config.global_config.get_config_value(
217 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
218 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000219 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000220 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000221 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
222
showardb18134f2009-03-20 20:52:18 +0000223 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000224
225
showarded2afea2009-07-07 20:54:07 +0000226def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
227 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000228 """
229 @returns The autoserv command line as a list of executable + parameters.
230
231 @param machines - string - A machine or comma separated list of machines
232 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000233 @param extra_args - list - Additional arguments to pass to autoserv.
234 @param job - Job object - If supplied, -u owner and -l name parameters
235 will be added.
236 @param queue_entry - A HostQueueEntry object - If supplied and no Job
237 object was supplied, this will be used to lookup the Job object.
238 """
showard87ba02a2009-04-20 19:37:32 +0000239 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
showarded2afea2009-07-07 20:54:07 +0000240 '-r', drone_manager.WORKING_DIRECTORY]
showard87ba02a2009-04-20 19:37:32 +0000241 if job or queue_entry:
242 if not job:
243 job = queue_entry.job
244 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000245 if verbose:
246 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000247 return autoserv_argv + extra_args
248
249
showard89f84db2009-03-12 20:39:13 +0000250class SchedulerError(Exception):
251 """Raised by HostScheduler when an inconsistent state occurs."""
252
253
showard63a34772008-08-18 19:32:50 +0000254class HostScheduler(object):
255 def _get_ready_hosts(self):
256 # avoid any host with a currently active queue entry against it
257 hosts = Host.fetch(
258 joins='LEFT JOIN host_queue_entries AS active_hqe '
259 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000260 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000261 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000262 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000263 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
264 return dict((host.id, host) for host in hosts)
265
266
267 @staticmethod
268 def _get_sql_id_list(id_list):
269 return ','.join(str(item_id) for item_id in id_list)
270
271
272 @classmethod
showard989f25d2008-10-01 11:38:11 +0000273 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000274 if not id_list:
275 return {}
showard63a34772008-08-18 19:32:50 +0000276 query %= cls._get_sql_id_list(id_list)
277 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000278 return cls._process_many2many_dict(rows, flip)
279
280
281 @staticmethod
282 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000283 result = {}
284 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000285 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000286 if flip:
287 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000288 result.setdefault(left_id, set()).add(right_id)
289 return result
290
291
292 @classmethod
293 def _get_job_acl_groups(cls, job_ids):
294 query = """
showardd9ac4452009-02-07 02:04:37 +0000295 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000296 FROM jobs
297 INNER JOIN users ON users.login = jobs.owner
298 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
299 WHERE jobs.id IN (%s)
300 """
301 return cls._get_many2many_dict(query, job_ids)
302
303
304 @classmethod
305 def _get_job_ineligible_hosts(cls, job_ids):
306 query = """
307 SELECT job_id, host_id
308 FROM ineligible_host_queues
309 WHERE job_id IN (%s)
310 """
311 return cls._get_many2many_dict(query, job_ids)
312
313
314 @classmethod
showard989f25d2008-10-01 11:38:11 +0000315 def _get_job_dependencies(cls, job_ids):
316 query = """
317 SELECT job_id, label_id
318 FROM jobs_dependency_labels
319 WHERE job_id IN (%s)
320 """
321 return cls._get_many2many_dict(query, job_ids)
322
323
324 @classmethod
showard63a34772008-08-18 19:32:50 +0000325 def _get_host_acls(cls, host_ids):
326 query = """
showardd9ac4452009-02-07 02:04:37 +0000327 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000328 FROM acl_groups_hosts
329 WHERE host_id IN (%s)
330 """
331 return cls._get_many2many_dict(query, host_ids)
332
333
334 @classmethod
335 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000336 if not host_ids:
337 return {}, {}
showard63a34772008-08-18 19:32:50 +0000338 query = """
339 SELECT label_id, host_id
340 FROM hosts_labels
341 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000342 """ % cls._get_sql_id_list(host_ids)
343 rows = _db.execute(query)
344 labels_to_hosts = cls._process_many2many_dict(rows)
345 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
346 return labels_to_hosts, hosts_to_labels
347
348
349 @classmethod
350 def _get_labels(cls):
351 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000352
353
354 def refresh(self, pending_queue_entries):
355 self._hosts_available = self._get_ready_hosts()
356
357 relevant_jobs = [queue_entry.job_id
358 for queue_entry in pending_queue_entries]
359 self._job_acls = self._get_job_acl_groups(relevant_jobs)
360 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000361 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000362
363 host_ids = self._hosts_available.keys()
364 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000365 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
366
367 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000368
369
370 def _is_acl_accessible(self, host_id, queue_entry):
371 job_acls = self._job_acls.get(queue_entry.job_id, set())
372 host_acls = self._host_acls.get(host_id, set())
373 return len(host_acls.intersection(job_acls)) > 0
374
375
showard989f25d2008-10-01 11:38:11 +0000376 def _check_job_dependencies(self, job_dependencies, host_labels):
377 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000378 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000379
380
381 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
382 queue_entry):
showardade14e22009-01-26 22:38:32 +0000383 if not queue_entry.meta_host:
384 # bypass only_if_needed labels when a specific host is selected
385 return True
386
showard989f25d2008-10-01 11:38:11 +0000387 for label_id in host_labels:
388 label = self._labels[label_id]
389 if not label.only_if_needed:
390 # we don't care about non-only_if_needed labels
391 continue
392 if queue_entry.meta_host == label_id:
393 # if the label was requested in a metahost it's OK
394 continue
395 if label_id not in job_dependencies:
396 return False
397 return True
398
399
showard89f84db2009-03-12 20:39:13 +0000400 def _check_atomic_group_labels(self, host_labels, queue_entry):
401 """
402 Determine if the given HostQueueEntry's atomic group settings are okay
403 to schedule on a host with the given labels.
404
showard6157c632009-07-06 20:19:31 +0000405 @param host_labels: A list of label ids that the host has.
406 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000407
408 @returns True if atomic group settings are okay, False otherwise.
409 """
showard6157c632009-07-06 20:19:31 +0000410 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000411 queue_entry.atomic_group_id)
412
413
showard6157c632009-07-06 20:19:31 +0000414 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000415 """
416 Return the atomic group label id for a host with the given set of
417 labels if any, or None otherwise. Raises an exception if more than
418 one atomic group are found in the set of labels.
419
showard6157c632009-07-06 20:19:31 +0000420 @param host_labels: A list of label ids that the host has.
421 @param queue_entry: The HostQueueEntry we're testing. Only used for
422 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000423
424 @returns The id of the atomic group found on a label in host_labels
425 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000426 """
showard6157c632009-07-06 20:19:31 +0000427 atomic_labels = [self._labels[label_id] for label_id in host_labels
428 if self._labels[label_id].atomic_group_id is not None]
429 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000430 if not atomic_ids:
431 return None
432 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000433 logging.error('More than one Atomic Group on HQE "%s" via: %r',
434 queue_entry, atomic_labels)
435 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000436
437
438 def _get_atomic_group_labels(self, atomic_group_id):
439 """
440 Lookup the label ids that an atomic_group is associated with.
441
442 @param atomic_group_id - The id of the AtomicGroup to look up.
443
444 @returns A generator yeilding Label ids for this atomic group.
445 """
446 return (id for id, label in self._labels.iteritems()
447 if label.atomic_group_id == atomic_group_id
448 and not label.invalid)
449
450
showard54c1ea92009-05-20 00:32:58 +0000451 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000452 """
453 @param group_hosts - A sequence of Host ids to test for usability
454 and eligibility against the Job associated with queue_entry.
455 @param queue_entry - The HostQueueEntry that these hosts are being
456 tested for eligibility against.
457
458 @returns A subset of group_hosts Host ids that are eligible for the
459 supplied queue_entry.
460 """
461 return set(host_id for host_id in group_hosts
462 if self._is_host_usable(host_id)
463 and self._is_host_eligible_for_job(host_id, queue_entry))
464
465
showard989f25d2008-10-01 11:38:11 +0000466 def _is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000467 if self._is_host_invalid(host_id):
468 # if an invalid host is scheduled for a job, it's a one-time host
469 # and it therefore bypasses eligibility checks. note this can only
470 # happen for non-metahosts, because invalid hosts have their label
471 # relationships cleared.
472 return True
473
showard989f25d2008-10-01 11:38:11 +0000474 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
475 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000476
showard89f84db2009-03-12 20:39:13 +0000477 return (self._is_acl_accessible(host_id, queue_entry) and
478 self._check_job_dependencies(job_dependencies, host_labels) and
479 self._check_only_if_needed_labels(
480 job_dependencies, host_labels, queue_entry) and
481 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000482
483
showard2924b0a2009-06-18 23:16:15 +0000484 def _is_host_invalid(self, host_id):
485 host_object = self._hosts_available.get(host_id, None)
486 return host_object and host_object.invalid
487
488
showard63a34772008-08-18 19:32:50 +0000489 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000490 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000491 return None
492 return self._hosts_available.pop(queue_entry.host_id, None)
493
494
495 def _is_host_usable(self, host_id):
496 if host_id not in self._hosts_available:
497 # host was already used during this scheduling cycle
498 return False
499 if self._hosts_available[host_id].invalid:
500 # Invalid hosts cannot be used for metahosts. They're included in
501 # the original query because they can be used by non-metahosts.
502 return False
503 return True
504
505
506 def _schedule_metahost(self, queue_entry):
507 label_id = queue_entry.meta_host
508 hosts_in_label = self._label_hosts.get(label_id, set())
509 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
510 set())
511
512 # must iterate over a copy so we can mutate the original while iterating
513 for host_id in list(hosts_in_label):
514 if not self._is_host_usable(host_id):
515 hosts_in_label.remove(host_id)
516 continue
517 if host_id in ineligible_host_ids:
518 continue
showard989f25d2008-10-01 11:38:11 +0000519 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000520 continue
521
showard89f84db2009-03-12 20:39:13 +0000522 # Remove the host from our cached internal state before returning
523 # the host object.
showard63a34772008-08-18 19:32:50 +0000524 hosts_in_label.remove(host_id)
525 return self._hosts_available.pop(host_id)
526 return None
527
528
529 def find_eligible_host(self, queue_entry):
530 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000531 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000532 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000533 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000534 return self._schedule_metahost(queue_entry)
535
536
showard89f84db2009-03-12 20:39:13 +0000537 def find_eligible_atomic_group(self, queue_entry):
538 """
539 Given an atomic group host queue entry, locate an appropriate group
540 of hosts for the associated job to run on.
541
542 The caller is responsible for creating new HQEs for the additional
543 hosts returned in order to run the actual job on them.
544
545 @returns A list of Host instances in a ready state to satisfy this
546 atomic group scheduling. Hosts will all belong to the same
547 atomic group label as specified by the queue_entry.
548 An empty list will be returned if no suitable atomic
549 group could be found.
550
551 TODO(gps): what is responsible for kicking off any attempted repairs on
552 a group of hosts? not this function, but something needs to. We do
553 not communicate that reason for returning [] outside of here...
554 For now, we'll just be unschedulable if enough hosts within one group
555 enter Repair Failed state.
556 """
557 assert queue_entry.atomic_group_id is not None
558 job = queue_entry.job
559 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000560 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000561 if job.synch_count > atomic_group.max_number_of_machines:
562 # Such a Job and HostQueueEntry should never be possible to
563 # create using the frontend. Regardless, we can't process it.
564 # Abort it immediately and log an error on the scheduler.
565 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000566 logging.error(
567 'Error: job %d synch_count=%d > requested atomic_group %d '
568 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
569 job.id, job.synch_count, atomic_group.id,
570 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000571 return []
572 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
573 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
574 set())
575
576 # Look in each label associated with atomic_group until we find one with
577 # enough hosts to satisfy the job.
578 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
579 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
580 if queue_entry.meta_host is not None:
581 # If we have a metahost label, only allow its hosts.
582 group_hosts.intersection_update(hosts_in_label)
583 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000584 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000585 group_hosts, queue_entry)
586
587 # Job.synch_count is treated as "minimum synch count" when
588 # scheduling for an atomic group of hosts. The atomic group
589 # number of machines is the maximum to pick out of a single
590 # atomic group label for scheduling at one time.
591 min_hosts = job.synch_count
592 max_hosts = atomic_group.max_number_of_machines
593
showard54c1ea92009-05-20 00:32:58 +0000594 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000595 # Not enough eligible hosts in this atomic group label.
596 continue
597
showard54c1ea92009-05-20 00:32:58 +0000598 eligible_hosts_in_group = [self._hosts_available[id]
599 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000600 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000601 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000602
showard89f84db2009-03-12 20:39:13 +0000603 # Limit ourselves to scheduling the atomic group size.
604 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000605 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000606
607 # Remove the selected hosts from our cached internal state
608 # of available hosts in order to return the Host objects.
609 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000610 for host in eligible_hosts_in_group:
611 hosts_in_label.discard(host.id)
612 self._hosts_available.pop(host.id)
613 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000614 return host_list
615
616 return []
617
618
showard170873e2009-01-07 00:22:26 +0000619class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000620 def __init__(self):
621 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000622 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000623 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000624 user_cleanup_time = scheduler_config.config.clean_interval
625 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
626 _db, user_cleanup_time)
627 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000628 self._host_agents = {}
629 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000630
mbligh36768f02008-02-22 18:28:33 +0000631
showard915958d2009-04-22 21:00:58 +0000632 def initialize(self, recover_hosts=True):
633 self._periodic_cleanup.initialize()
634 self._24hr_upkeep.initialize()
635
jadmanski0afbb632008-06-06 21:10:57 +0000636 # always recover processes
637 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000638
jadmanski0afbb632008-06-06 21:10:57 +0000639 if recover_hosts:
640 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000641
642
jadmanski0afbb632008-06-06 21:10:57 +0000643 def tick(self):
showard170873e2009-01-07 00:22:26 +0000644 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000645 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000646 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000647 self._process_recurring_runs()
showard8cc058f2009-09-08 16:26:33 +0000648 self._schedule_delay_tasks()
jadmanski0afbb632008-06-06 21:10:57 +0000649 self._schedule_new_jobs()
showard8cc058f2009-09-08 16:26:33 +0000650 self._schedule_running_host_queue_entries()
651 self._schedule_special_tasks()
jadmanski0afbb632008-06-06 21:10:57 +0000652 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000653 _drone_manager.execute_actions()
654 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000655
showard97aed502008-11-04 02:01:24 +0000656
mblighf3294cc2009-04-08 21:17:38 +0000657 def _run_cleanup(self):
658 self._periodic_cleanup.run_cleanup_maybe()
659 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000660
mbligh36768f02008-02-22 18:28:33 +0000661
showard170873e2009-01-07 00:22:26 +0000662 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
663 for object_id in object_ids:
664 agent_dict.setdefault(object_id, set()).add(agent)
665
666
667 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
668 for object_id in object_ids:
669 assert object_id in agent_dict
670 agent_dict[object_id].remove(agent)
671
672
jadmanski0afbb632008-06-06 21:10:57 +0000673 def add_agent(self, agent):
674 self._agents.append(agent)
675 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000676 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
677 self._register_agent_for_ids(self._queue_entry_agents,
678 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000679
showard170873e2009-01-07 00:22:26 +0000680
681 def get_agents_for_entry(self, queue_entry):
682 """
683 Find agents corresponding to the specified queue_entry.
684 """
showardd3dc1992009-04-22 21:01:40 +0000685 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000686
687
688 def host_has_agent(self, host):
689 """
690 Determine if there is currently an Agent present using this host.
691 """
692 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000693
694
jadmanski0afbb632008-06-06 21:10:57 +0000695 def remove_agent(self, agent):
696 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000697 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
698 agent)
699 self._unregister_agent_for_ids(self._queue_entry_agents,
700 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000701
702
showard8cc058f2009-09-08 16:26:33 +0000703 def _host_has_scheduled_special_task(self, host):
704 return bool(models.SpecialTask.objects.filter(host__id=host.id,
705 is_active=False,
706 is_complete=False))
707
708
jadmanski0afbb632008-06-06 21:10:57 +0000709 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000710 self._register_pidfiles()
711 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000712 self._recover_all_recoverable_entries()
showard8cc058f2009-09-08 16:26:33 +0000713 self._recover_pending_entries()
showard6878e8b2009-07-20 22:37:45 +0000714 self._check_for_remaining_active_entries()
showard170873e2009-01-07 00:22:26 +0000715 self._reverify_remaining_hosts()
716 # reinitialize drones after killing orphaned processes, since they can
717 # leave around files when they die
718 _drone_manager.execute_actions()
719 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000720
showard170873e2009-01-07 00:22:26 +0000721
722 def _register_pidfiles(self):
723 # during recovery we may need to read pidfiles for both running and
724 # parsing entries
725 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000726 where="status IN ('Running', 'Gathering', 'Parsing')")
showarded2afea2009-07-07 20:54:07 +0000727 special_tasks = models.SpecialTask.objects.filter(is_active=True)
728 for execution_entry in itertools.chain(queue_entries, special_tasks):
showardd3dc1992009-04-22 21:01:40 +0000729 for pidfile_name in _ALL_PIDFILE_NAMES:
730 pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000731 execution_entry.execution_path(), pidfile_name=pidfile_name)
showardd3dc1992009-04-22 21:01:40 +0000732 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000733
734
showarded2afea2009-07-07 20:54:07 +0000735 def _get_recovery_run_monitor(self, execution_path, pidfile_name, orphans):
736 run_monitor = PidfileRunMonitor()
737 run_monitor.attach_to_existing_process(execution_path,
738 pidfile_name=pidfile_name)
739 if run_monitor.has_process():
740 orphans.discard(run_monitor.get_process())
741 return run_monitor, '(process %s)' % run_monitor.get_process()
742 return None, 'without process'
743
744
showard8cc058f2009-09-08 16:26:33 +0000745 def _get_unassigned_entries(self, status):
746 for entry in HostQueueEntry.fetch(where="status = '%s'" % status):
747 if not self.get_agents_for_entry(entry):
748 yield entry
749
750
showardd3dc1992009-04-22 21:01:40 +0000751 def _recover_entries_with_status(self, status, orphans, pidfile_name,
752 recover_entries_fn):
showard8cc058f2009-09-08 16:26:33 +0000753 for queue_entry in self._get_unassigned_entries(status):
showardd3dc1992009-04-22 21:01:40 +0000754 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showarded2afea2009-07-07 20:54:07 +0000755 run_monitor, process_string = self._get_recovery_run_monitor(
756 queue_entry.execution_path(), pidfile_name, orphans)
showard8cc058f2009-09-08 16:26:33 +0000757 if not run_monitor:
758 # _schedule_running_host_queue_entries should schedule and
759 # recover these entries
760 continue
showard597bfd32009-05-08 18:22:50 +0000761
showarded2afea2009-07-07 20:54:07 +0000762 logging.info('Recovering %s entry %s %s',status.lower(),
763 ', '.join(str(entry) for entry in queue_entries),
764 process_string)
showardd3dc1992009-04-22 21:01:40 +0000765 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000766
767
showard6878e8b2009-07-20 22:37:45 +0000768 def _check_for_remaining_orphan_processes(self, orphans):
769 if not orphans:
770 return
771 subject = 'Unrecovered orphan autoserv processes remain'
772 message = '\n'.join(str(process) for process in orphans)
773 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000774
775 die_on_orphans = global_config.global_config.get_config_value(
776 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
777
778 if die_on_orphans:
779 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000780
showard170873e2009-01-07 00:22:26 +0000781
showardd3dc1992009-04-22 21:01:40 +0000782 def _recover_running_entries(self, orphans):
783 def recover_entries(job, queue_entries, run_monitor):
showard8cc058f2009-09-08 16:26:33 +0000784 queue_task = QueueTask(job=job, queue_entries=queue_entries,
785 recover_run_monitor=run_monitor)
786 self.add_agent(Agent(task=queue_task,
787 num_processes=len(queue_entries)))
showardd3dc1992009-04-22 21:01:40 +0000788
789 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
showarded2afea2009-07-07 20:54:07 +0000790 orphans, _AUTOSERV_PID_FILE,
showardd3dc1992009-04-22 21:01:40 +0000791 recover_entries)
792
793
794 def _recover_gathering_entries(self, orphans):
795 def recover_entries(job, queue_entries, run_monitor):
796 gather_task = GatherLogsTask(job, queue_entries,
showarded2afea2009-07-07 20:54:07 +0000797 recover_run_monitor=run_monitor)
showard8cc058f2009-09-08 16:26:33 +0000798 self.add_agent(Agent(gather_task))
showardd3dc1992009-04-22 21:01:40 +0000799
800 self._recover_entries_with_status(
801 models.HostQueueEntry.Status.GATHERING,
802 orphans, _CRASHINFO_PID_FILE, recover_entries)
803
804
805 def _recover_parsing_entries(self, orphans):
806 def recover_entries(job, queue_entries, run_monitor):
807 reparse_task = FinalReparseTask(queue_entries,
showarded2afea2009-07-07 20:54:07 +0000808 recover_run_monitor=run_monitor)
showard8cc058f2009-09-08 16:26:33 +0000809 self.add_agent(Agent(reparse_task, num_processes=0))
showardd3dc1992009-04-22 21:01:40 +0000810
811 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
812 orphans, _PARSER_PID_FILE,
813 recover_entries)
814
815
showard8cc058f2009-09-08 16:26:33 +0000816 def _recover_pending_entries(self):
817 for entry in self._get_unassigned_entries(
818 models.HostQueueEntry.Status.PENDING):
819 entry.on_pending()
820
821
showardd3dc1992009-04-22 21:01:40 +0000822 def _recover_all_recoverable_entries(self):
823 orphans = _drone_manager.get_orphaned_autoserv_processes()
824 self._recover_running_entries(orphans)
825 self._recover_gathering_entries(orphans)
826 self._recover_parsing_entries(orphans)
showarded2afea2009-07-07 20:54:07 +0000827 self._recover_special_tasks(orphans)
showard6878e8b2009-07-20 22:37:45 +0000828 self._check_for_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000829
showard97aed502008-11-04 02:01:24 +0000830
showarded2afea2009-07-07 20:54:07 +0000831 def _recover_special_tasks(self, orphans):
showard2fe3f1d2009-07-06 20:19:11 +0000832 """\
833 Recovers all special tasks that have started running but have not
834 completed.
835 """
836
837 tasks = models.SpecialTask.objects.filter(is_active=True,
838 is_complete=False)
839 # Use ordering to force NULL queue_entry_id's to the end of the list
showarda5288b42009-07-28 20:06:08 +0000840 for task in tasks.order_by('-queue_entry__id'):
showard9b6ec502009-08-20 23:25:17 +0000841 if self.host_has_agent(task.host):
842 raise SchedulerError(
843 "%s already has a host agent %s." % (
showard8cc058f2009-09-08 16:26:33 +0000844 task, self._host_agents.get(task.host.id)))
showard2fe3f1d2009-07-06 20:19:11 +0000845
showarded2afea2009-07-07 20:54:07 +0000846 run_monitor, process_string = self._get_recovery_run_monitor(
847 task.execution_path(), _AUTOSERV_PID_FILE, orphans)
848
849 logging.info('Recovering %s %s', task, process_string)
showard8cc058f2009-09-08 16:26:33 +0000850 self._recover_special_task(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000851
852
showard8cc058f2009-09-08 16:26:33 +0000853 def _recover_special_task(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000854 """\
855 Recovers a single special task.
856 """
857 if task.task == models.SpecialTask.Task.VERIFY:
showard8cc058f2009-09-08 16:26:33 +0000858 agent_task = self._recover_verify(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000859 elif task.task == models.SpecialTask.Task.REPAIR:
showard8cc058f2009-09-08 16:26:33 +0000860 agent_task = self._recover_repair(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000861 elif task.task == models.SpecialTask.Task.CLEANUP:
showard8cc058f2009-09-08 16:26:33 +0000862 agent_task = self._recover_cleanup(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000863 else:
864 # Should never happen
865 logging.error(
866 "Special task id %d had invalid task %s", (task.id, task.task))
867
showard8cc058f2009-09-08 16:26:33 +0000868 self.add_agent(Agent(agent_task))
showard2fe3f1d2009-07-06 20:19:11 +0000869
870
showard8cc058f2009-09-08 16:26:33 +0000871 def _recover_verify(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000872 """\
873 Recovers a verify task.
874 No associated queue entry: Verify host
875 With associated queue entry: Verify host, and run associated queue
876 entry
877 """
showard8cc058f2009-09-08 16:26:33 +0000878 return VerifyTask(task=task, recover_run_monitor=run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000879
880
showard8cc058f2009-09-08 16:26:33 +0000881 def _recover_repair(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000882 """\
883 Recovers a repair task.
884 Always repair host
885 """
showard8cc058f2009-09-08 16:26:33 +0000886 return RepairTask(task=task, recover_run_monitor=run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000887
888
showard8cc058f2009-09-08 16:26:33 +0000889 def _recover_cleanup(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000890 """\
891 Recovers a cleanup task.
892 No associated queue entry: Clean host
893 With associated queue entry: Clean host, verify host if needed, and
894 run associated queue entry
895 """
showard8cc058f2009-09-08 16:26:33 +0000896 return CleanupTask(task=task, recover_run_monitor=run_monitor)
showard6af73ad2009-07-28 20:00:58 +0000897
898
showard6878e8b2009-07-20 22:37:45 +0000899 def _check_for_remaining_active_entries(self):
showard170873e2009-01-07 00:22:26 +0000900 queue_entries = HostQueueEntry.fetch(
showard8cc058f2009-09-08 16:26:33 +0000901 where='active AND NOT complete AND status NOT IN '
902 '("Starting", "Gathering", "Pending")')
showardd3dc1992009-04-22 21:01:40 +0000903
showarde8e37072009-08-20 23:31:30 +0000904 unrecovered_active_hqes = [entry for entry in queue_entries
showard8cc058f2009-09-08 16:26:33 +0000905 if not self.get_agents_for_entry(entry) and
906 not self._host_has_scheduled_special_task(
907 entry.host)]
showarde8e37072009-08-20 23:31:30 +0000908 if unrecovered_active_hqes:
909 message = '\n'.join(str(hqe) for hqe in unrecovered_active_hqes)
910 raise SchedulerError(
911 '%d unrecovered active host queue entries:\n%s' %
912 (len(unrecovered_active_hqes), message))
showard170873e2009-01-07 00:22:26 +0000913
914
showard8cc058f2009-09-08 16:26:33 +0000915 def _schedule_special_tasks(self):
916 tasks = models.SpecialTask.objects.filter(is_active=False,
917 is_complete=False,
918 host__locked=False)
919 # We want lower ids to come first, but the NULL queue_entry_ids need to
920 # come last
921 tasks = tasks.extra(select={'isnull' : 'queue_entry_id IS NULL'})
922 tasks = tasks.extra(order_by=['isnull', 'id'])
showard6d7b2ff2009-06-10 00:16:47 +0000923
showard2fe3f1d2009-07-06 20:19:11 +0000924 for task in tasks:
showard8cc058f2009-09-08 16:26:33 +0000925 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000926 continue
showard6d7b2ff2009-06-10 00:16:47 +0000927
showard8cc058f2009-09-08 16:26:33 +0000928 if task.task == models.SpecialTask.Task.CLEANUP:
929 agent_task = CleanupTask(task=task)
930 elif task.task == models.SpecialTask.Task.VERIFY:
931 agent_task = VerifyTask(task=task)
932 elif task.task == models.SpecialTask.Task.REPAIR:
933 agent_task = RepairTask(task=task)
934 else:
935 email_manager.manager.enqueue_notify_email(
936 'Special task with invalid task', task)
937 continue
938
939 self.add_agent(Agent(agent_task))
showard1ff7b2e2009-05-15 23:17:18 +0000940
941
showard170873e2009-01-07 00:22:26 +0000942 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000943 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000944 # should never happen
showarded2afea2009-07-07 20:54:07 +0000945 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000946 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000947 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000948 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000949 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000950
951
jadmanski0afbb632008-06-06 21:10:57 +0000952 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000953 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +0000954 full_where='locked = 0 AND invalid = 0 AND ' + where
955 for host in Host.fetch(where=full_where):
956 if self.host_has_agent(host):
957 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000958 continue
showard8cc058f2009-09-08 16:26:33 +0000959 if self._host_has_scheduled_special_task(host):
960 # host will have a special task scheduled on the next cycle
961 continue
showard170873e2009-01-07 00:22:26 +0000962 if print_message:
showardb18134f2009-03-20 20:52:18 +0000963 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000964 models.SpecialTask.objects.create(
965 task=models.SpecialTask.Task.CLEANUP,
966 host=models.Host(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000967
968
jadmanski0afbb632008-06-06 21:10:57 +0000969 def _recover_hosts(self):
970 # recover "Repair Failed" hosts
971 message = 'Reverifying dead host %s'
972 self._reverify_hosts_where("status = 'Repair Failed'",
973 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000974
975
showard04c82c52008-05-29 19:38:12 +0000976
showardb95b1bd2008-08-15 18:11:04 +0000977 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000978 # prioritize by job priority, then non-metahost over metahost, then FIFO
979 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000980 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000981 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000982 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000983
984
showard89f84db2009-03-12 20:39:13 +0000985 def _refresh_pending_queue_entries(self):
986 """
987 Lookup the pending HostQueueEntries and call our HostScheduler
988 refresh() method given that list. Return the list.
989
990 @returns A list of pending HostQueueEntries sorted in priority order.
991 """
showard63a34772008-08-18 19:32:50 +0000992 queue_entries = self._get_pending_queue_entries()
993 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000994 return []
showardb95b1bd2008-08-15 18:11:04 +0000995
showard63a34772008-08-18 19:32:50 +0000996 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000997
showard89f84db2009-03-12 20:39:13 +0000998 return queue_entries
999
1000
1001 def _schedule_atomic_group(self, queue_entry):
1002 """
1003 Schedule the given queue_entry on an atomic group of hosts.
1004
1005 Returns immediately if there are insufficient available hosts.
1006
1007 Creates new HostQueueEntries based off of queue_entry for the
1008 scheduled hosts and starts them all running.
1009 """
1010 # This is a virtual host queue entry representing an entire
1011 # atomic group, find a group and schedule their hosts.
1012 group_hosts = self._host_scheduler.find_eligible_atomic_group(
1013 queue_entry)
1014 if not group_hosts:
1015 return
showardcbe6f942009-06-17 19:33:49 +00001016
1017 logging.info('Expanding atomic group entry %s with hosts %s',
1018 queue_entry,
1019 ', '.join(host.hostname for host in group_hosts))
showard89f84db2009-03-12 20:39:13 +00001020 # The first assigned host uses the original HostQueueEntry
1021 group_queue_entries = [queue_entry]
1022 for assigned_host in group_hosts[1:]:
1023 # Create a new HQE for every additional assigned_host.
1024 new_hqe = HostQueueEntry.clone(queue_entry)
1025 new_hqe.save()
1026 group_queue_entries.append(new_hqe)
1027 assert len(group_queue_entries) == len(group_hosts)
1028 for queue_entry, host in itertools.izip(group_queue_entries,
1029 group_hosts):
1030 self._run_queue_entry(queue_entry, host)
1031
1032
1033 def _schedule_new_jobs(self):
1034 queue_entries = self._refresh_pending_queue_entries()
1035 if not queue_entries:
1036 return
1037
showard63a34772008-08-18 19:32:50 +00001038 for queue_entry in queue_entries:
showarde55955f2009-10-07 20:48:58 +00001039 is_unassigned_atomic_group = (
1040 queue_entry.atomic_group_id is not None
1041 and queue_entry.host_id is None)
1042 if is_unassigned_atomic_group:
1043 self._schedule_atomic_group(queue_entry)
1044 else:
showard89f84db2009-03-12 20:39:13 +00001045 assigned_host = self._host_scheduler.find_eligible_host(
1046 queue_entry)
1047 if assigned_host:
1048 self._run_queue_entry(queue_entry, assigned_host)
showardb95b1bd2008-08-15 18:11:04 +00001049
1050
showard8cc058f2009-09-08 16:26:33 +00001051 def _schedule_running_host_queue_entries(self):
1052 entries = HostQueueEntry.fetch(
1053 where="status IN "
1054 "('Starting', 'Running', 'Gathering', 'Parsing')")
1055 for entry in entries:
1056 if self.get_agents_for_entry(entry):
1057 continue
1058
1059 task_entries = entry.job.get_group_entries(entry)
1060 for task_entry in task_entries:
1061 if (task_entry.status != models.HostQueueEntry.Status.PARSING
1062 and self.host_has_agent(task_entry.host)):
1063 agent = self._host_agents.get(task_entry.host.id)[0]
1064 raise SchedulerError('Attempted to schedule on host that '
1065 'already has agent: %s (previous '
1066 'agent task: %s)'
1067 % (task_entry, agent.task))
1068
1069 if entry.status in (models.HostQueueEntry.Status.STARTING,
1070 models.HostQueueEntry.Status.RUNNING):
1071 params = entry.job.get_autoserv_params(task_entries)
1072 agent_task = QueueTask(job=entry.job,
1073 queue_entries=task_entries, cmd=params)
1074 elif entry.status == models.HostQueueEntry.Status.GATHERING:
1075 agent_task = GatherLogsTask(
1076 job=entry.job, queue_entries=task_entries)
1077 elif entry.status == models.HostQueueEntry.Status.PARSING:
1078 agent_task = FinalReparseTask(queue_entries=task_entries)
1079 else:
1080 raise SchedulerError('_schedule_running_host_queue_entries got '
1081 'entry with invalid status %s: %s'
1082 % (entry.status, entry))
1083
1084 self.add_agent(Agent(agent_task, num_processes=len(task_entries)))
1085
1086
1087 def _schedule_delay_tasks(self):
1088 for entry in HostQueueEntry.fetch(where="status = 'Waiting'"):
1089 task = entry.job.schedule_delayed_callback_task(entry)
1090 if task:
1091 self.add_agent(Agent(task, num_processes=0))
1092
1093
showardb95b1bd2008-08-15 18:11:04 +00001094 def _run_queue_entry(self, queue_entry, host):
showard8cc058f2009-09-08 16:26:33 +00001095 queue_entry.schedule_pre_job_tasks(assigned_host=host)
mblighd5c95802008-03-05 00:33:46 +00001096
1097
jadmanski0afbb632008-06-06 21:10:57 +00001098 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +00001099 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001100 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001101 for agent in self.get_agents_for_entry(entry):
1102 agent.abort()
1103 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001104
1105
showard324bf812009-01-20 23:23:38 +00001106 def _can_start_agent(self, agent, num_started_this_cycle,
1107 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001108 # always allow zero-process agents to run
1109 if agent.num_processes == 0:
1110 return True
1111 # don't allow any nonzero-process agents to run after we've reached a
1112 # limit (this avoids starvation of many-process agents)
1113 if have_reached_limit:
1114 return False
1115 # total process throttling
showard324bf812009-01-20 23:23:38 +00001116 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +00001117 return False
1118 # if a single agent exceeds the per-cycle throttling, still allow it to
1119 # run when it's the first agent in the cycle
1120 if num_started_this_cycle == 0:
1121 return True
1122 # per-cycle throttling
1123 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +00001124 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001125 return False
1126 return True
1127
1128
jadmanski0afbb632008-06-06 21:10:57 +00001129 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001130 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001131 have_reached_limit = False
1132 # iterate over copy, so we can remove agents during iteration
1133 for agent in list(self._agents):
showard8cc058f2009-09-08 16:26:33 +00001134 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001135 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001136 have_reached_limit):
1137 have_reached_limit = True
1138 continue
showard4c5374f2008-09-04 17:02:56 +00001139 num_started_this_cycle += agent.num_processes
1140 agent.tick()
showard8cc058f2009-09-08 16:26:33 +00001141 if agent.is_done():
1142 logging.info("agent finished")
1143 self.remove_agent(agent)
showarda9435c02009-05-13 21:28:17 +00001144 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001145 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001146
1147
showard29f7cd22009-04-29 21:16:24 +00001148 def _process_recurring_runs(self):
1149 recurring_runs = models.RecurringRun.objects.filter(
1150 start_date__lte=datetime.datetime.now())
1151 for rrun in recurring_runs:
1152 # Create job from template
1153 job = rrun.job
1154 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001155 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001156
1157 host_objects = info['hosts']
1158 one_time_hosts = info['one_time_hosts']
1159 metahost_objects = info['meta_hosts']
1160 dependencies = info['dependencies']
1161 atomic_group = info['atomic_group']
1162
1163 for host in one_time_hosts or []:
1164 this_host = models.Host.create_one_time_host(host.hostname)
1165 host_objects.append(this_host)
1166
1167 try:
1168 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001169 options=options,
showard29f7cd22009-04-29 21:16:24 +00001170 host_objects=host_objects,
1171 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001172 atomic_group=atomic_group)
1173
1174 except Exception, ex:
1175 logging.exception(ex)
1176 #TODO send email
1177
1178 if rrun.loop_count == 1:
1179 rrun.delete()
1180 else:
1181 if rrun.loop_count != 0: # if not infinite loop
1182 # calculate new start_date
1183 difference = datetime.timedelta(seconds=rrun.loop_period)
1184 rrun.start_date = rrun.start_date + difference
1185 rrun.loop_count -= 1
1186 rrun.save()
1187
1188
showard170873e2009-01-07 00:22:26 +00001189class PidfileRunMonitor(object):
1190 """
1191 Client must call either run() to start a new process or
1192 attach_to_existing_process().
1193 """
mbligh36768f02008-02-22 18:28:33 +00001194
showard170873e2009-01-07 00:22:26 +00001195 class _PidfileException(Exception):
1196 """
1197 Raised when there's some unexpected behavior with the pid file, but only
1198 used internally (never allowed to escape this class).
1199 """
mbligh36768f02008-02-22 18:28:33 +00001200
1201
showard170873e2009-01-07 00:22:26 +00001202 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001203 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001204 self._start_time = None
1205 self.pidfile_id = None
1206 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001207
1208
showard170873e2009-01-07 00:22:26 +00001209 def _add_nice_command(self, command, nice_level):
1210 if not nice_level:
1211 return command
1212 return ['nice', '-n', str(nice_level)] + command
1213
1214
1215 def _set_start_time(self):
1216 self._start_time = time.time()
1217
1218
1219 def run(self, command, working_directory, nice_level=None, log_file=None,
1220 pidfile_name=None, paired_with_pidfile=None):
1221 assert command is not None
1222 if nice_level is not None:
1223 command = ['nice', '-n', str(nice_level)] + command
1224 self._set_start_time()
1225 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001226 command, working_directory, pidfile_name=pidfile_name,
1227 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001228
1229
showarded2afea2009-07-07 20:54:07 +00001230 def attach_to_existing_process(self, execution_path,
showardd3dc1992009-04-22 21:01:40 +00001231 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001232 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001233 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001234 execution_path, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001235 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001236
1237
jadmanski0afbb632008-06-06 21:10:57 +00001238 def kill(self):
showard170873e2009-01-07 00:22:26 +00001239 if self.has_process():
1240 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001241
mbligh36768f02008-02-22 18:28:33 +00001242
showard170873e2009-01-07 00:22:26 +00001243 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001244 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001245 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001246
1247
showard170873e2009-01-07 00:22:26 +00001248 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001249 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001250 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001251 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001252
1253
showard170873e2009-01-07 00:22:26 +00001254 def _read_pidfile(self, use_second_read=False):
1255 assert self.pidfile_id is not None, (
1256 'You must call run() or attach_to_existing_process()')
1257 contents = _drone_manager.get_pidfile_contents(
1258 self.pidfile_id, use_second_read=use_second_read)
1259 if contents.is_invalid():
1260 self._state = drone_manager.PidfileContents()
1261 raise self._PidfileException(contents)
1262 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001263
1264
showard21baa452008-10-21 00:08:39 +00001265 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001266 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1267 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001268 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001269 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001270
1271
1272 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001273 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001274 return
mblighbb421852008-03-11 22:36:16 +00001275
showard21baa452008-10-21 00:08:39 +00001276 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001277
showard170873e2009-01-07 00:22:26 +00001278 if self._state.process is None:
1279 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001280 return
mbligh90a549d2008-03-25 23:52:34 +00001281
showard21baa452008-10-21 00:08:39 +00001282 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001283 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001284 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001285 return
mbligh90a549d2008-03-25 23:52:34 +00001286
showard170873e2009-01-07 00:22:26 +00001287 # pid but no running process - maybe process *just* exited
1288 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001289 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001290 # autoserv exited without writing an exit code
1291 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001292 self._handle_pidfile_error(
1293 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001294
showard21baa452008-10-21 00:08:39 +00001295
1296 def _get_pidfile_info(self):
1297 """\
1298 After completion, self._state will contain:
1299 pid=None, exit_status=None if autoserv has not yet run
1300 pid!=None, exit_status=None if autoserv is running
1301 pid!=None, exit_status!=None if autoserv has completed
1302 """
1303 try:
1304 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001305 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001306 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001307
1308
showard170873e2009-01-07 00:22:26 +00001309 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001310 """\
1311 Called when no pidfile is found or no pid is in the pidfile.
1312 """
showard170873e2009-01-07 00:22:26 +00001313 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001314 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001315 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001316 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001317 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001318
1319
showard35162b02009-03-03 02:17:30 +00001320 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001321 """\
1322 Called when autoserv has exited without writing an exit status,
1323 or we've timed out waiting for autoserv to write a pid to the
1324 pidfile. In either case, we just return failure and the caller
1325 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001326
showard170873e2009-01-07 00:22:26 +00001327 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001328 """
1329 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001330 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001331 self._state.exit_status = 1
1332 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001333
1334
jadmanski0afbb632008-06-06 21:10:57 +00001335 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001336 self._get_pidfile_info()
1337 return self._state.exit_status
1338
1339
1340 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001341 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001342 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001343 if self._state.num_tests_failed is None:
1344 return -1
showard21baa452008-10-21 00:08:39 +00001345 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001346
1347
showardcdaeae82009-08-31 18:32:48 +00001348 def try_copy_results_on_drone(self, **kwargs):
1349 if self.has_process():
1350 # copy results logs into the normal place for job results
1351 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1352
1353
1354 def try_copy_to_results_repository(self, source, **kwargs):
1355 if self.has_process():
1356 _drone_manager.copy_to_results_repository(self.get_process(),
1357 source, **kwargs)
1358
1359
mbligh36768f02008-02-22 18:28:33 +00001360class Agent(object):
showard77182562009-06-10 00:16:05 +00001361 """
showard8cc058f2009-09-08 16:26:33 +00001362 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001363
1364 The following methods are required on all task objects:
1365 poll() - Called periodically to let the task check its status and
1366 update its internal state. If the task succeeded.
1367 is_done() - Returns True if the task is finished.
1368 abort() - Called when an abort has been requested. The task must
1369 set its aborted attribute to True if it actually aborted.
1370
1371 The following attributes are required on all task objects:
1372 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001373 success - bool, True if this task succeeded.
1374 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1375 host_ids - A sequence of Host ids this task represents.
1376
1377 The following attribute is written to all task objects:
1378 agent - A reference to the Agent instance that the task has been
1379 added to.
1380 """
1381
1382
showard8cc058f2009-09-08 16:26:33 +00001383 def __init__(self, task, num_processes=1):
showard77182562009-06-10 00:16:05 +00001384 """
showard8cc058f2009-09-08 16:26:33 +00001385 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001386 @param num_processes: The number of subprocesses the Agent represents.
1387 This is used by the Dispatcher for managing the load on the
1388 system. Defaults to 1.
1389 """
showard8cc058f2009-09-08 16:26:33 +00001390 self.task = task
1391 task.agent = self
1392
showard77182562009-06-10 00:16:05 +00001393 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001394 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001395 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001396
showard8cc058f2009-09-08 16:26:33 +00001397 self.queue_entry_ids = task.queue_entry_ids
1398 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001399
showard8cc058f2009-09-08 16:26:33 +00001400 self.started = False
mbligh36768f02008-02-22 18:28:33 +00001401
1402
jadmanski0afbb632008-06-06 21:10:57 +00001403 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001404 self.started = True
1405 if self.task:
1406 self.task.poll()
1407 if self.task.is_done():
1408 self.task = None
showardec113162008-05-08 00:52:49 +00001409
1410
jadmanski0afbb632008-06-06 21:10:57 +00001411 def is_done(self):
showard8cc058f2009-09-08 16:26:33 +00001412 return self.task is None
mbligh36768f02008-02-22 18:28:33 +00001413
1414
showardd3dc1992009-04-22 21:01:40 +00001415 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001416 if self.task:
1417 self.task.abort()
1418 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001419 # tasks can choose to ignore aborts
showard8cc058f2009-09-08 16:26:33 +00001420 self.task = None
showard20f9bdd2009-04-29 19:48:33 +00001421
showardd3dc1992009-04-22 21:01:40 +00001422
showard77182562009-06-10 00:16:05 +00001423class DelayedCallTask(object):
1424 """
1425 A task object like AgentTask for an Agent to run that waits for the
1426 specified amount of time to have elapsed before calling the supplied
1427 callback once and finishing. If the callback returns anything, it is
1428 assumed to be a new Agent instance and will be added to the dispatcher.
1429
1430 @attribute end_time: The absolute posix time after which this task will
1431 call its callback when it is polled and be finished.
1432
1433 Also has all attributes required by the Agent class.
1434 """
1435 def __init__(self, delay_seconds, callback, now_func=None):
1436 """
1437 @param delay_seconds: The delay in seconds from now that this task
1438 will call the supplied callback and be done.
1439 @param callback: A callable to be called by this task once after at
1440 least delay_seconds time has elapsed. It must return None
1441 or a new Agent instance.
1442 @param now_func: A time.time like function. Default: time.time.
1443 Used for testing.
1444 """
1445 assert delay_seconds > 0
1446 assert callable(callback)
1447 if not now_func:
1448 now_func = time.time
1449 self._now_func = now_func
1450 self._callback = callback
1451
1452 self.end_time = self._now_func() + delay_seconds
1453
1454 # These attributes are required by Agent.
1455 self.aborted = False
showard77182562009-06-10 00:16:05 +00001456 self.host_ids = ()
1457 self.success = False
1458 self.queue_entry_ids = ()
1459 # This is filled in by Agent.add_task().
1460 self.agent = None
1461
1462
1463 def poll(self):
showard8cc058f2009-09-08 16:26:33 +00001464 if not self.is_done() and self._now_func() >= self.end_time:
1465 self._callback()
showard77182562009-06-10 00:16:05 +00001466 self.success = True
1467
1468
1469 def is_done(self):
showard8cc058f2009-09-08 16:26:33 +00001470 return self.success or self.aborted
showard77182562009-06-10 00:16:05 +00001471
1472
1473 def abort(self):
1474 self.aborted = True
showard77182562009-06-10 00:16:05 +00001475
1476
mbligh36768f02008-02-22 18:28:33 +00001477class AgentTask(object):
showard8cc058f2009-09-08 16:26:33 +00001478 def __init__(self, cmd=None, working_directory=None,
showarded2afea2009-07-07 20:54:07 +00001479 pidfile_name=None, paired_with_pidfile=None,
1480 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001481 self.done = False
jadmanski0afbb632008-06-06 21:10:57 +00001482 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001483 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001484 self.agent = None
showarded2afea2009-07-07 20:54:07 +00001485 self.monitor = recover_run_monitor
1486 self.started = bool(recover_run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +00001487 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001488 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001489 self.queue_entry_ids = []
1490 self.host_ids = []
1491 self.log_file = None
1492
1493
1494 def _set_ids(self, host=None, queue_entries=None):
1495 if queue_entries and queue_entries != [None]:
1496 self.host_ids = [entry.host.id for entry in queue_entries]
1497 self.queue_entry_ids = [entry.id for entry in queue_entries]
1498 else:
1499 assert host
1500 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001501
1502
jadmanski0afbb632008-06-06 21:10:57 +00001503 def poll(self):
showard08a36412009-05-05 01:01:13 +00001504 if not self.started:
1505 self.start()
1506 self.tick()
1507
1508
1509 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001510 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001511 exit_code = self.monitor.exit_code()
1512 if exit_code is None:
1513 return
1514 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001515 else:
1516 success = False
mbligh36768f02008-02-22 18:28:33 +00001517
jadmanski0afbb632008-06-06 21:10:57 +00001518 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001519
1520
jadmanski0afbb632008-06-06 21:10:57 +00001521 def is_done(self):
1522 return self.done
mbligh36768f02008-02-22 18:28:33 +00001523
1524
jadmanski0afbb632008-06-06 21:10:57 +00001525 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001526 if self.done:
1527 return
jadmanski0afbb632008-06-06 21:10:57 +00001528 self.done = True
1529 self.success = success
1530 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001531
1532
jadmanski0afbb632008-06-06 21:10:57 +00001533 def prolog(self):
showarded2afea2009-07-07 20:54:07 +00001534 assert not self.monitor
mblighd64e5702008-04-04 21:39:28 +00001535
mbligh36768f02008-02-22 18:28:33 +00001536
jadmanski0afbb632008-06-06 21:10:57 +00001537 def cleanup(self):
showardcdaeae82009-08-31 18:32:48 +00001538 if self.monitor and self.log_file:
1539 self.monitor.try_copy_to_results_repository(self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001540
1541
jadmanski0afbb632008-06-06 21:10:57 +00001542 def epilog(self):
1543 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001544
1545
jadmanski0afbb632008-06-06 21:10:57 +00001546 def start(self):
1547 assert self.agent
1548
1549 if not self.started:
1550 self.prolog()
1551 self.run()
1552
1553 self.started = True
1554
1555
1556 def abort(self):
1557 if self.monitor:
1558 self.monitor.kill()
1559 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001560 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001561 self.cleanup()
1562
1563
showarded2afea2009-07-07 20:54:07 +00001564 def _get_consistent_execution_path(self, execution_entries):
1565 first_execution_path = execution_entries[0].execution_path()
1566 for execution_entry in execution_entries[1:]:
1567 assert execution_entry.execution_path() == first_execution_path, (
1568 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1569 execution_entry,
1570 first_execution_path,
1571 execution_entries[0]))
1572 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001573
1574
showarded2afea2009-07-07 20:54:07 +00001575 def _copy_results(self, execution_entries, use_monitor=None):
1576 """
1577 @param execution_entries: list of objects with execution_path() method
1578 """
showard6d1c1432009-08-20 23:30:39 +00001579 if use_monitor is not None and not use_monitor.has_process():
1580 return
1581
showarded2afea2009-07-07 20:54:07 +00001582 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001583 if use_monitor is None:
1584 assert self.monitor
1585 use_monitor = self.monitor
1586 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001587 execution_path = self._get_consistent_execution_path(execution_entries)
1588 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001589 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001590
showarda1e74b32009-05-12 17:32:04 +00001591
1592 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001593 for queue_entry in queue_entries:
1594 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001595
1596
showarda1e74b32009-05-12 17:32:04 +00001597 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1598 self._copy_results(queue_entries, use_monitor)
1599 self._parse_results(queue_entries)
1600
1601
showardd3dc1992009-04-22 21:01:40 +00001602 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
showarded2afea2009-07-07 20:54:07 +00001603 assert not self.monitor
jadmanski0afbb632008-06-06 21:10:57 +00001604 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001605 self.monitor = PidfileRunMonitor()
1606 self.monitor.run(self.cmd, self._working_directory,
1607 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001608 log_file=self.log_file,
1609 pidfile_name=pidfile_name,
1610 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001611
1612
showardd9205182009-04-27 20:09:55 +00001613class TaskWithJobKeyvals(object):
1614 """AgentTask mixin providing functionality to help with job keyval files."""
1615 _KEYVAL_FILE = 'keyval'
1616 def _format_keyval(self, key, value):
1617 return '%s=%s' % (key, value)
1618
1619
1620 def _keyval_path(self):
1621 """Subclasses must override this"""
1622 raise NotImplemented
1623
1624
1625 def _write_keyval_after_job(self, field, value):
1626 assert self.monitor
1627 if not self.monitor.has_process():
1628 return
1629 _drone_manager.write_lines_to_file(
1630 self._keyval_path(), [self._format_keyval(field, value)],
1631 paired_with_process=self.monitor.get_process())
1632
1633
1634 def _job_queued_keyval(self, job):
1635 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1636
1637
1638 def _write_job_finished(self):
1639 self._write_keyval_after_job("job_finished", int(time.time()))
1640
1641
showarddb502762009-09-09 15:31:20 +00001642 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1643 keyval_contents = '\n'.join(self._format_keyval(key, value)
1644 for key, value in keyval_dict.iteritems())
1645 # always end with a newline to allow additional keyvals to be written
1646 keyval_contents += '\n'
1647 _drone_manager.attach_file_to_execution(self._working_directory,
1648 keyval_contents,
1649 file_path=keyval_path)
1650
1651
1652 def _write_keyvals_before_job(self, keyval_dict):
1653 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1654
1655
1656 def _write_host_keyvals(self, host):
1657 keyval_path = os.path.join(self._working_directory, 'host_keyvals',
1658 host.hostname)
1659 platform, all_labels = host.platform_and_labels()
1660 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1661 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1662
1663
showard8cc058f2009-09-08 16:26:33 +00001664class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001665 """
1666 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1667 """
1668
1669 TASK_TYPE = None
1670 host = None
1671 queue_entry = None
1672
1673 def __init__(self, task, extra_command_args, **kwargs):
showarded2afea2009-07-07 20:54:07 +00001674 assert (self.TASK_TYPE is not None,
1675 'self.TASK_TYPE must be overridden')
showard8cc058f2009-09-08 16:26:33 +00001676
1677 self.host = Host(id=task.host.id)
1678 self.queue_entry = None
1679 if task.queue_entry:
1680 self.queue_entry = HostQueueEntry(id=task.queue_entry.id)
1681
showarded2afea2009-07-07 20:54:07 +00001682 self.task = task
showarddb502762009-09-09 15:31:20 +00001683 kwargs['working_directory'] = task.execution_path()
showarded2afea2009-07-07 20:54:07 +00001684 self._extra_command_args = extra_command_args
1685 super(SpecialAgentTask, self).__init__(**kwargs)
1686
1687
showard8cc058f2009-09-08 16:26:33 +00001688 def _keyval_path(self):
1689 return os.path.join(self._working_directory, self._KEYVAL_FILE)
1690
1691
showarded2afea2009-07-07 20:54:07 +00001692 def prolog(self):
1693 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001694 self.cmd = _autoserv_command_line(self.host.hostname,
1695 self._extra_command_args,
1696 queue_entry=self.queue_entry)
1697 self._working_directory = self.task.execution_path()
1698 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001699 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001700
1701
showardde634ee2009-01-30 01:44:24 +00001702 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001703 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001704
showard2fe3f1d2009-07-06 20:19:11 +00001705 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001706 return # don't fail metahost entries, they'll be reassigned
1707
showard2fe3f1d2009-07-06 20:19:11 +00001708 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001709 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001710 return # entry has been aborted
1711
showard2fe3f1d2009-07-06 20:19:11 +00001712 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001713 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001714 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001715 self._write_keyval_after_job(queued_key, queued_time)
1716 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001717
showard8cc058f2009-09-08 16:26:33 +00001718 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001719 self.monitor.try_copy_results_on_drone(
1720 source_path=self._working_directory + '/',
1721 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001722
showard2fe3f1d2009-07-06 20:19:11 +00001723 self._copy_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001724 self.queue_entry.handle_host_failure()
showard2fe3f1d2009-07-06 20:19:11 +00001725 if self.queue_entry.job.parse_failed_repair:
1726 self._parse_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001727
1728 pidfile_id = _drone_manager.get_pidfile_id_from(
1729 self.queue_entry.execution_path(),
1730 pidfile_name=_AUTOSERV_PID_FILE)
1731 _drone_manager.register_pidfile(pidfile_id)
1732
1733
1734 def cleanup(self):
1735 super(SpecialAgentTask, self).cleanup()
1736 self.task.finish()
showardf85a0b72009-10-07 20:48:45 +00001737 if self.monitor:
1738 if self.monitor.has_process():
1739 self._copy_results([self.task])
1740 if self.monitor.pidfile_id is not None:
1741 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001742
1743
1744class RepairTask(SpecialAgentTask):
1745 TASK_TYPE = models.SpecialTask.Task.REPAIR
1746
1747
1748 def __init__(self, task, recover_run_monitor=None):
1749 """\
1750 queue_entry: queue entry to mark failed if this repair fails.
1751 """
1752 protection = host_protections.Protection.get_string(
1753 task.host.protection)
1754 # normalize the protection name
1755 protection = host_protections.Protection.get_attr_name(protection)
1756
1757 super(RepairTask, self).__init__(
1758 task, ['-R', '--host-protection', protection],
1759 recover_run_monitor=recover_run_monitor)
1760
1761 # *don't* include the queue entry in IDs -- if the queue entry is
1762 # aborted, we want to leave the repair task running
1763 self._set_ids(host=self.host)
1764
1765
1766 def prolog(self):
1767 super(RepairTask, self).prolog()
1768 logging.info("repair_task starting")
1769 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001770
1771
jadmanski0afbb632008-06-06 21:10:57 +00001772 def epilog(self):
1773 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001774
jadmanski0afbb632008-06-06 21:10:57 +00001775 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001776 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001777 else:
showard8cc058f2009-09-08 16:26:33 +00001778 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001779 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001780 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001781
1782
showarded2afea2009-07-07 20:54:07 +00001783class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001784 def _copy_to_results_repository(self):
1785 if not self.queue_entry or self.queue_entry.meta_host:
1786 return
1787
1788 self.queue_entry.set_execution_subdir()
1789 log_name = os.path.basename(self.task.execution_path())
1790 source = os.path.join(self.task.execution_path(), 'debug',
1791 'autoserv.DEBUG')
1792 destination = os.path.join(
1793 self.queue_entry.execution_path(), log_name)
1794
1795 self.monitor.try_copy_to_results_repository(
1796 source, destination_path=destination)
1797
1798
showard170873e2009-01-07 00:22:26 +00001799 def epilog(self):
1800 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001801
showard775300b2009-09-09 15:30:50 +00001802 if self.success:
1803 return
showard8fe93b52008-11-18 17:53:22 +00001804
showard775300b2009-09-09 15:30:50 +00001805 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001806
showard775300b2009-09-09 15:30:50 +00001807 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
1808 return
1809
1810 if self.queue_entry:
1811 self.queue_entry.requeue()
1812
1813 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001814 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001815 queue_entry__id=self.queue_entry.id):
1816 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1817 self._fail_queue_entry()
1818 return
1819
1820 queue_entry = models.HostQueueEntry(id=self.queue_entry.id)
1821 else:
1822 queue_entry = None
1823
1824 models.SpecialTask.objects.create(
1825 host=models.Host(id=self.host.id),
1826 task=models.SpecialTask.Task.REPAIR,
1827 queue_entry=queue_entry)
showard58721a82009-08-20 23:32:40 +00001828
showard8fe93b52008-11-18 17:53:22 +00001829
1830class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001831 TASK_TYPE = models.SpecialTask.Task.VERIFY
1832
1833
showard8cc058f2009-09-08 16:26:33 +00001834 def __init__(self, task, recover_run_monitor=None):
showarded2afea2009-07-07 20:54:07 +00001835 super(VerifyTask, self).__init__(
showard8cc058f2009-09-08 16:26:33 +00001836 task, ['-v'], recover_run_monitor=recover_run_monitor)
mblighe2586682008-02-29 22:45:46 +00001837
showard8cc058f2009-09-08 16:26:33 +00001838 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001839
1840
jadmanski0afbb632008-06-06 21:10:57 +00001841 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001842 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001843
showardb18134f2009-03-20 20:52:18 +00001844 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001845 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001846 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1847 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001848
showarded2afea2009-07-07 20:54:07 +00001849 # Delete any other queued verifies for this host. One verify will do
1850 # and there's no need to keep records of other requests.
1851 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001852 host__id=self.host.id,
1853 task=models.SpecialTask.Task.VERIFY,
showarded2afea2009-07-07 20:54:07 +00001854 is_active=False, is_complete=False)
1855 queued_verifies = queued_verifies.exclude(id=self.task.id)
1856 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001857
mbligh36768f02008-02-22 18:28:33 +00001858
jadmanski0afbb632008-06-06 21:10:57 +00001859 def epilog(self):
1860 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001861 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001862 if self.queue_entry:
1863 self.queue_entry.on_pending()
1864 else:
1865 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001866
1867
showardb5626452009-06-30 01:57:28 +00001868class CleanupHostsMixin(object):
1869 def _reboot_hosts(self, job, queue_entries, final_success,
1870 num_tests_failed):
1871 reboot_after = job.reboot_after
1872 do_reboot = (
1873 # always reboot after aborted jobs
1874 self._final_status == models.HostQueueEntry.Status.ABORTED
1875 or reboot_after == models.RebootAfter.ALWAYS
1876 or (reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED
1877 and final_success and num_tests_failed == 0))
1878
1879 for queue_entry in queue_entries:
1880 if do_reboot:
1881 # don't pass the queue entry to the CleanupTask. if the cleanup
1882 # fails, the job doesn't care -- it's over.
showard8cc058f2009-09-08 16:26:33 +00001883 models.SpecialTask.objects.create(
1884 host=models.Host(id=queue_entry.host.id),
1885 task=models.SpecialTask.Task.CLEANUP)
showardb5626452009-06-30 01:57:28 +00001886 else:
showard8cc058f2009-09-08 16:26:33 +00001887 queue_entry.host.set_status(models.Host.Status.READY)
showardb5626452009-06-30 01:57:28 +00001888
1889
1890class QueueTask(AgentTask, TaskWithJobKeyvals, CleanupHostsMixin):
showard8cc058f2009-09-08 16:26:33 +00001891 def __init__(self, job, queue_entries, cmd=None, recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001892 self.job = job
1893 self.queue_entries = queue_entries
showard8cc058f2009-09-08 16:26:33 +00001894 self.group_name = queue_entries[0].get_group_name()
showarded2afea2009-07-07 20:54:07 +00001895 super(QueueTask, self).__init__(
1896 cmd, self._execution_path(),
1897 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00001898 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001899
1900
showard73ec0442009-02-07 02:05:20 +00001901 def _keyval_path(self):
showarded2afea2009-07-07 20:54:07 +00001902 return os.path.join(self._execution_path(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001903
1904
showarded2afea2009-07-07 20:54:07 +00001905 def _execution_path(self):
1906 return self.queue_entries[0].execution_path()
mblighbb421852008-03-11 22:36:16 +00001907
1908
jadmanski0afbb632008-06-06 21:10:57 +00001909 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00001910 for entry in self.queue_entries:
1911 if entry.status not in (models.HostQueueEntry.Status.STARTING,
1912 models.HostQueueEntry.Status.RUNNING):
1913 raise SchedulerError('Queue task attempting to start '
1914 'entry with invalid status %s: %s'
1915 % (entry.status, entry))
1916 if entry.host.status not in (models.Host.Status.PENDING,
1917 models.Host.Status.RUNNING):
1918 raise SchedulerError('Queue task attempting to start on queue '
1919 'entry with invalid host status %s: %s'
1920 % (entry.host.status, entry))
1921
showardd9205182009-04-27 20:09:55 +00001922 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001923 keyval_dict = {queued_key: queued_time}
1924 if self.group_name:
1925 keyval_dict['host_group_name'] = self.group_name
1926 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001927 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001928 self._write_host_keyvals(queue_entry.host)
showard8cc058f2009-09-08 16:26:33 +00001929 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showard12f3e322009-05-13 21:27:42 +00001930 queue_entry.update_field('started_on', datetime.datetime.now())
showard8cc058f2009-09-08 16:26:33 +00001931 queue_entry.host.set_status(models.Host.Status.RUNNING)
showard21baa452008-10-21 00:08:39 +00001932 queue_entry.host.update_field('dirty', 1)
showardc6a56872009-07-28 20:11:58 +00001933 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
1934 # TODO(gps): Remove this if nothing needs it anymore.
1935 # A potential user is: tko/parser
jadmanski0afbb632008-06-06 21:10:57 +00001936 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001937
1938
showard35162b02009-03-03 02:17:30 +00001939 def _write_lost_process_error_file(self):
showarded2afea2009-07-07 20:54:07 +00001940 error_file_path = os.path.join(self._execution_path(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001941 _drone_manager.write_lines_to_file(error_file_path,
1942 [_LOST_PROCESS_ERROR])
1943
1944
showardd3dc1992009-04-22 21:01:40 +00001945 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001946 if not self.monitor:
1947 return
1948
showardd9205182009-04-27 20:09:55 +00001949 self._write_job_finished()
1950
showard35162b02009-03-03 02:17:30 +00001951 if self.monitor.lost_process:
1952 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001953
showard8cc058f2009-09-08 16:26:33 +00001954 for queue_entry in self.queue_entries:
1955 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001956
1957
showardcbd74612008-11-19 21:42:02 +00001958 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001959 _drone_manager.write_lines_to_file(
showarded2afea2009-07-07 20:54:07 +00001960 os.path.join(self._execution_path(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001961 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001962 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001963
1964
jadmanskif7fa2cc2008-10-01 14:13:23 +00001965 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001966 if not self.monitor or not self.monitor.has_process():
1967 return
1968
jadmanskif7fa2cc2008-10-01 14:13:23 +00001969 # build up sets of all the aborted_by and aborted_on values
1970 aborted_by, aborted_on = set(), set()
1971 for queue_entry in self.queue_entries:
1972 if queue_entry.aborted_by:
1973 aborted_by.add(queue_entry.aborted_by)
1974 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1975 aborted_on.add(t)
1976
1977 # extract some actual, unique aborted by value and write it out
1978 assert len(aborted_by) <= 1
1979 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001980 aborted_by_value = aborted_by.pop()
1981 aborted_on_value = max(aborted_on)
1982 else:
1983 aborted_by_value = 'autotest_system'
1984 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001985
showarda0382352009-02-11 23:36:43 +00001986 self._write_keyval_after_job("aborted_by", aborted_by_value)
1987 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001988
showardcbd74612008-11-19 21:42:02 +00001989 aborted_on_string = str(datetime.datetime.fromtimestamp(
1990 aborted_on_value))
1991 self._write_status_comment('Job aborted by %s on %s' %
1992 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001993
1994
jadmanski0afbb632008-06-06 21:10:57 +00001995 def abort(self):
1996 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001997 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001998 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001999
2000
jadmanski0afbb632008-06-06 21:10:57 +00002001 def epilog(self):
2002 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002003 self._finish_task()
2004 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00002005
2006
showardd3dc1992009-04-22 21:01:40 +00002007class PostJobTask(AgentTask):
2008 def __init__(self, queue_entries, pidfile_name, logfile_name,
showarded2afea2009-07-07 20:54:07 +00002009 recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00002010 self._queue_entries = queue_entries
2011 self._pidfile_name = pidfile_name
showardd3dc1992009-04-22 21:01:40 +00002012
showarded2afea2009-07-07 20:54:07 +00002013 self._execution_path = self._get_consistent_execution_path(
2014 queue_entries)
2015 self._results_dir = _drone_manager.absolute_path(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00002016 self._autoserv_monitor = PidfileRunMonitor()
showarded2afea2009-07-07 20:54:07 +00002017 self._autoserv_monitor.attach_to_existing_process(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00002018 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
2019
2020 if _testing_mode:
2021 command = 'true'
2022 else:
2023 command = self._generate_command(self._results_dir)
2024
showarded2afea2009-07-07 20:54:07 +00002025 super(PostJobTask, self).__init__(
2026 cmd=command, working_directory=self._execution_path,
2027 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00002028
showarded2afea2009-07-07 20:54:07 +00002029 self.log_file = os.path.join(self._execution_path, logfile_name)
showardd3dc1992009-04-22 21:01:40 +00002030 self._final_status = self._determine_final_status()
2031
2032
2033 def _generate_command(self, results_dir):
2034 raise NotImplementedError('Subclasses must override this')
2035
2036
2037 def _job_was_aborted(self):
2038 was_aborted = None
2039 for queue_entry in self._queue_entries:
2040 queue_entry.update_from_database()
2041 if was_aborted is None: # first queue entry
2042 was_aborted = bool(queue_entry.aborted)
2043 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
2044 email_manager.manager.enqueue_notify_email(
2045 'Inconsistent abort state',
2046 'Queue entries have inconsistent abort state: ' +
2047 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
2048 # don't crash here, just assume true
2049 return True
2050 return was_aborted
2051
2052
2053 def _determine_final_status(self):
2054 if self._job_was_aborted():
2055 return models.HostQueueEntry.Status.ABORTED
2056
2057 # we'll use a PidfileRunMonitor to read the autoserv exit status
2058 if self._autoserv_monitor.exit_code() == 0:
2059 return models.HostQueueEntry.Status.COMPLETED
2060 return models.HostQueueEntry.Status.FAILED
2061
2062
2063 def run(self):
showard8cc058f2009-09-08 16:26:33 +00002064 # Make sure we actually have results to work with.
2065 # This should never happen in normal operation.
showard5add1c82009-05-26 19:27:46 +00002066 if not self._autoserv_monitor.has_process():
2067 email_manager.manager.enqueue_notify_email(
showard8cc058f2009-09-08 16:26:33 +00002068 'No results in post-job task',
2069 'No results in post-job task at %s' %
2070 self._autoserv_monitor.pidfile_id)
showard5add1c82009-05-26 19:27:46 +00002071 self.finished(False)
2072 return
2073
2074 super(PostJobTask, self).run(
2075 pidfile_name=self._pidfile_name,
2076 paired_with_pidfile=self._paired_with_pidfile)
showardd3dc1992009-04-22 21:01:40 +00002077
2078
2079 def _set_all_statuses(self, status):
2080 for queue_entry in self._queue_entries:
2081 queue_entry.set_status(status)
2082
2083
2084 def abort(self):
2085 # override AgentTask.abort() to avoid killing the process and ending
2086 # the task. post-job tasks continue when the job is aborted.
2087 pass
2088
2089
showardb5626452009-06-30 01:57:28 +00002090class GatherLogsTask(PostJobTask, CleanupHostsMixin):
showardd3dc1992009-04-22 21:01:40 +00002091 """
2092 Task responsible for
2093 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2094 * copying logs to the results repository
2095 * spawning CleanupTasks for hosts, if necessary
2096 * spawning a FinalReparseTask for the job
2097 """
showarded2afea2009-07-07 20:54:07 +00002098 def __init__(self, job, queue_entries, recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00002099 self._job = job
2100 super(GatherLogsTask, self).__init__(
2101 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
showarded2afea2009-07-07 20:54:07 +00002102 logfile_name='.collect_crashinfo.log',
2103 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00002104 self._set_ids(queue_entries=queue_entries)
2105
2106
2107 def _generate_command(self, results_dir):
2108 host_list = ','.join(queue_entry.host.hostname
2109 for queue_entry in self._queue_entries)
2110 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
2111 '-r', results_dir]
2112
2113
2114 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00002115 for queue_entry in self._queue_entries:
2116 if queue_entry.status != models.HostQueueEntry.Status.GATHERING:
2117 raise SchedulerError('Gather task attempting to start on '
2118 'non-gathering entry: %s' % queue_entry)
2119 if queue_entry.host.status != models.Host.Status.RUNNING:
2120 raise SchedulerError('Gather task attempting to start on queue '
showard0c5c18d2009-09-24 22:20:45 +00002121 'entry with non-running host status %s: %s'
2122 % (queue_entry.host.status, queue_entry))
showard8cc058f2009-09-08 16:26:33 +00002123
showardd3dc1992009-04-22 21:01:40 +00002124 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002125
2126
showardd3dc1992009-04-22 21:01:40 +00002127 def epilog(self):
2128 super(GatherLogsTask, self).epilog()
showardb5626452009-06-30 01:57:28 +00002129
showard6d1c1432009-08-20 23:30:39 +00002130 self._copy_and_parse_results(self._queue_entries,
2131 use_monitor=self._autoserv_monitor)
2132
2133 if self._autoserv_monitor.has_process():
2134 final_success = (self._final_status ==
2135 models.HostQueueEntry.Status.COMPLETED)
2136 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2137 else:
2138 final_success = False
2139 num_tests_failed = 0
2140
showardb5626452009-06-30 01:57:28 +00002141 self._reboot_hosts(self._job, self._queue_entries, final_success,
2142 num_tests_failed)
showardd3dc1992009-04-22 21:01:40 +00002143
2144
showard0bbfc212009-04-29 21:06:13 +00002145 def run(self):
showard597bfd32009-05-08 18:22:50 +00002146 autoserv_exit_code = self._autoserv_monitor.exit_code()
2147 # only run if Autoserv exited due to some signal. if we have no exit
2148 # code, assume something bad (and signal-like) happened.
2149 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002150 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002151 else:
2152 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002153
2154
showard8fe93b52008-11-18 17:53:22 +00002155class CleanupTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002156 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2157
2158
showard8cc058f2009-09-08 16:26:33 +00002159 def __init__(self, task, recover_run_monitor=None):
showarded2afea2009-07-07 20:54:07 +00002160 super(CleanupTask, self).__init__(
showard8cc058f2009-09-08 16:26:33 +00002161 task, ['--cleanup'], recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002162
showard8cc058f2009-09-08 16:26:33 +00002163 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mbligh16c722d2008-03-05 00:58:44 +00002164
mblighd5c95802008-03-05 00:33:46 +00002165
jadmanski0afbb632008-06-06 21:10:57 +00002166 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002167 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00002168 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard8cc058f2009-09-08 16:26:33 +00002169 self.host.set_status(models.Host.Status.CLEANING)
2170 if self.queue_entry:
2171 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2172
2173
showard775300b2009-09-09 15:30:50 +00002174 def _finish_epilog(self):
2175 if not self.queue_entry:
2176 return
2177
2178 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
2179 self.queue_entry.on_pending()
2180 elif self.success:
2181 if self.queue_entry.job.run_verify:
2182 entry = models.HostQueueEntry(id=self.queue_entry.id)
2183 models.SpecialTask.objects.create(
2184 host=models.Host(id=self.host.id),
2185 queue_entry=entry,
2186 task=models.SpecialTask.Task.VERIFY)
2187 else:
2188 self.queue_entry.on_pending()
mblighd5c95802008-03-05 00:33:46 +00002189
mblighd5c95802008-03-05 00:33:46 +00002190
showard21baa452008-10-21 00:08:39 +00002191 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00002192 super(CleanupTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002193
showard21baa452008-10-21 00:08:39 +00002194 if self.success:
2195 self.host.update_field('dirty', 0)
showard775300b2009-09-09 15:30:50 +00002196 self.host.set_status(models.Host.Status.READY)
showard21baa452008-10-21 00:08:39 +00002197
showard775300b2009-09-09 15:30:50 +00002198 self._finish_epilog()
showard8cc058f2009-09-08 16:26:33 +00002199
showard21baa452008-10-21 00:08:39 +00002200
showardd3dc1992009-04-22 21:01:40 +00002201class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00002202 _num_running_parses = 0
2203
showarded2afea2009-07-07 20:54:07 +00002204 def __init__(self, queue_entries, recover_run_monitor=None):
2205 super(FinalReparseTask, self).__init__(
2206 queue_entries, pidfile_name=_PARSER_PID_FILE,
2207 logfile_name='.parse.log',
2208 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002209 # don't use _set_ids, since we don't want to set the host_ids
2210 self.queue_entry_ids = [entry.id for entry in queue_entries]
showarded2afea2009-07-07 20:54:07 +00002211 self._parse_started = self.started
showard97aed502008-11-04 02:01:24 +00002212
showard97aed502008-11-04 02:01:24 +00002213
2214 @classmethod
2215 def _increment_running_parses(cls):
2216 cls._num_running_parses += 1
2217
2218
2219 @classmethod
2220 def _decrement_running_parses(cls):
2221 cls._num_running_parses -= 1
2222
2223
2224 @classmethod
2225 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002226 return (cls._num_running_parses <
2227 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002228
2229
2230 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00002231 for queue_entry in self._queue_entries:
2232 if queue_entry.status != models.HostQueueEntry.Status.PARSING:
2233 raise SchedulerError('Parse task attempting to start on '
2234 'non-parsing entry: %s' % queue_entry)
2235
showard97aed502008-11-04 02:01:24 +00002236 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002237
2238
2239 def epilog(self):
2240 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002241 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00002242
2243
showardd3dc1992009-04-22 21:01:40 +00002244 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00002245 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00002246 results_dir]
showard97aed502008-11-04 02:01:24 +00002247
2248
showard08a36412009-05-05 01:01:13 +00002249 def tick(self):
2250 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002251 # and we can, at which point we revert to default behavior
2252 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00002253 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002254 else:
2255 self._try_starting_parse()
2256
2257
2258 def run(self):
2259 # override run() to not actually run unless we can
2260 self._try_starting_parse()
2261
2262
2263 def _try_starting_parse(self):
2264 if not self._can_run_new_parse():
2265 return
showard170873e2009-01-07 00:22:26 +00002266
showard97aed502008-11-04 02:01:24 +00002267 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002268 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00002269
showard97aed502008-11-04 02:01:24 +00002270 self._increment_running_parses()
2271 self._parse_started = True
2272
2273
2274 def finished(self, success):
2275 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00002276 if self._parse_started:
2277 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002278
2279
showarda3c58572009-03-12 20:36:59 +00002280class DBError(Exception):
2281 """Raised by the DBObject constructor when its select fails."""
2282
2283
mbligh36768f02008-02-22 18:28:33 +00002284class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002285 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002286
2287 # Subclasses MUST override these:
2288 _table_name = ''
2289 _fields = ()
2290
showarda3c58572009-03-12 20:36:59 +00002291 # A mapping from (type, id) to the instance of the object for that
2292 # particular id. This prevents us from creating new Job() and Host()
2293 # instances for every HostQueueEntry object that we instantiate as
2294 # multiple HQEs often share the same Job.
2295 _instances_by_type_and_id = weakref.WeakValueDictionary()
2296 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002297
showarda3c58572009-03-12 20:36:59 +00002298
2299 def __new__(cls, id=None, **kwargs):
2300 """
2301 Look to see if we already have an instance for this particular type
2302 and id. If so, use it instead of creating a duplicate instance.
2303 """
2304 if id is not None:
2305 instance = cls._instances_by_type_and_id.get((cls, id))
2306 if instance:
2307 return instance
2308 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2309
2310
2311 def __init__(self, id=None, row=None, new_record=False, always_query=True):
showard8cc058f2009-09-08 16:26:33 +00002312 assert bool(id) or bool(row)
2313 if id is not None and row is not None:
2314 assert id == row[0]
showard6ae5ea92009-02-25 00:11:51 +00002315 assert self._table_name, '_table_name must be defined in your class'
2316 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002317 if not new_record:
2318 if self._initialized and not always_query:
2319 return # We've already been initialized.
2320 if id is None:
2321 id = row[0]
2322 # Tell future constructors to use us instead of re-querying while
2323 # this instance is still around.
2324 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002325
showard6ae5ea92009-02-25 00:11:51 +00002326 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002327
jadmanski0afbb632008-06-06 21:10:57 +00002328 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002329
jadmanski0afbb632008-06-06 21:10:57 +00002330 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002331 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002332
showarda3c58572009-03-12 20:36:59 +00002333 if self._initialized:
2334 differences = self._compare_fields_in_row(row)
2335 if differences:
showard7629f142009-03-27 21:02:02 +00002336 logging.warn(
2337 'initialized %s %s instance requery is updating: %s',
2338 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002339 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002340 self._initialized = True
2341
2342
2343 @classmethod
2344 def _clear_instance_cache(cls):
2345 """Used for testing, clear the internal instance cache."""
2346 cls._instances_by_type_and_id.clear()
2347
2348
showardccbd6c52009-03-21 00:10:21 +00002349 def _fetch_row_from_db(self, row_id):
2350 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2351 rows = _db.execute(sql, (row_id,))
2352 if not rows:
showard76e29d12009-04-15 21:53:10 +00002353 raise DBError("row not found (table=%s, row id=%s)"
2354 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002355 return rows[0]
2356
2357
showarda3c58572009-03-12 20:36:59 +00002358 def _assert_row_length(self, row):
2359 assert len(row) == len(self._fields), (
2360 "table = %s, row = %s/%d, fields = %s/%d" % (
2361 self.__table, row, len(row), self._fields, len(self._fields)))
2362
2363
2364 def _compare_fields_in_row(self, row):
2365 """
showarddae680a2009-10-12 20:26:43 +00002366 Given a row as returned by a SELECT query, compare it to our existing in
2367 memory fields. Fractional seconds are stripped from datetime values
2368 before comparison.
showarda3c58572009-03-12 20:36:59 +00002369
2370 @param row - A sequence of values corresponding to fields named in
2371 The class attribute _fields.
2372
2373 @returns A dictionary listing the differences keyed by field name
2374 containing tuples of (current_value, row_value).
2375 """
2376 self._assert_row_length(row)
2377 differences = {}
showarddae680a2009-10-12 20:26:43 +00002378 datetime_cmp_fmt = '%Y-%m-%d %H:%M:%S' # Leave off the microseconds.
showarda3c58572009-03-12 20:36:59 +00002379 for field, row_value in itertools.izip(self._fields, row):
2380 current_value = getattr(self, field)
showarddae680a2009-10-12 20:26:43 +00002381 if (isinstance(current_value, datetime.datetime)
2382 and isinstance(row_value, datetime.datetime)):
2383 current_value = current_value.strftime(datetime_cmp_fmt)
2384 row_value = row_value.strftime(datetime_cmp_fmt)
showarda3c58572009-03-12 20:36:59 +00002385 if current_value != row_value:
2386 differences[field] = (current_value, row_value)
2387 return differences
showard2bab8f42008-11-12 18:15:22 +00002388
2389
2390 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002391 """
2392 Update our field attributes using a single row returned by SELECT.
2393
2394 @param row - A sequence of values corresponding to fields named in
2395 the class fields list.
2396 """
2397 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002398
showard2bab8f42008-11-12 18:15:22 +00002399 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002400 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002401 setattr(self, field, value)
2402 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002403
showard2bab8f42008-11-12 18:15:22 +00002404 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002405
mblighe2586682008-02-29 22:45:46 +00002406
showardccbd6c52009-03-21 00:10:21 +00002407 def update_from_database(self):
2408 assert self.id is not None
2409 row = self._fetch_row_from_db(self.id)
2410 self._update_fields_from_row(row)
2411
2412
jadmanski0afbb632008-06-06 21:10:57 +00002413 def count(self, where, table = None):
2414 if not table:
2415 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002416
jadmanski0afbb632008-06-06 21:10:57 +00002417 rows = _db.execute("""
2418 SELECT count(*) FROM %s
2419 WHERE %s
2420 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002421
jadmanski0afbb632008-06-06 21:10:57 +00002422 assert len(rows) == 1
2423
2424 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002425
2426
showardd3dc1992009-04-22 21:01:40 +00002427 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002428 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002429
showard2bab8f42008-11-12 18:15:22 +00002430 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002431 return
mbligh36768f02008-02-22 18:28:33 +00002432
mblighf8c624d2008-07-03 16:58:45 +00002433 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002434 _db.execute(query, (value, self.id))
2435
showard2bab8f42008-11-12 18:15:22 +00002436 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002437
2438
jadmanski0afbb632008-06-06 21:10:57 +00002439 def save(self):
2440 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002441 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002442 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002443 values = []
2444 for key in keys:
2445 value = getattr(self, key)
2446 if value is None:
2447 values.append('NULL')
2448 else:
2449 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002450 values_str = ','.join(values)
2451 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2452 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002453 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002454 # Update our id to the one the database just assigned to us.
2455 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002456
2457
jadmanski0afbb632008-06-06 21:10:57 +00002458 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002459 self._instances_by_type_and_id.pop((type(self), id), None)
2460 self._initialized = False
2461 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002462 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2463 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002464
2465
showard63a34772008-08-18 19:32:50 +00002466 @staticmethod
2467 def _prefix_with(string, prefix):
2468 if string:
2469 string = prefix + string
2470 return string
2471
2472
jadmanski0afbb632008-06-06 21:10:57 +00002473 @classmethod
showard989f25d2008-10-01 11:38:11 +00002474 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002475 """
2476 Construct instances of our class based on the given database query.
2477
2478 @yields One class instance for each row fetched.
2479 """
showard63a34772008-08-18 19:32:50 +00002480 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2481 where = cls._prefix_with(where, 'WHERE ')
2482 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002483 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002484 'joins' : joins,
2485 'where' : where,
2486 'order_by' : order_by})
2487 rows = _db.execute(query, params)
showard8cc058f2009-09-08 16:26:33 +00002488 return [cls(id=row[0], row=row) for row in rows]
mblighe2586682008-02-29 22:45:46 +00002489
mbligh36768f02008-02-22 18:28:33 +00002490
2491class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002492 _table_name = 'ineligible_host_queues'
2493 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002494
2495
showard89f84db2009-03-12 20:39:13 +00002496class AtomicGroup(DBObject):
2497 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002498 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2499 'invalid')
showard89f84db2009-03-12 20:39:13 +00002500
2501
showard989f25d2008-10-01 11:38:11 +00002502class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002503 _table_name = 'labels'
2504 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002505 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002506
2507
showard6157c632009-07-06 20:19:31 +00002508 def __repr__(self):
2509 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
2510 self.name, self.id, self.atomic_group_id)
2511
2512
mbligh36768f02008-02-22 18:28:33 +00002513class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002514 _table_name = 'hosts'
2515 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2516 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2517
2518
jadmanski0afbb632008-06-06 21:10:57 +00002519 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002520 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002521 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002522
2523
showard170873e2009-01-07 00:22:26 +00002524 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002525 """
showard170873e2009-01-07 00:22:26 +00002526 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002527 """
2528 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002529 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002530 FROM labels
2531 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002532 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002533 ORDER BY labels.name
2534 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002535 platform = None
2536 all_labels = []
2537 for label_name, is_platform in rows:
2538 if is_platform:
2539 platform = label_name
2540 all_labels.append(label_name)
2541 return platform, all_labels
2542
2543
showard54c1ea92009-05-20 00:32:58 +00002544 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2545
2546
2547 @classmethod
2548 def cmp_for_sort(cls, a, b):
2549 """
2550 A comparison function for sorting Host objects by hostname.
2551
2552 This strips any trailing numeric digits, ignores leading 0s and
2553 compares hostnames by the leading name and the trailing digits as a
2554 number. If both hostnames do not match this pattern, they are simply
2555 compared as lower case strings.
2556
2557 Example of how hostnames will be sorted:
2558
2559 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2560
2561 This hopefully satisfy most people's hostname sorting needs regardless
2562 of their exact naming schemes. Nobody sane should have both a host10
2563 and host010 (but the algorithm works regardless).
2564 """
2565 lower_a = a.hostname.lower()
2566 lower_b = b.hostname.lower()
2567 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2568 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2569 if match_a and match_b:
2570 name_a, number_a_str = match_a.groups()
2571 name_b, number_b_str = match_b.groups()
2572 number_a = int(number_a_str.lstrip('0'))
2573 number_b = int(number_b_str.lstrip('0'))
2574 result = cmp((name_a, number_a), (name_b, number_b))
2575 if result == 0 and lower_a != lower_b:
2576 # If they compared equal above but the lower case names are
2577 # indeed different, don't report equality. abc012 != abc12.
2578 return cmp(lower_a, lower_b)
2579 return result
2580 else:
2581 return cmp(lower_a, lower_b)
2582
2583
mbligh36768f02008-02-22 18:28:33 +00002584class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002585 _table_name = 'host_queue_entries'
2586 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002587 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002588 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002589
2590
showarda3c58572009-03-12 20:36:59 +00002591 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002592 assert id or row
showarda3c58572009-03-12 20:36:59 +00002593 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002594 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002595
jadmanski0afbb632008-06-06 21:10:57 +00002596 if self.host_id:
2597 self.host = Host(self.host_id)
2598 else:
2599 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002600
showard77182562009-06-10 00:16:05 +00002601 if self.atomic_group_id:
2602 self.atomic_group = AtomicGroup(self.atomic_group_id,
2603 always_query=False)
2604 else:
2605 self.atomic_group = None
2606
showard170873e2009-01-07 00:22:26 +00002607 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002608 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002609
2610
showard89f84db2009-03-12 20:39:13 +00002611 @classmethod
2612 def clone(cls, template):
2613 """
2614 Creates a new row using the values from a template instance.
2615
2616 The new instance will not exist in the database or have a valid
2617 id attribute until its save() method is called.
2618 """
2619 assert isinstance(template, cls)
2620 new_row = [getattr(template, field) for field in cls._fields]
2621 clone = cls(row=new_row, new_record=True)
2622 clone.id = None
2623 return clone
2624
2625
showardc85c21b2008-11-24 22:17:37 +00002626 def _view_job_url(self):
2627 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2628
2629
showardf1ae3542009-05-11 19:26:02 +00002630 def get_labels(self):
2631 """
2632 Get all labels associated with this host queue entry (either via the
2633 meta_host or as a job dependency label). The labels yielded are not
2634 guaranteed to be unique.
2635
2636 @yields Label instances associated with this host_queue_entry.
2637 """
2638 if self.meta_host:
2639 yield Label(id=self.meta_host, always_query=False)
2640 labels = Label.fetch(
2641 joins="JOIN jobs_dependency_labels AS deps "
2642 "ON (labels.id = deps.label_id)",
2643 where="deps.job_id = %d" % self.job.id)
2644 for label in labels:
2645 yield label
2646
2647
jadmanski0afbb632008-06-06 21:10:57 +00002648 def set_host(self, host):
2649 if host:
2650 self.queue_log_record('Assigning host ' + host.hostname)
2651 self.update_field('host_id', host.id)
2652 self.update_field('active', True)
2653 self.block_host(host.id)
2654 else:
2655 self.queue_log_record('Releasing host')
2656 self.unblock_host(self.host.id)
2657 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002658
jadmanski0afbb632008-06-06 21:10:57 +00002659 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002660
2661
jadmanski0afbb632008-06-06 21:10:57 +00002662 def get_host(self):
2663 return self.host
mbligh36768f02008-02-22 18:28:33 +00002664
2665
jadmanski0afbb632008-06-06 21:10:57 +00002666 def queue_log_record(self, log_line):
2667 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002668 _drone_manager.write_lines_to_file(self.queue_log_path,
2669 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002670
2671
jadmanski0afbb632008-06-06 21:10:57 +00002672 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002673 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002674 row = [0, self.job.id, host_id]
2675 block = IneligibleHostQueue(row=row, new_record=True)
2676 block.save()
mblighe2586682008-02-29 22:45:46 +00002677
2678
jadmanski0afbb632008-06-06 21:10:57 +00002679 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002680 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002681 blocks = IneligibleHostQueue.fetch(
2682 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2683 for block in blocks:
2684 block.delete()
mblighe2586682008-02-29 22:45:46 +00002685
2686
showard2bab8f42008-11-12 18:15:22 +00002687 def set_execution_subdir(self, subdir=None):
2688 if subdir is None:
2689 assert self.get_host()
2690 subdir = self.get_host().hostname
2691 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002692
2693
showard6355f6b2008-12-05 18:52:13 +00002694 def _get_hostname(self):
2695 if self.host:
2696 return self.host.hostname
2697 return 'no host'
2698
2699
showard170873e2009-01-07 00:22:26 +00002700 def __str__(self):
showard828fc4c2009-09-14 20:31:00 +00002701 flags = []
2702 if self.active:
2703 flags.append('active')
2704 if self.complete:
2705 flags.append('complete')
2706 if self.deleted:
2707 flags.append('deleted')
2708 if self.aborted:
2709 flags.append('aborted')
2710 flags_str = ','.join(flags)
2711 if flags_str:
2712 flags_str = ' [%s]' % flags_str
2713 return "%s/%d (%d) %s%s" % (self._get_hostname(), self.job.id, self.id,
2714 self.status, flags_str)
showard170873e2009-01-07 00:22:26 +00002715
2716
jadmanski0afbb632008-06-06 21:10:57 +00002717 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002718 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002719
showardb18134f2009-03-20 20:52:18 +00002720 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002721
showard8cc058f2009-09-08 16:26:33 +00002722 if status in (models.HostQueueEntry.Status.QUEUED,
2723 models.HostQueueEntry.Status.PARSING):
jadmanski0afbb632008-06-06 21:10:57 +00002724 self.update_field('complete', False)
2725 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002726
showard8cc058f2009-09-08 16:26:33 +00002727 if status in (models.HostQueueEntry.Status.PENDING,
2728 models.HostQueueEntry.Status.RUNNING,
2729 models.HostQueueEntry.Status.VERIFYING,
2730 models.HostQueueEntry.Status.STARTING,
2731 models.HostQueueEntry.Status.GATHERING):
jadmanski0afbb632008-06-06 21:10:57 +00002732 self.update_field('complete', False)
2733 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002734
showard8cc058f2009-09-08 16:26:33 +00002735 if status in (models.HostQueueEntry.Status.FAILED,
2736 models.HostQueueEntry.Status.COMPLETED,
2737 models.HostQueueEntry.Status.STOPPED,
2738 models.HostQueueEntry.Status.ABORTED):
jadmanski0afbb632008-06-06 21:10:57 +00002739 self.update_field('complete', True)
2740 self.update_field('active', False)
showardf85a0b72009-10-07 20:48:45 +00002741 self._on_complete()
showardc85c21b2008-11-24 22:17:37 +00002742
2743 should_email_status = (status.lower() in _notify_email_statuses or
2744 'all' in _notify_email_statuses)
2745 if should_email_status:
2746 self._email_on_status(status)
2747
2748 self._email_on_job_complete()
2749
2750
showardf85a0b72009-10-07 20:48:45 +00002751 def _on_complete(self):
2752 if not self.execution_subdir:
2753 return
2754 # unregister any possible pidfiles associated with this queue entry
2755 for pidfile_name in _ALL_PIDFILE_NAMES:
2756 pidfile_id = _drone_manager.get_pidfile_id_from(
2757 self.execution_path(), pidfile_name=pidfile_name)
2758 _drone_manager.unregister_pidfile(pidfile_id)
2759
2760
showardc85c21b2008-11-24 22:17:37 +00002761 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002762 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002763
2764 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2765 self.job.id, self.job.name, hostname, status)
2766 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2767 self.job.id, self.job.name, hostname, status,
2768 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002769 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002770
2771
2772 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002773 if not self.job.is_finished():
2774 return
showard542e8402008-09-19 20:16:18 +00002775
showardc85c21b2008-11-24 22:17:37 +00002776 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002777 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002778 for queue_entry in hosts_queue:
2779 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002780 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002781 queue_entry.status))
2782
2783 summary_text = "\n".join(summary_text)
2784 status_counts = models.Job.objects.get_status_counts(
2785 [self.job.id])[self.job.id]
2786 status = ', '.join('%d %s' % (count, status) for status, count
2787 in status_counts.iteritems())
2788
2789 subject = 'Autotest: Job ID: %s "%s" %s' % (
2790 self.job.id, self.job.name, status)
2791 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2792 self.job.id, self.job.name, status, self._view_job_url(),
2793 summary_text)
showard170873e2009-01-07 00:22:26 +00002794 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002795
2796
showard8cc058f2009-09-08 16:26:33 +00002797 def schedule_pre_job_tasks(self, assigned_host=None):
showard77182562009-06-10 00:16:05 +00002798 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002799 assert assigned_host
2800 # ensure results dir exists for the queue log
showard08356c12009-06-15 20:24:01 +00002801 if self.host_id is None:
2802 self.set_host(assigned_host)
2803 else:
2804 assert assigned_host.id == self.host_id
mbligh36768f02008-02-22 18:28:33 +00002805
showardcfd4a7e2009-07-11 01:47:33 +00002806 logging.info("%s/%s/%s scheduled on %s, status=%s",
showardb18134f2009-03-20 20:52:18 +00002807 self.job.name, self.meta_host, self.atomic_group_id,
2808 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002809
showard8cc058f2009-09-08 16:26:33 +00002810 self._do_schedule_pre_job_tasks()
showard77182562009-06-10 00:16:05 +00002811
2812
showard8cc058f2009-09-08 16:26:33 +00002813 def _do_schedule_pre_job_tasks(self):
showard77182562009-06-10 00:16:05 +00002814 # Every host goes thru the Verifying stage (which may or may not
2815 # actually do anything as determined by get_pre_job_tasks).
2816 self.set_status(models.HostQueueEntry.Status.VERIFYING)
showard8cc058f2009-09-08 16:26:33 +00002817 self.job.schedule_pre_job_tasks(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002818
showard6ae5ea92009-02-25 00:11:51 +00002819
jadmanski0afbb632008-06-06 21:10:57 +00002820 def requeue(self):
showardcfd4a7e2009-07-11 01:47:33 +00002821 assert self.host
showard8cc058f2009-09-08 16:26:33 +00002822 self.set_status(models.HostQueueEntry.Status.QUEUED)
showard12f3e322009-05-13 21:27:42 +00002823 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002824 # verify/cleanup failure sets the execution subdir, so reset it here
2825 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002826 if self.meta_host:
2827 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002828
2829
jadmanski0afbb632008-06-06 21:10:57 +00002830 def handle_host_failure(self):
2831 """\
2832 Called when this queue entry's host has failed verification and
2833 repair.
2834 """
2835 assert not self.meta_host
showard8cc058f2009-09-08 16:26:33 +00002836 self.set_status(models.HostQueueEntry.Status.FAILED)
showard2bab8f42008-11-12 18:15:22 +00002837 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002838
2839
jadmanskif7fa2cc2008-10-01 14:13:23 +00002840 @property
2841 def aborted_by(self):
2842 self._load_abort_info()
2843 return self._aborted_by
2844
2845
2846 @property
2847 def aborted_on(self):
2848 self._load_abort_info()
2849 return self._aborted_on
2850
2851
2852 def _load_abort_info(self):
2853 """ Fetch info about who aborted the job. """
2854 if hasattr(self, "_aborted_by"):
2855 return
2856 rows = _db.execute("""
2857 SELECT users.login, aborted_host_queue_entries.aborted_on
2858 FROM aborted_host_queue_entries
2859 INNER JOIN users
2860 ON users.id = aborted_host_queue_entries.aborted_by_id
2861 WHERE aborted_host_queue_entries.queue_entry_id = %s
2862 """, (self.id,))
2863 if rows:
2864 self._aborted_by, self._aborted_on = rows[0]
2865 else:
2866 self._aborted_by = self._aborted_on = None
2867
2868
showardb2e2c322008-10-14 17:33:55 +00002869 def on_pending(self):
2870 """
2871 Called when an entry in a synchronous job has passed verify. If the
showard8cc058f2009-09-08 16:26:33 +00002872 job is ready to run, sets the entries to STARTING. Otherwise, it leaves
2873 them in PENDING.
showardb2e2c322008-10-14 17:33:55 +00002874 """
showard8cc058f2009-09-08 16:26:33 +00002875 self.set_status(models.HostQueueEntry.Status.PENDING)
2876 self.host.set_status(models.Host.Status.PENDING)
showardb000a8d2009-07-28 20:02:07 +00002877
2878 # Some debug code here: sends an email if an asynchronous job does not
2879 # immediately enter Starting.
2880 # TODO: Remove this once we figure out why asynchronous jobs are getting
2881 # stuck in Pending.
showard8cc058f2009-09-08 16:26:33 +00002882 self.job.run_if_ready(queue_entry=self)
2883 if (self.job.synch_count == 1 and
2884 self.status == models.HostQueueEntry.Status.PENDING):
showardb000a8d2009-07-28 20:02:07 +00002885 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
2886 message = 'Asynchronous job stuck in Pending'
2887 email_manager.manager.enqueue_notify_email(subject, message)
showardb2e2c322008-10-14 17:33:55 +00002888
2889
showardd3dc1992009-04-22 21:01:40 +00002890 def abort(self, dispatcher):
2891 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002892
showardd3dc1992009-04-22 21:01:40 +00002893 Status = models.HostQueueEntry.Status
showard8cc058f2009-09-08 16:26:33 +00002894 if self.status in (Status.GATHERING, Status.PARSING):
showardd3dc1992009-04-22 21:01:40 +00002895 # do nothing; post-job tasks will finish and then mark this entry
2896 # with status "Aborted" and take care of the host
2897 return
2898
showard8cc058f2009-09-08 16:26:33 +00002899 if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING):
2900 assert not dispatcher.get_agents_for_entry(self)
showardd3dc1992009-04-22 21:01:40 +00002901 self.host.set_status(models.Host.Status.READY)
2902 elif self.status == Status.VERIFYING:
showard8cc058f2009-09-08 16:26:33 +00002903 models.SpecialTask.objects.create(
2904 task=models.SpecialTask.Task.CLEANUP,
2905 host=models.Host(id=self.host.id))
showardd3dc1992009-04-22 21:01:40 +00002906
2907 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002908
showard8cc058f2009-09-08 16:26:33 +00002909
2910 def get_group_name(self):
2911 atomic_group = self.atomic_group
2912 if not atomic_group:
2913 return ''
2914
2915 # Look at any meta_host and dependency labels and pick the first
2916 # one that also specifies this atomic group. Use that label name
2917 # as the group name if possible (it is more specific).
2918 for label in self.get_labels():
2919 if label.atomic_group_id:
2920 assert label.atomic_group_id == atomic_group.id
2921 return label.name
2922 return atomic_group.name
2923
2924
showard170873e2009-01-07 00:22:26 +00002925 def execution_tag(self):
2926 assert self.execution_subdir
mblighe7d9c602009-07-02 19:02:33 +00002927 return "%s/%s" % (self.job.tag(), self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002928
2929
showarded2afea2009-07-07 20:54:07 +00002930 def execution_path(self):
2931 return self.execution_tag()
2932
2933
mbligh36768f02008-02-22 18:28:33 +00002934class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002935 _table_name = 'jobs'
2936 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2937 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002938 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002939 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002940
showard77182562009-06-10 00:16:05 +00002941 # This does not need to be a column in the DB. The delays are likely to
2942 # be configured short. If the scheduler is stopped and restarted in
2943 # the middle of a job's delay cycle, the delay cycle will either be
2944 # repeated or skipped depending on the number of Pending machines found
2945 # when the restarted scheduler recovers to track it. Not a problem.
2946 #
2947 # A reference to the DelayedCallTask that will wake up the job should
2948 # no other HQEs change state in time. Its end_time attribute is used
2949 # by our run_with_ready_delay() method to determine if the wait is over.
2950 _delay_ready_task = None
2951
2952 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
2953 # all status='Pending' atomic group HQEs incase a delay was running when the
2954 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00002955
showarda3c58572009-03-12 20:36:59 +00002956 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002957 assert id or row
showarda3c58572009-03-12 20:36:59 +00002958 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002959
mblighe2586682008-02-29 22:45:46 +00002960
jadmanski0afbb632008-06-06 21:10:57 +00002961 def is_server_job(self):
2962 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002963
2964
showard170873e2009-01-07 00:22:26 +00002965 def tag(self):
2966 return "%s-%s" % (self.id, self.owner)
2967
2968
jadmanski0afbb632008-06-06 21:10:57 +00002969 def get_host_queue_entries(self):
2970 rows = _db.execute("""
2971 SELECT * FROM host_queue_entries
2972 WHERE job_id= %s
2973 """, (self.id,))
2974 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002975
jadmanski0afbb632008-06-06 21:10:57 +00002976 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002977
jadmanski0afbb632008-06-06 21:10:57 +00002978 return entries
mbligh36768f02008-02-22 18:28:33 +00002979
2980
jadmanski0afbb632008-06-06 21:10:57 +00002981 def set_status(self, status, update_queues=False):
2982 self.update_field('status',status)
2983
2984 if update_queues:
2985 for queue_entry in self.get_host_queue_entries():
2986 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002987
2988
showard77182562009-06-10 00:16:05 +00002989 def _atomic_and_has_started(self):
2990 """
2991 @returns True if any of the HostQueueEntries associated with this job
2992 have entered the Status.STARTING state or beyond.
2993 """
2994 atomic_entries = models.HostQueueEntry.objects.filter(
2995 job=self.id, atomic_group__isnull=False)
2996 if atomic_entries.count() <= 0:
2997 return False
2998
showardaf8b4ca2009-06-16 18:47:26 +00002999 # These states may *only* be reached if Job.run() has been called.
3000 started_statuses = (models.HostQueueEntry.Status.STARTING,
3001 models.HostQueueEntry.Status.RUNNING,
3002 models.HostQueueEntry.Status.COMPLETED)
3003
3004 started_entries = atomic_entries.filter(status__in=started_statuses)
showard77182562009-06-10 00:16:05 +00003005 return started_entries.count() > 0
3006
3007
showard708b3522009-08-20 23:26:15 +00003008 def _hosts_assigned_count(self):
3009 """The number of HostQueueEntries assigned a Host for this job."""
3010 entries = models.HostQueueEntry.objects.filter(job=self.id,
3011 host__isnull=False)
3012 return entries.count()
3013
3014
showard77182562009-06-10 00:16:05 +00003015 def _pending_count(self):
3016 """The number of HostQueueEntries for this job in the Pending state."""
3017 pending_entries = models.HostQueueEntry.objects.filter(
3018 job=self.id, status=models.HostQueueEntry.Status.PENDING)
3019 return pending_entries.count()
3020
3021
jadmanski0afbb632008-06-06 21:10:57 +00003022 def is_ready(self):
showard77182562009-06-10 00:16:05 +00003023 # NOTE: Atomic group jobs stop reporting ready after they have been
3024 # started to avoid launching multiple copies of one atomic job.
3025 # Only possible if synch_count is less than than half the number of
3026 # machines in the atomic group.
showardb000a8d2009-07-28 20:02:07 +00003027 pending_count = self._pending_count()
3028 atomic_and_has_started = self._atomic_and_has_started()
3029 ready = (pending_count >= self.synch_count
3030 and not self._atomic_and_has_started())
3031
3032 if not ready:
3033 logging.info(
3034 'Job %s not ready: %s pending, %s required '
3035 '(Atomic and started: %s)',
3036 self, pending_count, self.synch_count,
3037 atomic_and_has_started)
3038
3039 return ready
mbligh36768f02008-02-22 18:28:33 +00003040
3041
jadmanski0afbb632008-06-06 21:10:57 +00003042 def num_machines(self, clause = None):
3043 sql = "job_id=%s" % self.id
3044 if clause:
3045 sql += " AND (%s)" % clause
3046 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00003047
3048
jadmanski0afbb632008-06-06 21:10:57 +00003049 def num_queued(self):
3050 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00003051
3052
jadmanski0afbb632008-06-06 21:10:57 +00003053 def num_active(self):
3054 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00003055
3056
jadmanski0afbb632008-06-06 21:10:57 +00003057 def num_complete(self):
3058 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00003059
3060
jadmanski0afbb632008-06-06 21:10:57 +00003061 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00003062 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00003063
mbligh36768f02008-02-22 18:28:33 +00003064
showard6bb7c292009-01-30 01:44:51 +00003065 def _not_yet_run_entries(self, include_verifying=True):
3066 statuses = [models.HostQueueEntry.Status.QUEUED,
3067 models.HostQueueEntry.Status.PENDING]
3068 if include_verifying:
3069 statuses.append(models.HostQueueEntry.Status.VERIFYING)
3070 return models.HostQueueEntry.objects.filter(job=self.id,
3071 status__in=statuses)
3072
3073
3074 def _stop_all_entries(self):
3075 entries_to_stop = self._not_yet_run_entries(
3076 include_verifying=False)
3077 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00003078 assert not child_entry.complete, (
3079 '%s status=%s, active=%s, complete=%s' %
3080 (child_entry.id, child_entry.status, child_entry.active,
3081 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00003082 if child_entry.status == models.HostQueueEntry.Status.PENDING:
3083 child_entry.host.status = models.Host.Status.READY
3084 child_entry.host.save()
3085 child_entry.status = models.HostQueueEntry.Status.STOPPED
3086 child_entry.save()
3087
showard2bab8f42008-11-12 18:15:22 +00003088 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00003089 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00003090 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00003091 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00003092
3093
jadmanski0afbb632008-06-06 21:10:57 +00003094 def write_to_machines_file(self, queue_entry):
3095 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00003096 file_path = os.path.join(self.tag(), '.machines')
3097 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00003098
3099
showardf1ae3542009-05-11 19:26:02 +00003100 def _next_group_name(self, group_name=''):
3101 """@returns a directory name to use for the next host group results."""
3102 if group_name:
3103 # Sanitize for use as a pathname.
3104 group_name = group_name.replace(os.path.sep, '_')
3105 if group_name.startswith('.'):
3106 group_name = '_' + group_name[1:]
3107 # Add a separator between the group name and 'group%d'.
3108 group_name += '.'
3109 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00003110 query = models.HostQueueEntry.objects.filter(
3111 job=self.id).values('execution_subdir').distinct()
3112 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00003113 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
3114 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00003115 if ids:
3116 next_id = max(ids) + 1
3117 else:
3118 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00003119 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00003120
3121
showarddb502762009-09-09 15:31:20 +00003122 def _write_control_file(self, execution_path):
showard170873e2009-01-07 00:22:26 +00003123 control_path = _drone_manager.attach_file_to_execution(
showarddb502762009-09-09 15:31:20 +00003124 execution_path, self.control_file)
showard170873e2009-01-07 00:22:26 +00003125 return control_path
mbligh36768f02008-02-22 18:28:33 +00003126
showardb2e2c322008-10-14 17:33:55 +00003127
showard2bab8f42008-11-12 18:15:22 +00003128 def get_group_entries(self, queue_entry_from_group):
3129 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00003130 return list(HostQueueEntry.fetch(
3131 where='job_id=%s AND execution_subdir=%s',
3132 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00003133
3134
showard8cc058f2009-09-08 16:26:33 +00003135 def get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00003136 assert queue_entries
showarddb502762009-09-09 15:31:20 +00003137 execution_path = queue_entries[0].execution_path()
3138 control_path = self._write_control_file(execution_path)
jadmanski0afbb632008-06-06 21:10:57 +00003139 hostnames = ','.join([entry.get_host().hostname
3140 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00003141
showarddb502762009-09-09 15:31:20 +00003142 execution_tag = queue_entries[0].execution_tag()
showard87ba02a2009-04-20 19:37:32 +00003143 params = _autoserv_command_line(
showarded2afea2009-07-07 20:54:07 +00003144 hostnames,
showard87ba02a2009-04-20 19:37:32 +00003145 ['-P', execution_tag, '-n',
3146 _drone_manager.absolute_path(control_path)],
showarde9c69362009-06-30 01:58:03 +00003147 job=self, verbose=False)
mbligh36768f02008-02-22 18:28:33 +00003148
jadmanski0afbb632008-06-06 21:10:57 +00003149 if not self.is_server_job():
3150 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00003151
showardb2e2c322008-10-14 17:33:55 +00003152 return params
mblighe2586682008-02-29 22:45:46 +00003153
mbligh36768f02008-02-22 18:28:33 +00003154
showardc9ae1782009-01-30 01:42:37 +00003155 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00003156 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00003157 return True
showard0fc38302008-10-23 00:44:07 +00003158 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00003159 return queue_entry.get_host().dirty
3160 return False
showard21baa452008-10-21 00:08:39 +00003161
showardc9ae1782009-01-30 01:42:37 +00003162
showard8cc058f2009-09-08 16:26:33 +00003163 def _should_run_verify(self, queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003164 do_not_verify = (queue_entry.host.protection ==
3165 host_protections.Protection.DO_NOT_VERIFY)
3166 if do_not_verify:
3167 return False
3168 return self.run_verify
3169
3170
showard8cc058f2009-09-08 16:26:33 +00003171 def schedule_pre_job_tasks(self, queue_entry):
showard77182562009-06-10 00:16:05 +00003172 """
3173 Get a list of tasks to perform before the host_queue_entry
3174 may be used to run this Job (such as Cleanup & Verify).
3175
3176 @returns A list of tasks to be done to the given queue_entry before
mbligh6fbdb802009-08-03 16:42:55 +00003177 it should be considered be ready to run this job. The last
showard77182562009-06-10 00:16:05 +00003178 task in the list calls HostQueueEntry.on_pending(), which
3179 continues the flow of the job.
3180 """
showardc9ae1782009-01-30 01:42:37 +00003181 if self._should_run_cleanup(queue_entry):
showard8cc058f2009-09-08 16:26:33 +00003182 task = models.SpecialTask.Task.CLEANUP
3183 elif self._should_run_verify(queue_entry):
3184 task = models.SpecialTask.Task.VERIFY
3185 else:
3186 queue_entry.on_pending()
3187 return
3188
3189 models.SpecialTask.objects.create(
3190 host=models.Host(id=queue_entry.host_id),
3191 queue_entry=models.HostQueueEntry(id=queue_entry.id),
3192 task=task)
showard21baa452008-10-21 00:08:39 +00003193
3194
showardf1ae3542009-05-11 19:26:02 +00003195 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00003196 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00003197 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00003198 else:
showardf1ae3542009-05-11 19:26:02 +00003199 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00003200 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00003201 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00003202 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003203
3204 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00003205 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003206
3207
3208 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00003209 """
3210 @returns A tuple containing a list of HostQueueEntry instances to be
3211 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00003212 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00003213 """
showard77182562009-06-10 00:16:05 +00003214 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00003215 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00003216 if atomic_group:
3217 num_entries_wanted = atomic_group.max_number_of_machines
3218 else:
3219 num_entries_wanted = self.synch_count
3220 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00003221
showardf1ae3542009-05-11 19:26:02 +00003222 if num_entries_wanted > 0:
3223 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00003224 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00003225 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00003226 params=(self.id, include_queue_entry.id)))
3227
3228 # Sort the chosen hosts by hostname before slicing.
3229 def cmp_queue_entries_by_hostname(entry_a, entry_b):
3230 return Host.cmp_for_sort(entry_a.host, entry_b.host)
3231 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
3232 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00003233
showardf1ae3542009-05-11 19:26:02 +00003234 # Sanity check. We'll only ever be called if this can be met.
showard828fc4c2009-09-14 20:31:00 +00003235 if len(chosen_entries) < self.synch_count:
3236 message = ('job %s got less than %s chosen entries: %s' % (
3237 self.id, self.synch_count, chosen_entries))
3238 logging.error(message)
3239 email_manager.manager.enqueue_notify_email(
3240 'Job not started, too few chosen entries', message)
3241 return []
showardf1ae3542009-05-11 19:26:02 +00003242
showard8cc058f2009-09-08 16:26:33 +00003243 group_name = include_queue_entry.get_group_name()
showardf1ae3542009-05-11 19:26:02 +00003244
3245 self._assign_new_group(chosen_entries, group_name=group_name)
showard8cc058f2009-09-08 16:26:33 +00003246 return chosen_entries
showard2bab8f42008-11-12 18:15:22 +00003247
3248
showard77182562009-06-10 00:16:05 +00003249 def run_if_ready(self, queue_entry):
3250 """
3251 @returns An Agent instance to ultimately run this job if enough hosts
3252 are ready for it to run.
3253 @returns None and potentially cleans up excess hosts if this Job
3254 is not ready to run.
3255 """
showardb2e2c322008-10-14 17:33:55 +00003256 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00003257 self.stop_if_necessary()
showard8cc058f2009-09-08 16:26:33 +00003258 elif queue_entry.atomic_group:
3259 self.run_with_ready_delay(queue_entry)
3260 else:
3261 self.run(queue_entry)
showard77182562009-06-10 00:16:05 +00003262
3263
3264 def run_with_ready_delay(self, queue_entry):
3265 """
3266 Start a delay to wait for more hosts to enter Pending state before
3267 launching an atomic group job. Once set, the a delay cannot be reset.
3268
3269 @param queue_entry: The HostQueueEntry object to get atomic group
3270 info from and pass to run_if_ready when the delay is up.
3271
3272 @returns An Agent to run the job as appropriate or None if a delay
3273 has already been set.
3274 """
3275 assert queue_entry.job_id == self.id
3276 assert queue_entry.atomic_group
3277 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
showard708b3522009-08-20 23:26:15 +00003278 pending_threshold = min(self._hosts_assigned_count(),
3279 queue_entry.atomic_group.max_number_of_machines)
showard77182562009-06-10 00:16:05 +00003280 over_max_threshold = (self._pending_count() >= pending_threshold)
3281 delay_expired = (self._delay_ready_task and
3282 time.time() >= self._delay_ready_task.end_time)
3283
3284 # Delay is disabled or we already have enough? Do not wait to run.
3285 if not delay or over_max_threshold or delay_expired:
showard8cc058f2009-09-08 16:26:33 +00003286 self.run(queue_entry)
3287 else:
3288 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
showard77182562009-06-10 00:16:05 +00003289
showard8cc058f2009-09-08 16:26:33 +00003290
3291 def schedule_delayed_callback_task(self, queue_entry):
3292 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
3293
showard77182562009-06-10 00:16:05 +00003294 if self._delay_ready_task:
3295 return None
3296
showard8cc058f2009-09-08 16:26:33 +00003297 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
3298
showard77182562009-06-10 00:16:05 +00003299 def run_job_after_delay():
3300 logging.info('Job %s done waiting for extra hosts.', self.id)
3301 return self.run(queue_entry)
3302
showard708b3522009-08-20 23:26:15 +00003303 logging.info('Job %s waiting up to %s seconds for more hosts.',
3304 self.id, delay)
showard77182562009-06-10 00:16:05 +00003305 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3306 callback=run_job_after_delay)
showard8cc058f2009-09-08 16:26:33 +00003307 return self._delay_ready_task
showard77182562009-06-10 00:16:05 +00003308
3309
3310 def run(self, queue_entry):
3311 """
3312 @param queue_entry: The HostQueueEntry instance calling this method.
showard77182562009-06-10 00:16:05 +00003313 """
3314 if queue_entry.atomic_group and self._atomic_and_has_started():
3315 logging.error('Job.run() called on running atomic Job %d '
3316 'with HQE %s.', self.id, queue_entry)
showard8cc058f2009-09-08 16:26:33 +00003317 return
3318 queue_entries = self._choose_group_to_run(queue_entry)
showard828fc4c2009-09-14 20:31:00 +00003319 if queue_entries:
3320 self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00003321
3322
showard8cc058f2009-09-08 16:26:33 +00003323 def _finish_run(self, queue_entries):
showardb2ccdda2008-10-28 20:39:05 +00003324 for queue_entry in queue_entries:
showard8cc058f2009-09-08 16:26:33 +00003325 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showard77182562009-06-10 00:16:05 +00003326 if self._delay_ready_task:
3327 # Cancel any pending callback that would try to run again
3328 # as we are already running.
3329 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003330
showardb000a8d2009-07-28 20:02:07 +00003331 def __str__(self):
3332 return '%s-%s' % (self.id, self.owner)
3333
3334
mbligh36768f02008-02-22 18:28:33 +00003335if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003336 main()