blob: 1c62491d365e7069e1f37edb355ec27f84160d34 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showard136e6dc2009-06-10 19:38:49 +000010import itertools, logging, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard043c62a2009-06-10 19:48:57 +000013from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard136e6dc2009-06-10 19:38:49 +000015from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000016from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000017from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000018from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000019from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000020from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000021from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000022
showard549afad2009-08-20 23:33:36 +000023BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
24PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000025
mbligh36768f02008-02-22 18:28:33 +000026RESULTS_DIR = '.'
27AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000028DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000029AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
30
31if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000032 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000033AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
34AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
35
36if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000037 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000038
showardd3dc1992009-04-22 21:01:40 +000039_AUTOSERV_PID_FILE = '.autoserv_execute'
40_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
41_PARSER_PID_FILE = '.parser_execute'
42
43_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
44 _PARSER_PID_FILE)
45
showard35162b02009-03-03 02:17:30 +000046# error message to leave in results dir when an autoserv process disappears
47# mysteriously
48_LOST_PROCESS_ERROR = """\
49Autoserv failed abnormally during execution for this job, probably due to a
50system error on the Autotest server. Full results may not be available. Sorry.
51"""
52
mbligh6f8bab42008-02-29 22:45:14 +000053_db = None
mbligh36768f02008-02-22 18:28:33 +000054_shutdown = False
showard170873e2009-01-07 00:22:26 +000055_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
56_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000057_testing_mode = False
showard542e8402008-09-19 20:16:18 +000058_base_url = None
showardc85c21b2008-11-24 22:17:37 +000059_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000060_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000061
62
showardec6a3b92009-09-25 20:29:13 +000063def _get_pidfile_timeout_secs():
64 """@returns How long to wait for autoserv to write pidfile."""
65 pidfile_timeout_mins = global_config.global_config.get_config_value(
66 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
67 return pidfile_timeout_mins * 60
68
69
mbligh83c1e9e2009-05-01 23:10:41 +000070def _site_init_monitor_db_dummy():
71 return {}
72
73
mbligh36768f02008-02-22 18:28:33 +000074def main():
showard27f33872009-04-07 18:20:53 +000075 try:
showard549afad2009-08-20 23:33:36 +000076 try:
77 main_without_exception_handling()
78 except SystemExit:
79 raise
80 except:
81 logging.exception('Exception escaping in monitor_db')
82 raise
83 finally:
84 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000085
86
87def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000088 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000089
showard136e6dc2009-06-10 19:38:49 +000090 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000091 parser = optparse.OptionParser(usage)
92 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
93 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000094 parser.add_option('--test', help='Indicate that scheduler is under ' +
95 'test and should use dummy autoserv and no parsing',
96 action='store_true')
97 (options, args) = parser.parse_args()
98 if len(args) != 1:
99 parser.print_usage()
100 return
mbligh36768f02008-02-22 18:28:33 +0000101
showard5613c662009-06-08 23:30:33 +0000102 scheduler_enabled = global_config.global_config.get_config_value(
103 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
104
105 if not scheduler_enabled:
106 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
107 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000108 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000109 sys.exit(1)
110
jadmanski0afbb632008-06-06 21:10:57 +0000111 global RESULTS_DIR
112 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000113
mbligh83c1e9e2009-05-01 23:10:41 +0000114 site_init = utils.import_site_function(__file__,
115 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
116 _site_init_monitor_db_dummy)
117 site_init()
118
showardcca334f2009-03-12 20:38:34 +0000119 # Change the cwd while running to avoid issues incase we were launched from
120 # somewhere odd (such as a random NFS home directory of the person running
121 # sudo to launch us as the appropriate user).
122 os.chdir(RESULTS_DIR)
123
jadmanski0afbb632008-06-06 21:10:57 +0000124 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000125 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
126 "notify_email_statuses",
127 default='')
showardc85c21b2008-11-24 22:17:37 +0000128 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000129 _notify_email_statuses = [status for status in
130 re.split(r'[\s,;:]', notify_statuses_list.lower())
131 if status]
showardc85c21b2008-11-24 22:17:37 +0000132
jadmanski0afbb632008-06-06 21:10:57 +0000133 if options.test:
134 global _autoserv_path
135 _autoserv_path = 'autoserv_dummy'
136 global _testing_mode
137 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000138
mbligh37eceaa2008-12-15 22:56:37 +0000139 # AUTOTEST_WEB.base_url is still a supported config option as some people
140 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000141 global _base_url
showard170873e2009-01-07 00:22:26 +0000142 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
143 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000144 if config_base_url:
145 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000146 else:
mbligh37eceaa2008-12-15 22:56:37 +0000147 # For the common case of everything running on a single server you
148 # can just set the hostname in a single place in the config file.
149 server_name = c.get_config_value('SERVER', 'hostname')
150 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000151 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000152 sys.exit(1)
153 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000154
showardc5afc462009-01-13 00:09:39 +0000155 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000156 server.start()
157
jadmanski0afbb632008-06-06 21:10:57 +0000158 try:
showard136e6dc2009-06-10 19:38:49 +0000159 init()
showardc5afc462009-01-13 00:09:39 +0000160 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000161 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000162
jadmanski0afbb632008-06-06 21:10:57 +0000163 while not _shutdown:
164 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000165 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000166 except:
showard170873e2009-01-07 00:22:26 +0000167 email_manager.manager.log_stacktrace(
168 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000169
showard170873e2009-01-07 00:22:26 +0000170 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000171 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000172 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000173 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000174
175
showard136e6dc2009-06-10 19:38:49 +0000176def setup_logging():
177 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
178 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
179 logging_manager.configure_logging(
180 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
181 logfile_name=log_name)
182
183
mbligh36768f02008-02-22 18:28:33 +0000184def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000185 global _shutdown
186 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000187 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000188
189
showard136e6dc2009-06-10 19:38:49 +0000190def init():
showardb18134f2009-03-20 20:52:18 +0000191 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
192 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000193
showard8de37132009-08-31 18:33:08 +0000194 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000195 logging.critical("monitor_db already running, aborting!")
196 sys.exit(1)
197 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000198
showardb1e51872008-10-07 11:08:18 +0000199 if _testing_mode:
200 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000201 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000202
jadmanski0afbb632008-06-06 21:10:57 +0000203 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
204 global _db
showard170873e2009-01-07 00:22:26 +0000205 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000206 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000207
showardfa8629c2008-11-04 16:51:23 +0000208 # ensure Django connection is in autocommit
209 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000210 # bypass the readonly connection
211 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000212
showardb18134f2009-03-20 20:52:18 +0000213 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000214 signal.signal(signal.SIGINT, handle_sigint)
215
showardd1ee1dd2009-01-07 21:33:08 +0000216 drones = global_config.global_config.get_config_value(
217 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
218 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000219 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000220 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000221 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
222
showardb18134f2009-03-20 20:52:18 +0000223 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000224
225
showarded2afea2009-07-07 20:54:07 +0000226def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
227 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000228 """
229 @returns The autoserv command line as a list of executable + parameters.
230
231 @param machines - string - A machine or comma separated list of machines
232 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000233 @param extra_args - list - Additional arguments to pass to autoserv.
234 @param job - Job object - If supplied, -u owner and -l name parameters
235 will be added.
236 @param queue_entry - A HostQueueEntry object - If supplied and no Job
237 object was supplied, this will be used to lookup the Job object.
238 """
showard87ba02a2009-04-20 19:37:32 +0000239 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
showarded2afea2009-07-07 20:54:07 +0000240 '-r', drone_manager.WORKING_DIRECTORY]
showard87ba02a2009-04-20 19:37:32 +0000241 if job or queue_entry:
242 if not job:
243 job = queue_entry.job
244 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000245 if verbose:
246 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000247 return autoserv_argv + extra_args
248
249
showard89f84db2009-03-12 20:39:13 +0000250class SchedulerError(Exception):
251 """Raised by HostScheduler when an inconsistent state occurs."""
252
253
showard63a34772008-08-18 19:32:50 +0000254class HostScheduler(object):
255 def _get_ready_hosts(self):
256 # avoid any host with a currently active queue entry against it
257 hosts = Host.fetch(
258 joins='LEFT JOIN host_queue_entries AS active_hqe '
259 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000260 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000261 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000262 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000263 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
264 return dict((host.id, host) for host in hosts)
265
266
267 @staticmethod
268 def _get_sql_id_list(id_list):
269 return ','.join(str(item_id) for item_id in id_list)
270
271
272 @classmethod
showard989f25d2008-10-01 11:38:11 +0000273 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000274 if not id_list:
275 return {}
showard63a34772008-08-18 19:32:50 +0000276 query %= cls._get_sql_id_list(id_list)
277 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000278 return cls._process_many2many_dict(rows, flip)
279
280
281 @staticmethod
282 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000283 result = {}
284 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000285 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000286 if flip:
287 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000288 result.setdefault(left_id, set()).add(right_id)
289 return result
290
291
292 @classmethod
293 def _get_job_acl_groups(cls, job_ids):
294 query = """
showardd9ac4452009-02-07 02:04:37 +0000295 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000296 FROM jobs
297 INNER JOIN users ON users.login = jobs.owner
298 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
299 WHERE jobs.id IN (%s)
300 """
301 return cls._get_many2many_dict(query, job_ids)
302
303
304 @classmethod
305 def _get_job_ineligible_hosts(cls, job_ids):
306 query = """
307 SELECT job_id, host_id
308 FROM ineligible_host_queues
309 WHERE job_id IN (%s)
310 """
311 return cls._get_many2many_dict(query, job_ids)
312
313
314 @classmethod
showard989f25d2008-10-01 11:38:11 +0000315 def _get_job_dependencies(cls, job_ids):
316 query = """
317 SELECT job_id, label_id
318 FROM jobs_dependency_labels
319 WHERE job_id IN (%s)
320 """
321 return cls._get_many2many_dict(query, job_ids)
322
323
324 @classmethod
showard63a34772008-08-18 19:32:50 +0000325 def _get_host_acls(cls, host_ids):
326 query = """
showardd9ac4452009-02-07 02:04:37 +0000327 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000328 FROM acl_groups_hosts
329 WHERE host_id IN (%s)
330 """
331 return cls._get_many2many_dict(query, host_ids)
332
333
334 @classmethod
335 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000336 if not host_ids:
337 return {}, {}
showard63a34772008-08-18 19:32:50 +0000338 query = """
339 SELECT label_id, host_id
340 FROM hosts_labels
341 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000342 """ % cls._get_sql_id_list(host_ids)
343 rows = _db.execute(query)
344 labels_to_hosts = cls._process_many2many_dict(rows)
345 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
346 return labels_to_hosts, hosts_to_labels
347
348
349 @classmethod
350 def _get_labels(cls):
351 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000352
353
354 def refresh(self, pending_queue_entries):
355 self._hosts_available = self._get_ready_hosts()
356
357 relevant_jobs = [queue_entry.job_id
358 for queue_entry in pending_queue_entries]
359 self._job_acls = self._get_job_acl_groups(relevant_jobs)
360 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000361 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000362
363 host_ids = self._hosts_available.keys()
364 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000365 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
366
367 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000368
369
370 def _is_acl_accessible(self, host_id, queue_entry):
371 job_acls = self._job_acls.get(queue_entry.job_id, set())
372 host_acls = self._host_acls.get(host_id, set())
373 return len(host_acls.intersection(job_acls)) > 0
374
375
showard989f25d2008-10-01 11:38:11 +0000376 def _check_job_dependencies(self, job_dependencies, host_labels):
377 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000378 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000379
380
381 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
382 queue_entry):
showardade14e22009-01-26 22:38:32 +0000383 if not queue_entry.meta_host:
384 # bypass only_if_needed labels when a specific host is selected
385 return True
386
showard989f25d2008-10-01 11:38:11 +0000387 for label_id in host_labels:
388 label = self._labels[label_id]
389 if not label.only_if_needed:
390 # we don't care about non-only_if_needed labels
391 continue
392 if queue_entry.meta_host == label_id:
393 # if the label was requested in a metahost it's OK
394 continue
395 if label_id not in job_dependencies:
396 return False
397 return True
398
399
showard89f84db2009-03-12 20:39:13 +0000400 def _check_atomic_group_labels(self, host_labels, queue_entry):
401 """
402 Determine if the given HostQueueEntry's atomic group settings are okay
403 to schedule on a host with the given labels.
404
showard6157c632009-07-06 20:19:31 +0000405 @param host_labels: A list of label ids that the host has.
406 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000407
408 @returns True if atomic group settings are okay, False otherwise.
409 """
showard6157c632009-07-06 20:19:31 +0000410 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000411 queue_entry.atomic_group_id)
412
413
showard6157c632009-07-06 20:19:31 +0000414 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000415 """
416 Return the atomic group label id for a host with the given set of
417 labels if any, or None otherwise. Raises an exception if more than
418 one atomic group are found in the set of labels.
419
showard6157c632009-07-06 20:19:31 +0000420 @param host_labels: A list of label ids that the host has.
421 @param queue_entry: The HostQueueEntry we're testing. Only used for
422 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000423
424 @returns The id of the atomic group found on a label in host_labels
425 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000426 """
showard6157c632009-07-06 20:19:31 +0000427 atomic_labels = [self._labels[label_id] for label_id in host_labels
428 if self._labels[label_id].atomic_group_id is not None]
429 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000430 if not atomic_ids:
431 return None
432 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000433 logging.error('More than one Atomic Group on HQE "%s" via: %r',
434 queue_entry, atomic_labels)
435 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000436
437
438 def _get_atomic_group_labels(self, atomic_group_id):
439 """
440 Lookup the label ids that an atomic_group is associated with.
441
442 @param atomic_group_id - The id of the AtomicGroup to look up.
443
444 @returns A generator yeilding Label ids for this atomic group.
445 """
446 return (id for id, label in self._labels.iteritems()
447 if label.atomic_group_id == atomic_group_id
448 and not label.invalid)
449
450
showard54c1ea92009-05-20 00:32:58 +0000451 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000452 """
453 @param group_hosts - A sequence of Host ids to test for usability
454 and eligibility against the Job associated with queue_entry.
455 @param queue_entry - The HostQueueEntry that these hosts are being
456 tested for eligibility against.
457
458 @returns A subset of group_hosts Host ids that are eligible for the
459 supplied queue_entry.
460 """
461 return set(host_id for host_id in group_hosts
462 if self._is_host_usable(host_id)
463 and self._is_host_eligible_for_job(host_id, queue_entry))
464
465
showard989f25d2008-10-01 11:38:11 +0000466 def _is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000467 if self._is_host_invalid(host_id):
468 # if an invalid host is scheduled for a job, it's a one-time host
469 # and it therefore bypasses eligibility checks. note this can only
470 # happen for non-metahosts, because invalid hosts have their label
471 # relationships cleared.
472 return True
473
showard989f25d2008-10-01 11:38:11 +0000474 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
475 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000476
showard89f84db2009-03-12 20:39:13 +0000477 return (self._is_acl_accessible(host_id, queue_entry) and
478 self._check_job_dependencies(job_dependencies, host_labels) and
479 self._check_only_if_needed_labels(
480 job_dependencies, host_labels, queue_entry) and
481 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000482
483
showard2924b0a2009-06-18 23:16:15 +0000484 def _is_host_invalid(self, host_id):
485 host_object = self._hosts_available.get(host_id, None)
486 return host_object and host_object.invalid
487
488
showard63a34772008-08-18 19:32:50 +0000489 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000490 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000491 return None
492 return self._hosts_available.pop(queue_entry.host_id, None)
493
494
495 def _is_host_usable(self, host_id):
496 if host_id not in self._hosts_available:
497 # host was already used during this scheduling cycle
498 return False
499 if self._hosts_available[host_id].invalid:
500 # Invalid hosts cannot be used for metahosts. They're included in
501 # the original query because they can be used by non-metahosts.
502 return False
503 return True
504
505
506 def _schedule_metahost(self, queue_entry):
507 label_id = queue_entry.meta_host
508 hosts_in_label = self._label_hosts.get(label_id, set())
509 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
510 set())
511
512 # must iterate over a copy so we can mutate the original while iterating
513 for host_id in list(hosts_in_label):
514 if not self._is_host_usable(host_id):
515 hosts_in_label.remove(host_id)
516 continue
517 if host_id in ineligible_host_ids:
518 continue
showard989f25d2008-10-01 11:38:11 +0000519 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000520 continue
521
showard89f84db2009-03-12 20:39:13 +0000522 # Remove the host from our cached internal state before returning
523 # the host object.
showard63a34772008-08-18 19:32:50 +0000524 hosts_in_label.remove(host_id)
525 return self._hosts_available.pop(host_id)
526 return None
527
528
529 def find_eligible_host(self, queue_entry):
530 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000531 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000532 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000533 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000534 return self._schedule_metahost(queue_entry)
535
536
showard89f84db2009-03-12 20:39:13 +0000537 def find_eligible_atomic_group(self, queue_entry):
538 """
539 Given an atomic group host queue entry, locate an appropriate group
540 of hosts for the associated job to run on.
541
542 The caller is responsible for creating new HQEs for the additional
543 hosts returned in order to run the actual job on them.
544
545 @returns A list of Host instances in a ready state to satisfy this
546 atomic group scheduling. Hosts will all belong to the same
547 atomic group label as specified by the queue_entry.
548 An empty list will be returned if no suitable atomic
549 group could be found.
550
551 TODO(gps): what is responsible for kicking off any attempted repairs on
552 a group of hosts? not this function, but something needs to. We do
553 not communicate that reason for returning [] outside of here...
554 For now, we'll just be unschedulable if enough hosts within one group
555 enter Repair Failed state.
556 """
557 assert queue_entry.atomic_group_id is not None
558 job = queue_entry.job
559 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000560 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000561 if job.synch_count > atomic_group.max_number_of_machines:
562 # Such a Job and HostQueueEntry should never be possible to
563 # create using the frontend. Regardless, we can't process it.
564 # Abort it immediately and log an error on the scheduler.
565 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000566 logging.error(
567 'Error: job %d synch_count=%d > requested atomic_group %d '
568 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
569 job.id, job.synch_count, atomic_group.id,
570 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000571 return []
572 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
573 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
574 set())
575
576 # Look in each label associated with atomic_group until we find one with
577 # enough hosts to satisfy the job.
578 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
579 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
580 if queue_entry.meta_host is not None:
581 # If we have a metahost label, only allow its hosts.
582 group_hosts.intersection_update(hosts_in_label)
583 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000584 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000585 group_hosts, queue_entry)
586
587 # Job.synch_count is treated as "minimum synch count" when
588 # scheduling for an atomic group of hosts. The atomic group
589 # number of machines is the maximum to pick out of a single
590 # atomic group label for scheduling at one time.
591 min_hosts = job.synch_count
592 max_hosts = atomic_group.max_number_of_machines
593
showard54c1ea92009-05-20 00:32:58 +0000594 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000595 # Not enough eligible hosts in this atomic group label.
596 continue
597
showard54c1ea92009-05-20 00:32:58 +0000598 eligible_hosts_in_group = [self._hosts_available[id]
599 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000600 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000601 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000602
showard89f84db2009-03-12 20:39:13 +0000603 # Limit ourselves to scheduling the atomic group size.
604 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000605 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000606
607 # Remove the selected hosts from our cached internal state
608 # of available hosts in order to return the Host objects.
609 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000610 for host in eligible_hosts_in_group:
611 hosts_in_label.discard(host.id)
612 self._hosts_available.pop(host.id)
613 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000614 return host_list
615
616 return []
617
618
showard170873e2009-01-07 00:22:26 +0000619class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000620 def __init__(self):
621 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000622 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000623 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000624 user_cleanup_time = scheduler_config.config.clean_interval
625 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
626 _db, user_cleanup_time)
627 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000628 self._host_agents = {}
629 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000630
mbligh36768f02008-02-22 18:28:33 +0000631
showard915958d2009-04-22 21:00:58 +0000632 def initialize(self, recover_hosts=True):
633 self._periodic_cleanup.initialize()
634 self._24hr_upkeep.initialize()
635
jadmanski0afbb632008-06-06 21:10:57 +0000636 # always recover processes
637 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000638
jadmanski0afbb632008-06-06 21:10:57 +0000639 if recover_hosts:
640 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000641
642
jadmanski0afbb632008-06-06 21:10:57 +0000643 def tick(self):
showard170873e2009-01-07 00:22:26 +0000644 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000645 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000646 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000647 self._process_recurring_runs()
showard8cc058f2009-09-08 16:26:33 +0000648 self._schedule_delay_tasks()
jadmanski0afbb632008-06-06 21:10:57 +0000649 self._schedule_new_jobs()
showard8cc058f2009-09-08 16:26:33 +0000650 self._schedule_running_host_queue_entries()
651 self._schedule_special_tasks()
jadmanski0afbb632008-06-06 21:10:57 +0000652 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000653 _drone_manager.execute_actions()
654 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000655
showard97aed502008-11-04 02:01:24 +0000656
mblighf3294cc2009-04-08 21:17:38 +0000657 def _run_cleanup(self):
658 self._periodic_cleanup.run_cleanup_maybe()
659 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000660
mbligh36768f02008-02-22 18:28:33 +0000661
showard170873e2009-01-07 00:22:26 +0000662 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
663 for object_id in object_ids:
664 agent_dict.setdefault(object_id, set()).add(agent)
665
666
667 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
668 for object_id in object_ids:
669 assert object_id in agent_dict
670 agent_dict[object_id].remove(agent)
671
672
jadmanski0afbb632008-06-06 21:10:57 +0000673 def add_agent(self, agent):
674 self._agents.append(agent)
675 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000676 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
677 self._register_agent_for_ids(self._queue_entry_agents,
678 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000679
showard170873e2009-01-07 00:22:26 +0000680
681 def get_agents_for_entry(self, queue_entry):
682 """
683 Find agents corresponding to the specified queue_entry.
684 """
showardd3dc1992009-04-22 21:01:40 +0000685 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000686
687
688 def host_has_agent(self, host):
689 """
690 Determine if there is currently an Agent present using this host.
691 """
692 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000693
694
jadmanski0afbb632008-06-06 21:10:57 +0000695 def remove_agent(self, agent):
696 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000697 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
698 agent)
699 self._unregister_agent_for_ids(self._queue_entry_agents,
700 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000701
702
showard8cc058f2009-09-08 16:26:33 +0000703 def _host_has_scheduled_special_task(self, host):
704 return bool(models.SpecialTask.objects.filter(host__id=host.id,
705 is_active=False,
706 is_complete=False))
707
708
jadmanski0afbb632008-06-06 21:10:57 +0000709 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000710 self._register_pidfiles()
711 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000712 self._recover_all_recoverable_entries()
showard8cc058f2009-09-08 16:26:33 +0000713 self._recover_pending_entries()
showard6878e8b2009-07-20 22:37:45 +0000714 self._check_for_remaining_active_entries()
showard170873e2009-01-07 00:22:26 +0000715 self._reverify_remaining_hosts()
716 # reinitialize drones after killing orphaned processes, since they can
717 # leave around files when they die
718 _drone_manager.execute_actions()
719 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000720
showard170873e2009-01-07 00:22:26 +0000721
722 def _register_pidfiles(self):
723 # during recovery we may need to read pidfiles for both running and
724 # parsing entries
725 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000726 where="status IN ('Running', 'Gathering', 'Parsing')")
showarded2afea2009-07-07 20:54:07 +0000727 special_tasks = models.SpecialTask.objects.filter(is_active=True)
728 for execution_entry in itertools.chain(queue_entries, special_tasks):
showardd3dc1992009-04-22 21:01:40 +0000729 for pidfile_name in _ALL_PIDFILE_NAMES:
730 pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000731 execution_entry.execution_path(), pidfile_name=pidfile_name)
showardd3dc1992009-04-22 21:01:40 +0000732 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000733
734
showarded2afea2009-07-07 20:54:07 +0000735 def _get_recovery_run_monitor(self, execution_path, pidfile_name, orphans):
736 run_monitor = PidfileRunMonitor()
737 run_monitor.attach_to_existing_process(execution_path,
738 pidfile_name=pidfile_name)
739 if run_monitor.has_process():
740 orphans.discard(run_monitor.get_process())
741 return run_monitor, '(process %s)' % run_monitor.get_process()
742 return None, 'without process'
743
744
showard8cc058f2009-09-08 16:26:33 +0000745 def _get_unassigned_entries(self, status):
746 for entry in HostQueueEntry.fetch(where="status = '%s'" % status):
747 if not self.get_agents_for_entry(entry):
748 yield entry
749
750
showardd3dc1992009-04-22 21:01:40 +0000751 def _recover_entries_with_status(self, status, orphans, pidfile_name,
752 recover_entries_fn):
showard8cc058f2009-09-08 16:26:33 +0000753 for queue_entry in self._get_unassigned_entries(status):
showardd3dc1992009-04-22 21:01:40 +0000754 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showarded2afea2009-07-07 20:54:07 +0000755 run_monitor, process_string = self._get_recovery_run_monitor(
756 queue_entry.execution_path(), pidfile_name, orphans)
showard8cc058f2009-09-08 16:26:33 +0000757 if not run_monitor:
758 # _schedule_running_host_queue_entries should schedule and
759 # recover these entries
760 continue
showard597bfd32009-05-08 18:22:50 +0000761
showarded2afea2009-07-07 20:54:07 +0000762 logging.info('Recovering %s entry %s %s',status.lower(),
763 ', '.join(str(entry) for entry in queue_entries),
764 process_string)
showardd3dc1992009-04-22 21:01:40 +0000765 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000766
767
showard6878e8b2009-07-20 22:37:45 +0000768 def _check_for_remaining_orphan_processes(self, orphans):
769 if not orphans:
770 return
771 subject = 'Unrecovered orphan autoserv processes remain'
772 message = '\n'.join(str(process) for process in orphans)
773 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000774
775 die_on_orphans = global_config.global_config.get_config_value(
776 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
777
778 if die_on_orphans:
779 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000780
showard170873e2009-01-07 00:22:26 +0000781
showardd3dc1992009-04-22 21:01:40 +0000782 def _recover_running_entries(self, orphans):
783 def recover_entries(job, queue_entries, run_monitor):
showard8cc058f2009-09-08 16:26:33 +0000784 queue_task = QueueTask(job=job, queue_entries=queue_entries,
785 recover_run_monitor=run_monitor)
786 self.add_agent(Agent(task=queue_task,
787 num_processes=len(queue_entries)))
showardd3dc1992009-04-22 21:01:40 +0000788
789 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
showarded2afea2009-07-07 20:54:07 +0000790 orphans, _AUTOSERV_PID_FILE,
showardd3dc1992009-04-22 21:01:40 +0000791 recover_entries)
792
793
794 def _recover_gathering_entries(self, orphans):
795 def recover_entries(job, queue_entries, run_monitor):
796 gather_task = GatherLogsTask(job, queue_entries,
showarded2afea2009-07-07 20:54:07 +0000797 recover_run_monitor=run_monitor)
showard8cc058f2009-09-08 16:26:33 +0000798 self.add_agent(Agent(gather_task))
showardd3dc1992009-04-22 21:01:40 +0000799
800 self._recover_entries_with_status(
801 models.HostQueueEntry.Status.GATHERING,
802 orphans, _CRASHINFO_PID_FILE, recover_entries)
803
804
805 def _recover_parsing_entries(self, orphans):
806 def recover_entries(job, queue_entries, run_monitor):
807 reparse_task = FinalReparseTask(queue_entries,
showarded2afea2009-07-07 20:54:07 +0000808 recover_run_monitor=run_monitor)
showard8cc058f2009-09-08 16:26:33 +0000809 self.add_agent(Agent(reparse_task, num_processes=0))
showardd3dc1992009-04-22 21:01:40 +0000810
811 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
812 orphans, _PARSER_PID_FILE,
813 recover_entries)
814
815
showard8cc058f2009-09-08 16:26:33 +0000816 def _recover_pending_entries(self):
817 for entry in self._get_unassigned_entries(
818 models.HostQueueEntry.Status.PENDING):
819 entry.on_pending()
820
821
showardd3dc1992009-04-22 21:01:40 +0000822 def _recover_all_recoverable_entries(self):
823 orphans = _drone_manager.get_orphaned_autoserv_processes()
824 self._recover_running_entries(orphans)
825 self._recover_gathering_entries(orphans)
826 self._recover_parsing_entries(orphans)
showarded2afea2009-07-07 20:54:07 +0000827 self._recover_special_tasks(orphans)
showard6878e8b2009-07-20 22:37:45 +0000828 self._check_for_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000829
showard97aed502008-11-04 02:01:24 +0000830
showarded2afea2009-07-07 20:54:07 +0000831 def _recover_special_tasks(self, orphans):
showard2fe3f1d2009-07-06 20:19:11 +0000832 """\
833 Recovers all special tasks that have started running but have not
834 completed.
835 """
836
837 tasks = models.SpecialTask.objects.filter(is_active=True,
838 is_complete=False)
839 # Use ordering to force NULL queue_entry_id's to the end of the list
showarda5288b42009-07-28 20:06:08 +0000840 for task in tasks.order_by('-queue_entry__id'):
showard9b6ec502009-08-20 23:25:17 +0000841 if self.host_has_agent(task.host):
842 raise SchedulerError(
843 "%s already has a host agent %s." % (
showard8cc058f2009-09-08 16:26:33 +0000844 task, self._host_agents.get(task.host.id)))
showard2fe3f1d2009-07-06 20:19:11 +0000845
showarded2afea2009-07-07 20:54:07 +0000846 run_monitor, process_string = self._get_recovery_run_monitor(
847 task.execution_path(), _AUTOSERV_PID_FILE, orphans)
848
849 logging.info('Recovering %s %s', task, process_string)
showard8cc058f2009-09-08 16:26:33 +0000850 self._recover_special_task(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000851
852
showard8cc058f2009-09-08 16:26:33 +0000853 def _recover_special_task(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000854 """\
855 Recovers a single special task.
856 """
857 if task.task == models.SpecialTask.Task.VERIFY:
showard8cc058f2009-09-08 16:26:33 +0000858 agent_task = self._recover_verify(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000859 elif task.task == models.SpecialTask.Task.REPAIR:
showard8cc058f2009-09-08 16:26:33 +0000860 agent_task = self._recover_repair(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000861 elif task.task == models.SpecialTask.Task.CLEANUP:
showard8cc058f2009-09-08 16:26:33 +0000862 agent_task = self._recover_cleanup(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000863 else:
864 # Should never happen
865 logging.error(
866 "Special task id %d had invalid task %s", (task.id, task.task))
867
showard8cc058f2009-09-08 16:26:33 +0000868 self.add_agent(Agent(agent_task))
showard2fe3f1d2009-07-06 20:19:11 +0000869
870
showard8cc058f2009-09-08 16:26:33 +0000871 def _recover_verify(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000872 """\
873 Recovers a verify task.
874 No associated queue entry: Verify host
875 With associated queue entry: Verify host, and run associated queue
876 entry
877 """
showard8cc058f2009-09-08 16:26:33 +0000878 return VerifyTask(task=task, recover_run_monitor=run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000879
880
showard8cc058f2009-09-08 16:26:33 +0000881 def _recover_repair(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000882 """\
883 Recovers a repair task.
884 Always repair host
885 """
showard8cc058f2009-09-08 16:26:33 +0000886 return RepairTask(task=task, recover_run_monitor=run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000887
888
showard8cc058f2009-09-08 16:26:33 +0000889 def _recover_cleanup(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000890 """\
891 Recovers a cleanup task.
892 No associated queue entry: Clean host
893 With associated queue entry: Clean host, verify host if needed, and
894 run associated queue entry
895 """
showard8cc058f2009-09-08 16:26:33 +0000896 return CleanupTask(task=task, recover_run_monitor=run_monitor)
showard6af73ad2009-07-28 20:00:58 +0000897
898
showard6878e8b2009-07-20 22:37:45 +0000899 def _check_for_remaining_active_entries(self):
showard170873e2009-01-07 00:22:26 +0000900 queue_entries = HostQueueEntry.fetch(
showard8cc058f2009-09-08 16:26:33 +0000901 where='active AND NOT complete AND status NOT IN '
902 '("Starting", "Gathering", "Pending")')
showardd3dc1992009-04-22 21:01:40 +0000903
showarde8e37072009-08-20 23:31:30 +0000904 unrecovered_active_hqes = [entry for entry in queue_entries
showard8cc058f2009-09-08 16:26:33 +0000905 if not self.get_agents_for_entry(entry) and
906 not self._host_has_scheduled_special_task(
907 entry.host)]
showarde8e37072009-08-20 23:31:30 +0000908 if unrecovered_active_hqes:
909 message = '\n'.join(str(hqe) for hqe in unrecovered_active_hqes)
910 raise SchedulerError(
911 '%d unrecovered active host queue entries:\n%s' %
912 (len(unrecovered_active_hqes), message))
showard170873e2009-01-07 00:22:26 +0000913
914
showard8cc058f2009-09-08 16:26:33 +0000915 def _schedule_special_tasks(self):
916 tasks = models.SpecialTask.objects.filter(is_active=False,
917 is_complete=False,
918 host__locked=False)
919 # We want lower ids to come first, but the NULL queue_entry_ids need to
920 # come last
921 tasks = tasks.extra(select={'isnull' : 'queue_entry_id IS NULL'})
922 tasks = tasks.extra(order_by=['isnull', 'id'])
showard6d7b2ff2009-06-10 00:16:47 +0000923
showard2fe3f1d2009-07-06 20:19:11 +0000924 for task in tasks:
showard8cc058f2009-09-08 16:26:33 +0000925 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000926 continue
showard6d7b2ff2009-06-10 00:16:47 +0000927
showard8cc058f2009-09-08 16:26:33 +0000928 if task.task == models.SpecialTask.Task.CLEANUP:
929 agent_task = CleanupTask(task=task)
930 elif task.task == models.SpecialTask.Task.VERIFY:
931 agent_task = VerifyTask(task=task)
932 elif task.task == models.SpecialTask.Task.REPAIR:
933 agent_task = RepairTask(task=task)
934 else:
935 email_manager.manager.enqueue_notify_email(
936 'Special task with invalid task', task)
937 continue
938
939 self.add_agent(Agent(agent_task))
showard1ff7b2e2009-05-15 23:17:18 +0000940
941
showard170873e2009-01-07 00:22:26 +0000942 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000943 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000944 # should never happen
showarded2afea2009-07-07 20:54:07 +0000945 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000946 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000947 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000948 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000949 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000950
951
jadmanski0afbb632008-06-06 21:10:57 +0000952 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000953 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +0000954 full_where='locked = 0 AND invalid = 0 AND ' + where
955 for host in Host.fetch(where=full_where):
956 if self.host_has_agent(host):
957 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000958 continue
showard8cc058f2009-09-08 16:26:33 +0000959 if self._host_has_scheduled_special_task(host):
960 # host will have a special task scheduled on the next cycle
961 continue
showard170873e2009-01-07 00:22:26 +0000962 if print_message:
showardb18134f2009-03-20 20:52:18 +0000963 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000964 models.SpecialTask.objects.create(
965 task=models.SpecialTask.Task.CLEANUP,
966 host=models.Host(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000967
968
jadmanski0afbb632008-06-06 21:10:57 +0000969 def _recover_hosts(self):
970 # recover "Repair Failed" hosts
971 message = 'Reverifying dead host %s'
972 self._reverify_hosts_where("status = 'Repair Failed'",
973 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000974
975
showard04c82c52008-05-29 19:38:12 +0000976
showardb95b1bd2008-08-15 18:11:04 +0000977 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000978 # prioritize by job priority, then non-metahost over metahost, then FIFO
979 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000980 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000981 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000982 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000983
984
showard89f84db2009-03-12 20:39:13 +0000985 def _refresh_pending_queue_entries(self):
986 """
987 Lookup the pending HostQueueEntries and call our HostScheduler
988 refresh() method given that list. Return the list.
989
990 @returns A list of pending HostQueueEntries sorted in priority order.
991 """
showard63a34772008-08-18 19:32:50 +0000992 queue_entries = self._get_pending_queue_entries()
993 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000994 return []
showardb95b1bd2008-08-15 18:11:04 +0000995
showard63a34772008-08-18 19:32:50 +0000996 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000997
showard89f84db2009-03-12 20:39:13 +0000998 return queue_entries
999
1000
1001 def _schedule_atomic_group(self, queue_entry):
1002 """
1003 Schedule the given queue_entry on an atomic group of hosts.
1004
1005 Returns immediately if there are insufficient available hosts.
1006
1007 Creates new HostQueueEntries based off of queue_entry for the
1008 scheduled hosts and starts them all running.
1009 """
1010 # This is a virtual host queue entry representing an entire
1011 # atomic group, find a group and schedule their hosts.
1012 group_hosts = self._host_scheduler.find_eligible_atomic_group(
1013 queue_entry)
1014 if not group_hosts:
1015 return
showardcbe6f942009-06-17 19:33:49 +00001016
1017 logging.info('Expanding atomic group entry %s with hosts %s',
1018 queue_entry,
1019 ', '.join(host.hostname for host in group_hosts))
showard89f84db2009-03-12 20:39:13 +00001020 # The first assigned host uses the original HostQueueEntry
1021 group_queue_entries = [queue_entry]
1022 for assigned_host in group_hosts[1:]:
1023 # Create a new HQE for every additional assigned_host.
1024 new_hqe = HostQueueEntry.clone(queue_entry)
1025 new_hqe.save()
1026 group_queue_entries.append(new_hqe)
1027 assert len(group_queue_entries) == len(group_hosts)
1028 for queue_entry, host in itertools.izip(group_queue_entries,
1029 group_hosts):
1030 self._run_queue_entry(queue_entry, host)
1031
1032
1033 def _schedule_new_jobs(self):
1034 queue_entries = self._refresh_pending_queue_entries()
1035 if not queue_entries:
1036 return
1037
showard63a34772008-08-18 19:32:50 +00001038 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +00001039 if (queue_entry.atomic_group_id is None or
1040 queue_entry.host_id is not None):
1041 assigned_host = self._host_scheduler.find_eligible_host(
1042 queue_entry)
1043 if assigned_host:
1044 self._run_queue_entry(queue_entry, assigned_host)
1045 else:
1046 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +00001047
1048
showard8cc058f2009-09-08 16:26:33 +00001049 def _schedule_running_host_queue_entries(self):
1050 entries = HostQueueEntry.fetch(
1051 where="status IN "
1052 "('Starting', 'Running', 'Gathering', 'Parsing')")
1053 for entry in entries:
1054 if self.get_agents_for_entry(entry):
1055 continue
1056
1057 task_entries = entry.job.get_group_entries(entry)
1058 for task_entry in task_entries:
1059 if (task_entry.status != models.HostQueueEntry.Status.PARSING
1060 and self.host_has_agent(task_entry.host)):
1061 agent = self._host_agents.get(task_entry.host.id)[0]
1062 raise SchedulerError('Attempted to schedule on host that '
1063 'already has agent: %s (previous '
1064 'agent task: %s)'
1065 % (task_entry, agent.task))
1066
1067 if entry.status in (models.HostQueueEntry.Status.STARTING,
1068 models.HostQueueEntry.Status.RUNNING):
1069 params = entry.job.get_autoserv_params(task_entries)
1070 agent_task = QueueTask(job=entry.job,
1071 queue_entries=task_entries, cmd=params)
1072 elif entry.status == models.HostQueueEntry.Status.GATHERING:
1073 agent_task = GatherLogsTask(
1074 job=entry.job, queue_entries=task_entries)
1075 elif entry.status == models.HostQueueEntry.Status.PARSING:
1076 agent_task = FinalReparseTask(queue_entries=task_entries)
1077 else:
1078 raise SchedulerError('_schedule_running_host_queue_entries got '
1079 'entry with invalid status %s: %s'
1080 % (entry.status, entry))
1081
1082 self.add_agent(Agent(agent_task, num_processes=len(task_entries)))
1083
1084
1085 def _schedule_delay_tasks(self):
1086 for entry in HostQueueEntry.fetch(where="status = 'Waiting'"):
1087 task = entry.job.schedule_delayed_callback_task(entry)
1088 if task:
1089 self.add_agent(Agent(task, num_processes=0))
1090
1091
showardb95b1bd2008-08-15 18:11:04 +00001092 def _run_queue_entry(self, queue_entry, host):
showard8cc058f2009-09-08 16:26:33 +00001093 queue_entry.schedule_pre_job_tasks(assigned_host=host)
mblighd5c95802008-03-05 00:33:46 +00001094
1095
jadmanski0afbb632008-06-06 21:10:57 +00001096 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +00001097 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001098 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001099 for agent in self.get_agents_for_entry(entry):
1100 agent.abort()
1101 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001102
1103
showard324bf812009-01-20 23:23:38 +00001104 def _can_start_agent(self, agent, num_started_this_cycle,
1105 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001106 # always allow zero-process agents to run
1107 if agent.num_processes == 0:
1108 return True
1109 # don't allow any nonzero-process agents to run after we've reached a
1110 # limit (this avoids starvation of many-process agents)
1111 if have_reached_limit:
1112 return False
1113 # total process throttling
showard324bf812009-01-20 23:23:38 +00001114 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +00001115 return False
1116 # if a single agent exceeds the per-cycle throttling, still allow it to
1117 # run when it's the first agent in the cycle
1118 if num_started_this_cycle == 0:
1119 return True
1120 # per-cycle throttling
1121 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +00001122 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001123 return False
1124 return True
1125
1126
jadmanski0afbb632008-06-06 21:10:57 +00001127 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001128 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001129 have_reached_limit = False
1130 # iterate over copy, so we can remove agents during iteration
1131 for agent in list(self._agents):
showard8cc058f2009-09-08 16:26:33 +00001132 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001133 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001134 have_reached_limit):
1135 have_reached_limit = True
1136 continue
showard4c5374f2008-09-04 17:02:56 +00001137 num_started_this_cycle += agent.num_processes
1138 agent.tick()
showard8cc058f2009-09-08 16:26:33 +00001139 if agent.is_done():
1140 logging.info("agent finished")
1141 self.remove_agent(agent)
showarda9435c02009-05-13 21:28:17 +00001142 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001143 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001144
1145
showard29f7cd22009-04-29 21:16:24 +00001146 def _process_recurring_runs(self):
1147 recurring_runs = models.RecurringRun.objects.filter(
1148 start_date__lte=datetime.datetime.now())
1149 for rrun in recurring_runs:
1150 # Create job from template
1151 job = rrun.job
1152 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001153 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001154
1155 host_objects = info['hosts']
1156 one_time_hosts = info['one_time_hosts']
1157 metahost_objects = info['meta_hosts']
1158 dependencies = info['dependencies']
1159 atomic_group = info['atomic_group']
1160
1161 for host in one_time_hosts or []:
1162 this_host = models.Host.create_one_time_host(host.hostname)
1163 host_objects.append(this_host)
1164
1165 try:
1166 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001167 options=options,
showard29f7cd22009-04-29 21:16:24 +00001168 host_objects=host_objects,
1169 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001170 atomic_group=atomic_group)
1171
1172 except Exception, ex:
1173 logging.exception(ex)
1174 #TODO send email
1175
1176 if rrun.loop_count == 1:
1177 rrun.delete()
1178 else:
1179 if rrun.loop_count != 0: # if not infinite loop
1180 # calculate new start_date
1181 difference = datetime.timedelta(seconds=rrun.loop_period)
1182 rrun.start_date = rrun.start_date + difference
1183 rrun.loop_count -= 1
1184 rrun.save()
1185
1186
showard170873e2009-01-07 00:22:26 +00001187class PidfileRunMonitor(object):
1188 """
1189 Client must call either run() to start a new process or
1190 attach_to_existing_process().
1191 """
mbligh36768f02008-02-22 18:28:33 +00001192
showard170873e2009-01-07 00:22:26 +00001193 class _PidfileException(Exception):
1194 """
1195 Raised when there's some unexpected behavior with the pid file, but only
1196 used internally (never allowed to escape this class).
1197 """
mbligh36768f02008-02-22 18:28:33 +00001198
1199
showard170873e2009-01-07 00:22:26 +00001200 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001201 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001202 self._start_time = None
1203 self.pidfile_id = None
1204 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001205
1206
showard170873e2009-01-07 00:22:26 +00001207 def _add_nice_command(self, command, nice_level):
1208 if not nice_level:
1209 return command
1210 return ['nice', '-n', str(nice_level)] + command
1211
1212
1213 def _set_start_time(self):
1214 self._start_time = time.time()
1215
1216
1217 def run(self, command, working_directory, nice_level=None, log_file=None,
1218 pidfile_name=None, paired_with_pidfile=None):
1219 assert command is not None
1220 if nice_level is not None:
1221 command = ['nice', '-n', str(nice_level)] + command
1222 self._set_start_time()
1223 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001224 command, working_directory, pidfile_name=pidfile_name,
1225 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001226
1227
showarded2afea2009-07-07 20:54:07 +00001228 def attach_to_existing_process(self, execution_path,
showardd3dc1992009-04-22 21:01:40 +00001229 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001230 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001231 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001232 execution_path, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001233 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001234
1235
jadmanski0afbb632008-06-06 21:10:57 +00001236 def kill(self):
showard170873e2009-01-07 00:22:26 +00001237 if self.has_process():
1238 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001239
mbligh36768f02008-02-22 18:28:33 +00001240
showard170873e2009-01-07 00:22:26 +00001241 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001242 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001243 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001244
1245
showard170873e2009-01-07 00:22:26 +00001246 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001247 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001248 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001249 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001250
1251
showard170873e2009-01-07 00:22:26 +00001252 def _read_pidfile(self, use_second_read=False):
1253 assert self.pidfile_id is not None, (
1254 'You must call run() or attach_to_existing_process()')
1255 contents = _drone_manager.get_pidfile_contents(
1256 self.pidfile_id, use_second_read=use_second_read)
1257 if contents.is_invalid():
1258 self._state = drone_manager.PidfileContents()
1259 raise self._PidfileException(contents)
1260 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001261
1262
showard21baa452008-10-21 00:08:39 +00001263 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001264 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1265 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001266 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001267 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001268
1269
1270 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001271 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001272 return
mblighbb421852008-03-11 22:36:16 +00001273
showard21baa452008-10-21 00:08:39 +00001274 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001275
showard170873e2009-01-07 00:22:26 +00001276 if self._state.process is None:
1277 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001278 return
mbligh90a549d2008-03-25 23:52:34 +00001279
showard21baa452008-10-21 00:08:39 +00001280 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001281 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001282 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001283 return
mbligh90a549d2008-03-25 23:52:34 +00001284
showard170873e2009-01-07 00:22:26 +00001285 # pid but no running process - maybe process *just* exited
1286 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001287 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001288 # autoserv exited without writing an exit code
1289 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001290 self._handle_pidfile_error(
1291 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001292
showard21baa452008-10-21 00:08:39 +00001293
1294 def _get_pidfile_info(self):
1295 """\
1296 After completion, self._state will contain:
1297 pid=None, exit_status=None if autoserv has not yet run
1298 pid!=None, exit_status=None if autoserv is running
1299 pid!=None, exit_status!=None if autoserv has completed
1300 """
1301 try:
1302 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001303 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001304 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001305
1306
showard170873e2009-01-07 00:22:26 +00001307 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001308 """\
1309 Called when no pidfile is found or no pid is in the pidfile.
1310 """
showard170873e2009-01-07 00:22:26 +00001311 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001312 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001313 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001314 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001315 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001316
1317
showard35162b02009-03-03 02:17:30 +00001318 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001319 """\
1320 Called when autoserv has exited without writing an exit status,
1321 or we've timed out waiting for autoserv to write a pid to the
1322 pidfile. In either case, we just return failure and the caller
1323 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001324
showard170873e2009-01-07 00:22:26 +00001325 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001326 """
1327 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001328 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001329 self._state.exit_status = 1
1330 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001331
1332
jadmanski0afbb632008-06-06 21:10:57 +00001333 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001334 self._get_pidfile_info()
1335 return self._state.exit_status
1336
1337
1338 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001339 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001340 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001341 if self._state.num_tests_failed is None:
1342 return -1
showard21baa452008-10-21 00:08:39 +00001343 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001344
1345
showardcdaeae82009-08-31 18:32:48 +00001346 def try_copy_results_on_drone(self, **kwargs):
1347 if self.has_process():
1348 # copy results logs into the normal place for job results
1349 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1350
1351
1352 def try_copy_to_results_repository(self, source, **kwargs):
1353 if self.has_process():
1354 _drone_manager.copy_to_results_repository(self.get_process(),
1355 source, **kwargs)
1356
1357
mbligh36768f02008-02-22 18:28:33 +00001358class Agent(object):
showard77182562009-06-10 00:16:05 +00001359 """
showard8cc058f2009-09-08 16:26:33 +00001360 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001361
1362 The following methods are required on all task objects:
1363 poll() - Called periodically to let the task check its status and
1364 update its internal state. If the task succeeded.
1365 is_done() - Returns True if the task is finished.
1366 abort() - Called when an abort has been requested. The task must
1367 set its aborted attribute to True if it actually aborted.
1368
1369 The following attributes are required on all task objects:
1370 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001371 success - bool, True if this task succeeded.
1372 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1373 host_ids - A sequence of Host ids this task represents.
1374
1375 The following attribute is written to all task objects:
1376 agent - A reference to the Agent instance that the task has been
1377 added to.
1378 """
1379
1380
showard8cc058f2009-09-08 16:26:33 +00001381 def __init__(self, task, num_processes=1):
showard77182562009-06-10 00:16:05 +00001382 """
showard8cc058f2009-09-08 16:26:33 +00001383 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001384 @param num_processes: The number of subprocesses the Agent represents.
1385 This is used by the Dispatcher for managing the load on the
1386 system. Defaults to 1.
1387 """
showard8cc058f2009-09-08 16:26:33 +00001388 self.task = task
1389 task.agent = self
1390
showard77182562009-06-10 00:16:05 +00001391 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001392 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001393 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001394
showard8cc058f2009-09-08 16:26:33 +00001395 self.queue_entry_ids = task.queue_entry_ids
1396 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001397
showard8cc058f2009-09-08 16:26:33 +00001398 self.started = False
mbligh36768f02008-02-22 18:28:33 +00001399
1400
jadmanski0afbb632008-06-06 21:10:57 +00001401 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001402 self.started = True
1403 if self.task:
1404 self.task.poll()
1405 if self.task.is_done():
1406 self.task = None
showardec113162008-05-08 00:52:49 +00001407
1408
jadmanski0afbb632008-06-06 21:10:57 +00001409 def is_done(self):
showard8cc058f2009-09-08 16:26:33 +00001410 return self.task is None
mbligh36768f02008-02-22 18:28:33 +00001411
1412
showardd3dc1992009-04-22 21:01:40 +00001413 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001414 if self.task:
1415 self.task.abort()
1416 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001417 # tasks can choose to ignore aborts
showard8cc058f2009-09-08 16:26:33 +00001418 self.task = None
showard20f9bdd2009-04-29 19:48:33 +00001419
showardd3dc1992009-04-22 21:01:40 +00001420
showard77182562009-06-10 00:16:05 +00001421class DelayedCallTask(object):
1422 """
1423 A task object like AgentTask for an Agent to run that waits for the
1424 specified amount of time to have elapsed before calling the supplied
1425 callback once and finishing. If the callback returns anything, it is
1426 assumed to be a new Agent instance and will be added to the dispatcher.
1427
1428 @attribute end_time: The absolute posix time after which this task will
1429 call its callback when it is polled and be finished.
1430
1431 Also has all attributes required by the Agent class.
1432 """
1433 def __init__(self, delay_seconds, callback, now_func=None):
1434 """
1435 @param delay_seconds: The delay in seconds from now that this task
1436 will call the supplied callback and be done.
1437 @param callback: A callable to be called by this task once after at
1438 least delay_seconds time has elapsed. It must return None
1439 or a new Agent instance.
1440 @param now_func: A time.time like function. Default: time.time.
1441 Used for testing.
1442 """
1443 assert delay_seconds > 0
1444 assert callable(callback)
1445 if not now_func:
1446 now_func = time.time
1447 self._now_func = now_func
1448 self._callback = callback
1449
1450 self.end_time = self._now_func() + delay_seconds
1451
1452 # These attributes are required by Agent.
1453 self.aborted = False
showard77182562009-06-10 00:16:05 +00001454 self.host_ids = ()
1455 self.success = False
1456 self.queue_entry_ids = ()
1457 # This is filled in by Agent.add_task().
1458 self.agent = None
1459
1460
1461 def poll(self):
showard8cc058f2009-09-08 16:26:33 +00001462 if not self.is_done() and self._now_func() >= self.end_time:
1463 self._callback()
showard77182562009-06-10 00:16:05 +00001464 self.success = True
1465
1466
1467 def is_done(self):
showard8cc058f2009-09-08 16:26:33 +00001468 return self.success or self.aborted
showard77182562009-06-10 00:16:05 +00001469
1470
1471 def abort(self):
1472 self.aborted = True
showard77182562009-06-10 00:16:05 +00001473
1474
mbligh36768f02008-02-22 18:28:33 +00001475class AgentTask(object):
showard8cc058f2009-09-08 16:26:33 +00001476 def __init__(self, cmd=None, working_directory=None,
showarded2afea2009-07-07 20:54:07 +00001477 pidfile_name=None, paired_with_pidfile=None,
1478 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001479 self.done = False
jadmanski0afbb632008-06-06 21:10:57 +00001480 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001481 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001482 self.agent = None
showarded2afea2009-07-07 20:54:07 +00001483 self.monitor = recover_run_monitor
1484 self.started = bool(recover_run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +00001485 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001486 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001487 self.queue_entry_ids = []
1488 self.host_ids = []
1489 self.log_file = None
1490
1491
1492 def _set_ids(self, host=None, queue_entries=None):
1493 if queue_entries and queue_entries != [None]:
1494 self.host_ids = [entry.host.id for entry in queue_entries]
1495 self.queue_entry_ids = [entry.id for entry in queue_entries]
1496 else:
1497 assert host
1498 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001499
1500
jadmanski0afbb632008-06-06 21:10:57 +00001501 def poll(self):
showard08a36412009-05-05 01:01:13 +00001502 if not self.started:
1503 self.start()
1504 self.tick()
1505
1506
1507 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001508 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001509 exit_code = self.monitor.exit_code()
1510 if exit_code is None:
1511 return
1512 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001513 else:
1514 success = False
mbligh36768f02008-02-22 18:28:33 +00001515
jadmanski0afbb632008-06-06 21:10:57 +00001516 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001517
1518
jadmanski0afbb632008-06-06 21:10:57 +00001519 def is_done(self):
1520 return self.done
mbligh36768f02008-02-22 18:28:33 +00001521
1522
jadmanski0afbb632008-06-06 21:10:57 +00001523 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001524 if self.done:
1525 return
jadmanski0afbb632008-06-06 21:10:57 +00001526 self.done = True
1527 self.success = success
1528 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001529
1530
jadmanski0afbb632008-06-06 21:10:57 +00001531 def prolog(self):
showarded2afea2009-07-07 20:54:07 +00001532 assert not self.monitor
mblighd64e5702008-04-04 21:39:28 +00001533
mbligh36768f02008-02-22 18:28:33 +00001534
jadmanski0afbb632008-06-06 21:10:57 +00001535 def cleanup(self):
showardcdaeae82009-08-31 18:32:48 +00001536 if self.monitor and self.log_file:
1537 self.monitor.try_copy_to_results_repository(self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001538
1539
jadmanski0afbb632008-06-06 21:10:57 +00001540 def epilog(self):
1541 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001542
1543
jadmanski0afbb632008-06-06 21:10:57 +00001544 def start(self):
1545 assert self.agent
1546
1547 if not self.started:
1548 self.prolog()
1549 self.run()
1550
1551 self.started = True
1552
1553
1554 def abort(self):
1555 if self.monitor:
1556 self.monitor.kill()
1557 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001558 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001559 self.cleanup()
1560
1561
showarded2afea2009-07-07 20:54:07 +00001562 def _get_consistent_execution_path(self, execution_entries):
1563 first_execution_path = execution_entries[0].execution_path()
1564 for execution_entry in execution_entries[1:]:
1565 assert execution_entry.execution_path() == first_execution_path, (
1566 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1567 execution_entry,
1568 first_execution_path,
1569 execution_entries[0]))
1570 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001571
1572
showarded2afea2009-07-07 20:54:07 +00001573 def _copy_results(self, execution_entries, use_monitor=None):
1574 """
1575 @param execution_entries: list of objects with execution_path() method
1576 """
showard6d1c1432009-08-20 23:30:39 +00001577 if use_monitor is not None and not use_monitor.has_process():
1578 return
1579
showarded2afea2009-07-07 20:54:07 +00001580 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001581 if use_monitor is None:
1582 assert self.monitor
1583 use_monitor = self.monitor
1584 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001585 execution_path = self._get_consistent_execution_path(execution_entries)
1586 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001587 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001588
showarda1e74b32009-05-12 17:32:04 +00001589
1590 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001591 for queue_entry in queue_entries:
1592 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001593
1594
showarda1e74b32009-05-12 17:32:04 +00001595 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1596 self._copy_results(queue_entries, use_monitor)
1597 self._parse_results(queue_entries)
1598
1599
showardd3dc1992009-04-22 21:01:40 +00001600 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
showarded2afea2009-07-07 20:54:07 +00001601 assert not self.monitor
jadmanski0afbb632008-06-06 21:10:57 +00001602 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001603 self.monitor = PidfileRunMonitor()
1604 self.monitor.run(self.cmd, self._working_directory,
1605 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001606 log_file=self.log_file,
1607 pidfile_name=pidfile_name,
1608 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001609
1610
showardd9205182009-04-27 20:09:55 +00001611class TaskWithJobKeyvals(object):
1612 """AgentTask mixin providing functionality to help with job keyval files."""
1613 _KEYVAL_FILE = 'keyval'
1614 def _format_keyval(self, key, value):
1615 return '%s=%s' % (key, value)
1616
1617
1618 def _keyval_path(self):
1619 """Subclasses must override this"""
1620 raise NotImplemented
1621
1622
1623 def _write_keyval_after_job(self, field, value):
1624 assert self.monitor
1625 if not self.monitor.has_process():
1626 return
1627 _drone_manager.write_lines_to_file(
1628 self._keyval_path(), [self._format_keyval(field, value)],
1629 paired_with_process=self.monitor.get_process())
1630
1631
1632 def _job_queued_keyval(self, job):
1633 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1634
1635
1636 def _write_job_finished(self):
1637 self._write_keyval_after_job("job_finished", int(time.time()))
1638
1639
showarddb502762009-09-09 15:31:20 +00001640 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1641 keyval_contents = '\n'.join(self._format_keyval(key, value)
1642 for key, value in keyval_dict.iteritems())
1643 # always end with a newline to allow additional keyvals to be written
1644 keyval_contents += '\n'
1645 _drone_manager.attach_file_to_execution(self._working_directory,
1646 keyval_contents,
1647 file_path=keyval_path)
1648
1649
1650 def _write_keyvals_before_job(self, keyval_dict):
1651 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1652
1653
1654 def _write_host_keyvals(self, host):
1655 keyval_path = os.path.join(self._working_directory, 'host_keyvals',
1656 host.hostname)
1657 platform, all_labels = host.platform_and_labels()
1658 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1659 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1660
1661
showard8cc058f2009-09-08 16:26:33 +00001662class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001663 """
1664 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1665 """
1666
1667 TASK_TYPE = None
1668 host = None
1669 queue_entry = None
1670
1671 def __init__(self, task, extra_command_args, **kwargs):
showarded2afea2009-07-07 20:54:07 +00001672 assert (self.TASK_TYPE is not None,
1673 'self.TASK_TYPE must be overridden')
showard8cc058f2009-09-08 16:26:33 +00001674
1675 self.host = Host(id=task.host.id)
1676 self.queue_entry = None
1677 if task.queue_entry:
1678 self.queue_entry = HostQueueEntry(id=task.queue_entry.id)
1679
showarded2afea2009-07-07 20:54:07 +00001680 self.task = task
showarddb502762009-09-09 15:31:20 +00001681 kwargs['working_directory'] = task.execution_path()
showarded2afea2009-07-07 20:54:07 +00001682 self._extra_command_args = extra_command_args
1683 super(SpecialAgentTask, self).__init__(**kwargs)
1684
1685
showard8cc058f2009-09-08 16:26:33 +00001686 def _keyval_path(self):
1687 return os.path.join(self._working_directory, self._KEYVAL_FILE)
1688
1689
showarded2afea2009-07-07 20:54:07 +00001690 def prolog(self):
1691 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001692 self.cmd = _autoserv_command_line(self.host.hostname,
1693 self._extra_command_args,
1694 queue_entry=self.queue_entry)
1695 self._working_directory = self.task.execution_path()
1696 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001697 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001698
1699
showardde634ee2009-01-30 01:44:24 +00001700 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001701 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001702
showard2fe3f1d2009-07-06 20:19:11 +00001703 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001704 return # don't fail metahost entries, they'll be reassigned
1705
showard2fe3f1d2009-07-06 20:19:11 +00001706 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001707 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001708 return # entry has been aborted
1709
showard2fe3f1d2009-07-06 20:19:11 +00001710 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001711 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001712 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001713 self._write_keyval_after_job(queued_key, queued_time)
1714 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001715
showard8cc058f2009-09-08 16:26:33 +00001716 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001717 self.monitor.try_copy_results_on_drone(
1718 source_path=self._working_directory + '/',
1719 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001720
showard2fe3f1d2009-07-06 20:19:11 +00001721 self._copy_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001722 self.queue_entry.handle_host_failure()
showard2fe3f1d2009-07-06 20:19:11 +00001723 if self.queue_entry.job.parse_failed_repair:
1724 self._parse_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001725
1726 pidfile_id = _drone_manager.get_pidfile_id_from(
1727 self.queue_entry.execution_path(),
1728 pidfile_name=_AUTOSERV_PID_FILE)
1729 _drone_manager.register_pidfile(pidfile_id)
1730
1731
1732 def cleanup(self):
1733 super(SpecialAgentTask, self).cleanup()
1734 self.task.finish()
showardf85a0b72009-10-07 20:48:45 +00001735 if self.monitor:
1736 if self.monitor.has_process():
1737 self._copy_results([self.task])
1738 if self.monitor.pidfile_id is not None:
1739 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001740
1741
1742class RepairTask(SpecialAgentTask):
1743 TASK_TYPE = models.SpecialTask.Task.REPAIR
1744
1745
1746 def __init__(self, task, recover_run_monitor=None):
1747 """\
1748 queue_entry: queue entry to mark failed if this repair fails.
1749 """
1750 protection = host_protections.Protection.get_string(
1751 task.host.protection)
1752 # normalize the protection name
1753 protection = host_protections.Protection.get_attr_name(protection)
1754
1755 super(RepairTask, self).__init__(
1756 task, ['-R', '--host-protection', protection],
1757 recover_run_monitor=recover_run_monitor)
1758
1759 # *don't* include the queue entry in IDs -- if the queue entry is
1760 # aborted, we want to leave the repair task running
1761 self._set_ids(host=self.host)
1762
1763
1764 def prolog(self):
1765 super(RepairTask, self).prolog()
1766 logging.info("repair_task starting")
1767 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001768
1769
jadmanski0afbb632008-06-06 21:10:57 +00001770 def epilog(self):
1771 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001772
jadmanski0afbb632008-06-06 21:10:57 +00001773 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001774 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001775 else:
showard8cc058f2009-09-08 16:26:33 +00001776 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001777 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001778 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001779
1780
showarded2afea2009-07-07 20:54:07 +00001781class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001782 def _copy_to_results_repository(self):
1783 if not self.queue_entry or self.queue_entry.meta_host:
1784 return
1785
1786 self.queue_entry.set_execution_subdir()
1787 log_name = os.path.basename(self.task.execution_path())
1788 source = os.path.join(self.task.execution_path(), 'debug',
1789 'autoserv.DEBUG')
1790 destination = os.path.join(
1791 self.queue_entry.execution_path(), log_name)
1792
1793 self.monitor.try_copy_to_results_repository(
1794 source, destination_path=destination)
1795
1796
showard170873e2009-01-07 00:22:26 +00001797 def epilog(self):
1798 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001799
showard775300b2009-09-09 15:30:50 +00001800 if self.success:
1801 return
showard8fe93b52008-11-18 17:53:22 +00001802
showard775300b2009-09-09 15:30:50 +00001803 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001804
showard775300b2009-09-09 15:30:50 +00001805 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
1806 return
1807
1808 if self.queue_entry:
1809 self.queue_entry.requeue()
1810
1811 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001812 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001813 queue_entry__id=self.queue_entry.id):
1814 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1815 self._fail_queue_entry()
1816 return
1817
1818 queue_entry = models.HostQueueEntry(id=self.queue_entry.id)
1819 else:
1820 queue_entry = None
1821
1822 models.SpecialTask.objects.create(
1823 host=models.Host(id=self.host.id),
1824 task=models.SpecialTask.Task.REPAIR,
1825 queue_entry=queue_entry)
showard58721a82009-08-20 23:32:40 +00001826
showard8fe93b52008-11-18 17:53:22 +00001827
1828class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001829 TASK_TYPE = models.SpecialTask.Task.VERIFY
1830
1831
showard8cc058f2009-09-08 16:26:33 +00001832 def __init__(self, task, recover_run_monitor=None):
showarded2afea2009-07-07 20:54:07 +00001833 super(VerifyTask, self).__init__(
showard8cc058f2009-09-08 16:26:33 +00001834 task, ['-v'], recover_run_monitor=recover_run_monitor)
mblighe2586682008-02-29 22:45:46 +00001835
showard8cc058f2009-09-08 16:26:33 +00001836 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001837
1838
jadmanski0afbb632008-06-06 21:10:57 +00001839 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001840 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001841
showardb18134f2009-03-20 20:52:18 +00001842 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001843 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001844 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1845 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001846
showarded2afea2009-07-07 20:54:07 +00001847 # Delete any other queued verifies for this host. One verify will do
1848 # and there's no need to keep records of other requests.
1849 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001850 host__id=self.host.id,
1851 task=models.SpecialTask.Task.VERIFY,
showarded2afea2009-07-07 20:54:07 +00001852 is_active=False, is_complete=False)
1853 queued_verifies = queued_verifies.exclude(id=self.task.id)
1854 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001855
mbligh36768f02008-02-22 18:28:33 +00001856
jadmanski0afbb632008-06-06 21:10:57 +00001857 def epilog(self):
1858 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001859 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001860 if self.queue_entry:
1861 self.queue_entry.on_pending()
1862 else:
1863 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001864
1865
showardb5626452009-06-30 01:57:28 +00001866class CleanupHostsMixin(object):
1867 def _reboot_hosts(self, job, queue_entries, final_success,
1868 num_tests_failed):
1869 reboot_after = job.reboot_after
1870 do_reboot = (
1871 # always reboot after aborted jobs
1872 self._final_status == models.HostQueueEntry.Status.ABORTED
1873 or reboot_after == models.RebootAfter.ALWAYS
1874 or (reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED
1875 and final_success and num_tests_failed == 0))
1876
1877 for queue_entry in queue_entries:
1878 if do_reboot:
1879 # don't pass the queue entry to the CleanupTask. if the cleanup
1880 # fails, the job doesn't care -- it's over.
showard8cc058f2009-09-08 16:26:33 +00001881 models.SpecialTask.objects.create(
1882 host=models.Host(id=queue_entry.host.id),
1883 task=models.SpecialTask.Task.CLEANUP)
showardb5626452009-06-30 01:57:28 +00001884 else:
showard8cc058f2009-09-08 16:26:33 +00001885 queue_entry.host.set_status(models.Host.Status.READY)
showardb5626452009-06-30 01:57:28 +00001886
1887
1888class QueueTask(AgentTask, TaskWithJobKeyvals, CleanupHostsMixin):
showard8cc058f2009-09-08 16:26:33 +00001889 def __init__(self, job, queue_entries, cmd=None, recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001890 self.job = job
1891 self.queue_entries = queue_entries
showard8cc058f2009-09-08 16:26:33 +00001892 self.group_name = queue_entries[0].get_group_name()
showarded2afea2009-07-07 20:54:07 +00001893 super(QueueTask, self).__init__(
1894 cmd, self._execution_path(),
1895 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00001896 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001897
1898
showard73ec0442009-02-07 02:05:20 +00001899 def _keyval_path(self):
showarded2afea2009-07-07 20:54:07 +00001900 return os.path.join(self._execution_path(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001901
1902
showarded2afea2009-07-07 20:54:07 +00001903 def _execution_path(self):
1904 return self.queue_entries[0].execution_path()
mblighbb421852008-03-11 22:36:16 +00001905
1906
jadmanski0afbb632008-06-06 21:10:57 +00001907 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00001908 for entry in self.queue_entries:
1909 if entry.status not in (models.HostQueueEntry.Status.STARTING,
1910 models.HostQueueEntry.Status.RUNNING):
1911 raise SchedulerError('Queue task attempting to start '
1912 'entry with invalid status %s: %s'
1913 % (entry.status, entry))
1914 if entry.host.status not in (models.Host.Status.PENDING,
1915 models.Host.Status.RUNNING):
1916 raise SchedulerError('Queue task attempting to start on queue '
1917 'entry with invalid host status %s: %s'
1918 % (entry.host.status, entry))
1919
showardd9205182009-04-27 20:09:55 +00001920 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001921 keyval_dict = {queued_key: queued_time}
1922 if self.group_name:
1923 keyval_dict['host_group_name'] = self.group_name
1924 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001925 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001926 self._write_host_keyvals(queue_entry.host)
showard8cc058f2009-09-08 16:26:33 +00001927 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showard12f3e322009-05-13 21:27:42 +00001928 queue_entry.update_field('started_on', datetime.datetime.now())
showard8cc058f2009-09-08 16:26:33 +00001929 queue_entry.host.set_status(models.Host.Status.RUNNING)
showard21baa452008-10-21 00:08:39 +00001930 queue_entry.host.update_field('dirty', 1)
showardc6a56872009-07-28 20:11:58 +00001931 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
1932 # TODO(gps): Remove this if nothing needs it anymore.
1933 # A potential user is: tko/parser
jadmanski0afbb632008-06-06 21:10:57 +00001934 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001935
1936
showard35162b02009-03-03 02:17:30 +00001937 def _write_lost_process_error_file(self):
showarded2afea2009-07-07 20:54:07 +00001938 error_file_path = os.path.join(self._execution_path(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001939 _drone_manager.write_lines_to_file(error_file_path,
1940 [_LOST_PROCESS_ERROR])
1941
1942
showardd3dc1992009-04-22 21:01:40 +00001943 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001944 if not self.monitor:
1945 return
1946
showardd9205182009-04-27 20:09:55 +00001947 self._write_job_finished()
1948
showard35162b02009-03-03 02:17:30 +00001949 if self.monitor.lost_process:
1950 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001951
showard8cc058f2009-09-08 16:26:33 +00001952 for queue_entry in self.queue_entries:
1953 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001954
1955
showardcbd74612008-11-19 21:42:02 +00001956 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001957 _drone_manager.write_lines_to_file(
showarded2afea2009-07-07 20:54:07 +00001958 os.path.join(self._execution_path(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001959 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001960 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001961
1962
jadmanskif7fa2cc2008-10-01 14:13:23 +00001963 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001964 if not self.monitor or not self.monitor.has_process():
1965 return
1966
jadmanskif7fa2cc2008-10-01 14:13:23 +00001967 # build up sets of all the aborted_by and aborted_on values
1968 aborted_by, aborted_on = set(), set()
1969 for queue_entry in self.queue_entries:
1970 if queue_entry.aborted_by:
1971 aborted_by.add(queue_entry.aborted_by)
1972 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1973 aborted_on.add(t)
1974
1975 # extract some actual, unique aborted by value and write it out
1976 assert len(aborted_by) <= 1
1977 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001978 aborted_by_value = aborted_by.pop()
1979 aborted_on_value = max(aborted_on)
1980 else:
1981 aborted_by_value = 'autotest_system'
1982 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001983
showarda0382352009-02-11 23:36:43 +00001984 self._write_keyval_after_job("aborted_by", aborted_by_value)
1985 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001986
showardcbd74612008-11-19 21:42:02 +00001987 aborted_on_string = str(datetime.datetime.fromtimestamp(
1988 aborted_on_value))
1989 self._write_status_comment('Job aborted by %s on %s' %
1990 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001991
1992
jadmanski0afbb632008-06-06 21:10:57 +00001993 def abort(self):
1994 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001995 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001996 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001997
1998
jadmanski0afbb632008-06-06 21:10:57 +00001999 def epilog(self):
2000 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002001 self._finish_task()
2002 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00002003
2004
showardd3dc1992009-04-22 21:01:40 +00002005class PostJobTask(AgentTask):
2006 def __init__(self, queue_entries, pidfile_name, logfile_name,
showarded2afea2009-07-07 20:54:07 +00002007 recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00002008 self._queue_entries = queue_entries
2009 self._pidfile_name = pidfile_name
showardd3dc1992009-04-22 21:01:40 +00002010
showarded2afea2009-07-07 20:54:07 +00002011 self._execution_path = self._get_consistent_execution_path(
2012 queue_entries)
2013 self._results_dir = _drone_manager.absolute_path(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00002014 self._autoserv_monitor = PidfileRunMonitor()
showarded2afea2009-07-07 20:54:07 +00002015 self._autoserv_monitor.attach_to_existing_process(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00002016 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
2017
2018 if _testing_mode:
2019 command = 'true'
2020 else:
2021 command = self._generate_command(self._results_dir)
2022
showarded2afea2009-07-07 20:54:07 +00002023 super(PostJobTask, self).__init__(
2024 cmd=command, working_directory=self._execution_path,
2025 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00002026
showarded2afea2009-07-07 20:54:07 +00002027 self.log_file = os.path.join(self._execution_path, logfile_name)
showardd3dc1992009-04-22 21:01:40 +00002028 self._final_status = self._determine_final_status()
2029
2030
2031 def _generate_command(self, results_dir):
2032 raise NotImplementedError('Subclasses must override this')
2033
2034
2035 def _job_was_aborted(self):
2036 was_aborted = None
2037 for queue_entry in self._queue_entries:
2038 queue_entry.update_from_database()
2039 if was_aborted is None: # first queue entry
2040 was_aborted = bool(queue_entry.aborted)
2041 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
2042 email_manager.manager.enqueue_notify_email(
2043 'Inconsistent abort state',
2044 'Queue entries have inconsistent abort state: ' +
2045 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
2046 # don't crash here, just assume true
2047 return True
2048 return was_aborted
2049
2050
2051 def _determine_final_status(self):
2052 if self._job_was_aborted():
2053 return models.HostQueueEntry.Status.ABORTED
2054
2055 # we'll use a PidfileRunMonitor to read the autoserv exit status
2056 if self._autoserv_monitor.exit_code() == 0:
2057 return models.HostQueueEntry.Status.COMPLETED
2058 return models.HostQueueEntry.Status.FAILED
2059
2060
2061 def run(self):
showard8cc058f2009-09-08 16:26:33 +00002062 # Make sure we actually have results to work with.
2063 # This should never happen in normal operation.
showard5add1c82009-05-26 19:27:46 +00002064 if not self._autoserv_monitor.has_process():
2065 email_manager.manager.enqueue_notify_email(
showard8cc058f2009-09-08 16:26:33 +00002066 'No results in post-job task',
2067 'No results in post-job task at %s' %
2068 self._autoserv_monitor.pidfile_id)
showard5add1c82009-05-26 19:27:46 +00002069 self.finished(False)
2070 return
2071
2072 super(PostJobTask, self).run(
2073 pidfile_name=self._pidfile_name,
2074 paired_with_pidfile=self._paired_with_pidfile)
showardd3dc1992009-04-22 21:01:40 +00002075
2076
2077 def _set_all_statuses(self, status):
2078 for queue_entry in self._queue_entries:
2079 queue_entry.set_status(status)
2080
2081
2082 def abort(self):
2083 # override AgentTask.abort() to avoid killing the process and ending
2084 # the task. post-job tasks continue when the job is aborted.
2085 pass
2086
2087
showardb5626452009-06-30 01:57:28 +00002088class GatherLogsTask(PostJobTask, CleanupHostsMixin):
showardd3dc1992009-04-22 21:01:40 +00002089 """
2090 Task responsible for
2091 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2092 * copying logs to the results repository
2093 * spawning CleanupTasks for hosts, if necessary
2094 * spawning a FinalReparseTask for the job
2095 """
showarded2afea2009-07-07 20:54:07 +00002096 def __init__(self, job, queue_entries, recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00002097 self._job = job
2098 super(GatherLogsTask, self).__init__(
2099 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
showarded2afea2009-07-07 20:54:07 +00002100 logfile_name='.collect_crashinfo.log',
2101 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00002102 self._set_ids(queue_entries=queue_entries)
2103
2104
2105 def _generate_command(self, results_dir):
2106 host_list = ','.join(queue_entry.host.hostname
2107 for queue_entry in self._queue_entries)
2108 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
2109 '-r', results_dir]
2110
2111
2112 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00002113 for queue_entry in self._queue_entries:
2114 if queue_entry.status != models.HostQueueEntry.Status.GATHERING:
2115 raise SchedulerError('Gather task attempting to start on '
2116 'non-gathering entry: %s' % queue_entry)
2117 if queue_entry.host.status != models.Host.Status.RUNNING:
2118 raise SchedulerError('Gather task attempting to start on queue '
showard0c5c18d2009-09-24 22:20:45 +00002119 'entry with non-running host status %s: %s'
2120 % (queue_entry.host.status, queue_entry))
showard8cc058f2009-09-08 16:26:33 +00002121
showardd3dc1992009-04-22 21:01:40 +00002122 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002123
2124
showardd3dc1992009-04-22 21:01:40 +00002125 def epilog(self):
2126 super(GatherLogsTask, self).epilog()
showardb5626452009-06-30 01:57:28 +00002127
showard6d1c1432009-08-20 23:30:39 +00002128 self._copy_and_parse_results(self._queue_entries,
2129 use_monitor=self._autoserv_monitor)
2130
2131 if self._autoserv_monitor.has_process():
2132 final_success = (self._final_status ==
2133 models.HostQueueEntry.Status.COMPLETED)
2134 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2135 else:
2136 final_success = False
2137 num_tests_failed = 0
2138
showardb5626452009-06-30 01:57:28 +00002139 self._reboot_hosts(self._job, self._queue_entries, final_success,
2140 num_tests_failed)
showardd3dc1992009-04-22 21:01:40 +00002141
2142
showard0bbfc212009-04-29 21:06:13 +00002143 def run(self):
showard597bfd32009-05-08 18:22:50 +00002144 autoserv_exit_code = self._autoserv_monitor.exit_code()
2145 # only run if Autoserv exited due to some signal. if we have no exit
2146 # code, assume something bad (and signal-like) happened.
2147 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002148 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002149 else:
2150 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002151
2152
showard8fe93b52008-11-18 17:53:22 +00002153class CleanupTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002154 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2155
2156
showard8cc058f2009-09-08 16:26:33 +00002157 def __init__(self, task, recover_run_monitor=None):
showarded2afea2009-07-07 20:54:07 +00002158 super(CleanupTask, self).__init__(
showard8cc058f2009-09-08 16:26:33 +00002159 task, ['--cleanup'], recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002160
showard8cc058f2009-09-08 16:26:33 +00002161 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mbligh16c722d2008-03-05 00:58:44 +00002162
mblighd5c95802008-03-05 00:33:46 +00002163
jadmanski0afbb632008-06-06 21:10:57 +00002164 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002165 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00002166 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard8cc058f2009-09-08 16:26:33 +00002167 self.host.set_status(models.Host.Status.CLEANING)
2168 if self.queue_entry:
2169 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2170
2171
showard775300b2009-09-09 15:30:50 +00002172 def _finish_epilog(self):
2173 if not self.queue_entry:
2174 return
2175
2176 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
2177 self.queue_entry.on_pending()
2178 elif self.success:
2179 if self.queue_entry.job.run_verify:
2180 entry = models.HostQueueEntry(id=self.queue_entry.id)
2181 models.SpecialTask.objects.create(
2182 host=models.Host(id=self.host.id),
2183 queue_entry=entry,
2184 task=models.SpecialTask.Task.VERIFY)
2185 else:
2186 self.queue_entry.on_pending()
mblighd5c95802008-03-05 00:33:46 +00002187
mblighd5c95802008-03-05 00:33:46 +00002188
showard21baa452008-10-21 00:08:39 +00002189 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00002190 super(CleanupTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002191
showard21baa452008-10-21 00:08:39 +00002192 if self.success:
2193 self.host.update_field('dirty', 0)
showard775300b2009-09-09 15:30:50 +00002194 self.host.set_status(models.Host.Status.READY)
showard21baa452008-10-21 00:08:39 +00002195
showard775300b2009-09-09 15:30:50 +00002196 self._finish_epilog()
showard8cc058f2009-09-08 16:26:33 +00002197
showard21baa452008-10-21 00:08:39 +00002198
showardd3dc1992009-04-22 21:01:40 +00002199class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00002200 _num_running_parses = 0
2201
showarded2afea2009-07-07 20:54:07 +00002202 def __init__(self, queue_entries, recover_run_monitor=None):
2203 super(FinalReparseTask, self).__init__(
2204 queue_entries, pidfile_name=_PARSER_PID_FILE,
2205 logfile_name='.parse.log',
2206 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002207 # don't use _set_ids, since we don't want to set the host_ids
2208 self.queue_entry_ids = [entry.id for entry in queue_entries]
showarded2afea2009-07-07 20:54:07 +00002209 self._parse_started = self.started
showard97aed502008-11-04 02:01:24 +00002210
showard97aed502008-11-04 02:01:24 +00002211
2212 @classmethod
2213 def _increment_running_parses(cls):
2214 cls._num_running_parses += 1
2215
2216
2217 @classmethod
2218 def _decrement_running_parses(cls):
2219 cls._num_running_parses -= 1
2220
2221
2222 @classmethod
2223 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002224 return (cls._num_running_parses <
2225 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002226
2227
2228 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00002229 for queue_entry in self._queue_entries:
2230 if queue_entry.status != models.HostQueueEntry.Status.PARSING:
2231 raise SchedulerError('Parse task attempting to start on '
2232 'non-parsing entry: %s' % queue_entry)
2233
showard97aed502008-11-04 02:01:24 +00002234 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002235
2236
2237 def epilog(self):
2238 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002239 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00002240
2241
showardd3dc1992009-04-22 21:01:40 +00002242 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00002243 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00002244 results_dir]
showard97aed502008-11-04 02:01:24 +00002245
2246
showard08a36412009-05-05 01:01:13 +00002247 def tick(self):
2248 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002249 # and we can, at which point we revert to default behavior
2250 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00002251 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002252 else:
2253 self._try_starting_parse()
2254
2255
2256 def run(self):
2257 # override run() to not actually run unless we can
2258 self._try_starting_parse()
2259
2260
2261 def _try_starting_parse(self):
2262 if not self._can_run_new_parse():
2263 return
showard170873e2009-01-07 00:22:26 +00002264
showard97aed502008-11-04 02:01:24 +00002265 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002266 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00002267
showard97aed502008-11-04 02:01:24 +00002268 self._increment_running_parses()
2269 self._parse_started = True
2270
2271
2272 def finished(self, success):
2273 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00002274 if self._parse_started:
2275 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002276
2277
showarda3c58572009-03-12 20:36:59 +00002278class DBError(Exception):
2279 """Raised by the DBObject constructor when its select fails."""
2280
2281
mbligh36768f02008-02-22 18:28:33 +00002282class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002283 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002284
2285 # Subclasses MUST override these:
2286 _table_name = ''
2287 _fields = ()
2288
showarda3c58572009-03-12 20:36:59 +00002289 # A mapping from (type, id) to the instance of the object for that
2290 # particular id. This prevents us from creating new Job() and Host()
2291 # instances for every HostQueueEntry object that we instantiate as
2292 # multiple HQEs often share the same Job.
2293 _instances_by_type_and_id = weakref.WeakValueDictionary()
2294 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002295
showarda3c58572009-03-12 20:36:59 +00002296
2297 def __new__(cls, id=None, **kwargs):
2298 """
2299 Look to see if we already have an instance for this particular type
2300 and id. If so, use it instead of creating a duplicate instance.
2301 """
2302 if id is not None:
2303 instance = cls._instances_by_type_and_id.get((cls, id))
2304 if instance:
2305 return instance
2306 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2307
2308
2309 def __init__(self, id=None, row=None, new_record=False, always_query=True):
showard8cc058f2009-09-08 16:26:33 +00002310 assert bool(id) or bool(row)
2311 if id is not None and row is not None:
2312 assert id == row[0]
showard6ae5ea92009-02-25 00:11:51 +00002313 assert self._table_name, '_table_name must be defined in your class'
2314 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002315 if not new_record:
2316 if self._initialized and not always_query:
2317 return # We've already been initialized.
2318 if id is None:
2319 id = row[0]
2320 # Tell future constructors to use us instead of re-querying while
2321 # this instance is still around.
2322 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002323
showard6ae5ea92009-02-25 00:11:51 +00002324 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002325
jadmanski0afbb632008-06-06 21:10:57 +00002326 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002327
jadmanski0afbb632008-06-06 21:10:57 +00002328 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002329 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002330
showarda3c58572009-03-12 20:36:59 +00002331 if self._initialized:
2332 differences = self._compare_fields_in_row(row)
2333 if differences:
showard7629f142009-03-27 21:02:02 +00002334 logging.warn(
2335 'initialized %s %s instance requery is updating: %s',
2336 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002337 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002338 self._initialized = True
2339
2340
2341 @classmethod
2342 def _clear_instance_cache(cls):
2343 """Used for testing, clear the internal instance cache."""
2344 cls._instances_by_type_and_id.clear()
2345
2346
showardccbd6c52009-03-21 00:10:21 +00002347 def _fetch_row_from_db(self, row_id):
2348 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2349 rows = _db.execute(sql, (row_id,))
2350 if not rows:
showard76e29d12009-04-15 21:53:10 +00002351 raise DBError("row not found (table=%s, row id=%s)"
2352 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002353 return rows[0]
2354
2355
showarda3c58572009-03-12 20:36:59 +00002356 def _assert_row_length(self, row):
2357 assert len(row) == len(self._fields), (
2358 "table = %s, row = %s/%d, fields = %s/%d" % (
2359 self.__table, row, len(row), self._fields, len(self._fields)))
2360
2361
2362 def _compare_fields_in_row(self, row):
2363 """
2364 Given a row as returned by a SELECT query, compare it to our existing
2365 in memory fields.
2366
2367 @param row - A sequence of values corresponding to fields named in
2368 The class attribute _fields.
2369
2370 @returns A dictionary listing the differences keyed by field name
2371 containing tuples of (current_value, row_value).
2372 """
2373 self._assert_row_length(row)
2374 differences = {}
2375 for field, row_value in itertools.izip(self._fields, row):
2376 current_value = getattr(self, field)
2377 if current_value != row_value:
2378 differences[field] = (current_value, row_value)
2379 return differences
showard2bab8f42008-11-12 18:15:22 +00002380
2381
2382 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002383 """
2384 Update our field attributes using a single row returned by SELECT.
2385
2386 @param row - A sequence of values corresponding to fields named in
2387 the class fields list.
2388 """
2389 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002390
showard2bab8f42008-11-12 18:15:22 +00002391 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002392 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002393 setattr(self, field, value)
2394 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002395
showard2bab8f42008-11-12 18:15:22 +00002396 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002397
mblighe2586682008-02-29 22:45:46 +00002398
showardccbd6c52009-03-21 00:10:21 +00002399 def update_from_database(self):
2400 assert self.id is not None
2401 row = self._fetch_row_from_db(self.id)
2402 self._update_fields_from_row(row)
2403
2404
jadmanski0afbb632008-06-06 21:10:57 +00002405 def count(self, where, table = None):
2406 if not table:
2407 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002408
jadmanski0afbb632008-06-06 21:10:57 +00002409 rows = _db.execute("""
2410 SELECT count(*) FROM %s
2411 WHERE %s
2412 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002413
jadmanski0afbb632008-06-06 21:10:57 +00002414 assert len(rows) == 1
2415
2416 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002417
2418
showardd3dc1992009-04-22 21:01:40 +00002419 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002420 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002421
showard2bab8f42008-11-12 18:15:22 +00002422 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002423 return
mbligh36768f02008-02-22 18:28:33 +00002424
mblighf8c624d2008-07-03 16:58:45 +00002425 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002426 _db.execute(query, (value, self.id))
2427
showard2bab8f42008-11-12 18:15:22 +00002428 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002429
2430
jadmanski0afbb632008-06-06 21:10:57 +00002431 def save(self):
2432 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002433 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002434 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002435 values = []
2436 for key in keys:
2437 value = getattr(self, key)
2438 if value is None:
2439 values.append('NULL')
2440 else:
2441 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002442 values_str = ','.join(values)
2443 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2444 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002445 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002446 # Update our id to the one the database just assigned to us.
2447 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002448
2449
jadmanski0afbb632008-06-06 21:10:57 +00002450 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002451 self._instances_by_type_and_id.pop((type(self), id), None)
2452 self._initialized = False
2453 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002454 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2455 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002456
2457
showard63a34772008-08-18 19:32:50 +00002458 @staticmethod
2459 def _prefix_with(string, prefix):
2460 if string:
2461 string = prefix + string
2462 return string
2463
2464
jadmanski0afbb632008-06-06 21:10:57 +00002465 @classmethod
showard989f25d2008-10-01 11:38:11 +00002466 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002467 """
2468 Construct instances of our class based on the given database query.
2469
2470 @yields One class instance for each row fetched.
2471 """
showard63a34772008-08-18 19:32:50 +00002472 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2473 where = cls._prefix_with(where, 'WHERE ')
2474 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002475 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002476 'joins' : joins,
2477 'where' : where,
2478 'order_by' : order_by})
2479 rows = _db.execute(query, params)
showard8cc058f2009-09-08 16:26:33 +00002480 return [cls(id=row[0], row=row) for row in rows]
mblighe2586682008-02-29 22:45:46 +00002481
mbligh36768f02008-02-22 18:28:33 +00002482
2483class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002484 _table_name = 'ineligible_host_queues'
2485 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002486
2487
showard89f84db2009-03-12 20:39:13 +00002488class AtomicGroup(DBObject):
2489 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002490 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2491 'invalid')
showard89f84db2009-03-12 20:39:13 +00002492
2493
showard989f25d2008-10-01 11:38:11 +00002494class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002495 _table_name = 'labels'
2496 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002497 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002498
2499
showard6157c632009-07-06 20:19:31 +00002500 def __repr__(self):
2501 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
2502 self.name, self.id, self.atomic_group_id)
2503
2504
mbligh36768f02008-02-22 18:28:33 +00002505class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002506 _table_name = 'hosts'
2507 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2508 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2509
2510
jadmanski0afbb632008-06-06 21:10:57 +00002511 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002512 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002513 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002514
2515
showard170873e2009-01-07 00:22:26 +00002516 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002517 """
showard170873e2009-01-07 00:22:26 +00002518 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002519 """
2520 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002521 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002522 FROM labels
2523 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002524 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002525 ORDER BY labels.name
2526 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002527 platform = None
2528 all_labels = []
2529 for label_name, is_platform in rows:
2530 if is_platform:
2531 platform = label_name
2532 all_labels.append(label_name)
2533 return platform, all_labels
2534
2535
showard54c1ea92009-05-20 00:32:58 +00002536 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2537
2538
2539 @classmethod
2540 def cmp_for_sort(cls, a, b):
2541 """
2542 A comparison function for sorting Host objects by hostname.
2543
2544 This strips any trailing numeric digits, ignores leading 0s and
2545 compares hostnames by the leading name and the trailing digits as a
2546 number. If both hostnames do not match this pattern, they are simply
2547 compared as lower case strings.
2548
2549 Example of how hostnames will be sorted:
2550
2551 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2552
2553 This hopefully satisfy most people's hostname sorting needs regardless
2554 of their exact naming schemes. Nobody sane should have both a host10
2555 and host010 (but the algorithm works regardless).
2556 """
2557 lower_a = a.hostname.lower()
2558 lower_b = b.hostname.lower()
2559 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2560 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2561 if match_a and match_b:
2562 name_a, number_a_str = match_a.groups()
2563 name_b, number_b_str = match_b.groups()
2564 number_a = int(number_a_str.lstrip('0'))
2565 number_b = int(number_b_str.lstrip('0'))
2566 result = cmp((name_a, number_a), (name_b, number_b))
2567 if result == 0 and lower_a != lower_b:
2568 # If they compared equal above but the lower case names are
2569 # indeed different, don't report equality. abc012 != abc12.
2570 return cmp(lower_a, lower_b)
2571 return result
2572 else:
2573 return cmp(lower_a, lower_b)
2574
2575
mbligh36768f02008-02-22 18:28:33 +00002576class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002577 _table_name = 'host_queue_entries'
2578 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002579 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002580 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002581
2582
showarda3c58572009-03-12 20:36:59 +00002583 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002584 assert id or row
showarda3c58572009-03-12 20:36:59 +00002585 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002586 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002587
jadmanski0afbb632008-06-06 21:10:57 +00002588 if self.host_id:
2589 self.host = Host(self.host_id)
2590 else:
2591 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002592
showard77182562009-06-10 00:16:05 +00002593 if self.atomic_group_id:
2594 self.atomic_group = AtomicGroup(self.atomic_group_id,
2595 always_query=False)
2596 else:
2597 self.atomic_group = None
2598
showard170873e2009-01-07 00:22:26 +00002599 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002600 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002601
2602
showard89f84db2009-03-12 20:39:13 +00002603 @classmethod
2604 def clone(cls, template):
2605 """
2606 Creates a new row using the values from a template instance.
2607
2608 The new instance will not exist in the database or have a valid
2609 id attribute until its save() method is called.
2610 """
2611 assert isinstance(template, cls)
2612 new_row = [getattr(template, field) for field in cls._fields]
2613 clone = cls(row=new_row, new_record=True)
2614 clone.id = None
2615 return clone
2616
2617
showardc85c21b2008-11-24 22:17:37 +00002618 def _view_job_url(self):
2619 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2620
2621
showardf1ae3542009-05-11 19:26:02 +00002622 def get_labels(self):
2623 """
2624 Get all labels associated with this host queue entry (either via the
2625 meta_host or as a job dependency label). The labels yielded are not
2626 guaranteed to be unique.
2627
2628 @yields Label instances associated with this host_queue_entry.
2629 """
2630 if self.meta_host:
2631 yield Label(id=self.meta_host, always_query=False)
2632 labels = Label.fetch(
2633 joins="JOIN jobs_dependency_labels AS deps "
2634 "ON (labels.id = deps.label_id)",
2635 where="deps.job_id = %d" % self.job.id)
2636 for label in labels:
2637 yield label
2638
2639
jadmanski0afbb632008-06-06 21:10:57 +00002640 def set_host(self, host):
2641 if host:
2642 self.queue_log_record('Assigning host ' + host.hostname)
2643 self.update_field('host_id', host.id)
2644 self.update_field('active', True)
2645 self.block_host(host.id)
2646 else:
2647 self.queue_log_record('Releasing host')
2648 self.unblock_host(self.host.id)
2649 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002650
jadmanski0afbb632008-06-06 21:10:57 +00002651 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002652
2653
jadmanski0afbb632008-06-06 21:10:57 +00002654 def get_host(self):
2655 return self.host
mbligh36768f02008-02-22 18:28:33 +00002656
2657
jadmanski0afbb632008-06-06 21:10:57 +00002658 def queue_log_record(self, log_line):
2659 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002660 _drone_manager.write_lines_to_file(self.queue_log_path,
2661 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002662
2663
jadmanski0afbb632008-06-06 21:10:57 +00002664 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002665 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002666 row = [0, self.job.id, host_id]
2667 block = IneligibleHostQueue(row=row, new_record=True)
2668 block.save()
mblighe2586682008-02-29 22:45:46 +00002669
2670
jadmanski0afbb632008-06-06 21:10:57 +00002671 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002672 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002673 blocks = IneligibleHostQueue.fetch(
2674 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2675 for block in blocks:
2676 block.delete()
mblighe2586682008-02-29 22:45:46 +00002677
2678
showard2bab8f42008-11-12 18:15:22 +00002679 def set_execution_subdir(self, subdir=None):
2680 if subdir is None:
2681 assert self.get_host()
2682 subdir = self.get_host().hostname
2683 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002684
2685
showard6355f6b2008-12-05 18:52:13 +00002686 def _get_hostname(self):
2687 if self.host:
2688 return self.host.hostname
2689 return 'no host'
2690
2691
showard170873e2009-01-07 00:22:26 +00002692 def __str__(self):
showard828fc4c2009-09-14 20:31:00 +00002693 flags = []
2694 if self.active:
2695 flags.append('active')
2696 if self.complete:
2697 flags.append('complete')
2698 if self.deleted:
2699 flags.append('deleted')
2700 if self.aborted:
2701 flags.append('aborted')
2702 flags_str = ','.join(flags)
2703 if flags_str:
2704 flags_str = ' [%s]' % flags_str
2705 return "%s/%d (%d) %s%s" % (self._get_hostname(), self.job.id, self.id,
2706 self.status, flags_str)
showard170873e2009-01-07 00:22:26 +00002707
2708
jadmanski0afbb632008-06-06 21:10:57 +00002709 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002710 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002711
showardb18134f2009-03-20 20:52:18 +00002712 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002713
showard8cc058f2009-09-08 16:26:33 +00002714 if status in (models.HostQueueEntry.Status.QUEUED,
2715 models.HostQueueEntry.Status.PARSING):
jadmanski0afbb632008-06-06 21:10:57 +00002716 self.update_field('complete', False)
2717 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002718
showard8cc058f2009-09-08 16:26:33 +00002719 if status in (models.HostQueueEntry.Status.PENDING,
2720 models.HostQueueEntry.Status.RUNNING,
2721 models.HostQueueEntry.Status.VERIFYING,
2722 models.HostQueueEntry.Status.STARTING,
2723 models.HostQueueEntry.Status.GATHERING):
jadmanski0afbb632008-06-06 21:10:57 +00002724 self.update_field('complete', False)
2725 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002726
showard8cc058f2009-09-08 16:26:33 +00002727 if status in (models.HostQueueEntry.Status.FAILED,
2728 models.HostQueueEntry.Status.COMPLETED,
2729 models.HostQueueEntry.Status.STOPPED,
2730 models.HostQueueEntry.Status.ABORTED):
jadmanski0afbb632008-06-06 21:10:57 +00002731 self.update_field('complete', True)
2732 self.update_field('active', False)
showardf85a0b72009-10-07 20:48:45 +00002733 self._on_complete()
showardc85c21b2008-11-24 22:17:37 +00002734
2735 should_email_status = (status.lower() in _notify_email_statuses or
2736 'all' in _notify_email_statuses)
2737 if should_email_status:
2738 self._email_on_status(status)
2739
2740 self._email_on_job_complete()
2741
2742
showardf85a0b72009-10-07 20:48:45 +00002743 def _on_complete(self):
2744 if not self.execution_subdir:
2745 return
2746 # unregister any possible pidfiles associated with this queue entry
2747 for pidfile_name in _ALL_PIDFILE_NAMES:
2748 pidfile_id = _drone_manager.get_pidfile_id_from(
2749 self.execution_path(), pidfile_name=pidfile_name)
2750 _drone_manager.unregister_pidfile(pidfile_id)
2751
2752
showardc85c21b2008-11-24 22:17:37 +00002753 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002754 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002755
2756 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2757 self.job.id, self.job.name, hostname, status)
2758 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2759 self.job.id, self.job.name, hostname, status,
2760 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002761 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002762
2763
2764 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002765 if not self.job.is_finished():
2766 return
showard542e8402008-09-19 20:16:18 +00002767
showardc85c21b2008-11-24 22:17:37 +00002768 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002769 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002770 for queue_entry in hosts_queue:
2771 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002772 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002773 queue_entry.status))
2774
2775 summary_text = "\n".join(summary_text)
2776 status_counts = models.Job.objects.get_status_counts(
2777 [self.job.id])[self.job.id]
2778 status = ', '.join('%d %s' % (count, status) for status, count
2779 in status_counts.iteritems())
2780
2781 subject = 'Autotest: Job ID: %s "%s" %s' % (
2782 self.job.id, self.job.name, status)
2783 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2784 self.job.id, self.job.name, status, self._view_job_url(),
2785 summary_text)
showard170873e2009-01-07 00:22:26 +00002786 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002787
2788
showard8cc058f2009-09-08 16:26:33 +00002789 def schedule_pre_job_tasks(self, assigned_host=None):
showard77182562009-06-10 00:16:05 +00002790 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002791 assert assigned_host
2792 # ensure results dir exists for the queue log
showard08356c12009-06-15 20:24:01 +00002793 if self.host_id is None:
2794 self.set_host(assigned_host)
2795 else:
2796 assert assigned_host.id == self.host_id
mbligh36768f02008-02-22 18:28:33 +00002797
showardcfd4a7e2009-07-11 01:47:33 +00002798 logging.info("%s/%s/%s scheduled on %s, status=%s",
showardb18134f2009-03-20 20:52:18 +00002799 self.job.name, self.meta_host, self.atomic_group_id,
2800 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002801
showard8cc058f2009-09-08 16:26:33 +00002802 self._do_schedule_pre_job_tasks()
showard77182562009-06-10 00:16:05 +00002803
2804
showard8cc058f2009-09-08 16:26:33 +00002805 def _do_schedule_pre_job_tasks(self):
showard77182562009-06-10 00:16:05 +00002806 # Every host goes thru the Verifying stage (which may or may not
2807 # actually do anything as determined by get_pre_job_tasks).
2808 self.set_status(models.HostQueueEntry.Status.VERIFYING)
showard8cc058f2009-09-08 16:26:33 +00002809 self.job.schedule_pre_job_tasks(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002810
showard6ae5ea92009-02-25 00:11:51 +00002811
jadmanski0afbb632008-06-06 21:10:57 +00002812 def requeue(self):
showardcfd4a7e2009-07-11 01:47:33 +00002813 assert self.host
showard8cc058f2009-09-08 16:26:33 +00002814 self.set_status(models.HostQueueEntry.Status.QUEUED)
showard12f3e322009-05-13 21:27:42 +00002815 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002816 # verify/cleanup failure sets the execution subdir, so reset it here
2817 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002818 if self.meta_host:
2819 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002820
2821
jadmanski0afbb632008-06-06 21:10:57 +00002822 def handle_host_failure(self):
2823 """\
2824 Called when this queue entry's host has failed verification and
2825 repair.
2826 """
2827 assert not self.meta_host
showard8cc058f2009-09-08 16:26:33 +00002828 self.set_status(models.HostQueueEntry.Status.FAILED)
showard2bab8f42008-11-12 18:15:22 +00002829 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002830
2831
jadmanskif7fa2cc2008-10-01 14:13:23 +00002832 @property
2833 def aborted_by(self):
2834 self._load_abort_info()
2835 return self._aborted_by
2836
2837
2838 @property
2839 def aborted_on(self):
2840 self._load_abort_info()
2841 return self._aborted_on
2842
2843
2844 def _load_abort_info(self):
2845 """ Fetch info about who aborted the job. """
2846 if hasattr(self, "_aborted_by"):
2847 return
2848 rows = _db.execute("""
2849 SELECT users.login, aborted_host_queue_entries.aborted_on
2850 FROM aborted_host_queue_entries
2851 INNER JOIN users
2852 ON users.id = aborted_host_queue_entries.aborted_by_id
2853 WHERE aborted_host_queue_entries.queue_entry_id = %s
2854 """, (self.id,))
2855 if rows:
2856 self._aborted_by, self._aborted_on = rows[0]
2857 else:
2858 self._aborted_by = self._aborted_on = None
2859
2860
showardb2e2c322008-10-14 17:33:55 +00002861 def on_pending(self):
2862 """
2863 Called when an entry in a synchronous job has passed verify. If the
showard8cc058f2009-09-08 16:26:33 +00002864 job is ready to run, sets the entries to STARTING. Otherwise, it leaves
2865 them in PENDING.
showardb2e2c322008-10-14 17:33:55 +00002866 """
showard8cc058f2009-09-08 16:26:33 +00002867 self.set_status(models.HostQueueEntry.Status.PENDING)
2868 self.host.set_status(models.Host.Status.PENDING)
showardb000a8d2009-07-28 20:02:07 +00002869
2870 # Some debug code here: sends an email if an asynchronous job does not
2871 # immediately enter Starting.
2872 # TODO: Remove this once we figure out why asynchronous jobs are getting
2873 # stuck in Pending.
showard8cc058f2009-09-08 16:26:33 +00002874 self.job.run_if_ready(queue_entry=self)
2875 if (self.job.synch_count == 1 and
2876 self.status == models.HostQueueEntry.Status.PENDING):
showardb000a8d2009-07-28 20:02:07 +00002877 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
2878 message = 'Asynchronous job stuck in Pending'
2879 email_manager.manager.enqueue_notify_email(subject, message)
showardb2e2c322008-10-14 17:33:55 +00002880
2881
showardd3dc1992009-04-22 21:01:40 +00002882 def abort(self, dispatcher):
2883 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002884
showardd3dc1992009-04-22 21:01:40 +00002885 Status = models.HostQueueEntry.Status
showard8cc058f2009-09-08 16:26:33 +00002886 if self.status in (Status.GATHERING, Status.PARSING):
showardd3dc1992009-04-22 21:01:40 +00002887 # do nothing; post-job tasks will finish and then mark this entry
2888 # with status "Aborted" and take care of the host
2889 return
2890
showard8cc058f2009-09-08 16:26:33 +00002891 if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING):
2892 assert not dispatcher.get_agents_for_entry(self)
showardd3dc1992009-04-22 21:01:40 +00002893 self.host.set_status(models.Host.Status.READY)
2894 elif self.status == Status.VERIFYING:
showard8cc058f2009-09-08 16:26:33 +00002895 models.SpecialTask.objects.create(
2896 task=models.SpecialTask.Task.CLEANUP,
2897 host=models.Host(id=self.host.id))
showardd3dc1992009-04-22 21:01:40 +00002898
2899 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002900
showard8cc058f2009-09-08 16:26:33 +00002901
2902 def get_group_name(self):
2903 atomic_group = self.atomic_group
2904 if not atomic_group:
2905 return ''
2906
2907 # Look at any meta_host and dependency labels and pick the first
2908 # one that also specifies this atomic group. Use that label name
2909 # as the group name if possible (it is more specific).
2910 for label in self.get_labels():
2911 if label.atomic_group_id:
2912 assert label.atomic_group_id == atomic_group.id
2913 return label.name
2914 return atomic_group.name
2915
2916
showard170873e2009-01-07 00:22:26 +00002917 def execution_tag(self):
2918 assert self.execution_subdir
mblighe7d9c602009-07-02 19:02:33 +00002919 return "%s/%s" % (self.job.tag(), self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002920
2921
showarded2afea2009-07-07 20:54:07 +00002922 def execution_path(self):
2923 return self.execution_tag()
2924
2925
mbligh36768f02008-02-22 18:28:33 +00002926class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002927 _table_name = 'jobs'
2928 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2929 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002930 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002931 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002932
showard77182562009-06-10 00:16:05 +00002933 # This does not need to be a column in the DB. The delays are likely to
2934 # be configured short. If the scheduler is stopped and restarted in
2935 # the middle of a job's delay cycle, the delay cycle will either be
2936 # repeated or skipped depending on the number of Pending machines found
2937 # when the restarted scheduler recovers to track it. Not a problem.
2938 #
2939 # A reference to the DelayedCallTask that will wake up the job should
2940 # no other HQEs change state in time. Its end_time attribute is used
2941 # by our run_with_ready_delay() method to determine if the wait is over.
2942 _delay_ready_task = None
2943
2944 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
2945 # all status='Pending' atomic group HQEs incase a delay was running when the
2946 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00002947
showarda3c58572009-03-12 20:36:59 +00002948 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002949 assert id or row
showarda3c58572009-03-12 20:36:59 +00002950 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002951
mblighe2586682008-02-29 22:45:46 +00002952
jadmanski0afbb632008-06-06 21:10:57 +00002953 def is_server_job(self):
2954 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002955
2956
showard170873e2009-01-07 00:22:26 +00002957 def tag(self):
2958 return "%s-%s" % (self.id, self.owner)
2959
2960
jadmanski0afbb632008-06-06 21:10:57 +00002961 def get_host_queue_entries(self):
2962 rows = _db.execute("""
2963 SELECT * FROM host_queue_entries
2964 WHERE job_id= %s
2965 """, (self.id,))
2966 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002967
jadmanski0afbb632008-06-06 21:10:57 +00002968 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002969
jadmanski0afbb632008-06-06 21:10:57 +00002970 return entries
mbligh36768f02008-02-22 18:28:33 +00002971
2972
jadmanski0afbb632008-06-06 21:10:57 +00002973 def set_status(self, status, update_queues=False):
2974 self.update_field('status',status)
2975
2976 if update_queues:
2977 for queue_entry in self.get_host_queue_entries():
2978 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002979
2980
showard77182562009-06-10 00:16:05 +00002981 def _atomic_and_has_started(self):
2982 """
2983 @returns True if any of the HostQueueEntries associated with this job
2984 have entered the Status.STARTING state or beyond.
2985 """
2986 atomic_entries = models.HostQueueEntry.objects.filter(
2987 job=self.id, atomic_group__isnull=False)
2988 if atomic_entries.count() <= 0:
2989 return False
2990
showardaf8b4ca2009-06-16 18:47:26 +00002991 # These states may *only* be reached if Job.run() has been called.
2992 started_statuses = (models.HostQueueEntry.Status.STARTING,
2993 models.HostQueueEntry.Status.RUNNING,
2994 models.HostQueueEntry.Status.COMPLETED)
2995
2996 started_entries = atomic_entries.filter(status__in=started_statuses)
showard77182562009-06-10 00:16:05 +00002997 return started_entries.count() > 0
2998
2999
showard708b3522009-08-20 23:26:15 +00003000 def _hosts_assigned_count(self):
3001 """The number of HostQueueEntries assigned a Host for this job."""
3002 entries = models.HostQueueEntry.objects.filter(job=self.id,
3003 host__isnull=False)
3004 return entries.count()
3005
3006
showard77182562009-06-10 00:16:05 +00003007 def _pending_count(self):
3008 """The number of HostQueueEntries for this job in the Pending state."""
3009 pending_entries = models.HostQueueEntry.objects.filter(
3010 job=self.id, status=models.HostQueueEntry.Status.PENDING)
3011 return pending_entries.count()
3012
3013
jadmanski0afbb632008-06-06 21:10:57 +00003014 def is_ready(self):
showard77182562009-06-10 00:16:05 +00003015 # NOTE: Atomic group jobs stop reporting ready after they have been
3016 # started to avoid launching multiple copies of one atomic job.
3017 # Only possible if synch_count is less than than half the number of
3018 # machines in the atomic group.
showardb000a8d2009-07-28 20:02:07 +00003019 pending_count = self._pending_count()
3020 atomic_and_has_started = self._atomic_and_has_started()
3021 ready = (pending_count >= self.synch_count
3022 and not self._atomic_and_has_started())
3023
3024 if not ready:
3025 logging.info(
3026 'Job %s not ready: %s pending, %s required '
3027 '(Atomic and started: %s)',
3028 self, pending_count, self.synch_count,
3029 atomic_and_has_started)
3030
3031 return ready
mbligh36768f02008-02-22 18:28:33 +00003032
3033
jadmanski0afbb632008-06-06 21:10:57 +00003034 def num_machines(self, clause = None):
3035 sql = "job_id=%s" % self.id
3036 if clause:
3037 sql += " AND (%s)" % clause
3038 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00003039
3040
jadmanski0afbb632008-06-06 21:10:57 +00003041 def num_queued(self):
3042 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00003043
3044
jadmanski0afbb632008-06-06 21:10:57 +00003045 def num_active(self):
3046 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00003047
3048
jadmanski0afbb632008-06-06 21:10:57 +00003049 def num_complete(self):
3050 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00003051
3052
jadmanski0afbb632008-06-06 21:10:57 +00003053 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00003054 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00003055
mbligh36768f02008-02-22 18:28:33 +00003056
showard6bb7c292009-01-30 01:44:51 +00003057 def _not_yet_run_entries(self, include_verifying=True):
3058 statuses = [models.HostQueueEntry.Status.QUEUED,
3059 models.HostQueueEntry.Status.PENDING]
3060 if include_verifying:
3061 statuses.append(models.HostQueueEntry.Status.VERIFYING)
3062 return models.HostQueueEntry.objects.filter(job=self.id,
3063 status__in=statuses)
3064
3065
3066 def _stop_all_entries(self):
3067 entries_to_stop = self._not_yet_run_entries(
3068 include_verifying=False)
3069 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00003070 assert not child_entry.complete, (
3071 '%s status=%s, active=%s, complete=%s' %
3072 (child_entry.id, child_entry.status, child_entry.active,
3073 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00003074 if child_entry.status == models.HostQueueEntry.Status.PENDING:
3075 child_entry.host.status = models.Host.Status.READY
3076 child_entry.host.save()
3077 child_entry.status = models.HostQueueEntry.Status.STOPPED
3078 child_entry.save()
3079
showard2bab8f42008-11-12 18:15:22 +00003080 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00003081 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00003082 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00003083 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00003084
3085
jadmanski0afbb632008-06-06 21:10:57 +00003086 def write_to_machines_file(self, queue_entry):
3087 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00003088 file_path = os.path.join(self.tag(), '.machines')
3089 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00003090
3091
showardf1ae3542009-05-11 19:26:02 +00003092 def _next_group_name(self, group_name=''):
3093 """@returns a directory name to use for the next host group results."""
3094 if group_name:
3095 # Sanitize for use as a pathname.
3096 group_name = group_name.replace(os.path.sep, '_')
3097 if group_name.startswith('.'):
3098 group_name = '_' + group_name[1:]
3099 # Add a separator between the group name and 'group%d'.
3100 group_name += '.'
3101 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00003102 query = models.HostQueueEntry.objects.filter(
3103 job=self.id).values('execution_subdir').distinct()
3104 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00003105 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
3106 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00003107 if ids:
3108 next_id = max(ids) + 1
3109 else:
3110 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00003111 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00003112
3113
showarddb502762009-09-09 15:31:20 +00003114 def _write_control_file(self, execution_path):
showard170873e2009-01-07 00:22:26 +00003115 control_path = _drone_manager.attach_file_to_execution(
showarddb502762009-09-09 15:31:20 +00003116 execution_path, self.control_file)
showard170873e2009-01-07 00:22:26 +00003117 return control_path
mbligh36768f02008-02-22 18:28:33 +00003118
showardb2e2c322008-10-14 17:33:55 +00003119
showard2bab8f42008-11-12 18:15:22 +00003120 def get_group_entries(self, queue_entry_from_group):
3121 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00003122 return list(HostQueueEntry.fetch(
3123 where='job_id=%s AND execution_subdir=%s',
3124 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00003125
3126
showard8cc058f2009-09-08 16:26:33 +00003127 def get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00003128 assert queue_entries
showarddb502762009-09-09 15:31:20 +00003129 execution_path = queue_entries[0].execution_path()
3130 control_path = self._write_control_file(execution_path)
jadmanski0afbb632008-06-06 21:10:57 +00003131 hostnames = ','.join([entry.get_host().hostname
3132 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00003133
showarddb502762009-09-09 15:31:20 +00003134 execution_tag = queue_entries[0].execution_tag()
showard87ba02a2009-04-20 19:37:32 +00003135 params = _autoserv_command_line(
showarded2afea2009-07-07 20:54:07 +00003136 hostnames,
showard87ba02a2009-04-20 19:37:32 +00003137 ['-P', execution_tag, '-n',
3138 _drone_manager.absolute_path(control_path)],
showarde9c69362009-06-30 01:58:03 +00003139 job=self, verbose=False)
mbligh36768f02008-02-22 18:28:33 +00003140
jadmanski0afbb632008-06-06 21:10:57 +00003141 if not self.is_server_job():
3142 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00003143
showardb2e2c322008-10-14 17:33:55 +00003144 return params
mblighe2586682008-02-29 22:45:46 +00003145
mbligh36768f02008-02-22 18:28:33 +00003146
showardc9ae1782009-01-30 01:42:37 +00003147 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00003148 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00003149 return True
showard0fc38302008-10-23 00:44:07 +00003150 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00003151 return queue_entry.get_host().dirty
3152 return False
showard21baa452008-10-21 00:08:39 +00003153
showardc9ae1782009-01-30 01:42:37 +00003154
showard8cc058f2009-09-08 16:26:33 +00003155 def _should_run_verify(self, queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003156 do_not_verify = (queue_entry.host.protection ==
3157 host_protections.Protection.DO_NOT_VERIFY)
3158 if do_not_verify:
3159 return False
3160 return self.run_verify
3161
3162
showard8cc058f2009-09-08 16:26:33 +00003163 def schedule_pre_job_tasks(self, queue_entry):
showard77182562009-06-10 00:16:05 +00003164 """
3165 Get a list of tasks to perform before the host_queue_entry
3166 may be used to run this Job (such as Cleanup & Verify).
3167
3168 @returns A list of tasks to be done to the given queue_entry before
mbligh6fbdb802009-08-03 16:42:55 +00003169 it should be considered be ready to run this job. The last
showard77182562009-06-10 00:16:05 +00003170 task in the list calls HostQueueEntry.on_pending(), which
3171 continues the flow of the job.
3172 """
showardc9ae1782009-01-30 01:42:37 +00003173 if self._should_run_cleanup(queue_entry):
showard8cc058f2009-09-08 16:26:33 +00003174 task = models.SpecialTask.Task.CLEANUP
3175 elif self._should_run_verify(queue_entry):
3176 task = models.SpecialTask.Task.VERIFY
3177 else:
3178 queue_entry.on_pending()
3179 return
3180
3181 models.SpecialTask.objects.create(
3182 host=models.Host(id=queue_entry.host_id),
3183 queue_entry=models.HostQueueEntry(id=queue_entry.id),
3184 task=task)
showard21baa452008-10-21 00:08:39 +00003185
3186
showardf1ae3542009-05-11 19:26:02 +00003187 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00003188 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00003189 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00003190 else:
showardf1ae3542009-05-11 19:26:02 +00003191 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00003192 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00003193 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00003194 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003195
3196 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00003197 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003198
3199
3200 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00003201 """
3202 @returns A tuple containing a list of HostQueueEntry instances to be
3203 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00003204 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00003205 """
showard77182562009-06-10 00:16:05 +00003206 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00003207 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00003208 if atomic_group:
3209 num_entries_wanted = atomic_group.max_number_of_machines
3210 else:
3211 num_entries_wanted = self.synch_count
3212 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00003213
showardf1ae3542009-05-11 19:26:02 +00003214 if num_entries_wanted > 0:
3215 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00003216 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00003217 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00003218 params=(self.id, include_queue_entry.id)))
3219
3220 # Sort the chosen hosts by hostname before slicing.
3221 def cmp_queue_entries_by_hostname(entry_a, entry_b):
3222 return Host.cmp_for_sort(entry_a.host, entry_b.host)
3223 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
3224 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00003225
showardf1ae3542009-05-11 19:26:02 +00003226 # Sanity check. We'll only ever be called if this can be met.
showard828fc4c2009-09-14 20:31:00 +00003227 if len(chosen_entries) < self.synch_count:
3228 message = ('job %s got less than %s chosen entries: %s' % (
3229 self.id, self.synch_count, chosen_entries))
3230 logging.error(message)
3231 email_manager.manager.enqueue_notify_email(
3232 'Job not started, too few chosen entries', message)
3233 return []
showardf1ae3542009-05-11 19:26:02 +00003234
showard8cc058f2009-09-08 16:26:33 +00003235 group_name = include_queue_entry.get_group_name()
showardf1ae3542009-05-11 19:26:02 +00003236
3237 self._assign_new_group(chosen_entries, group_name=group_name)
showard8cc058f2009-09-08 16:26:33 +00003238 return chosen_entries
showard2bab8f42008-11-12 18:15:22 +00003239
3240
showard77182562009-06-10 00:16:05 +00003241 def run_if_ready(self, queue_entry):
3242 """
3243 @returns An Agent instance to ultimately run this job if enough hosts
3244 are ready for it to run.
3245 @returns None and potentially cleans up excess hosts if this Job
3246 is not ready to run.
3247 """
showardb2e2c322008-10-14 17:33:55 +00003248 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00003249 self.stop_if_necessary()
showard8cc058f2009-09-08 16:26:33 +00003250 elif queue_entry.atomic_group:
3251 self.run_with_ready_delay(queue_entry)
3252 else:
3253 self.run(queue_entry)
showard77182562009-06-10 00:16:05 +00003254
3255
3256 def run_with_ready_delay(self, queue_entry):
3257 """
3258 Start a delay to wait for more hosts to enter Pending state before
3259 launching an atomic group job. Once set, the a delay cannot be reset.
3260
3261 @param queue_entry: The HostQueueEntry object to get atomic group
3262 info from and pass to run_if_ready when the delay is up.
3263
3264 @returns An Agent to run the job as appropriate or None if a delay
3265 has already been set.
3266 """
3267 assert queue_entry.job_id == self.id
3268 assert queue_entry.atomic_group
3269 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
showard708b3522009-08-20 23:26:15 +00003270 pending_threshold = min(self._hosts_assigned_count(),
3271 queue_entry.atomic_group.max_number_of_machines)
showard77182562009-06-10 00:16:05 +00003272 over_max_threshold = (self._pending_count() >= pending_threshold)
3273 delay_expired = (self._delay_ready_task and
3274 time.time() >= self._delay_ready_task.end_time)
3275
3276 # Delay is disabled or we already have enough? Do not wait to run.
3277 if not delay or over_max_threshold or delay_expired:
showard8cc058f2009-09-08 16:26:33 +00003278 self.run(queue_entry)
3279 else:
3280 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
showard77182562009-06-10 00:16:05 +00003281
showard8cc058f2009-09-08 16:26:33 +00003282
3283 def schedule_delayed_callback_task(self, queue_entry):
3284 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
3285
showard77182562009-06-10 00:16:05 +00003286 if self._delay_ready_task:
3287 return None
3288
showard8cc058f2009-09-08 16:26:33 +00003289 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
3290
showard77182562009-06-10 00:16:05 +00003291 def run_job_after_delay():
3292 logging.info('Job %s done waiting for extra hosts.', self.id)
3293 return self.run(queue_entry)
3294
showard708b3522009-08-20 23:26:15 +00003295 logging.info('Job %s waiting up to %s seconds for more hosts.',
3296 self.id, delay)
showard77182562009-06-10 00:16:05 +00003297 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3298 callback=run_job_after_delay)
showard8cc058f2009-09-08 16:26:33 +00003299 return self._delay_ready_task
showard77182562009-06-10 00:16:05 +00003300
3301
3302 def run(self, queue_entry):
3303 """
3304 @param queue_entry: The HostQueueEntry instance calling this method.
showard77182562009-06-10 00:16:05 +00003305 """
3306 if queue_entry.atomic_group and self._atomic_and_has_started():
3307 logging.error('Job.run() called on running atomic Job %d '
3308 'with HQE %s.', self.id, queue_entry)
showard8cc058f2009-09-08 16:26:33 +00003309 return
3310 queue_entries = self._choose_group_to_run(queue_entry)
showard828fc4c2009-09-14 20:31:00 +00003311 if queue_entries:
3312 self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00003313
3314
showard8cc058f2009-09-08 16:26:33 +00003315 def _finish_run(self, queue_entries):
showardb2ccdda2008-10-28 20:39:05 +00003316 for queue_entry in queue_entries:
showard8cc058f2009-09-08 16:26:33 +00003317 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showard77182562009-06-10 00:16:05 +00003318 if self._delay_ready_task:
3319 # Cancel any pending callback that would try to run again
3320 # as we are already running.
3321 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003322
showardb000a8d2009-07-28 20:02:07 +00003323 def __str__(self):
3324 return '%s-%s' % (self.id, self.owner)
3325
3326
mbligh36768f02008-02-22 18:28:33 +00003327if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003328 main()