blob: ce33adda38933a8aaf303e44949d5f8ff221640b [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showard136e6dc2009-06-10 19:38:49 +000010import itertools, logging, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard043c62a2009-06-10 19:48:57 +000013from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard136e6dc2009-06-10 19:38:49 +000015from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000016from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000017from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000018from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000019from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000020from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000021from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000022
showard549afad2009-08-20 23:33:36 +000023BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
24PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000025
mbligh36768f02008-02-22 18:28:33 +000026RESULTS_DIR = '.'
27AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000028DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000029AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
30
31if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000032 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000033AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
34AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
35
36if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000037 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000038
showardd3dc1992009-04-22 21:01:40 +000039_AUTOSERV_PID_FILE = '.autoserv_execute'
40_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
41_PARSER_PID_FILE = '.parser_execute'
42
43_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
44 _PARSER_PID_FILE)
45
showard35162b02009-03-03 02:17:30 +000046# error message to leave in results dir when an autoserv process disappears
47# mysteriously
48_LOST_PROCESS_ERROR = """\
49Autoserv failed abnormally during execution for this job, probably due to a
50system error on the Autotest server. Full results may not be available. Sorry.
51"""
52
mbligh6f8bab42008-02-29 22:45:14 +000053_db = None
mbligh36768f02008-02-22 18:28:33 +000054_shutdown = False
showard170873e2009-01-07 00:22:26 +000055_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
56_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000057_testing_mode = False
showard542e8402008-09-19 20:16:18 +000058_base_url = None
showardc85c21b2008-11-24 22:17:37 +000059_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000060_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000061
62
showardec6a3b92009-09-25 20:29:13 +000063def _get_pidfile_timeout_secs():
64 """@returns How long to wait for autoserv to write pidfile."""
65 pidfile_timeout_mins = global_config.global_config.get_config_value(
66 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
67 return pidfile_timeout_mins * 60
68
69
mbligh83c1e9e2009-05-01 23:10:41 +000070def _site_init_monitor_db_dummy():
71 return {}
72
73
mbligh36768f02008-02-22 18:28:33 +000074def main():
showard27f33872009-04-07 18:20:53 +000075 try:
showard549afad2009-08-20 23:33:36 +000076 try:
77 main_without_exception_handling()
78 except SystemExit:
79 raise
80 except:
81 logging.exception('Exception escaping in monitor_db')
82 raise
83 finally:
84 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000085
86
87def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000088 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000089
showard136e6dc2009-06-10 19:38:49 +000090 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000091 parser = optparse.OptionParser(usage)
92 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
93 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000094 parser.add_option('--test', help='Indicate that scheduler is under ' +
95 'test and should use dummy autoserv and no parsing',
96 action='store_true')
97 (options, args) = parser.parse_args()
98 if len(args) != 1:
99 parser.print_usage()
100 return
mbligh36768f02008-02-22 18:28:33 +0000101
showard5613c662009-06-08 23:30:33 +0000102 scheduler_enabled = global_config.global_config.get_config_value(
103 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
104
105 if not scheduler_enabled:
106 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
107 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000108 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000109 sys.exit(1)
110
jadmanski0afbb632008-06-06 21:10:57 +0000111 global RESULTS_DIR
112 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000113
mbligh83c1e9e2009-05-01 23:10:41 +0000114 site_init = utils.import_site_function(__file__,
115 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
116 _site_init_monitor_db_dummy)
117 site_init()
118
showardcca334f2009-03-12 20:38:34 +0000119 # Change the cwd while running to avoid issues incase we were launched from
120 # somewhere odd (such as a random NFS home directory of the person running
121 # sudo to launch us as the appropriate user).
122 os.chdir(RESULTS_DIR)
123
jadmanski0afbb632008-06-06 21:10:57 +0000124 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000125 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
126 "notify_email_statuses",
127 default='')
showardc85c21b2008-11-24 22:17:37 +0000128 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000129 _notify_email_statuses = [status for status in
130 re.split(r'[\s,;:]', notify_statuses_list.lower())
131 if status]
showardc85c21b2008-11-24 22:17:37 +0000132
jadmanski0afbb632008-06-06 21:10:57 +0000133 if options.test:
134 global _autoserv_path
135 _autoserv_path = 'autoserv_dummy'
136 global _testing_mode
137 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000138
mbligh37eceaa2008-12-15 22:56:37 +0000139 # AUTOTEST_WEB.base_url is still a supported config option as some people
140 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000141 global _base_url
showard170873e2009-01-07 00:22:26 +0000142 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
143 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000144 if config_base_url:
145 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000146 else:
mbligh37eceaa2008-12-15 22:56:37 +0000147 # For the common case of everything running on a single server you
148 # can just set the hostname in a single place in the config file.
149 server_name = c.get_config_value('SERVER', 'hostname')
150 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000151 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000152 sys.exit(1)
153 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000154
showardc5afc462009-01-13 00:09:39 +0000155 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000156 server.start()
157
jadmanski0afbb632008-06-06 21:10:57 +0000158 try:
showard136e6dc2009-06-10 19:38:49 +0000159 init()
showardc5afc462009-01-13 00:09:39 +0000160 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000161 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000162
jadmanski0afbb632008-06-06 21:10:57 +0000163 while not _shutdown:
164 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000165 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000166 except:
showard170873e2009-01-07 00:22:26 +0000167 email_manager.manager.log_stacktrace(
168 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000169
showard170873e2009-01-07 00:22:26 +0000170 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000171 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000172 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000173 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000174
175
showard136e6dc2009-06-10 19:38:49 +0000176def setup_logging():
177 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
178 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
179 logging_manager.configure_logging(
180 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
181 logfile_name=log_name)
182
183
mbligh36768f02008-02-22 18:28:33 +0000184def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000185 global _shutdown
186 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000187 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000188
189
showard136e6dc2009-06-10 19:38:49 +0000190def init():
showardb18134f2009-03-20 20:52:18 +0000191 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
192 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000193
showard8de37132009-08-31 18:33:08 +0000194 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000195 logging.critical("monitor_db already running, aborting!")
196 sys.exit(1)
197 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000198
showardb1e51872008-10-07 11:08:18 +0000199 if _testing_mode:
200 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000201 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000202
jadmanski0afbb632008-06-06 21:10:57 +0000203 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
204 global _db
showard170873e2009-01-07 00:22:26 +0000205 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000206 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000207
showardfa8629c2008-11-04 16:51:23 +0000208 # ensure Django connection is in autocommit
209 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000210 # bypass the readonly connection
211 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000212
showardb18134f2009-03-20 20:52:18 +0000213 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000214 signal.signal(signal.SIGINT, handle_sigint)
215
showardd1ee1dd2009-01-07 21:33:08 +0000216 drones = global_config.global_config.get_config_value(
217 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
218 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000219 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000220 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000221 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
222
showardb18134f2009-03-20 20:52:18 +0000223 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000224
225
showarded2afea2009-07-07 20:54:07 +0000226def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
227 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000228 """
229 @returns The autoserv command line as a list of executable + parameters.
230
231 @param machines - string - A machine or comma separated list of machines
232 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000233 @param extra_args - list - Additional arguments to pass to autoserv.
234 @param job - Job object - If supplied, -u owner and -l name parameters
235 will be added.
236 @param queue_entry - A HostQueueEntry object - If supplied and no Job
237 object was supplied, this will be used to lookup the Job object.
238 """
showard87ba02a2009-04-20 19:37:32 +0000239 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
showarded2afea2009-07-07 20:54:07 +0000240 '-r', drone_manager.WORKING_DIRECTORY]
showard87ba02a2009-04-20 19:37:32 +0000241 if job or queue_entry:
242 if not job:
243 job = queue_entry.job
244 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000245 if verbose:
246 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000247 return autoserv_argv + extra_args
248
249
showard89f84db2009-03-12 20:39:13 +0000250class SchedulerError(Exception):
251 """Raised by HostScheduler when an inconsistent state occurs."""
252
253
showard63a34772008-08-18 19:32:50 +0000254class HostScheduler(object):
255 def _get_ready_hosts(self):
256 # avoid any host with a currently active queue entry against it
257 hosts = Host.fetch(
258 joins='LEFT JOIN host_queue_entries AS active_hqe '
259 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000260 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000261 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000262 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000263 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
264 return dict((host.id, host) for host in hosts)
265
266
267 @staticmethod
268 def _get_sql_id_list(id_list):
269 return ','.join(str(item_id) for item_id in id_list)
270
271
272 @classmethod
showard989f25d2008-10-01 11:38:11 +0000273 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000274 if not id_list:
275 return {}
showard63a34772008-08-18 19:32:50 +0000276 query %= cls._get_sql_id_list(id_list)
277 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000278 return cls._process_many2many_dict(rows, flip)
279
280
281 @staticmethod
282 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000283 result = {}
284 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000285 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000286 if flip:
287 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000288 result.setdefault(left_id, set()).add(right_id)
289 return result
290
291
292 @classmethod
293 def _get_job_acl_groups(cls, job_ids):
294 query = """
showardd9ac4452009-02-07 02:04:37 +0000295 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000296 FROM jobs
297 INNER JOIN users ON users.login = jobs.owner
298 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
299 WHERE jobs.id IN (%s)
300 """
301 return cls._get_many2many_dict(query, job_ids)
302
303
304 @classmethod
305 def _get_job_ineligible_hosts(cls, job_ids):
306 query = """
307 SELECT job_id, host_id
308 FROM ineligible_host_queues
309 WHERE job_id IN (%s)
310 """
311 return cls._get_many2many_dict(query, job_ids)
312
313
314 @classmethod
showard989f25d2008-10-01 11:38:11 +0000315 def _get_job_dependencies(cls, job_ids):
316 query = """
317 SELECT job_id, label_id
318 FROM jobs_dependency_labels
319 WHERE job_id IN (%s)
320 """
321 return cls._get_many2many_dict(query, job_ids)
322
323
324 @classmethod
showard63a34772008-08-18 19:32:50 +0000325 def _get_host_acls(cls, host_ids):
326 query = """
showardd9ac4452009-02-07 02:04:37 +0000327 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000328 FROM acl_groups_hosts
329 WHERE host_id IN (%s)
330 """
331 return cls._get_many2many_dict(query, host_ids)
332
333
334 @classmethod
335 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000336 if not host_ids:
337 return {}, {}
showard63a34772008-08-18 19:32:50 +0000338 query = """
339 SELECT label_id, host_id
340 FROM hosts_labels
341 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000342 """ % cls._get_sql_id_list(host_ids)
343 rows = _db.execute(query)
344 labels_to_hosts = cls._process_many2many_dict(rows)
345 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
346 return labels_to_hosts, hosts_to_labels
347
348
349 @classmethod
350 def _get_labels(cls):
351 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000352
353
354 def refresh(self, pending_queue_entries):
355 self._hosts_available = self._get_ready_hosts()
356
357 relevant_jobs = [queue_entry.job_id
358 for queue_entry in pending_queue_entries]
359 self._job_acls = self._get_job_acl_groups(relevant_jobs)
360 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000361 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000362
363 host_ids = self._hosts_available.keys()
364 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000365 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
366
367 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000368
369
370 def _is_acl_accessible(self, host_id, queue_entry):
371 job_acls = self._job_acls.get(queue_entry.job_id, set())
372 host_acls = self._host_acls.get(host_id, set())
373 return len(host_acls.intersection(job_acls)) > 0
374
375
showard989f25d2008-10-01 11:38:11 +0000376 def _check_job_dependencies(self, job_dependencies, host_labels):
377 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000378 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000379
380
381 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
382 queue_entry):
showardade14e22009-01-26 22:38:32 +0000383 if not queue_entry.meta_host:
384 # bypass only_if_needed labels when a specific host is selected
385 return True
386
showard989f25d2008-10-01 11:38:11 +0000387 for label_id in host_labels:
388 label = self._labels[label_id]
389 if not label.only_if_needed:
390 # we don't care about non-only_if_needed labels
391 continue
392 if queue_entry.meta_host == label_id:
393 # if the label was requested in a metahost it's OK
394 continue
395 if label_id not in job_dependencies:
396 return False
397 return True
398
399
showard89f84db2009-03-12 20:39:13 +0000400 def _check_atomic_group_labels(self, host_labels, queue_entry):
401 """
402 Determine if the given HostQueueEntry's atomic group settings are okay
403 to schedule on a host with the given labels.
404
showard6157c632009-07-06 20:19:31 +0000405 @param host_labels: A list of label ids that the host has.
406 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000407
408 @returns True if atomic group settings are okay, False otherwise.
409 """
showard6157c632009-07-06 20:19:31 +0000410 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000411 queue_entry.atomic_group_id)
412
413
showard6157c632009-07-06 20:19:31 +0000414 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000415 """
416 Return the atomic group label id for a host with the given set of
417 labels if any, or None otherwise. Raises an exception if more than
418 one atomic group are found in the set of labels.
419
showard6157c632009-07-06 20:19:31 +0000420 @param host_labels: A list of label ids that the host has.
421 @param queue_entry: The HostQueueEntry we're testing. Only used for
422 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000423
424 @returns The id of the atomic group found on a label in host_labels
425 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000426 """
showard6157c632009-07-06 20:19:31 +0000427 atomic_labels = [self._labels[label_id] for label_id in host_labels
428 if self._labels[label_id].atomic_group_id is not None]
429 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000430 if not atomic_ids:
431 return None
432 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000433 logging.error('More than one Atomic Group on HQE "%s" via: %r',
434 queue_entry, atomic_labels)
435 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000436
437
438 def _get_atomic_group_labels(self, atomic_group_id):
439 """
440 Lookup the label ids that an atomic_group is associated with.
441
442 @param atomic_group_id - The id of the AtomicGroup to look up.
443
444 @returns A generator yeilding Label ids for this atomic group.
445 """
446 return (id for id, label in self._labels.iteritems()
447 if label.atomic_group_id == atomic_group_id
448 and not label.invalid)
449
450
showard54c1ea92009-05-20 00:32:58 +0000451 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000452 """
453 @param group_hosts - A sequence of Host ids to test for usability
454 and eligibility against the Job associated with queue_entry.
455 @param queue_entry - The HostQueueEntry that these hosts are being
456 tested for eligibility against.
457
458 @returns A subset of group_hosts Host ids that are eligible for the
459 supplied queue_entry.
460 """
461 return set(host_id for host_id in group_hosts
462 if self._is_host_usable(host_id)
463 and self._is_host_eligible_for_job(host_id, queue_entry))
464
465
showard989f25d2008-10-01 11:38:11 +0000466 def _is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000467 if self._is_host_invalid(host_id):
468 # if an invalid host is scheduled for a job, it's a one-time host
469 # and it therefore bypasses eligibility checks. note this can only
470 # happen for non-metahosts, because invalid hosts have their label
471 # relationships cleared.
472 return True
473
showard989f25d2008-10-01 11:38:11 +0000474 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
475 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000476
showard89f84db2009-03-12 20:39:13 +0000477 return (self._is_acl_accessible(host_id, queue_entry) and
478 self._check_job_dependencies(job_dependencies, host_labels) and
479 self._check_only_if_needed_labels(
480 job_dependencies, host_labels, queue_entry) and
481 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000482
483
showard2924b0a2009-06-18 23:16:15 +0000484 def _is_host_invalid(self, host_id):
485 host_object = self._hosts_available.get(host_id, None)
486 return host_object and host_object.invalid
487
488
showard63a34772008-08-18 19:32:50 +0000489 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000490 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000491 return None
492 return self._hosts_available.pop(queue_entry.host_id, None)
493
494
495 def _is_host_usable(self, host_id):
496 if host_id not in self._hosts_available:
497 # host was already used during this scheduling cycle
498 return False
499 if self._hosts_available[host_id].invalid:
500 # Invalid hosts cannot be used for metahosts. They're included in
501 # the original query because they can be used by non-metahosts.
502 return False
503 return True
504
505
506 def _schedule_metahost(self, queue_entry):
507 label_id = queue_entry.meta_host
508 hosts_in_label = self._label_hosts.get(label_id, set())
509 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
510 set())
511
512 # must iterate over a copy so we can mutate the original while iterating
513 for host_id in list(hosts_in_label):
514 if not self._is_host_usable(host_id):
515 hosts_in_label.remove(host_id)
516 continue
517 if host_id in ineligible_host_ids:
518 continue
showard989f25d2008-10-01 11:38:11 +0000519 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000520 continue
521
showard89f84db2009-03-12 20:39:13 +0000522 # Remove the host from our cached internal state before returning
523 # the host object.
showard63a34772008-08-18 19:32:50 +0000524 hosts_in_label.remove(host_id)
525 return self._hosts_available.pop(host_id)
526 return None
527
528
529 def find_eligible_host(self, queue_entry):
530 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000531 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000532 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000533 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000534 return self._schedule_metahost(queue_entry)
535
536
showard89f84db2009-03-12 20:39:13 +0000537 def find_eligible_atomic_group(self, queue_entry):
538 """
539 Given an atomic group host queue entry, locate an appropriate group
540 of hosts for the associated job to run on.
541
542 The caller is responsible for creating new HQEs for the additional
543 hosts returned in order to run the actual job on them.
544
545 @returns A list of Host instances in a ready state to satisfy this
546 atomic group scheduling. Hosts will all belong to the same
547 atomic group label as specified by the queue_entry.
548 An empty list will be returned if no suitable atomic
549 group could be found.
550
551 TODO(gps): what is responsible for kicking off any attempted repairs on
552 a group of hosts? not this function, but something needs to. We do
553 not communicate that reason for returning [] outside of here...
554 For now, we'll just be unschedulable if enough hosts within one group
555 enter Repair Failed state.
556 """
557 assert queue_entry.atomic_group_id is not None
558 job = queue_entry.job
559 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000560 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000561 if job.synch_count > atomic_group.max_number_of_machines:
562 # Such a Job and HostQueueEntry should never be possible to
563 # create using the frontend. Regardless, we can't process it.
564 # Abort it immediately and log an error on the scheduler.
565 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000566 logging.error(
567 'Error: job %d synch_count=%d > requested atomic_group %d '
568 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
569 job.id, job.synch_count, atomic_group.id,
570 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000571 return []
572 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
573 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
574 set())
575
576 # Look in each label associated with atomic_group until we find one with
577 # enough hosts to satisfy the job.
578 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
579 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
580 if queue_entry.meta_host is not None:
581 # If we have a metahost label, only allow its hosts.
582 group_hosts.intersection_update(hosts_in_label)
583 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000584 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000585 group_hosts, queue_entry)
586
587 # Job.synch_count is treated as "minimum synch count" when
588 # scheduling for an atomic group of hosts. The atomic group
589 # number of machines is the maximum to pick out of a single
590 # atomic group label for scheduling at one time.
591 min_hosts = job.synch_count
592 max_hosts = atomic_group.max_number_of_machines
593
showard54c1ea92009-05-20 00:32:58 +0000594 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000595 # Not enough eligible hosts in this atomic group label.
596 continue
597
showard54c1ea92009-05-20 00:32:58 +0000598 eligible_hosts_in_group = [self._hosts_available[id]
599 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000600 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000601 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000602
showard89f84db2009-03-12 20:39:13 +0000603 # Limit ourselves to scheduling the atomic group size.
604 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000605 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000606
607 # Remove the selected hosts from our cached internal state
608 # of available hosts in order to return the Host objects.
609 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000610 for host in eligible_hosts_in_group:
611 hosts_in_label.discard(host.id)
612 self._hosts_available.pop(host.id)
613 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000614 return host_list
615
616 return []
617
618
showard170873e2009-01-07 00:22:26 +0000619class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000620 def __init__(self):
621 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000622 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000623 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000624 user_cleanup_time = scheduler_config.config.clean_interval
625 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
626 _db, user_cleanup_time)
627 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000628 self._host_agents = {}
629 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000630
mbligh36768f02008-02-22 18:28:33 +0000631
showard915958d2009-04-22 21:00:58 +0000632 def initialize(self, recover_hosts=True):
633 self._periodic_cleanup.initialize()
634 self._24hr_upkeep.initialize()
635
jadmanski0afbb632008-06-06 21:10:57 +0000636 # always recover processes
637 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000638
jadmanski0afbb632008-06-06 21:10:57 +0000639 if recover_hosts:
640 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000641
642
jadmanski0afbb632008-06-06 21:10:57 +0000643 def tick(self):
showard170873e2009-01-07 00:22:26 +0000644 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000645 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000646 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000647 self._process_recurring_runs()
showard8cc058f2009-09-08 16:26:33 +0000648 self._schedule_delay_tasks()
showard8cc058f2009-09-08 16:26:33 +0000649 self._schedule_running_host_queue_entries()
650 self._schedule_special_tasks()
showard65db3932009-10-28 19:54:35 +0000651 self._schedule_new_jobs()
jadmanski0afbb632008-06-06 21:10:57 +0000652 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000653 _drone_manager.execute_actions()
654 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000655
showard97aed502008-11-04 02:01:24 +0000656
mblighf3294cc2009-04-08 21:17:38 +0000657 def _run_cleanup(self):
658 self._periodic_cleanup.run_cleanup_maybe()
659 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000660
mbligh36768f02008-02-22 18:28:33 +0000661
showard170873e2009-01-07 00:22:26 +0000662 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
663 for object_id in object_ids:
664 agent_dict.setdefault(object_id, set()).add(agent)
665
666
667 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
668 for object_id in object_ids:
669 assert object_id in agent_dict
670 agent_dict[object_id].remove(agent)
671
672
jadmanski0afbb632008-06-06 21:10:57 +0000673 def add_agent(self, agent):
674 self._agents.append(agent)
675 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000676 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
677 self._register_agent_for_ids(self._queue_entry_agents,
678 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000679
showard170873e2009-01-07 00:22:26 +0000680
681 def get_agents_for_entry(self, queue_entry):
682 """
683 Find agents corresponding to the specified queue_entry.
684 """
showardd3dc1992009-04-22 21:01:40 +0000685 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000686
687
688 def host_has_agent(self, host):
689 """
690 Determine if there is currently an Agent present using this host.
691 """
692 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000693
694
jadmanski0afbb632008-06-06 21:10:57 +0000695 def remove_agent(self, agent):
696 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000697 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
698 agent)
699 self._unregister_agent_for_ids(self._queue_entry_agents,
700 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000701
702
showard8cc058f2009-09-08 16:26:33 +0000703 def _host_has_scheduled_special_task(self, host):
704 return bool(models.SpecialTask.objects.filter(host__id=host.id,
705 is_active=False,
706 is_complete=False))
707
708
jadmanski0afbb632008-06-06 21:10:57 +0000709 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000710 self._register_pidfiles()
711 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000712 self._recover_all_recoverable_entries()
showard8cc058f2009-09-08 16:26:33 +0000713 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000714 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000715 self._reverify_remaining_hosts()
716 # reinitialize drones after killing orphaned processes, since they can
717 # leave around files when they die
718 _drone_manager.execute_actions()
719 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000720
showard170873e2009-01-07 00:22:26 +0000721
722 def _register_pidfiles(self):
723 # during recovery we may need to read pidfiles for both running and
724 # parsing entries
725 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000726 where="status IN ('Running', 'Gathering', 'Parsing')")
showarded2afea2009-07-07 20:54:07 +0000727 special_tasks = models.SpecialTask.objects.filter(is_active=True)
728 for execution_entry in itertools.chain(queue_entries, special_tasks):
showardd3dc1992009-04-22 21:01:40 +0000729 for pidfile_name in _ALL_PIDFILE_NAMES:
730 pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000731 execution_entry.execution_path(), pidfile_name=pidfile_name)
showardd3dc1992009-04-22 21:01:40 +0000732 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000733
734
showarded2afea2009-07-07 20:54:07 +0000735 def _get_recovery_run_monitor(self, execution_path, pidfile_name, orphans):
736 run_monitor = PidfileRunMonitor()
737 run_monitor.attach_to_existing_process(execution_path,
738 pidfile_name=pidfile_name)
739 if run_monitor.has_process():
740 orphans.discard(run_monitor.get_process())
741 return run_monitor, '(process %s)' % run_monitor.get_process()
742 return None, 'without process'
743
744
showard8cc058f2009-09-08 16:26:33 +0000745 def _get_unassigned_entries(self, status):
746 for entry in HostQueueEntry.fetch(where="status = '%s'" % status):
showard0db3d432009-10-12 20:29:15 +0000747 if entry.status == status and not self.get_agents_for_entry(entry):
748 # The status can change during iteration, e.g., if job.run()
749 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000750 yield entry
751
752
showardd3dc1992009-04-22 21:01:40 +0000753 def _recover_entries_with_status(self, status, orphans, pidfile_name,
754 recover_entries_fn):
showard8cc058f2009-09-08 16:26:33 +0000755 for queue_entry in self._get_unassigned_entries(status):
showardd3dc1992009-04-22 21:01:40 +0000756 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showarded2afea2009-07-07 20:54:07 +0000757 run_monitor, process_string = self._get_recovery_run_monitor(
758 queue_entry.execution_path(), pidfile_name, orphans)
showard8cc058f2009-09-08 16:26:33 +0000759 if not run_monitor:
760 # _schedule_running_host_queue_entries should schedule and
761 # recover these entries
762 continue
showard597bfd32009-05-08 18:22:50 +0000763
showarded2afea2009-07-07 20:54:07 +0000764 logging.info('Recovering %s entry %s %s',status.lower(),
765 ', '.join(str(entry) for entry in queue_entries),
766 process_string)
showardd3dc1992009-04-22 21:01:40 +0000767 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000768
769
showard6878e8b2009-07-20 22:37:45 +0000770 def _check_for_remaining_orphan_processes(self, orphans):
771 if not orphans:
772 return
773 subject = 'Unrecovered orphan autoserv processes remain'
774 message = '\n'.join(str(process) for process in orphans)
775 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000776
777 die_on_orphans = global_config.global_config.get_config_value(
778 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
779
780 if die_on_orphans:
781 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000782
showard170873e2009-01-07 00:22:26 +0000783
showardd3dc1992009-04-22 21:01:40 +0000784 def _recover_running_entries(self, orphans):
785 def recover_entries(job, queue_entries, run_monitor):
showard8cc058f2009-09-08 16:26:33 +0000786 queue_task = QueueTask(job=job, queue_entries=queue_entries,
787 recover_run_monitor=run_monitor)
showard418785b2009-11-23 20:19:59 +0000788 self.add_agent(Agent(task=queue_task))
showardd3dc1992009-04-22 21:01:40 +0000789
790 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
showarded2afea2009-07-07 20:54:07 +0000791 orphans, _AUTOSERV_PID_FILE,
showardd3dc1992009-04-22 21:01:40 +0000792 recover_entries)
793
794
795 def _recover_gathering_entries(self, orphans):
796 def recover_entries(job, queue_entries, run_monitor):
797 gather_task = GatherLogsTask(job, queue_entries,
showarded2afea2009-07-07 20:54:07 +0000798 recover_run_monitor=run_monitor)
showard8cc058f2009-09-08 16:26:33 +0000799 self.add_agent(Agent(gather_task))
showardd3dc1992009-04-22 21:01:40 +0000800
801 self._recover_entries_with_status(
802 models.HostQueueEntry.Status.GATHERING,
803 orphans, _CRASHINFO_PID_FILE, recover_entries)
804
805
806 def _recover_parsing_entries(self, orphans):
807 def recover_entries(job, queue_entries, run_monitor):
808 reparse_task = FinalReparseTask(queue_entries,
showarded2afea2009-07-07 20:54:07 +0000809 recover_run_monitor=run_monitor)
showard418785b2009-11-23 20:19:59 +0000810 self.add_agent(Agent(reparse_task))
showardd3dc1992009-04-22 21:01:40 +0000811
812 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
813 orphans, _PARSER_PID_FILE,
814 recover_entries)
815
816
showard8cc058f2009-09-08 16:26:33 +0000817 def _recover_pending_entries(self):
818 for entry in self._get_unassigned_entries(
819 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000820 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000821 entry.on_pending()
822
823
showardd3dc1992009-04-22 21:01:40 +0000824 def _recover_all_recoverable_entries(self):
825 orphans = _drone_manager.get_orphaned_autoserv_processes()
826 self._recover_running_entries(orphans)
827 self._recover_gathering_entries(orphans)
828 self._recover_parsing_entries(orphans)
showarded2afea2009-07-07 20:54:07 +0000829 self._recover_special_tasks(orphans)
showard6878e8b2009-07-20 22:37:45 +0000830 self._check_for_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000831
showard97aed502008-11-04 02:01:24 +0000832
showarded2afea2009-07-07 20:54:07 +0000833 def _recover_special_tasks(self, orphans):
showard2fe3f1d2009-07-06 20:19:11 +0000834 """\
835 Recovers all special tasks that have started running but have not
836 completed.
837 """
showard2fe3f1d2009-07-06 20:19:11 +0000838 tasks = models.SpecialTask.objects.filter(is_active=True,
839 is_complete=False)
showard65db3932009-10-28 19:54:35 +0000840 for task in tasks:
showard9b6ec502009-08-20 23:25:17 +0000841 if self.host_has_agent(task.host):
842 raise SchedulerError(
843 "%s already has a host agent %s." % (
showard8cc058f2009-09-08 16:26:33 +0000844 task, self._host_agents.get(task.host.id)))
showard2fe3f1d2009-07-06 20:19:11 +0000845
showarded2afea2009-07-07 20:54:07 +0000846 run_monitor, process_string = self._get_recovery_run_monitor(
847 task.execution_path(), _AUTOSERV_PID_FILE, orphans)
848
849 logging.info('Recovering %s %s', task, process_string)
showard65db3932009-10-28 19:54:35 +0000850 self._run_or_recover_special_task(task, run_monitor=run_monitor)
showard6af73ad2009-07-28 20:00:58 +0000851
852
showardb8900452009-10-12 20:31:01 +0000853 def _check_for_unrecovered_verifying_entries(self):
showard170873e2009-01-07 00:22:26 +0000854 queue_entries = HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000855 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
856 unrecovered_hqes = []
857 for queue_entry in queue_entries:
858 special_tasks = models.SpecialTask.objects.filter(
859 task__in=(models.SpecialTask.Task.CLEANUP,
860 models.SpecialTask.Task.VERIFY),
861 queue_entry__id=queue_entry.id,
862 is_complete=False)
863 if special_tasks.count() == 0:
864 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000865
showardb8900452009-10-12 20:31:01 +0000866 if unrecovered_hqes:
867 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
showarde8e37072009-08-20 23:31:30 +0000868 raise SchedulerError(
showard37757f32009-10-19 18:34:24 +0000869 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000870 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000871
872
showard65db3932009-10-28 19:54:35 +0000873 def _get_prioritized_special_tasks(self):
874 """
875 Returns all queued SpecialTasks prioritized for repair first, then
876 cleanup, then verify.
877 """
878 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
879 is_complete=False,
880 host__locked=False)
881 # exclude hosts with active queue entries unless the SpecialTask is for
882 # that queue entry
883 queued_tasks = models.Host.objects.add_join(
884 queued_tasks, 'host_queue_entries', 'host_id',
885 join_condition='host_queue_entries.active',
886 force_left_join=True)
887 queued_tasks = queued_tasks.extra(
888 where=['(host_queue_entries.id IS NULL OR '
889 'host_queue_entries.id = special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000890
showard65db3932009-10-28 19:54:35 +0000891 # reorder tasks by priority
892 task_priority_order = [models.SpecialTask.Task.REPAIR,
893 models.SpecialTask.Task.CLEANUP,
894 models.SpecialTask.Task.VERIFY]
895 def task_priority_key(task):
896 return task_priority_order.index(task.task)
897 return sorted(queued_tasks, key=task_priority_key)
898
899
900 def _run_or_recover_special_task(self, special_task, run_monitor=None):
901 """
902 Construct an AgentTask class to run the given SpecialTask and add it
903 to this dispatcher.
904 @param special_task: a models.SpecialTask instance
905 @run_monitor: if given, a running SpecialTask will be recovered with
906 this monitor.
907 """
908 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
909 for agent_task_class in special_agent_task_classes:
910 if agent_task_class.TASK_TYPE == special_task.task:
911 agent_task = agent_task_class(task=special_task,
912 recover_run_monitor=run_monitor)
913 self.add_agent(Agent(agent_task))
914 return
915
916 email_manager.manager.enqueue_notify_email(
917 'No AgentTask class for task', str(special_task))
918
919
920 def _schedule_special_tasks(self):
921 """
922 Execute queued SpecialTasks that are ready to run on idle hosts.
923 """
924 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000925 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000926 continue
showard65db3932009-10-28 19:54:35 +0000927 self._run_or_recover_special_task(task)
showard1ff7b2e2009-05-15 23:17:18 +0000928
929
showard170873e2009-01-07 00:22:26 +0000930 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000931 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000932 # should never happen
showarded2afea2009-07-07 20:54:07 +0000933 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000934 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000935 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000936 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000937 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000938
939
jadmanski0afbb632008-06-06 21:10:57 +0000940 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000941 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +0000942 full_where='locked = 0 AND invalid = 0 AND ' + where
943 for host in Host.fetch(where=full_where):
944 if self.host_has_agent(host):
945 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000946 continue
showard8cc058f2009-09-08 16:26:33 +0000947 if self._host_has_scheduled_special_task(host):
948 # host will have a special task scheduled on the next cycle
949 continue
showard170873e2009-01-07 00:22:26 +0000950 if print_message:
showardb18134f2009-03-20 20:52:18 +0000951 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000952 models.SpecialTask.objects.create(
953 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000954 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000955
956
jadmanski0afbb632008-06-06 21:10:57 +0000957 def _recover_hosts(self):
958 # recover "Repair Failed" hosts
959 message = 'Reverifying dead host %s'
960 self._reverify_hosts_where("status = 'Repair Failed'",
961 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000962
963
showard04c82c52008-05-29 19:38:12 +0000964
showardb95b1bd2008-08-15 18:11:04 +0000965 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000966 # prioritize by job priority, then non-metahost over metahost, then FIFO
967 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000968 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000969 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000970 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000971
972
showard89f84db2009-03-12 20:39:13 +0000973 def _refresh_pending_queue_entries(self):
974 """
975 Lookup the pending HostQueueEntries and call our HostScheduler
976 refresh() method given that list. Return the list.
977
978 @returns A list of pending HostQueueEntries sorted in priority order.
979 """
showard63a34772008-08-18 19:32:50 +0000980 queue_entries = self._get_pending_queue_entries()
981 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000982 return []
showardb95b1bd2008-08-15 18:11:04 +0000983
showard63a34772008-08-18 19:32:50 +0000984 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000985
showard89f84db2009-03-12 20:39:13 +0000986 return queue_entries
987
988
989 def _schedule_atomic_group(self, queue_entry):
990 """
991 Schedule the given queue_entry on an atomic group of hosts.
992
993 Returns immediately if there are insufficient available hosts.
994
995 Creates new HostQueueEntries based off of queue_entry for the
996 scheduled hosts and starts them all running.
997 """
998 # This is a virtual host queue entry representing an entire
999 # atomic group, find a group and schedule their hosts.
1000 group_hosts = self._host_scheduler.find_eligible_atomic_group(
1001 queue_entry)
1002 if not group_hosts:
1003 return
showardcbe6f942009-06-17 19:33:49 +00001004
1005 logging.info('Expanding atomic group entry %s with hosts %s',
1006 queue_entry,
1007 ', '.join(host.hostname for host in group_hosts))
showard89f84db2009-03-12 20:39:13 +00001008 # The first assigned host uses the original HostQueueEntry
1009 group_queue_entries = [queue_entry]
1010 for assigned_host in group_hosts[1:]:
1011 # Create a new HQE for every additional assigned_host.
1012 new_hqe = HostQueueEntry.clone(queue_entry)
1013 new_hqe.save()
1014 group_queue_entries.append(new_hqe)
1015 assert len(group_queue_entries) == len(group_hosts)
1016 for queue_entry, host in itertools.izip(group_queue_entries,
1017 group_hosts):
1018 self._run_queue_entry(queue_entry, host)
1019
1020
1021 def _schedule_new_jobs(self):
1022 queue_entries = self._refresh_pending_queue_entries()
1023 if not queue_entries:
1024 return
1025
showard63a34772008-08-18 19:32:50 +00001026 for queue_entry in queue_entries:
showarde55955f2009-10-07 20:48:58 +00001027 is_unassigned_atomic_group = (
1028 queue_entry.atomic_group_id is not None
1029 and queue_entry.host_id is None)
1030 if is_unassigned_atomic_group:
1031 self._schedule_atomic_group(queue_entry)
1032 else:
showard89f84db2009-03-12 20:39:13 +00001033 assigned_host = self._host_scheduler.find_eligible_host(
1034 queue_entry)
showard65db3932009-10-28 19:54:35 +00001035 if assigned_host and not self.host_has_agent(assigned_host):
showard89f84db2009-03-12 20:39:13 +00001036 self._run_queue_entry(queue_entry, assigned_host)
showardb95b1bd2008-08-15 18:11:04 +00001037
1038
showard8cc058f2009-09-08 16:26:33 +00001039 def _schedule_running_host_queue_entries(self):
showard8375ce02009-10-12 20:35:13 +00001040 status_enum = models.HostQueueEntry.Status
1041 running_statuses = (status_enum.STARTING, status_enum.RUNNING,
1042 status_enum.GATHERING, status_enum.PARSING)
1043 sql_statuses = ', '.join(('"%s"' % s for s in running_statuses))
1044 entries = HostQueueEntry.fetch(where="status IN (%s)" % sql_statuses)
showard8cc058f2009-09-08 16:26:33 +00001045 for entry in entries:
1046 if self.get_agents_for_entry(entry):
1047 continue
1048
1049 task_entries = entry.job.get_group_entries(entry)
1050 for task_entry in task_entries:
1051 if (task_entry.status != models.HostQueueEntry.Status.PARSING
1052 and self.host_has_agent(task_entry.host)):
showard8375ce02009-10-12 20:35:13 +00001053 agent = tuple(self._host_agents.get(task_entry.host.id))[0]
showard8cc058f2009-09-08 16:26:33 +00001054 raise SchedulerError('Attempted to schedule on host that '
1055 'already has agent: %s (previous '
1056 'agent task: %s)'
1057 % (task_entry, agent.task))
1058
1059 if entry.status in (models.HostQueueEntry.Status.STARTING,
1060 models.HostQueueEntry.Status.RUNNING):
1061 params = entry.job.get_autoserv_params(task_entries)
1062 agent_task = QueueTask(job=entry.job,
1063 queue_entries=task_entries, cmd=params)
1064 elif entry.status == models.HostQueueEntry.Status.GATHERING:
1065 agent_task = GatherLogsTask(
1066 job=entry.job, queue_entries=task_entries)
1067 elif entry.status == models.HostQueueEntry.Status.PARSING:
1068 agent_task = FinalReparseTask(queue_entries=task_entries)
1069 else:
1070 raise SchedulerError('_schedule_running_host_queue_entries got '
1071 'entry with invalid status %s: %s'
1072 % (entry.status, entry))
1073
showard418785b2009-11-23 20:19:59 +00001074 self.add_agent(Agent(agent_task))
showard8cc058f2009-09-08 16:26:33 +00001075
1076
1077 def _schedule_delay_tasks(self):
showardd2014822009-10-12 20:26:58 +00001078 for entry in HostQueueEntry.fetch(where='status = "%s"' %
1079 models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +00001080 task = entry.job.schedule_delayed_callback_task(entry)
1081 if task:
showard418785b2009-11-23 20:19:59 +00001082 self.add_agent(Agent(task))
showard8cc058f2009-09-08 16:26:33 +00001083
1084
showardb95b1bd2008-08-15 18:11:04 +00001085 def _run_queue_entry(self, queue_entry, host):
showard8cc058f2009-09-08 16:26:33 +00001086 queue_entry.schedule_pre_job_tasks(assigned_host=host)
mblighd5c95802008-03-05 00:33:46 +00001087
1088
jadmanski0afbb632008-06-06 21:10:57 +00001089 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +00001090 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001091 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001092 for agent in self.get_agents_for_entry(entry):
1093 agent.abort()
1094 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001095
1096
showard324bf812009-01-20 23:23:38 +00001097 def _can_start_agent(self, agent, num_started_this_cycle,
1098 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001099 # always allow zero-process agents to run
1100 if agent.num_processes == 0:
1101 return True
1102 # don't allow any nonzero-process agents to run after we've reached a
1103 # limit (this avoids starvation of many-process agents)
1104 if have_reached_limit:
1105 return False
1106 # total process throttling
showard9bb960b2009-11-19 01:02:11 +00001107 max_runnable_processes = _drone_manager.max_runnable_processes(
1108 agent.task.username)
1109 if agent.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +00001110 return False
1111 # if a single agent exceeds the per-cycle throttling, still allow it to
1112 # run when it's the first agent in the cycle
1113 if num_started_this_cycle == 0:
1114 return True
1115 # per-cycle throttling
1116 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +00001117 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001118 return False
1119 return True
1120
1121
jadmanski0afbb632008-06-06 21:10:57 +00001122 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001123 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001124 have_reached_limit = False
1125 # iterate over copy, so we can remove agents during iteration
1126 for agent in list(self._agents):
showard8cc058f2009-09-08 16:26:33 +00001127 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001128 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001129 have_reached_limit):
1130 have_reached_limit = True
1131 continue
showard4c5374f2008-09-04 17:02:56 +00001132 num_started_this_cycle += agent.num_processes
1133 agent.tick()
showard8cc058f2009-09-08 16:26:33 +00001134 if agent.is_done():
1135 logging.info("agent finished")
1136 self.remove_agent(agent)
showarda9435c02009-05-13 21:28:17 +00001137 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001138 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001139
1140
showard29f7cd22009-04-29 21:16:24 +00001141 def _process_recurring_runs(self):
1142 recurring_runs = models.RecurringRun.objects.filter(
1143 start_date__lte=datetime.datetime.now())
1144 for rrun in recurring_runs:
1145 # Create job from template
1146 job = rrun.job
1147 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001148 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001149
1150 host_objects = info['hosts']
1151 one_time_hosts = info['one_time_hosts']
1152 metahost_objects = info['meta_hosts']
1153 dependencies = info['dependencies']
1154 atomic_group = info['atomic_group']
1155
1156 for host in one_time_hosts or []:
1157 this_host = models.Host.create_one_time_host(host.hostname)
1158 host_objects.append(this_host)
1159
1160 try:
1161 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001162 options=options,
showard29f7cd22009-04-29 21:16:24 +00001163 host_objects=host_objects,
1164 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001165 atomic_group=atomic_group)
1166
1167 except Exception, ex:
1168 logging.exception(ex)
1169 #TODO send email
1170
1171 if rrun.loop_count == 1:
1172 rrun.delete()
1173 else:
1174 if rrun.loop_count != 0: # if not infinite loop
1175 # calculate new start_date
1176 difference = datetime.timedelta(seconds=rrun.loop_period)
1177 rrun.start_date = rrun.start_date + difference
1178 rrun.loop_count -= 1
1179 rrun.save()
1180
1181
showard170873e2009-01-07 00:22:26 +00001182class PidfileRunMonitor(object):
1183 """
1184 Client must call either run() to start a new process or
1185 attach_to_existing_process().
1186 """
mbligh36768f02008-02-22 18:28:33 +00001187
showard170873e2009-01-07 00:22:26 +00001188 class _PidfileException(Exception):
1189 """
1190 Raised when there's some unexpected behavior with the pid file, but only
1191 used internally (never allowed to escape this class).
1192 """
mbligh36768f02008-02-22 18:28:33 +00001193
1194
showard170873e2009-01-07 00:22:26 +00001195 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001196 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001197 self._start_time = None
1198 self.pidfile_id = None
1199 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001200
1201
showard170873e2009-01-07 00:22:26 +00001202 def _add_nice_command(self, command, nice_level):
1203 if not nice_level:
1204 return command
1205 return ['nice', '-n', str(nice_level)] + command
1206
1207
1208 def _set_start_time(self):
1209 self._start_time = time.time()
1210
1211
showard418785b2009-11-23 20:19:59 +00001212 def run(self, command, working_directory, num_processes, nice_level=None,
1213 log_file=None, pidfile_name=None, paired_with_pidfile=None,
1214 username=None):
showard170873e2009-01-07 00:22:26 +00001215 assert command is not None
1216 if nice_level is not None:
1217 command = ['nice', '-n', str(nice_level)] + command
1218 self._set_start_time()
1219 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001220 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +00001221 num_processes=num_processes, log_file=log_file,
1222 paired_with_pidfile=paired_with_pidfile, username=username)
showard170873e2009-01-07 00:22:26 +00001223
1224
showarded2afea2009-07-07 20:54:07 +00001225 def attach_to_existing_process(self, execution_path,
showardd3dc1992009-04-22 21:01:40 +00001226 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001227 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001228 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001229 execution_path, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001230 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001231
1232
jadmanski0afbb632008-06-06 21:10:57 +00001233 def kill(self):
showard170873e2009-01-07 00:22:26 +00001234 if self.has_process():
1235 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001236
mbligh36768f02008-02-22 18:28:33 +00001237
showard170873e2009-01-07 00:22:26 +00001238 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001239 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001240 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001241
1242
showard170873e2009-01-07 00:22:26 +00001243 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001244 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001245 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001246 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001247
1248
showard170873e2009-01-07 00:22:26 +00001249 def _read_pidfile(self, use_second_read=False):
1250 assert self.pidfile_id is not None, (
1251 'You must call run() or attach_to_existing_process()')
1252 contents = _drone_manager.get_pidfile_contents(
1253 self.pidfile_id, use_second_read=use_second_read)
1254 if contents.is_invalid():
1255 self._state = drone_manager.PidfileContents()
1256 raise self._PidfileException(contents)
1257 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001258
1259
showard21baa452008-10-21 00:08:39 +00001260 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001261 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1262 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001263 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001264 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001265
1266
1267 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001268 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001269 return
mblighbb421852008-03-11 22:36:16 +00001270
showard21baa452008-10-21 00:08:39 +00001271 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001272
showard170873e2009-01-07 00:22:26 +00001273 if self._state.process is None:
1274 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001275 return
mbligh90a549d2008-03-25 23:52:34 +00001276
showard21baa452008-10-21 00:08:39 +00001277 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001278 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001279 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001280 return
mbligh90a549d2008-03-25 23:52:34 +00001281
showard170873e2009-01-07 00:22:26 +00001282 # pid but no running process - maybe process *just* exited
1283 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001284 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001285 # autoserv exited without writing an exit code
1286 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001287 self._handle_pidfile_error(
1288 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001289
showard21baa452008-10-21 00:08:39 +00001290
1291 def _get_pidfile_info(self):
1292 """\
1293 After completion, self._state will contain:
1294 pid=None, exit_status=None if autoserv has not yet run
1295 pid!=None, exit_status=None if autoserv is running
1296 pid!=None, exit_status!=None if autoserv has completed
1297 """
1298 try:
1299 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001300 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001301 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001302
1303
showard170873e2009-01-07 00:22:26 +00001304 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001305 """\
1306 Called when no pidfile is found or no pid is in the pidfile.
1307 """
showard170873e2009-01-07 00:22:26 +00001308 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001309 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001310 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001311 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001312 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001313
1314
showard35162b02009-03-03 02:17:30 +00001315 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001316 """\
1317 Called when autoserv has exited without writing an exit status,
1318 or we've timed out waiting for autoserv to write a pid to the
1319 pidfile. In either case, we just return failure and the caller
1320 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001321
showard170873e2009-01-07 00:22:26 +00001322 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001323 """
1324 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001325 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001326 self._state.exit_status = 1
1327 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001328
1329
jadmanski0afbb632008-06-06 21:10:57 +00001330 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001331 self._get_pidfile_info()
1332 return self._state.exit_status
1333
1334
1335 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001336 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001337 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001338 if self._state.num_tests_failed is None:
1339 return -1
showard21baa452008-10-21 00:08:39 +00001340 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001341
1342
showardcdaeae82009-08-31 18:32:48 +00001343 def try_copy_results_on_drone(self, **kwargs):
1344 if self.has_process():
1345 # copy results logs into the normal place for job results
1346 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1347
1348
1349 def try_copy_to_results_repository(self, source, **kwargs):
1350 if self.has_process():
1351 _drone_manager.copy_to_results_repository(self.get_process(),
1352 source, **kwargs)
1353
1354
mbligh36768f02008-02-22 18:28:33 +00001355class Agent(object):
showard77182562009-06-10 00:16:05 +00001356 """
showard8cc058f2009-09-08 16:26:33 +00001357 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001358
1359 The following methods are required on all task objects:
1360 poll() - Called periodically to let the task check its status and
1361 update its internal state. If the task succeeded.
1362 is_done() - Returns True if the task is finished.
1363 abort() - Called when an abort has been requested. The task must
1364 set its aborted attribute to True if it actually aborted.
1365
1366 The following attributes are required on all task objects:
1367 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001368 success - bool, True if this task succeeded.
1369 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1370 host_ids - A sequence of Host ids this task represents.
1371
1372 The following attribute is written to all task objects:
1373 agent - A reference to the Agent instance that the task has been
1374 added to.
1375 """
1376
1377
showard418785b2009-11-23 20:19:59 +00001378 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001379 """
showard8cc058f2009-09-08 16:26:33 +00001380 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001381 @param num_processes: The number of subprocesses the Agent represents.
1382 This is used by the Dispatcher for managing the load on the
1383 system. Defaults to 1.
1384 """
showard8cc058f2009-09-08 16:26:33 +00001385 self.task = task
1386 task.agent = self
1387
showard77182562009-06-10 00:16:05 +00001388 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001389 self.dispatcher = None
showard418785b2009-11-23 20:19:59 +00001390 self.num_processes = task.num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001391
showard8cc058f2009-09-08 16:26:33 +00001392 self.queue_entry_ids = task.queue_entry_ids
1393 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001394
showard8cc058f2009-09-08 16:26:33 +00001395 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001396 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001397
1398
jadmanski0afbb632008-06-06 21:10:57 +00001399 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001400 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001401 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001402 self.task.poll()
1403 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001404 self.finished = True
showardec113162008-05-08 00:52:49 +00001405
1406
jadmanski0afbb632008-06-06 21:10:57 +00001407 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001408 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001409
1410
showardd3dc1992009-04-22 21:01:40 +00001411 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001412 if self.task:
1413 self.task.abort()
1414 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001415 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001416 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001417
showardd3dc1992009-04-22 21:01:40 +00001418
showard77182562009-06-10 00:16:05 +00001419class DelayedCallTask(object):
1420 """
1421 A task object like AgentTask for an Agent to run that waits for the
1422 specified amount of time to have elapsed before calling the supplied
1423 callback once and finishing. If the callback returns anything, it is
1424 assumed to be a new Agent instance and will be added to the dispatcher.
1425
1426 @attribute end_time: The absolute posix time after which this task will
1427 call its callback when it is polled and be finished.
1428
1429 Also has all attributes required by the Agent class.
1430 """
1431 def __init__(self, delay_seconds, callback, now_func=None):
1432 """
1433 @param delay_seconds: The delay in seconds from now that this task
1434 will call the supplied callback and be done.
1435 @param callback: A callable to be called by this task once after at
1436 least delay_seconds time has elapsed. It must return None
1437 or a new Agent instance.
1438 @param now_func: A time.time like function. Default: time.time.
1439 Used for testing.
1440 """
1441 assert delay_seconds > 0
1442 assert callable(callback)
1443 if not now_func:
1444 now_func = time.time
1445 self._now_func = now_func
1446 self._callback = callback
1447
1448 self.end_time = self._now_func() + delay_seconds
1449
1450 # These attributes are required by Agent.
1451 self.aborted = False
showard77182562009-06-10 00:16:05 +00001452 self.host_ids = ()
1453 self.success = False
1454 self.queue_entry_ids = ()
showard418785b2009-11-23 20:19:59 +00001455 self.num_processes = 0
showard77182562009-06-10 00:16:05 +00001456 # This is filled in by Agent.add_task().
1457 self.agent = None
1458
1459
1460 def poll(self):
showard8cc058f2009-09-08 16:26:33 +00001461 if not self.is_done() and self._now_func() >= self.end_time:
1462 self._callback()
showard77182562009-06-10 00:16:05 +00001463 self.success = True
1464
1465
1466 def is_done(self):
showard8cc058f2009-09-08 16:26:33 +00001467 return self.success or self.aborted
showard77182562009-06-10 00:16:05 +00001468
1469
1470 def abort(self):
1471 self.aborted = True
showard77182562009-06-10 00:16:05 +00001472
1473
mbligh36768f02008-02-22 18:28:33 +00001474class AgentTask(object):
showard8cc058f2009-09-08 16:26:33 +00001475 def __init__(self, cmd=None, working_directory=None,
showard418785b2009-11-23 20:19:59 +00001476 recover_run_monitor=None, username=None, num_processes=1):
showard9bb960b2009-11-19 01:02:11 +00001477 """
showard418785b2009-11-23 20:19:59 +00001478 @param username: login of user responsible for this task. may be None.
1479 @param num_processes: number of autoserv processes launched by this
1480 AgentTask. this includes forking the autoserv process may do.
1481 it may be only approximate.
showard9bb960b2009-11-19 01:02:11 +00001482 """
jadmanski0afbb632008-06-06 21:10:57 +00001483 self.done = False
jadmanski0afbb632008-06-06 21:10:57 +00001484 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001485 self._working_directory = working_directory
showard9bb960b2009-11-19 01:02:11 +00001486 self.username = username
showard418785b2009-11-23 20:19:59 +00001487 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001488 self.agent = None
showarded2afea2009-07-07 20:54:07 +00001489 self.monitor = recover_run_monitor
1490 self.started = bool(recover_run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +00001491 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001492 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001493 self.queue_entry_ids = []
1494 self.host_ids = []
1495 self.log_file = None
1496
1497
1498 def _set_ids(self, host=None, queue_entries=None):
1499 if queue_entries and queue_entries != [None]:
1500 self.host_ids = [entry.host.id for entry in queue_entries]
1501 self.queue_entry_ids = [entry.id for entry in queue_entries]
1502 else:
1503 assert host
1504 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001505
1506
jadmanski0afbb632008-06-06 21:10:57 +00001507 def poll(self):
showard08a36412009-05-05 01:01:13 +00001508 if not self.started:
1509 self.start()
1510 self.tick()
1511
1512
1513 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001514 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001515 exit_code = self.monitor.exit_code()
1516 if exit_code is None:
1517 return
1518 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001519 else:
1520 success = False
mbligh36768f02008-02-22 18:28:33 +00001521
jadmanski0afbb632008-06-06 21:10:57 +00001522 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001523
1524
jadmanski0afbb632008-06-06 21:10:57 +00001525 def is_done(self):
1526 return self.done
mbligh36768f02008-02-22 18:28:33 +00001527
1528
jadmanski0afbb632008-06-06 21:10:57 +00001529 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001530 if self.done:
1531 return
jadmanski0afbb632008-06-06 21:10:57 +00001532 self.done = True
1533 self.success = success
1534 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001535
1536
jadmanski0afbb632008-06-06 21:10:57 +00001537 def prolog(self):
showarded2afea2009-07-07 20:54:07 +00001538 assert not self.monitor
mblighd64e5702008-04-04 21:39:28 +00001539
mbligh36768f02008-02-22 18:28:33 +00001540
jadmanski0afbb632008-06-06 21:10:57 +00001541 def cleanup(self):
showardcdaeae82009-08-31 18:32:48 +00001542 if self.monitor and self.log_file:
1543 self.monitor.try_copy_to_results_repository(self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001544
1545
jadmanski0afbb632008-06-06 21:10:57 +00001546 def epilog(self):
1547 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001548
1549
jadmanski0afbb632008-06-06 21:10:57 +00001550 def start(self):
1551 assert self.agent
1552
1553 if not self.started:
1554 self.prolog()
1555 self.run()
1556
1557 self.started = True
1558
1559
1560 def abort(self):
1561 if self.monitor:
1562 self.monitor.kill()
1563 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001564 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001565 self.cleanup()
1566
1567
showarded2afea2009-07-07 20:54:07 +00001568 def _get_consistent_execution_path(self, execution_entries):
1569 first_execution_path = execution_entries[0].execution_path()
1570 for execution_entry in execution_entries[1:]:
1571 assert execution_entry.execution_path() == first_execution_path, (
1572 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1573 execution_entry,
1574 first_execution_path,
1575 execution_entries[0]))
1576 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001577
1578
showarded2afea2009-07-07 20:54:07 +00001579 def _copy_results(self, execution_entries, use_monitor=None):
1580 """
1581 @param execution_entries: list of objects with execution_path() method
1582 """
showard6d1c1432009-08-20 23:30:39 +00001583 if use_monitor is not None and not use_monitor.has_process():
1584 return
1585
showarded2afea2009-07-07 20:54:07 +00001586 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001587 if use_monitor is None:
1588 assert self.monitor
1589 use_monitor = self.monitor
1590 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001591 execution_path = self._get_consistent_execution_path(execution_entries)
1592 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001593 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001594
showarda1e74b32009-05-12 17:32:04 +00001595
1596 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001597 for queue_entry in queue_entries:
1598 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001599
1600
showarda1e74b32009-05-12 17:32:04 +00001601 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1602 self._copy_results(queue_entries, use_monitor)
1603 self._parse_results(queue_entries)
1604
1605
showardd3dc1992009-04-22 21:01:40 +00001606 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
showarded2afea2009-07-07 20:54:07 +00001607 assert not self.monitor
jadmanski0afbb632008-06-06 21:10:57 +00001608 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001609 self.monitor = PidfileRunMonitor()
1610 self.monitor.run(self.cmd, self._working_directory,
showard418785b2009-11-23 20:19:59 +00001611 num_processes=self.num_processes,
showard170873e2009-01-07 00:22:26 +00001612 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001613 log_file=self.log_file,
1614 pidfile_name=pidfile_name,
showard9bb960b2009-11-19 01:02:11 +00001615 paired_with_pidfile=paired_with_pidfile,
1616 username=self.username)
mbligh36768f02008-02-22 18:28:33 +00001617
1618
showardd9205182009-04-27 20:09:55 +00001619class TaskWithJobKeyvals(object):
1620 """AgentTask mixin providing functionality to help with job keyval files."""
1621 _KEYVAL_FILE = 'keyval'
1622 def _format_keyval(self, key, value):
1623 return '%s=%s' % (key, value)
1624
1625
1626 def _keyval_path(self):
1627 """Subclasses must override this"""
1628 raise NotImplemented
1629
1630
1631 def _write_keyval_after_job(self, field, value):
1632 assert self.monitor
1633 if not self.monitor.has_process():
1634 return
1635 _drone_manager.write_lines_to_file(
1636 self._keyval_path(), [self._format_keyval(field, value)],
1637 paired_with_process=self.monitor.get_process())
1638
1639
1640 def _job_queued_keyval(self, job):
1641 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1642
1643
1644 def _write_job_finished(self):
1645 self._write_keyval_after_job("job_finished", int(time.time()))
1646
1647
showarddb502762009-09-09 15:31:20 +00001648 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1649 keyval_contents = '\n'.join(self._format_keyval(key, value)
1650 for key, value in keyval_dict.iteritems())
1651 # always end with a newline to allow additional keyvals to be written
1652 keyval_contents += '\n'
1653 _drone_manager.attach_file_to_execution(self._working_directory,
1654 keyval_contents,
1655 file_path=keyval_path)
1656
1657
1658 def _write_keyvals_before_job(self, keyval_dict):
1659 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1660
1661
1662 def _write_host_keyvals(self, host):
1663 keyval_path = os.path.join(self._working_directory, 'host_keyvals',
1664 host.hostname)
1665 platform, all_labels = host.platform_and_labels()
1666 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1667 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1668
1669
showard8cc058f2009-09-08 16:26:33 +00001670class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001671 """
1672 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1673 """
1674
1675 TASK_TYPE = None
1676 host = None
1677 queue_entry = None
1678
1679 def __init__(self, task, extra_command_args, **kwargs):
showarded2afea2009-07-07 20:54:07 +00001680 assert (self.TASK_TYPE is not None,
1681 'self.TASK_TYPE must be overridden')
showard8cc058f2009-09-08 16:26:33 +00001682
1683 self.host = Host(id=task.host.id)
1684 self.queue_entry = None
1685 if task.queue_entry:
1686 self.queue_entry = HostQueueEntry(id=task.queue_entry.id)
1687
showarded2afea2009-07-07 20:54:07 +00001688 self.task = task
showarddb502762009-09-09 15:31:20 +00001689 kwargs['working_directory'] = task.execution_path()
showard9bb960b2009-11-19 01:02:11 +00001690 if task.requested_by:
1691 kwargs['username'] = task.requested_by.login
showarded2afea2009-07-07 20:54:07 +00001692 self._extra_command_args = extra_command_args
1693 super(SpecialAgentTask, self).__init__(**kwargs)
1694
1695
showard8cc058f2009-09-08 16:26:33 +00001696 def _keyval_path(self):
1697 return os.path.join(self._working_directory, self._KEYVAL_FILE)
1698
1699
showarded2afea2009-07-07 20:54:07 +00001700 def prolog(self):
1701 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001702 self.cmd = _autoserv_command_line(self.host.hostname,
1703 self._extra_command_args,
1704 queue_entry=self.queue_entry)
1705 self._working_directory = self.task.execution_path()
1706 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001707 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001708
1709
showardde634ee2009-01-30 01:44:24 +00001710 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001711 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001712
showard2fe3f1d2009-07-06 20:19:11 +00001713 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001714 return # don't fail metahost entries, they'll be reassigned
1715
showard2fe3f1d2009-07-06 20:19:11 +00001716 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001717 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001718 return # entry has been aborted
1719
showard2fe3f1d2009-07-06 20:19:11 +00001720 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001721 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001722 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001723 self._write_keyval_after_job(queued_key, queued_time)
1724 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001725
showard8cc058f2009-09-08 16:26:33 +00001726 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001727 self.monitor.try_copy_results_on_drone(
1728 source_path=self._working_directory + '/',
1729 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001730
showard2fe3f1d2009-07-06 20:19:11 +00001731 self._copy_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001732 self.queue_entry.handle_host_failure()
showard2fe3f1d2009-07-06 20:19:11 +00001733 if self.queue_entry.job.parse_failed_repair:
1734 self._parse_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001735
1736 pidfile_id = _drone_manager.get_pidfile_id_from(
1737 self.queue_entry.execution_path(),
1738 pidfile_name=_AUTOSERV_PID_FILE)
1739 _drone_manager.register_pidfile(pidfile_id)
1740
1741
1742 def cleanup(self):
1743 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001744
1745 # We will consider an aborted task to be "Failed"
1746 self.task.finish(bool(self.success))
1747
showardf85a0b72009-10-07 20:48:45 +00001748 if self.monitor:
1749 if self.monitor.has_process():
1750 self._copy_results([self.task])
1751 if self.monitor.pidfile_id is not None:
1752 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001753
1754
1755class RepairTask(SpecialAgentTask):
1756 TASK_TYPE = models.SpecialTask.Task.REPAIR
1757
1758
1759 def __init__(self, task, recover_run_monitor=None):
1760 """\
1761 queue_entry: queue entry to mark failed if this repair fails.
1762 """
1763 protection = host_protections.Protection.get_string(
1764 task.host.protection)
1765 # normalize the protection name
1766 protection = host_protections.Protection.get_attr_name(protection)
1767
1768 super(RepairTask, self).__init__(
1769 task, ['-R', '--host-protection', protection],
1770 recover_run_monitor=recover_run_monitor)
1771
1772 # *don't* include the queue entry in IDs -- if the queue entry is
1773 # aborted, we want to leave the repair task running
1774 self._set_ids(host=self.host)
1775
1776
1777 def prolog(self):
1778 super(RepairTask, self).prolog()
1779 logging.info("repair_task starting")
1780 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001781
1782
jadmanski0afbb632008-06-06 21:10:57 +00001783 def epilog(self):
1784 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001785
jadmanski0afbb632008-06-06 21:10:57 +00001786 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001787 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001788 else:
showard8cc058f2009-09-08 16:26:33 +00001789 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001790 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001791 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001792
1793
showarded2afea2009-07-07 20:54:07 +00001794class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001795 def _copy_to_results_repository(self):
1796 if not self.queue_entry or self.queue_entry.meta_host:
1797 return
1798
1799 self.queue_entry.set_execution_subdir()
1800 log_name = os.path.basename(self.task.execution_path())
1801 source = os.path.join(self.task.execution_path(), 'debug',
1802 'autoserv.DEBUG')
1803 destination = os.path.join(
1804 self.queue_entry.execution_path(), log_name)
1805
1806 self.monitor.try_copy_to_results_repository(
1807 source, destination_path=destination)
1808
1809
showard170873e2009-01-07 00:22:26 +00001810 def epilog(self):
1811 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001812
showard775300b2009-09-09 15:30:50 +00001813 if self.success:
1814 return
showard8fe93b52008-11-18 17:53:22 +00001815
showard775300b2009-09-09 15:30:50 +00001816 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001817
showard775300b2009-09-09 15:30:50 +00001818 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001819 # effectively ignore failure for these hosts
1820 self.success = True
showard775300b2009-09-09 15:30:50 +00001821 return
1822
1823 if self.queue_entry:
1824 self.queue_entry.requeue()
1825
1826 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001827 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001828 queue_entry__id=self.queue_entry.id):
1829 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1830 self._fail_queue_entry()
1831 return
1832
showard9bb960b2009-11-19 01:02:11 +00001833 queue_entry = models.HostQueueEntry.objects.get(
1834 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00001835 else:
1836 queue_entry = None
1837
1838 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00001839 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00001840 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00001841 queue_entry=queue_entry,
1842 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00001843
showard8fe93b52008-11-18 17:53:22 +00001844
1845class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001846 TASK_TYPE = models.SpecialTask.Task.VERIFY
1847
1848
showard8cc058f2009-09-08 16:26:33 +00001849 def __init__(self, task, recover_run_monitor=None):
showarded2afea2009-07-07 20:54:07 +00001850 super(VerifyTask, self).__init__(
showard8cc058f2009-09-08 16:26:33 +00001851 task, ['-v'], recover_run_monitor=recover_run_monitor)
mblighe2586682008-02-29 22:45:46 +00001852
showard8cc058f2009-09-08 16:26:33 +00001853 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001854
1855
jadmanski0afbb632008-06-06 21:10:57 +00001856 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001857 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001858
showardb18134f2009-03-20 20:52:18 +00001859 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001860 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001861 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1862 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001863
showarded2afea2009-07-07 20:54:07 +00001864 # Delete any other queued verifies for this host. One verify will do
1865 # and there's no need to keep records of other requests.
1866 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001867 host__id=self.host.id,
1868 task=models.SpecialTask.Task.VERIFY,
showarded2afea2009-07-07 20:54:07 +00001869 is_active=False, is_complete=False)
1870 queued_verifies = queued_verifies.exclude(id=self.task.id)
1871 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001872
mbligh36768f02008-02-22 18:28:33 +00001873
jadmanski0afbb632008-06-06 21:10:57 +00001874 def epilog(self):
1875 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001876 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001877 if self.queue_entry:
1878 self.queue_entry.on_pending()
1879 else:
1880 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001881
1882
showard9bb960b2009-11-19 01:02:11 +00001883class QueueTask(AgentTask, TaskWithJobKeyvals):
showard8cc058f2009-09-08 16:26:33 +00001884 def __init__(self, job, queue_entries, cmd=None, recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001885 self.job = job
1886 self.queue_entries = queue_entries
showard8cc058f2009-09-08 16:26:33 +00001887 self.group_name = queue_entries[0].get_group_name()
showarded2afea2009-07-07 20:54:07 +00001888 super(QueueTask, self).__init__(
showard9bb960b2009-11-19 01:02:11 +00001889 cmd=cmd, working_directory=self._execution_path(),
1890 recover_run_monitor=recover_run_monitor,
showard418785b2009-11-23 20:19:59 +00001891 username=job.owner, num_processes=len(queue_entries))
showard170873e2009-01-07 00:22:26 +00001892 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001893
1894
showard73ec0442009-02-07 02:05:20 +00001895 def _keyval_path(self):
showarded2afea2009-07-07 20:54:07 +00001896 return os.path.join(self._execution_path(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001897
1898
showarded2afea2009-07-07 20:54:07 +00001899 def _execution_path(self):
1900 return self.queue_entries[0].execution_path()
mblighbb421852008-03-11 22:36:16 +00001901
1902
jadmanski0afbb632008-06-06 21:10:57 +00001903 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00001904 for entry in self.queue_entries:
1905 if entry.status not in (models.HostQueueEntry.Status.STARTING,
1906 models.HostQueueEntry.Status.RUNNING):
1907 raise SchedulerError('Queue task attempting to start '
1908 'entry with invalid status %s: %s'
1909 % (entry.status, entry))
1910 if entry.host.status not in (models.Host.Status.PENDING,
1911 models.Host.Status.RUNNING):
1912 raise SchedulerError('Queue task attempting to start on queue '
1913 'entry with invalid host status %s: %s'
1914 % (entry.host.status, entry))
1915
showardd9205182009-04-27 20:09:55 +00001916 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001917 keyval_dict = {queued_key: queued_time}
1918 if self.group_name:
1919 keyval_dict['host_group_name'] = self.group_name
1920 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001921 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001922 self._write_host_keyvals(queue_entry.host)
showard8cc058f2009-09-08 16:26:33 +00001923 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showard12f3e322009-05-13 21:27:42 +00001924 queue_entry.update_field('started_on', datetime.datetime.now())
showard8cc058f2009-09-08 16:26:33 +00001925 queue_entry.host.set_status(models.Host.Status.RUNNING)
showard21baa452008-10-21 00:08:39 +00001926 queue_entry.host.update_field('dirty', 1)
showardc6a56872009-07-28 20:11:58 +00001927 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
1928 # TODO(gps): Remove this if nothing needs it anymore.
1929 # A potential user is: tko/parser
jadmanski0afbb632008-06-06 21:10:57 +00001930 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001931
1932
showard35162b02009-03-03 02:17:30 +00001933 def _write_lost_process_error_file(self):
showarded2afea2009-07-07 20:54:07 +00001934 error_file_path = os.path.join(self._execution_path(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001935 _drone_manager.write_lines_to_file(error_file_path,
1936 [_LOST_PROCESS_ERROR])
1937
1938
showardd3dc1992009-04-22 21:01:40 +00001939 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001940 if not self.monitor:
1941 return
1942
showardd9205182009-04-27 20:09:55 +00001943 self._write_job_finished()
1944
showard35162b02009-03-03 02:17:30 +00001945 if self.monitor.lost_process:
1946 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001947
showard8cc058f2009-09-08 16:26:33 +00001948 for queue_entry in self.queue_entries:
1949 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001950
1951
showardcbd74612008-11-19 21:42:02 +00001952 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001953 _drone_manager.write_lines_to_file(
showarded2afea2009-07-07 20:54:07 +00001954 os.path.join(self._execution_path(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001955 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001956 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001957
1958
jadmanskif7fa2cc2008-10-01 14:13:23 +00001959 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001960 if not self.monitor or not self.monitor.has_process():
1961 return
1962
jadmanskif7fa2cc2008-10-01 14:13:23 +00001963 # build up sets of all the aborted_by and aborted_on values
1964 aborted_by, aborted_on = set(), set()
1965 for queue_entry in self.queue_entries:
1966 if queue_entry.aborted_by:
1967 aborted_by.add(queue_entry.aborted_by)
1968 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1969 aborted_on.add(t)
1970
1971 # extract some actual, unique aborted by value and write it out
1972 assert len(aborted_by) <= 1
1973 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001974 aborted_by_value = aborted_by.pop()
1975 aborted_on_value = max(aborted_on)
1976 else:
1977 aborted_by_value = 'autotest_system'
1978 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001979
showarda0382352009-02-11 23:36:43 +00001980 self._write_keyval_after_job("aborted_by", aborted_by_value)
1981 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001982
showardcbd74612008-11-19 21:42:02 +00001983 aborted_on_string = str(datetime.datetime.fromtimestamp(
1984 aborted_on_value))
1985 self._write_status_comment('Job aborted by %s on %s' %
1986 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001987
1988
jadmanski0afbb632008-06-06 21:10:57 +00001989 def abort(self):
1990 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001991 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001992 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001993
1994
jadmanski0afbb632008-06-06 21:10:57 +00001995 def epilog(self):
1996 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001997 self._finish_task()
1998 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001999
2000
showardd3dc1992009-04-22 21:01:40 +00002001class PostJobTask(AgentTask):
showard418785b2009-11-23 20:19:59 +00002002 def __init__(self, queue_entries, pidfile_name, logfile_name, num_processes,
showarded2afea2009-07-07 20:54:07 +00002003 recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00002004 self._queue_entries = queue_entries
2005 self._pidfile_name = pidfile_name
showardd3dc1992009-04-22 21:01:40 +00002006
showarded2afea2009-07-07 20:54:07 +00002007 self._execution_path = self._get_consistent_execution_path(
2008 queue_entries)
2009 self._results_dir = _drone_manager.absolute_path(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00002010 self._autoserv_monitor = PidfileRunMonitor()
showarded2afea2009-07-07 20:54:07 +00002011 self._autoserv_monitor.attach_to_existing_process(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00002012 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
2013
2014 if _testing_mode:
2015 command = 'true'
2016 else:
2017 command = self._generate_command(self._results_dir)
2018
showarded2afea2009-07-07 20:54:07 +00002019 super(PostJobTask, self).__init__(
2020 cmd=command, working_directory=self._execution_path,
showard9bb960b2009-11-19 01:02:11 +00002021 recover_run_monitor=recover_run_monitor,
showard418785b2009-11-23 20:19:59 +00002022 username=queue_entries[0].job.owner,
2023 num_processes=num_processes)
showardd3dc1992009-04-22 21:01:40 +00002024
showarded2afea2009-07-07 20:54:07 +00002025 self.log_file = os.path.join(self._execution_path, logfile_name)
showardd3dc1992009-04-22 21:01:40 +00002026 self._final_status = self._determine_final_status()
2027
2028
2029 def _generate_command(self, results_dir):
2030 raise NotImplementedError('Subclasses must override this')
2031
2032
2033 def _job_was_aborted(self):
2034 was_aborted = None
2035 for queue_entry in self._queue_entries:
2036 queue_entry.update_from_database()
2037 if was_aborted is None: # first queue entry
2038 was_aborted = bool(queue_entry.aborted)
2039 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
2040 email_manager.manager.enqueue_notify_email(
2041 'Inconsistent abort state',
2042 'Queue entries have inconsistent abort state: ' +
2043 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
2044 # don't crash here, just assume true
2045 return True
2046 return was_aborted
2047
2048
2049 def _determine_final_status(self):
2050 if self._job_was_aborted():
2051 return models.HostQueueEntry.Status.ABORTED
2052
2053 # we'll use a PidfileRunMonitor to read the autoserv exit status
2054 if self._autoserv_monitor.exit_code() == 0:
2055 return models.HostQueueEntry.Status.COMPLETED
2056 return models.HostQueueEntry.Status.FAILED
2057
2058
2059 def run(self):
showard8cc058f2009-09-08 16:26:33 +00002060 # Make sure we actually have results to work with.
2061 # This should never happen in normal operation.
showard5add1c82009-05-26 19:27:46 +00002062 if not self._autoserv_monitor.has_process():
2063 email_manager.manager.enqueue_notify_email(
showard8cc058f2009-09-08 16:26:33 +00002064 'No results in post-job task',
2065 'No results in post-job task at %s' %
2066 self._autoserv_monitor.pidfile_id)
showard5add1c82009-05-26 19:27:46 +00002067 self.finished(False)
2068 return
2069
2070 super(PostJobTask, self).run(
2071 pidfile_name=self._pidfile_name,
2072 paired_with_pidfile=self._paired_with_pidfile)
showardd3dc1992009-04-22 21:01:40 +00002073
2074
2075 def _set_all_statuses(self, status):
2076 for queue_entry in self._queue_entries:
2077 queue_entry.set_status(status)
2078
2079
2080 def abort(self):
2081 # override AgentTask.abort() to avoid killing the process and ending
2082 # the task. post-job tasks continue when the job is aborted.
2083 pass
2084
2085
showard9bb960b2009-11-19 01:02:11 +00002086class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002087 """
2088 Task responsible for
2089 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2090 * copying logs to the results repository
2091 * spawning CleanupTasks for hosts, if necessary
2092 * spawning a FinalReparseTask for the job
2093 """
showarded2afea2009-07-07 20:54:07 +00002094 def __init__(self, job, queue_entries, recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00002095 self._job = job
2096 super(GatherLogsTask, self).__init__(
2097 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
showarded2afea2009-07-07 20:54:07 +00002098 logfile_name='.collect_crashinfo.log',
showard418785b2009-11-23 20:19:59 +00002099 num_processes=len(queue_entries),
showarded2afea2009-07-07 20:54:07 +00002100 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00002101 self._set_ids(queue_entries=queue_entries)
2102
2103
2104 def _generate_command(self, results_dir):
2105 host_list = ','.join(queue_entry.host.hostname
2106 for queue_entry in self._queue_entries)
2107 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
2108 '-r', results_dir]
2109
2110
2111 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00002112 for queue_entry in self._queue_entries:
2113 if queue_entry.status != models.HostQueueEntry.Status.GATHERING:
2114 raise SchedulerError('Gather task attempting to start on '
2115 'non-gathering entry: %s' % queue_entry)
2116 if queue_entry.host.status != models.Host.Status.RUNNING:
2117 raise SchedulerError('Gather task attempting to start on queue '
showard0c5c18d2009-09-24 22:20:45 +00002118 'entry with non-running host status %s: %s'
2119 % (queue_entry.host.status, queue_entry))
showard8cc058f2009-09-08 16:26:33 +00002120
showardd3dc1992009-04-22 21:01:40 +00002121 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002122
2123
showardd3dc1992009-04-22 21:01:40 +00002124 def epilog(self):
2125 super(GatherLogsTask, self).epilog()
showardb5626452009-06-30 01:57:28 +00002126
showard6d1c1432009-08-20 23:30:39 +00002127 self._copy_and_parse_results(self._queue_entries,
2128 use_monitor=self._autoserv_monitor)
showard9bb960b2009-11-19 01:02:11 +00002129 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002130
showard9bb960b2009-11-19 01:02:11 +00002131
2132 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002133 if self._autoserv_monitor.has_process():
2134 final_success = (self._final_status ==
2135 models.HostQueueEntry.Status.COMPLETED)
2136 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2137 else:
2138 final_success = False
2139 num_tests_failed = 0
2140
showard9bb960b2009-11-19 01:02:11 +00002141 reboot_after = self._job.reboot_after
2142 do_reboot = (
2143 # always reboot after aborted jobs
2144 self._final_status == models.HostQueueEntry.Status.ABORTED
2145 or reboot_after == models.RebootAfter.ALWAYS
2146 or (reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED
2147 and final_success and num_tests_failed == 0))
2148
2149 for queue_entry in self._queue_entries:
2150 if do_reboot:
2151 # don't pass the queue entry to the CleanupTask. if the cleanup
2152 # fails, the job doesn't care -- it's over.
2153 models.SpecialTask.objects.create(
2154 host=models.Host.objects.get(id=queue_entry.host.id),
2155 task=models.SpecialTask.Task.CLEANUP,
2156 requested_by=self._job.owner_model())
2157 else:
2158 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002159
2160
showard0bbfc212009-04-29 21:06:13 +00002161 def run(self):
showard597bfd32009-05-08 18:22:50 +00002162 autoserv_exit_code = self._autoserv_monitor.exit_code()
2163 # only run if Autoserv exited due to some signal. if we have no exit
2164 # code, assume something bad (and signal-like) happened.
2165 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002166 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002167 else:
2168 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002169
2170
showard8fe93b52008-11-18 17:53:22 +00002171class CleanupTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002172 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2173
2174
showard8cc058f2009-09-08 16:26:33 +00002175 def __init__(self, task, recover_run_monitor=None):
showarded2afea2009-07-07 20:54:07 +00002176 super(CleanupTask, self).__init__(
showard8cc058f2009-09-08 16:26:33 +00002177 task, ['--cleanup'], recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002178
showard8cc058f2009-09-08 16:26:33 +00002179 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mbligh16c722d2008-03-05 00:58:44 +00002180
mblighd5c95802008-03-05 00:33:46 +00002181
jadmanski0afbb632008-06-06 21:10:57 +00002182 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002183 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00002184 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard8cc058f2009-09-08 16:26:33 +00002185 self.host.set_status(models.Host.Status.CLEANING)
2186 if self.queue_entry:
2187 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2188
2189
showard775300b2009-09-09 15:30:50 +00002190 def _finish_epilog(self):
showard7b2d7cb2009-10-28 19:53:03 +00002191 if not self.queue_entry or not self.success:
showard775300b2009-09-09 15:30:50 +00002192 return
2193
showard7b2d7cb2009-10-28 19:53:03 +00002194 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
2195 should_run_verify = (
2196 self.queue_entry.job.run_verify
2197 and self.host.protection != do_not_verify_protection)
2198 if should_run_verify:
showard9bb960b2009-11-19 01:02:11 +00002199 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
showard7b2d7cb2009-10-28 19:53:03 +00002200 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00002201 host=models.Host.objects.get(id=self.host.id),
showard7b2d7cb2009-10-28 19:53:03 +00002202 queue_entry=entry,
2203 task=models.SpecialTask.Task.VERIFY)
2204 else:
showard775300b2009-09-09 15:30:50 +00002205 self.queue_entry.on_pending()
mblighd5c95802008-03-05 00:33:46 +00002206
mblighd5c95802008-03-05 00:33:46 +00002207
showard21baa452008-10-21 00:08:39 +00002208 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00002209 super(CleanupTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002210
showard21baa452008-10-21 00:08:39 +00002211 if self.success:
2212 self.host.update_field('dirty', 0)
showard775300b2009-09-09 15:30:50 +00002213 self.host.set_status(models.Host.Status.READY)
showard21baa452008-10-21 00:08:39 +00002214
showard775300b2009-09-09 15:30:50 +00002215 self._finish_epilog()
showard8cc058f2009-09-08 16:26:33 +00002216
showard21baa452008-10-21 00:08:39 +00002217
showardd3dc1992009-04-22 21:01:40 +00002218class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00002219 _num_running_parses = 0
2220
showarded2afea2009-07-07 20:54:07 +00002221 def __init__(self, queue_entries, recover_run_monitor=None):
2222 super(FinalReparseTask, self).__init__(
2223 queue_entries, pidfile_name=_PARSER_PID_FILE,
2224 logfile_name='.parse.log',
showard418785b2009-11-23 20:19:59 +00002225 num_processes=0, # don't include parser processes in accounting
showarded2afea2009-07-07 20:54:07 +00002226 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002227 # don't use _set_ids, since we don't want to set the host_ids
2228 self.queue_entry_ids = [entry.id for entry in queue_entries]
showarded2afea2009-07-07 20:54:07 +00002229 self._parse_started = self.started
showard97aed502008-11-04 02:01:24 +00002230
showard97aed502008-11-04 02:01:24 +00002231
2232 @classmethod
2233 def _increment_running_parses(cls):
2234 cls._num_running_parses += 1
2235
2236
2237 @classmethod
2238 def _decrement_running_parses(cls):
2239 cls._num_running_parses -= 1
2240
2241
2242 @classmethod
2243 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002244 return (cls._num_running_parses <
2245 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002246
2247
2248 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00002249 for queue_entry in self._queue_entries:
2250 if queue_entry.status != models.HostQueueEntry.Status.PARSING:
2251 raise SchedulerError('Parse task attempting to start on '
2252 'non-parsing entry: %s' % queue_entry)
2253
showard97aed502008-11-04 02:01:24 +00002254 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002255
2256
2257 def epilog(self):
2258 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002259 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00002260
2261
showardd3dc1992009-04-22 21:01:40 +00002262 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00002263 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00002264 results_dir]
showard97aed502008-11-04 02:01:24 +00002265
2266
showard08a36412009-05-05 01:01:13 +00002267 def tick(self):
2268 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002269 # and we can, at which point we revert to default behavior
2270 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00002271 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002272 else:
2273 self._try_starting_parse()
2274
2275
2276 def run(self):
2277 # override run() to not actually run unless we can
2278 self._try_starting_parse()
2279
2280
2281 def _try_starting_parse(self):
2282 if not self._can_run_new_parse():
2283 return
showard170873e2009-01-07 00:22:26 +00002284
showard97aed502008-11-04 02:01:24 +00002285 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002286 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00002287
showard97aed502008-11-04 02:01:24 +00002288 self._increment_running_parses()
2289 self._parse_started = True
2290
2291
2292 def finished(self, success):
2293 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00002294 if self._parse_started:
2295 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002296
2297
showarda3c58572009-03-12 20:36:59 +00002298class DBError(Exception):
2299 """Raised by the DBObject constructor when its select fails."""
2300
2301
mbligh36768f02008-02-22 18:28:33 +00002302class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002303 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002304
2305 # Subclasses MUST override these:
2306 _table_name = ''
2307 _fields = ()
2308
showarda3c58572009-03-12 20:36:59 +00002309 # A mapping from (type, id) to the instance of the object for that
2310 # particular id. This prevents us from creating new Job() and Host()
2311 # instances for every HostQueueEntry object that we instantiate as
2312 # multiple HQEs often share the same Job.
2313 _instances_by_type_and_id = weakref.WeakValueDictionary()
2314 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002315
showarda3c58572009-03-12 20:36:59 +00002316
2317 def __new__(cls, id=None, **kwargs):
2318 """
2319 Look to see if we already have an instance for this particular type
2320 and id. If so, use it instead of creating a duplicate instance.
2321 """
2322 if id is not None:
2323 instance = cls._instances_by_type_and_id.get((cls, id))
2324 if instance:
2325 return instance
2326 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2327
2328
2329 def __init__(self, id=None, row=None, new_record=False, always_query=True):
showard8cc058f2009-09-08 16:26:33 +00002330 assert bool(id) or bool(row)
2331 if id is not None and row is not None:
2332 assert id == row[0]
showard6ae5ea92009-02-25 00:11:51 +00002333 assert self._table_name, '_table_name must be defined in your class'
2334 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002335 if not new_record:
2336 if self._initialized and not always_query:
2337 return # We've already been initialized.
2338 if id is None:
2339 id = row[0]
2340 # Tell future constructors to use us instead of re-querying while
2341 # this instance is still around.
2342 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002343
showard6ae5ea92009-02-25 00:11:51 +00002344 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002345
jadmanski0afbb632008-06-06 21:10:57 +00002346 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002347
jadmanski0afbb632008-06-06 21:10:57 +00002348 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002349 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002350
showarda3c58572009-03-12 20:36:59 +00002351 if self._initialized:
2352 differences = self._compare_fields_in_row(row)
2353 if differences:
showard7629f142009-03-27 21:02:02 +00002354 logging.warn(
2355 'initialized %s %s instance requery is updating: %s',
2356 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002357 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002358 self._initialized = True
2359
2360
2361 @classmethod
2362 def _clear_instance_cache(cls):
2363 """Used for testing, clear the internal instance cache."""
2364 cls._instances_by_type_and_id.clear()
2365
2366
showardccbd6c52009-03-21 00:10:21 +00002367 def _fetch_row_from_db(self, row_id):
2368 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2369 rows = _db.execute(sql, (row_id,))
2370 if not rows:
showard76e29d12009-04-15 21:53:10 +00002371 raise DBError("row not found (table=%s, row id=%s)"
2372 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002373 return rows[0]
2374
2375
showarda3c58572009-03-12 20:36:59 +00002376 def _assert_row_length(self, row):
2377 assert len(row) == len(self._fields), (
2378 "table = %s, row = %s/%d, fields = %s/%d" % (
2379 self.__table, row, len(row), self._fields, len(self._fields)))
2380
2381
2382 def _compare_fields_in_row(self, row):
2383 """
showarddae680a2009-10-12 20:26:43 +00002384 Given a row as returned by a SELECT query, compare it to our existing in
2385 memory fields. Fractional seconds are stripped from datetime values
2386 before comparison.
showarda3c58572009-03-12 20:36:59 +00002387
2388 @param row - A sequence of values corresponding to fields named in
2389 The class attribute _fields.
2390
2391 @returns A dictionary listing the differences keyed by field name
2392 containing tuples of (current_value, row_value).
2393 """
2394 self._assert_row_length(row)
2395 differences = {}
showarddae680a2009-10-12 20:26:43 +00002396 datetime_cmp_fmt = '%Y-%m-%d %H:%M:%S' # Leave off the microseconds.
showarda3c58572009-03-12 20:36:59 +00002397 for field, row_value in itertools.izip(self._fields, row):
2398 current_value = getattr(self, field)
showarddae680a2009-10-12 20:26:43 +00002399 if (isinstance(current_value, datetime.datetime)
2400 and isinstance(row_value, datetime.datetime)):
2401 current_value = current_value.strftime(datetime_cmp_fmt)
2402 row_value = row_value.strftime(datetime_cmp_fmt)
showarda3c58572009-03-12 20:36:59 +00002403 if current_value != row_value:
2404 differences[field] = (current_value, row_value)
2405 return differences
showard2bab8f42008-11-12 18:15:22 +00002406
2407
2408 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002409 """
2410 Update our field attributes using a single row returned by SELECT.
2411
2412 @param row - A sequence of values corresponding to fields named in
2413 the class fields list.
2414 """
2415 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002416
showard2bab8f42008-11-12 18:15:22 +00002417 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002418 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002419 setattr(self, field, value)
2420 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002421
showard2bab8f42008-11-12 18:15:22 +00002422 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002423
mblighe2586682008-02-29 22:45:46 +00002424
showardccbd6c52009-03-21 00:10:21 +00002425 def update_from_database(self):
2426 assert self.id is not None
2427 row = self._fetch_row_from_db(self.id)
2428 self._update_fields_from_row(row)
2429
2430
jadmanski0afbb632008-06-06 21:10:57 +00002431 def count(self, where, table = None):
2432 if not table:
2433 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002434
jadmanski0afbb632008-06-06 21:10:57 +00002435 rows = _db.execute("""
2436 SELECT count(*) FROM %s
2437 WHERE %s
2438 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002439
jadmanski0afbb632008-06-06 21:10:57 +00002440 assert len(rows) == 1
2441
2442 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002443
2444
showardd3dc1992009-04-22 21:01:40 +00002445 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002446 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002447
showard2bab8f42008-11-12 18:15:22 +00002448 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002449 return
mbligh36768f02008-02-22 18:28:33 +00002450
mblighf8c624d2008-07-03 16:58:45 +00002451 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002452 _db.execute(query, (value, self.id))
2453
showard2bab8f42008-11-12 18:15:22 +00002454 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002455
2456
jadmanski0afbb632008-06-06 21:10:57 +00002457 def save(self):
2458 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002459 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002460 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002461 values = []
2462 for key in keys:
2463 value = getattr(self, key)
2464 if value is None:
2465 values.append('NULL')
2466 else:
2467 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002468 values_str = ','.join(values)
2469 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2470 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002471 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002472 # Update our id to the one the database just assigned to us.
2473 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002474
2475
jadmanski0afbb632008-06-06 21:10:57 +00002476 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002477 self._instances_by_type_and_id.pop((type(self), id), None)
2478 self._initialized = False
2479 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002480 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2481 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002482
2483
showard63a34772008-08-18 19:32:50 +00002484 @staticmethod
2485 def _prefix_with(string, prefix):
2486 if string:
2487 string = prefix + string
2488 return string
2489
2490
jadmanski0afbb632008-06-06 21:10:57 +00002491 @classmethod
showard989f25d2008-10-01 11:38:11 +00002492 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002493 """
2494 Construct instances of our class based on the given database query.
2495
2496 @yields One class instance for each row fetched.
2497 """
showard63a34772008-08-18 19:32:50 +00002498 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2499 where = cls._prefix_with(where, 'WHERE ')
2500 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002501 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002502 'joins' : joins,
2503 'where' : where,
2504 'order_by' : order_by})
2505 rows = _db.execute(query, params)
showard8cc058f2009-09-08 16:26:33 +00002506 return [cls(id=row[0], row=row) for row in rows]
mblighe2586682008-02-29 22:45:46 +00002507
mbligh36768f02008-02-22 18:28:33 +00002508
2509class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002510 _table_name = 'ineligible_host_queues'
2511 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002512
2513
showard89f84db2009-03-12 20:39:13 +00002514class AtomicGroup(DBObject):
2515 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002516 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2517 'invalid')
showard89f84db2009-03-12 20:39:13 +00002518
2519
showard989f25d2008-10-01 11:38:11 +00002520class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002521 _table_name = 'labels'
2522 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002523 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002524
2525
showard6157c632009-07-06 20:19:31 +00002526 def __repr__(self):
2527 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
2528 self.name, self.id, self.atomic_group_id)
2529
2530
mbligh36768f02008-02-22 18:28:33 +00002531class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002532 _table_name = 'hosts'
2533 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2534 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2535
2536
jadmanski0afbb632008-06-06 21:10:57 +00002537 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002538 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002539 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002540
2541
showard170873e2009-01-07 00:22:26 +00002542 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002543 """
showard170873e2009-01-07 00:22:26 +00002544 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002545 """
2546 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002547 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002548 FROM labels
2549 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002550 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002551 ORDER BY labels.name
2552 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002553 platform = None
2554 all_labels = []
2555 for label_name, is_platform in rows:
2556 if is_platform:
2557 platform = label_name
2558 all_labels.append(label_name)
2559 return platform, all_labels
2560
2561
showard54c1ea92009-05-20 00:32:58 +00002562 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2563
2564
2565 @classmethod
2566 def cmp_for_sort(cls, a, b):
2567 """
2568 A comparison function for sorting Host objects by hostname.
2569
2570 This strips any trailing numeric digits, ignores leading 0s and
2571 compares hostnames by the leading name and the trailing digits as a
2572 number. If both hostnames do not match this pattern, they are simply
2573 compared as lower case strings.
2574
2575 Example of how hostnames will be sorted:
2576
2577 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2578
2579 This hopefully satisfy most people's hostname sorting needs regardless
2580 of their exact naming schemes. Nobody sane should have both a host10
2581 and host010 (but the algorithm works regardless).
2582 """
2583 lower_a = a.hostname.lower()
2584 lower_b = b.hostname.lower()
2585 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2586 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2587 if match_a and match_b:
2588 name_a, number_a_str = match_a.groups()
2589 name_b, number_b_str = match_b.groups()
2590 number_a = int(number_a_str.lstrip('0'))
2591 number_b = int(number_b_str.lstrip('0'))
2592 result = cmp((name_a, number_a), (name_b, number_b))
2593 if result == 0 and lower_a != lower_b:
2594 # If they compared equal above but the lower case names are
2595 # indeed different, don't report equality. abc012 != abc12.
2596 return cmp(lower_a, lower_b)
2597 return result
2598 else:
2599 return cmp(lower_a, lower_b)
2600
2601
mbligh36768f02008-02-22 18:28:33 +00002602class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002603 _table_name = 'host_queue_entries'
2604 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002605 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002606 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002607
2608
showarda3c58572009-03-12 20:36:59 +00002609 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002610 assert id or row
showarda3c58572009-03-12 20:36:59 +00002611 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002612 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002613
jadmanski0afbb632008-06-06 21:10:57 +00002614 if self.host_id:
2615 self.host = Host(self.host_id)
2616 else:
2617 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002618
showard77182562009-06-10 00:16:05 +00002619 if self.atomic_group_id:
2620 self.atomic_group = AtomicGroup(self.atomic_group_id,
2621 always_query=False)
2622 else:
2623 self.atomic_group = None
2624
showard170873e2009-01-07 00:22:26 +00002625 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002626 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002627
2628
showard89f84db2009-03-12 20:39:13 +00002629 @classmethod
2630 def clone(cls, template):
2631 """
2632 Creates a new row using the values from a template instance.
2633
2634 The new instance will not exist in the database or have a valid
2635 id attribute until its save() method is called.
2636 """
2637 assert isinstance(template, cls)
2638 new_row = [getattr(template, field) for field in cls._fields]
2639 clone = cls(row=new_row, new_record=True)
2640 clone.id = None
2641 return clone
2642
2643
showardc85c21b2008-11-24 22:17:37 +00002644 def _view_job_url(self):
2645 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2646
2647
showardf1ae3542009-05-11 19:26:02 +00002648 def get_labels(self):
2649 """
2650 Get all labels associated with this host queue entry (either via the
2651 meta_host or as a job dependency label). The labels yielded are not
2652 guaranteed to be unique.
2653
2654 @yields Label instances associated with this host_queue_entry.
2655 """
2656 if self.meta_host:
2657 yield Label(id=self.meta_host, always_query=False)
2658 labels = Label.fetch(
2659 joins="JOIN jobs_dependency_labels AS deps "
2660 "ON (labels.id = deps.label_id)",
2661 where="deps.job_id = %d" % self.job.id)
2662 for label in labels:
2663 yield label
2664
2665
jadmanski0afbb632008-06-06 21:10:57 +00002666 def set_host(self, host):
2667 if host:
2668 self.queue_log_record('Assigning host ' + host.hostname)
2669 self.update_field('host_id', host.id)
2670 self.update_field('active', True)
2671 self.block_host(host.id)
2672 else:
2673 self.queue_log_record('Releasing host')
2674 self.unblock_host(self.host.id)
2675 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002676
jadmanski0afbb632008-06-06 21:10:57 +00002677 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002678
2679
jadmanski0afbb632008-06-06 21:10:57 +00002680 def get_host(self):
2681 return self.host
mbligh36768f02008-02-22 18:28:33 +00002682
2683
jadmanski0afbb632008-06-06 21:10:57 +00002684 def queue_log_record(self, log_line):
2685 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002686 _drone_manager.write_lines_to_file(self.queue_log_path,
2687 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002688
2689
jadmanski0afbb632008-06-06 21:10:57 +00002690 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002691 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002692 row = [0, self.job.id, host_id]
2693 block = IneligibleHostQueue(row=row, new_record=True)
2694 block.save()
mblighe2586682008-02-29 22:45:46 +00002695
2696
jadmanski0afbb632008-06-06 21:10:57 +00002697 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002698 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002699 blocks = IneligibleHostQueue.fetch(
2700 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2701 for block in blocks:
2702 block.delete()
mblighe2586682008-02-29 22:45:46 +00002703
2704
showard2bab8f42008-11-12 18:15:22 +00002705 def set_execution_subdir(self, subdir=None):
2706 if subdir is None:
2707 assert self.get_host()
2708 subdir = self.get_host().hostname
2709 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002710
2711
showard6355f6b2008-12-05 18:52:13 +00002712 def _get_hostname(self):
2713 if self.host:
2714 return self.host.hostname
2715 return 'no host'
2716
2717
showard170873e2009-01-07 00:22:26 +00002718 def __str__(self):
showard828fc4c2009-09-14 20:31:00 +00002719 flags = []
2720 if self.active:
2721 flags.append('active')
2722 if self.complete:
2723 flags.append('complete')
2724 if self.deleted:
2725 flags.append('deleted')
2726 if self.aborted:
2727 flags.append('aborted')
2728 flags_str = ','.join(flags)
2729 if flags_str:
2730 flags_str = ' [%s]' % flags_str
2731 return "%s/%d (%d) %s%s" % (self._get_hostname(), self.job.id, self.id,
2732 self.status, flags_str)
showard170873e2009-01-07 00:22:26 +00002733
2734
jadmanski0afbb632008-06-06 21:10:57 +00002735 def set_status(self, status):
showard56824072009-10-12 20:30:21 +00002736 logging.info("%s -> %s", self, status)
mblighf8c624d2008-07-03 16:58:45 +00002737
showard56824072009-10-12 20:30:21 +00002738 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002739
showard8cc058f2009-09-08 16:26:33 +00002740 if status in (models.HostQueueEntry.Status.QUEUED,
2741 models.HostQueueEntry.Status.PARSING):
jadmanski0afbb632008-06-06 21:10:57 +00002742 self.update_field('complete', False)
2743 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002744
showard8cc058f2009-09-08 16:26:33 +00002745 if status in (models.HostQueueEntry.Status.PENDING,
2746 models.HostQueueEntry.Status.RUNNING,
2747 models.HostQueueEntry.Status.VERIFYING,
2748 models.HostQueueEntry.Status.STARTING,
2749 models.HostQueueEntry.Status.GATHERING):
jadmanski0afbb632008-06-06 21:10:57 +00002750 self.update_field('complete', False)
2751 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002752
showard8cc058f2009-09-08 16:26:33 +00002753 if status in (models.HostQueueEntry.Status.FAILED,
2754 models.HostQueueEntry.Status.COMPLETED,
2755 models.HostQueueEntry.Status.STOPPED,
2756 models.HostQueueEntry.Status.ABORTED):
jadmanski0afbb632008-06-06 21:10:57 +00002757 self.update_field('complete', True)
2758 self.update_field('active', False)
showardf85a0b72009-10-07 20:48:45 +00002759 self._on_complete()
showardc85c21b2008-11-24 22:17:37 +00002760
2761 should_email_status = (status.lower() in _notify_email_statuses or
2762 'all' in _notify_email_statuses)
2763 if should_email_status:
2764 self._email_on_status(status)
2765
2766 self._email_on_job_complete()
2767
2768
showardf85a0b72009-10-07 20:48:45 +00002769 def _on_complete(self):
2770 if not self.execution_subdir:
2771 return
2772 # unregister any possible pidfiles associated with this queue entry
2773 for pidfile_name in _ALL_PIDFILE_NAMES:
2774 pidfile_id = _drone_manager.get_pidfile_id_from(
2775 self.execution_path(), pidfile_name=pidfile_name)
2776 _drone_manager.unregister_pidfile(pidfile_id)
2777
2778
showardc85c21b2008-11-24 22:17:37 +00002779 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002780 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002781
2782 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2783 self.job.id, self.job.name, hostname, status)
2784 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2785 self.job.id, self.job.name, hostname, status,
2786 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002787 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002788
2789
2790 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002791 if not self.job.is_finished():
2792 return
showard542e8402008-09-19 20:16:18 +00002793
showardc85c21b2008-11-24 22:17:37 +00002794 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002795 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002796 for queue_entry in hosts_queue:
2797 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002798 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002799 queue_entry.status))
2800
2801 summary_text = "\n".join(summary_text)
2802 status_counts = models.Job.objects.get_status_counts(
2803 [self.job.id])[self.job.id]
2804 status = ', '.join('%d %s' % (count, status) for status, count
2805 in status_counts.iteritems())
2806
2807 subject = 'Autotest: Job ID: %s "%s" %s' % (
2808 self.job.id, self.job.name, status)
2809 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2810 self.job.id, self.job.name, status, self._view_job_url(),
2811 summary_text)
showard170873e2009-01-07 00:22:26 +00002812 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002813
2814
showard8cc058f2009-09-08 16:26:33 +00002815 def schedule_pre_job_tasks(self, assigned_host=None):
showard77182562009-06-10 00:16:05 +00002816 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002817 assert assigned_host
2818 # ensure results dir exists for the queue log
showard08356c12009-06-15 20:24:01 +00002819 if self.host_id is None:
2820 self.set_host(assigned_host)
2821 else:
2822 assert assigned_host.id == self.host_id
mbligh36768f02008-02-22 18:28:33 +00002823
showardcfd4a7e2009-07-11 01:47:33 +00002824 logging.info("%s/%s/%s scheduled on %s, status=%s",
showardb18134f2009-03-20 20:52:18 +00002825 self.job.name, self.meta_host, self.atomic_group_id,
2826 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002827
showard8cc058f2009-09-08 16:26:33 +00002828 self._do_schedule_pre_job_tasks()
showard77182562009-06-10 00:16:05 +00002829
2830
showard8cc058f2009-09-08 16:26:33 +00002831 def _do_schedule_pre_job_tasks(self):
showard77182562009-06-10 00:16:05 +00002832 # Every host goes thru the Verifying stage (which may or may not
2833 # actually do anything as determined by get_pre_job_tasks).
2834 self.set_status(models.HostQueueEntry.Status.VERIFYING)
showard8cc058f2009-09-08 16:26:33 +00002835 self.job.schedule_pre_job_tasks(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002836
showard6ae5ea92009-02-25 00:11:51 +00002837
jadmanski0afbb632008-06-06 21:10:57 +00002838 def requeue(self):
showardcfd4a7e2009-07-11 01:47:33 +00002839 assert self.host
showard8cc058f2009-09-08 16:26:33 +00002840 self.set_status(models.HostQueueEntry.Status.QUEUED)
showard12f3e322009-05-13 21:27:42 +00002841 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002842 # verify/cleanup failure sets the execution subdir, so reset it here
2843 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002844 if self.meta_host:
2845 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002846
2847
jadmanski0afbb632008-06-06 21:10:57 +00002848 def handle_host_failure(self):
2849 """\
2850 Called when this queue entry's host has failed verification and
2851 repair.
2852 """
2853 assert not self.meta_host
showard8cc058f2009-09-08 16:26:33 +00002854 self.set_status(models.HostQueueEntry.Status.FAILED)
showard2bab8f42008-11-12 18:15:22 +00002855 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002856
2857
jadmanskif7fa2cc2008-10-01 14:13:23 +00002858 @property
2859 def aborted_by(self):
2860 self._load_abort_info()
2861 return self._aborted_by
2862
2863
2864 @property
2865 def aborted_on(self):
2866 self._load_abort_info()
2867 return self._aborted_on
2868
2869
2870 def _load_abort_info(self):
2871 """ Fetch info about who aborted the job. """
2872 if hasattr(self, "_aborted_by"):
2873 return
2874 rows = _db.execute("""
2875 SELECT users.login, aborted_host_queue_entries.aborted_on
2876 FROM aborted_host_queue_entries
2877 INNER JOIN users
2878 ON users.id = aborted_host_queue_entries.aborted_by_id
2879 WHERE aborted_host_queue_entries.queue_entry_id = %s
2880 """, (self.id,))
2881 if rows:
2882 self._aborted_by, self._aborted_on = rows[0]
2883 else:
2884 self._aborted_by = self._aborted_on = None
2885
2886
showardb2e2c322008-10-14 17:33:55 +00002887 def on_pending(self):
2888 """
2889 Called when an entry in a synchronous job has passed verify. If the
showard8cc058f2009-09-08 16:26:33 +00002890 job is ready to run, sets the entries to STARTING. Otherwise, it leaves
2891 them in PENDING.
showardb2e2c322008-10-14 17:33:55 +00002892 """
showard8cc058f2009-09-08 16:26:33 +00002893 self.set_status(models.HostQueueEntry.Status.PENDING)
2894 self.host.set_status(models.Host.Status.PENDING)
showardb000a8d2009-07-28 20:02:07 +00002895
2896 # Some debug code here: sends an email if an asynchronous job does not
2897 # immediately enter Starting.
2898 # TODO: Remove this once we figure out why asynchronous jobs are getting
2899 # stuck in Pending.
showard8cc058f2009-09-08 16:26:33 +00002900 self.job.run_if_ready(queue_entry=self)
2901 if (self.job.synch_count == 1 and
2902 self.status == models.HostQueueEntry.Status.PENDING):
showardb000a8d2009-07-28 20:02:07 +00002903 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
2904 message = 'Asynchronous job stuck in Pending'
2905 email_manager.manager.enqueue_notify_email(subject, message)
showardb2e2c322008-10-14 17:33:55 +00002906
2907
showardd3dc1992009-04-22 21:01:40 +00002908 def abort(self, dispatcher):
2909 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002910
showardd3dc1992009-04-22 21:01:40 +00002911 Status = models.HostQueueEntry.Status
showard8cc058f2009-09-08 16:26:33 +00002912 if self.status in (Status.GATHERING, Status.PARSING):
showardd3dc1992009-04-22 21:01:40 +00002913 # do nothing; post-job tasks will finish and then mark this entry
2914 # with status "Aborted" and take care of the host
2915 return
2916
showard8cc058f2009-09-08 16:26:33 +00002917 if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING):
2918 assert not dispatcher.get_agents_for_entry(self)
showardd3dc1992009-04-22 21:01:40 +00002919 self.host.set_status(models.Host.Status.READY)
2920 elif self.status == Status.VERIFYING:
showard8cc058f2009-09-08 16:26:33 +00002921 models.SpecialTask.objects.create(
2922 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +00002923 host=models.Host.objects.get(id=self.host.id),
2924 requested_by=self.job.owner_model())
showardd3dc1992009-04-22 21:01:40 +00002925
2926 self.set_status(Status.ABORTED)
showardd2014822009-10-12 20:26:58 +00002927 self.job.abort_delay_ready_task()
showard170873e2009-01-07 00:22:26 +00002928
showard8cc058f2009-09-08 16:26:33 +00002929
2930 def get_group_name(self):
2931 atomic_group = self.atomic_group
2932 if not atomic_group:
2933 return ''
2934
2935 # Look at any meta_host and dependency labels and pick the first
2936 # one that also specifies this atomic group. Use that label name
2937 # as the group name if possible (it is more specific).
2938 for label in self.get_labels():
2939 if label.atomic_group_id:
2940 assert label.atomic_group_id == atomic_group.id
2941 return label.name
2942 return atomic_group.name
2943
2944
showard170873e2009-01-07 00:22:26 +00002945 def execution_tag(self):
2946 assert self.execution_subdir
mblighe7d9c602009-07-02 19:02:33 +00002947 return "%s/%s" % (self.job.tag(), self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002948
2949
showarded2afea2009-07-07 20:54:07 +00002950 def execution_path(self):
2951 return self.execution_tag()
2952
2953
mbligh36768f02008-02-22 18:28:33 +00002954class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002955 _table_name = 'jobs'
2956 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2957 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002958 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002959 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002960
showard77182562009-06-10 00:16:05 +00002961 # This does not need to be a column in the DB. The delays are likely to
2962 # be configured short. If the scheduler is stopped and restarted in
2963 # the middle of a job's delay cycle, the delay cycle will either be
2964 # repeated or skipped depending on the number of Pending machines found
2965 # when the restarted scheduler recovers to track it. Not a problem.
2966 #
2967 # A reference to the DelayedCallTask that will wake up the job should
2968 # no other HQEs change state in time. Its end_time attribute is used
2969 # by our run_with_ready_delay() method to determine if the wait is over.
2970 _delay_ready_task = None
2971
2972 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
2973 # all status='Pending' atomic group HQEs incase a delay was running when the
2974 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00002975
showarda3c58572009-03-12 20:36:59 +00002976 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002977 assert id or row
showarda3c58572009-03-12 20:36:59 +00002978 super(Job, self).__init__(id=id, row=row, **kwargs)
showard9bb960b2009-11-19 01:02:11 +00002979 self._owner_model = None # caches model instance of owner
2980
2981
2982 def owner_model(self):
2983 # work around the fact that the Job owner field is a string, not a
2984 # foreign key
2985 if not self._owner_model:
2986 self._owner_model = models.User.objects.get(login=self.owner)
2987 return self._owner_model
mbligh36768f02008-02-22 18:28:33 +00002988
mblighe2586682008-02-29 22:45:46 +00002989
jadmanski0afbb632008-06-06 21:10:57 +00002990 def is_server_job(self):
2991 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002992
2993
showard170873e2009-01-07 00:22:26 +00002994 def tag(self):
2995 return "%s-%s" % (self.id, self.owner)
2996
2997
jadmanski0afbb632008-06-06 21:10:57 +00002998 def get_host_queue_entries(self):
2999 rows = _db.execute("""
3000 SELECT * FROM host_queue_entries
3001 WHERE job_id= %s
3002 """, (self.id,))
3003 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00003004
jadmanski0afbb632008-06-06 21:10:57 +00003005 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00003006
jadmanski0afbb632008-06-06 21:10:57 +00003007 return entries
mbligh36768f02008-02-22 18:28:33 +00003008
3009
jadmanski0afbb632008-06-06 21:10:57 +00003010 def set_status(self, status, update_queues=False):
3011 self.update_field('status',status)
3012
3013 if update_queues:
3014 for queue_entry in self.get_host_queue_entries():
3015 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00003016
3017
showard77182562009-06-10 00:16:05 +00003018 def _atomic_and_has_started(self):
3019 """
3020 @returns True if any of the HostQueueEntries associated with this job
3021 have entered the Status.STARTING state or beyond.
3022 """
3023 atomic_entries = models.HostQueueEntry.objects.filter(
3024 job=self.id, atomic_group__isnull=False)
3025 if atomic_entries.count() <= 0:
3026 return False
3027
showardaf8b4ca2009-06-16 18:47:26 +00003028 # These states may *only* be reached if Job.run() has been called.
3029 started_statuses = (models.HostQueueEntry.Status.STARTING,
3030 models.HostQueueEntry.Status.RUNNING,
3031 models.HostQueueEntry.Status.COMPLETED)
3032
3033 started_entries = atomic_entries.filter(status__in=started_statuses)
showard77182562009-06-10 00:16:05 +00003034 return started_entries.count() > 0
3035
3036
showard708b3522009-08-20 23:26:15 +00003037 def _hosts_assigned_count(self):
3038 """The number of HostQueueEntries assigned a Host for this job."""
3039 entries = models.HostQueueEntry.objects.filter(job=self.id,
3040 host__isnull=False)
3041 return entries.count()
3042
3043
showard77182562009-06-10 00:16:05 +00003044 def _pending_count(self):
3045 """The number of HostQueueEntries for this job in the Pending state."""
3046 pending_entries = models.HostQueueEntry.objects.filter(
3047 job=self.id, status=models.HostQueueEntry.Status.PENDING)
3048 return pending_entries.count()
3049
3050
showardd2014822009-10-12 20:26:58 +00003051 def _pending_threshold(self, atomic_group):
3052 """
3053 @param atomic_group: The AtomicGroup associated with this job that we
3054 are using to bound the threshold.
3055 @returns The minimum number of HostQueueEntries assigned a Host before
3056 this job can run.
3057 """
3058 return min(self._hosts_assigned_count(),
3059 atomic_group.max_number_of_machines)
3060
3061
jadmanski0afbb632008-06-06 21:10:57 +00003062 def is_ready(self):
showard77182562009-06-10 00:16:05 +00003063 # NOTE: Atomic group jobs stop reporting ready after they have been
3064 # started to avoid launching multiple copies of one atomic job.
3065 # Only possible if synch_count is less than than half the number of
3066 # machines in the atomic group.
showardb000a8d2009-07-28 20:02:07 +00003067 pending_count = self._pending_count()
3068 atomic_and_has_started = self._atomic_and_has_started()
3069 ready = (pending_count >= self.synch_count
showardd2014822009-10-12 20:26:58 +00003070 and not atomic_and_has_started)
showardb000a8d2009-07-28 20:02:07 +00003071
3072 if not ready:
3073 logging.info(
3074 'Job %s not ready: %s pending, %s required '
3075 '(Atomic and started: %s)',
3076 self, pending_count, self.synch_count,
3077 atomic_and_has_started)
3078
3079 return ready
mbligh36768f02008-02-22 18:28:33 +00003080
3081
jadmanski0afbb632008-06-06 21:10:57 +00003082 def num_machines(self, clause = None):
3083 sql = "job_id=%s" % self.id
3084 if clause:
3085 sql += " AND (%s)" % clause
3086 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00003087
3088
jadmanski0afbb632008-06-06 21:10:57 +00003089 def num_queued(self):
3090 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00003091
3092
jadmanski0afbb632008-06-06 21:10:57 +00003093 def num_active(self):
3094 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00003095
3096
jadmanski0afbb632008-06-06 21:10:57 +00003097 def num_complete(self):
3098 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00003099
3100
jadmanski0afbb632008-06-06 21:10:57 +00003101 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00003102 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00003103
mbligh36768f02008-02-22 18:28:33 +00003104
showard6bb7c292009-01-30 01:44:51 +00003105 def _not_yet_run_entries(self, include_verifying=True):
3106 statuses = [models.HostQueueEntry.Status.QUEUED,
3107 models.HostQueueEntry.Status.PENDING]
3108 if include_verifying:
3109 statuses.append(models.HostQueueEntry.Status.VERIFYING)
3110 return models.HostQueueEntry.objects.filter(job=self.id,
3111 status__in=statuses)
3112
3113
3114 def _stop_all_entries(self):
3115 entries_to_stop = self._not_yet_run_entries(
3116 include_verifying=False)
3117 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00003118 assert not child_entry.complete, (
3119 '%s status=%s, active=%s, complete=%s' %
3120 (child_entry.id, child_entry.status, child_entry.active,
3121 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00003122 if child_entry.status == models.HostQueueEntry.Status.PENDING:
3123 child_entry.host.status = models.Host.Status.READY
3124 child_entry.host.save()
3125 child_entry.status = models.HostQueueEntry.Status.STOPPED
3126 child_entry.save()
3127
showard2bab8f42008-11-12 18:15:22 +00003128 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00003129 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00003130 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00003131 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00003132
3133
jadmanski0afbb632008-06-06 21:10:57 +00003134 def write_to_machines_file(self, queue_entry):
3135 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00003136 file_path = os.path.join(self.tag(), '.machines')
3137 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00003138
3139
showardf1ae3542009-05-11 19:26:02 +00003140 def _next_group_name(self, group_name=''):
3141 """@returns a directory name to use for the next host group results."""
3142 if group_name:
3143 # Sanitize for use as a pathname.
3144 group_name = group_name.replace(os.path.sep, '_')
3145 if group_name.startswith('.'):
3146 group_name = '_' + group_name[1:]
3147 # Add a separator between the group name and 'group%d'.
3148 group_name += '.'
3149 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00003150 query = models.HostQueueEntry.objects.filter(
3151 job=self.id).values('execution_subdir').distinct()
3152 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00003153 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
3154 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00003155 if ids:
3156 next_id = max(ids) + 1
3157 else:
3158 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00003159 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00003160
3161
showarddb502762009-09-09 15:31:20 +00003162 def _write_control_file(self, execution_path):
showard170873e2009-01-07 00:22:26 +00003163 control_path = _drone_manager.attach_file_to_execution(
showarddb502762009-09-09 15:31:20 +00003164 execution_path, self.control_file)
showard170873e2009-01-07 00:22:26 +00003165 return control_path
mbligh36768f02008-02-22 18:28:33 +00003166
showardb2e2c322008-10-14 17:33:55 +00003167
showard2bab8f42008-11-12 18:15:22 +00003168 def get_group_entries(self, queue_entry_from_group):
showard8375ce02009-10-12 20:35:13 +00003169 """
3170 @param queue_entry_from_group: A HostQueueEntry instance to find other
3171 group entries on this job for.
3172
3173 @returns A list of HostQueueEntry objects all executing this job as
3174 part of the same group as the one supplied (having the same
3175 execution_subdir).
3176 """
showard2bab8f42008-11-12 18:15:22 +00003177 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00003178 return list(HostQueueEntry.fetch(
3179 where='job_id=%s AND execution_subdir=%s',
3180 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00003181
3182
showard8cc058f2009-09-08 16:26:33 +00003183 def get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00003184 assert queue_entries
showarddb502762009-09-09 15:31:20 +00003185 execution_path = queue_entries[0].execution_path()
3186 control_path = self._write_control_file(execution_path)
jadmanski0afbb632008-06-06 21:10:57 +00003187 hostnames = ','.join([entry.get_host().hostname
3188 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00003189
showarddb502762009-09-09 15:31:20 +00003190 execution_tag = queue_entries[0].execution_tag()
showard87ba02a2009-04-20 19:37:32 +00003191 params = _autoserv_command_line(
showarded2afea2009-07-07 20:54:07 +00003192 hostnames,
showard87ba02a2009-04-20 19:37:32 +00003193 ['-P', execution_tag, '-n',
3194 _drone_manager.absolute_path(control_path)],
showarde9c69362009-06-30 01:58:03 +00003195 job=self, verbose=False)
mbligh36768f02008-02-22 18:28:33 +00003196
jadmanski0afbb632008-06-06 21:10:57 +00003197 if not self.is_server_job():
3198 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00003199
showardb2e2c322008-10-14 17:33:55 +00003200 return params
mblighe2586682008-02-29 22:45:46 +00003201
mbligh36768f02008-02-22 18:28:33 +00003202
showardc9ae1782009-01-30 01:42:37 +00003203 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00003204 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00003205 return True
showard0fc38302008-10-23 00:44:07 +00003206 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00003207 return queue_entry.get_host().dirty
3208 return False
showard21baa452008-10-21 00:08:39 +00003209
showardc9ae1782009-01-30 01:42:37 +00003210
showard8cc058f2009-09-08 16:26:33 +00003211 def _should_run_verify(self, queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003212 do_not_verify = (queue_entry.host.protection ==
3213 host_protections.Protection.DO_NOT_VERIFY)
3214 if do_not_verify:
3215 return False
3216 return self.run_verify
3217
3218
showard8cc058f2009-09-08 16:26:33 +00003219 def schedule_pre_job_tasks(self, queue_entry):
showard77182562009-06-10 00:16:05 +00003220 """
3221 Get a list of tasks to perform before the host_queue_entry
3222 may be used to run this Job (such as Cleanup & Verify).
3223
3224 @returns A list of tasks to be done to the given queue_entry before
mbligh6fbdb802009-08-03 16:42:55 +00003225 it should be considered be ready to run this job. The last
showard77182562009-06-10 00:16:05 +00003226 task in the list calls HostQueueEntry.on_pending(), which
3227 continues the flow of the job.
3228 """
showardc9ae1782009-01-30 01:42:37 +00003229 if self._should_run_cleanup(queue_entry):
showard8cc058f2009-09-08 16:26:33 +00003230 task = models.SpecialTask.Task.CLEANUP
3231 elif self._should_run_verify(queue_entry):
3232 task = models.SpecialTask.Task.VERIFY
3233 else:
3234 queue_entry.on_pending()
3235 return
3236
showard9bb960b2009-11-19 01:02:11 +00003237 queue_entry = models.HostQueueEntry.objects.get(id=queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00003238 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00003239 host=models.Host.objects.get(id=queue_entry.host_id),
3240 queue_entry=queue_entry, task=task)
showard21baa452008-10-21 00:08:39 +00003241
3242
showardf1ae3542009-05-11 19:26:02 +00003243 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00003244 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00003245 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00003246 else:
showardf1ae3542009-05-11 19:26:02 +00003247 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00003248 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00003249 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00003250 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003251
3252 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00003253 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003254
3255
3256 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00003257 """
3258 @returns A tuple containing a list of HostQueueEntry instances to be
3259 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00003260 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00003261 """
showard77182562009-06-10 00:16:05 +00003262 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00003263 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00003264 if atomic_group:
3265 num_entries_wanted = atomic_group.max_number_of_machines
3266 else:
3267 num_entries_wanted = self.synch_count
3268 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00003269
showardf1ae3542009-05-11 19:26:02 +00003270 if num_entries_wanted > 0:
3271 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00003272 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00003273 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00003274 params=(self.id, include_queue_entry.id)))
3275
3276 # Sort the chosen hosts by hostname before slicing.
3277 def cmp_queue_entries_by_hostname(entry_a, entry_b):
3278 return Host.cmp_for_sort(entry_a.host, entry_b.host)
3279 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
3280 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00003281
showardf1ae3542009-05-11 19:26:02 +00003282 # Sanity check. We'll only ever be called if this can be met.
showard828fc4c2009-09-14 20:31:00 +00003283 if len(chosen_entries) < self.synch_count:
3284 message = ('job %s got less than %s chosen entries: %s' % (
3285 self.id, self.synch_count, chosen_entries))
3286 logging.error(message)
3287 email_manager.manager.enqueue_notify_email(
3288 'Job not started, too few chosen entries', message)
3289 return []
showardf1ae3542009-05-11 19:26:02 +00003290
showard8cc058f2009-09-08 16:26:33 +00003291 group_name = include_queue_entry.get_group_name()
showardf1ae3542009-05-11 19:26:02 +00003292
3293 self._assign_new_group(chosen_entries, group_name=group_name)
showard8cc058f2009-09-08 16:26:33 +00003294 return chosen_entries
showard2bab8f42008-11-12 18:15:22 +00003295
3296
showard77182562009-06-10 00:16:05 +00003297 def run_if_ready(self, queue_entry):
3298 """
showard8375ce02009-10-12 20:35:13 +00003299 Run this job by kicking its HQEs into status='Starting' if enough
3300 hosts are ready for it to run.
3301
3302 Cleans up by kicking HQEs into status='Stopped' if this Job is not
3303 ready to run.
showard77182562009-06-10 00:16:05 +00003304 """
showardb2e2c322008-10-14 17:33:55 +00003305 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00003306 self.stop_if_necessary()
showard8cc058f2009-09-08 16:26:33 +00003307 elif queue_entry.atomic_group:
3308 self.run_with_ready_delay(queue_entry)
3309 else:
3310 self.run(queue_entry)
showard77182562009-06-10 00:16:05 +00003311
3312
3313 def run_with_ready_delay(self, queue_entry):
3314 """
3315 Start a delay to wait for more hosts to enter Pending state before
3316 launching an atomic group job. Once set, the a delay cannot be reset.
3317
3318 @param queue_entry: The HostQueueEntry object to get atomic group
3319 info from and pass to run_if_ready when the delay is up.
3320
3321 @returns An Agent to run the job as appropriate or None if a delay
3322 has already been set.
3323 """
3324 assert queue_entry.job_id == self.id
3325 assert queue_entry.atomic_group
3326 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
showardd2014822009-10-12 20:26:58 +00003327 over_max_threshold = (self._pending_count() >=
3328 self._pending_threshold(queue_entry.atomic_group))
showard77182562009-06-10 00:16:05 +00003329 delay_expired = (self._delay_ready_task and
3330 time.time() >= self._delay_ready_task.end_time)
3331
3332 # Delay is disabled or we already have enough? Do not wait to run.
3333 if not delay or over_max_threshold or delay_expired:
showard8cc058f2009-09-08 16:26:33 +00003334 self.run(queue_entry)
3335 else:
3336 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
showard77182562009-06-10 00:16:05 +00003337
showard8cc058f2009-09-08 16:26:33 +00003338
3339 def schedule_delayed_callback_task(self, queue_entry):
3340 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
3341
showard77182562009-06-10 00:16:05 +00003342 if self._delay_ready_task:
3343 return None
3344
showard8cc058f2009-09-08 16:26:33 +00003345 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
3346
showard77182562009-06-10 00:16:05 +00003347 def run_job_after_delay():
showardd2014822009-10-12 20:26:58 +00003348 logging.info('Job %s done waiting for extra hosts.', self)
3349 # Check to see if the job is still relevant. It could have aborted
3350 # while we were waiting or hosts could have disappearred, etc.
3351 threshold = self._pending_threshold(queue_entry.atomic_group)
3352 if self._pending_count() < threshold:
3353 logging.info('Job %s had too few Pending hosts after waiting '
3354 'for extras. Not running.', self)
3355 return
showard77182562009-06-10 00:16:05 +00003356 return self.run(queue_entry)
3357
showard708b3522009-08-20 23:26:15 +00003358 logging.info('Job %s waiting up to %s seconds for more hosts.',
3359 self.id, delay)
showard77182562009-06-10 00:16:05 +00003360 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3361 callback=run_job_after_delay)
showard8cc058f2009-09-08 16:26:33 +00003362 return self._delay_ready_task
showard77182562009-06-10 00:16:05 +00003363
3364
3365 def run(self, queue_entry):
3366 """
3367 @param queue_entry: The HostQueueEntry instance calling this method.
showard77182562009-06-10 00:16:05 +00003368 """
3369 if queue_entry.atomic_group and self._atomic_and_has_started():
3370 logging.error('Job.run() called on running atomic Job %d '
3371 'with HQE %s.', self.id, queue_entry)
showard8cc058f2009-09-08 16:26:33 +00003372 return
3373 queue_entries = self._choose_group_to_run(queue_entry)
showard828fc4c2009-09-14 20:31:00 +00003374 if queue_entries:
3375 self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00003376
3377
showard8cc058f2009-09-08 16:26:33 +00003378 def _finish_run(self, queue_entries):
showardb2ccdda2008-10-28 20:39:05 +00003379 for queue_entry in queue_entries:
showard8cc058f2009-09-08 16:26:33 +00003380 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showardd2014822009-10-12 20:26:58 +00003381 self.abort_delay_ready_task()
3382
3383
3384 def abort_delay_ready_task(self):
3385 """Abort the delayed task associated with this job, if any."""
showard77182562009-06-10 00:16:05 +00003386 if self._delay_ready_task:
3387 # Cancel any pending callback that would try to run again
3388 # as we are already running.
3389 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003390
showardd2014822009-10-12 20:26:58 +00003391
showardb000a8d2009-07-28 20:02:07 +00003392 def __str__(self):
3393 return '%s-%s' % (self.id, self.owner)
3394
3395
mbligh36768f02008-02-22 18:28:33 +00003396if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003397 main()