blob: 8b11026132a22db16d28e8ed7516a2c276626bdb [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showard136e6dc2009-06-10 19:38:49 +000010import itertools, logging, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard043c62a2009-06-10 19:48:57 +000013from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard136e6dc2009-06-10 19:38:49 +000015from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000016from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000017from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000018from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000019from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000020from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000021from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000022
showard549afad2009-08-20 23:33:36 +000023BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
24PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000025
mbligh36768f02008-02-22 18:28:33 +000026RESULTS_DIR = '.'
27AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000028DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000029AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
30
31if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000032 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000033AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
34AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
35
36if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000037 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000038
showardd3dc1992009-04-22 21:01:40 +000039_AUTOSERV_PID_FILE = '.autoserv_execute'
40_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
41_PARSER_PID_FILE = '.parser_execute'
42
43_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
44 _PARSER_PID_FILE)
45
showard35162b02009-03-03 02:17:30 +000046# error message to leave in results dir when an autoserv process disappears
47# mysteriously
48_LOST_PROCESS_ERROR = """\
49Autoserv failed abnormally during execution for this job, probably due to a
50system error on the Autotest server. Full results may not be available. Sorry.
51"""
52
mbligh6f8bab42008-02-29 22:45:14 +000053_db = None
mbligh36768f02008-02-22 18:28:33 +000054_shutdown = False
showard170873e2009-01-07 00:22:26 +000055_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
56_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000057_testing_mode = False
showard542e8402008-09-19 20:16:18 +000058_base_url = None
showardc85c21b2008-11-24 22:17:37 +000059_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000060_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000061
62
showardec6a3b92009-09-25 20:29:13 +000063def _get_pidfile_timeout_secs():
64 """@returns How long to wait for autoserv to write pidfile."""
65 pidfile_timeout_mins = global_config.global_config.get_config_value(
66 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
67 return pidfile_timeout_mins * 60
68
69
mbligh83c1e9e2009-05-01 23:10:41 +000070def _site_init_monitor_db_dummy():
71 return {}
72
73
mbligh36768f02008-02-22 18:28:33 +000074def main():
showard27f33872009-04-07 18:20:53 +000075 try:
showard549afad2009-08-20 23:33:36 +000076 try:
77 main_without_exception_handling()
78 except SystemExit:
79 raise
80 except:
81 logging.exception('Exception escaping in monitor_db')
82 raise
83 finally:
84 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000085
86
87def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000088 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000089
showard136e6dc2009-06-10 19:38:49 +000090 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000091 parser = optparse.OptionParser(usage)
92 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
93 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000094 parser.add_option('--test', help='Indicate that scheduler is under ' +
95 'test and should use dummy autoserv and no parsing',
96 action='store_true')
97 (options, args) = parser.parse_args()
98 if len(args) != 1:
99 parser.print_usage()
100 return
mbligh36768f02008-02-22 18:28:33 +0000101
showard5613c662009-06-08 23:30:33 +0000102 scheduler_enabled = global_config.global_config.get_config_value(
103 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
104
105 if not scheduler_enabled:
106 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
107 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000108 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000109 sys.exit(1)
110
jadmanski0afbb632008-06-06 21:10:57 +0000111 global RESULTS_DIR
112 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000113
mbligh83c1e9e2009-05-01 23:10:41 +0000114 site_init = utils.import_site_function(__file__,
115 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
116 _site_init_monitor_db_dummy)
117 site_init()
118
showardcca334f2009-03-12 20:38:34 +0000119 # Change the cwd while running to avoid issues incase we were launched from
120 # somewhere odd (such as a random NFS home directory of the person running
121 # sudo to launch us as the appropriate user).
122 os.chdir(RESULTS_DIR)
123
jadmanski0afbb632008-06-06 21:10:57 +0000124 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000125 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
126 "notify_email_statuses",
127 default='')
showardc85c21b2008-11-24 22:17:37 +0000128 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000129 _notify_email_statuses = [status for status in
130 re.split(r'[\s,;:]', notify_statuses_list.lower())
131 if status]
showardc85c21b2008-11-24 22:17:37 +0000132
jadmanski0afbb632008-06-06 21:10:57 +0000133 if options.test:
134 global _autoserv_path
135 _autoserv_path = 'autoserv_dummy'
136 global _testing_mode
137 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000138
mbligh37eceaa2008-12-15 22:56:37 +0000139 # AUTOTEST_WEB.base_url is still a supported config option as some people
140 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000141 global _base_url
showard170873e2009-01-07 00:22:26 +0000142 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
143 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000144 if config_base_url:
145 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000146 else:
mbligh37eceaa2008-12-15 22:56:37 +0000147 # For the common case of everything running on a single server you
148 # can just set the hostname in a single place in the config file.
149 server_name = c.get_config_value('SERVER', 'hostname')
150 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000151 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000152 sys.exit(1)
153 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000154
showardc5afc462009-01-13 00:09:39 +0000155 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000156 server.start()
157
jadmanski0afbb632008-06-06 21:10:57 +0000158 try:
showard136e6dc2009-06-10 19:38:49 +0000159 init()
showardc5afc462009-01-13 00:09:39 +0000160 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000161 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000162
jadmanski0afbb632008-06-06 21:10:57 +0000163 while not _shutdown:
164 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000165 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000166 except:
showard170873e2009-01-07 00:22:26 +0000167 email_manager.manager.log_stacktrace(
168 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000169
showard170873e2009-01-07 00:22:26 +0000170 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000171 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000172 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000173 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000174
175
showard136e6dc2009-06-10 19:38:49 +0000176def setup_logging():
177 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
178 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
179 logging_manager.configure_logging(
180 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
181 logfile_name=log_name)
182
183
mbligh36768f02008-02-22 18:28:33 +0000184def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000185 global _shutdown
186 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000187 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000188
189
showard136e6dc2009-06-10 19:38:49 +0000190def init():
showardb18134f2009-03-20 20:52:18 +0000191 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
192 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000193
showard8de37132009-08-31 18:33:08 +0000194 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000195 logging.critical("monitor_db already running, aborting!")
196 sys.exit(1)
197 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000198
showardb1e51872008-10-07 11:08:18 +0000199 if _testing_mode:
200 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000201 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000202
jadmanski0afbb632008-06-06 21:10:57 +0000203 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
204 global _db
showard170873e2009-01-07 00:22:26 +0000205 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000206 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000207
showardfa8629c2008-11-04 16:51:23 +0000208 # ensure Django connection is in autocommit
209 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000210 # bypass the readonly connection
211 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000212
showardb18134f2009-03-20 20:52:18 +0000213 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000214 signal.signal(signal.SIGINT, handle_sigint)
215
showardd1ee1dd2009-01-07 21:33:08 +0000216 drones = global_config.global_config.get_config_value(
217 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
218 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000219 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000220 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000221 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
222
showardb18134f2009-03-20 20:52:18 +0000223 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000224
225
showarded2afea2009-07-07 20:54:07 +0000226def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
227 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000228 """
229 @returns The autoserv command line as a list of executable + parameters.
230
231 @param machines - string - A machine or comma separated list of machines
232 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000233 @param extra_args - list - Additional arguments to pass to autoserv.
234 @param job - Job object - If supplied, -u owner and -l name parameters
235 will be added.
236 @param queue_entry - A HostQueueEntry object - If supplied and no Job
237 object was supplied, this will be used to lookup the Job object.
238 """
showard87ba02a2009-04-20 19:37:32 +0000239 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
showarded2afea2009-07-07 20:54:07 +0000240 '-r', drone_manager.WORKING_DIRECTORY]
showard87ba02a2009-04-20 19:37:32 +0000241 if job or queue_entry:
242 if not job:
243 job = queue_entry.job
244 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000245 if verbose:
246 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000247 return autoserv_argv + extra_args
248
249
showard89f84db2009-03-12 20:39:13 +0000250class SchedulerError(Exception):
251 """Raised by HostScheduler when an inconsistent state occurs."""
252
253
showard63a34772008-08-18 19:32:50 +0000254class HostScheduler(object):
255 def _get_ready_hosts(self):
256 # avoid any host with a currently active queue entry against it
257 hosts = Host.fetch(
258 joins='LEFT JOIN host_queue_entries AS active_hqe '
259 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000260 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000261 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000262 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000263 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
264 return dict((host.id, host) for host in hosts)
265
266
267 @staticmethod
268 def _get_sql_id_list(id_list):
269 return ','.join(str(item_id) for item_id in id_list)
270
271
272 @classmethod
showard989f25d2008-10-01 11:38:11 +0000273 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000274 if not id_list:
275 return {}
showard63a34772008-08-18 19:32:50 +0000276 query %= cls._get_sql_id_list(id_list)
277 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000278 return cls._process_many2many_dict(rows, flip)
279
280
281 @staticmethod
282 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000283 result = {}
284 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000285 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000286 if flip:
287 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000288 result.setdefault(left_id, set()).add(right_id)
289 return result
290
291
292 @classmethod
293 def _get_job_acl_groups(cls, job_ids):
294 query = """
showardd9ac4452009-02-07 02:04:37 +0000295 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000296 FROM jobs
297 INNER JOIN users ON users.login = jobs.owner
298 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
299 WHERE jobs.id IN (%s)
300 """
301 return cls._get_many2many_dict(query, job_ids)
302
303
304 @classmethod
305 def _get_job_ineligible_hosts(cls, job_ids):
306 query = """
307 SELECT job_id, host_id
308 FROM ineligible_host_queues
309 WHERE job_id IN (%s)
310 """
311 return cls._get_many2many_dict(query, job_ids)
312
313
314 @classmethod
showard989f25d2008-10-01 11:38:11 +0000315 def _get_job_dependencies(cls, job_ids):
316 query = """
317 SELECT job_id, label_id
318 FROM jobs_dependency_labels
319 WHERE job_id IN (%s)
320 """
321 return cls._get_many2many_dict(query, job_ids)
322
323
324 @classmethod
showard63a34772008-08-18 19:32:50 +0000325 def _get_host_acls(cls, host_ids):
326 query = """
showardd9ac4452009-02-07 02:04:37 +0000327 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000328 FROM acl_groups_hosts
329 WHERE host_id IN (%s)
330 """
331 return cls._get_many2many_dict(query, host_ids)
332
333
334 @classmethod
335 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000336 if not host_ids:
337 return {}, {}
showard63a34772008-08-18 19:32:50 +0000338 query = """
339 SELECT label_id, host_id
340 FROM hosts_labels
341 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000342 """ % cls._get_sql_id_list(host_ids)
343 rows = _db.execute(query)
344 labels_to_hosts = cls._process_many2many_dict(rows)
345 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
346 return labels_to_hosts, hosts_to_labels
347
348
349 @classmethod
350 def _get_labels(cls):
351 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000352
353
354 def refresh(self, pending_queue_entries):
355 self._hosts_available = self._get_ready_hosts()
356
357 relevant_jobs = [queue_entry.job_id
358 for queue_entry in pending_queue_entries]
359 self._job_acls = self._get_job_acl_groups(relevant_jobs)
360 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000361 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000362
363 host_ids = self._hosts_available.keys()
364 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000365 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
366
367 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000368
369
370 def _is_acl_accessible(self, host_id, queue_entry):
371 job_acls = self._job_acls.get(queue_entry.job_id, set())
372 host_acls = self._host_acls.get(host_id, set())
373 return len(host_acls.intersection(job_acls)) > 0
374
375
showard989f25d2008-10-01 11:38:11 +0000376 def _check_job_dependencies(self, job_dependencies, host_labels):
377 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000378 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000379
380
381 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
382 queue_entry):
showardade14e22009-01-26 22:38:32 +0000383 if not queue_entry.meta_host:
384 # bypass only_if_needed labels when a specific host is selected
385 return True
386
showard989f25d2008-10-01 11:38:11 +0000387 for label_id in host_labels:
388 label = self._labels[label_id]
389 if not label.only_if_needed:
390 # we don't care about non-only_if_needed labels
391 continue
392 if queue_entry.meta_host == label_id:
393 # if the label was requested in a metahost it's OK
394 continue
395 if label_id not in job_dependencies:
396 return False
397 return True
398
399
showard89f84db2009-03-12 20:39:13 +0000400 def _check_atomic_group_labels(self, host_labels, queue_entry):
401 """
402 Determine if the given HostQueueEntry's atomic group settings are okay
403 to schedule on a host with the given labels.
404
showard6157c632009-07-06 20:19:31 +0000405 @param host_labels: A list of label ids that the host has.
406 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000407
408 @returns True if atomic group settings are okay, False otherwise.
409 """
showard6157c632009-07-06 20:19:31 +0000410 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000411 queue_entry.atomic_group_id)
412
413
showard6157c632009-07-06 20:19:31 +0000414 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000415 """
416 Return the atomic group label id for a host with the given set of
417 labels if any, or None otherwise. Raises an exception if more than
418 one atomic group are found in the set of labels.
419
showard6157c632009-07-06 20:19:31 +0000420 @param host_labels: A list of label ids that the host has.
421 @param queue_entry: The HostQueueEntry we're testing. Only used for
422 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000423
424 @returns The id of the atomic group found on a label in host_labels
425 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000426 """
showard6157c632009-07-06 20:19:31 +0000427 atomic_labels = [self._labels[label_id] for label_id in host_labels
428 if self._labels[label_id].atomic_group_id is not None]
429 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000430 if not atomic_ids:
431 return None
432 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000433 logging.error('More than one Atomic Group on HQE "%s" via: %r',
434 queue_entry, atomic_labels)
435 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000436
437
438 def _get_atomic_group_labels(self, atomic_group_id):
439 """
440 Lookup the label ids that an atomic_group is associated with.
441
442 @param atomic_group_id - The id of the AtomicGroup to look up.
443
444 @returns A generator yeilding Label ids for this atomic group.
445 """
446 return (id for id, label in self._labels.iteritems()
447 if label.atomic_group_id == atomic_group_id
448 and not label.invalid)
449
450
showard54c1ea92009-05-20 00:32:58 +0000451 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000452 """
453 @param group_hosts - A sequence of Host ids to test for usability
454 and eligibility against the Job associated with queue_entry.
455 @param queue_entry - The HostQueueEntry that these hosts are being
456 tested for eligibility against.
457
458 @returns A subset of group_hosts Host ids that are eligible for the
459 supplied queue_entry.
460 """
461 return set(host_id for host_id in group_hosts
462 if self._is_host_usable(host_id)
463 and self._is_host_eligible_for_job(host_id, queue_entry))
464
465
showard989f25d2008-10-01 11:38:11 +0000466 def _is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000467 if self._is_host_invalid(host_id):
468 # if an invalid host is scheduled for a job, it's a one-time host
469 # and it therefore bypasses eligibility checks. note this can only
470 # happen for non-metahosts, because invalid hosts have their label
471 # relationships cleared.
472 return True
473
showard989f25d2008-10-01 11:38:11 +0000474 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
475 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000476
showard89f84db2009-03-12 20:39:13 +0000477 return (self._is_acl_accessible(host_id, queue_entry) and
478 self._check_job_dependencies(job_dependencies, host_labels) and
479 self._check_only_if_needed_labels(
480 job_dependencies, host_labels, queue_entry) and
481 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000482
483
showard2924b0a2009-06-18 23:16:15 +0000484 def _is_host_invalid(self, host_id):
485 host_object = self._hosts_available.get(host_id, None)
486 return host_object and host_object.invalid
487
488
showard63a34772008-08-18 19:32:50 +0000489 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000490 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000491 return None
492 return self._hosts_available.pop(queue_entry.host_id, None)
493
494
495 def _is_host_usable(self, host_id):
496 if host_id not in self._hosts_available:
497 # host was already used during this scheduling cycle
498 return False
499 if self._hosts_available[host_id].invalid:
500 # Invalid hosts cannot be used for metahosts. They're included in
501 # the original query because they can be used by non-metahosts.
502 return False
503 return True
504
505
506 def _schedule_metahost(self, queue_entry):
507 label_id = queue_entry.meta_host
508 hosts_in_label = self._label_hosts.get(label_id, set())
509 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
510 set())
511
512 # must iterate over a copy so we can mutate the original while iterating
513 for host_id in list(hosts_in_label):
514 if not self._is_host_usable(host_id):
515 hosts_in_label.remove(host_id)
516 continue
517 if host_id in ineligible_host_ids:
518 continue
showard989f25d2008-10-01 11:38:11 +0000519 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000520 continue
521
showard89f84db2009-03-12 20:39:13 +0000522 # Remove the host from our cached internal state before returning
523 # the host object.
showard63a34772008-08-18 19:32:50 +0000524 hosts_in_label.remove(host_id)
525 return self._hosts_available.pop(host_id)
526 return None
527
528
529 def find_eligible_host(self, queue_entry):
530 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000531 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000532 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000533 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000534 return self._schedule_metahost(queue_entry)
535
536
showard89f84db2009-03-12 20:39:13 +0000537 def find_eligible_atomic_group(self, queue_entry):
538 """
539 Given an atomic group host queue entry, locate an appropriate group
540 of hosts for the associated job to run on.
541
542 The caller is responsible for creating new HQEs for the additional
543 hosts returned in order to run the actual job on them.
544
545 @returns A list of Host instances in a ready state to satisfy this
546 atomic group scheduling. Hosts will all belong to the same
547 atomic group label as specified by the queue_entry.
548 An empty list will be returned if no suitable atomic
549 group could be found.
550
551 TODO(gps): what is responsible for kicking off any attempted repairs on
552 a group of hosts? not this function, but something needs to. We do
553 not communicate that reason for returning [] outside of here...
554 For now, we'll just be unschedulable if enough hosts within one group
555 enter Repair Failed state.
556 """
557 assert queue_entry.atomic_group_id is not None
558 job = queue_entry.job
559 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000560 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000561 if job.synch_count > atomic_group.max_number_of_machines:
562 # Such a Job and HostQueueEntry should never be possible to
563 # create using the frontend. Regardless, we can't process it.
564 # Abort it immediately and log an error on the scheduler.
565 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000566 logging.error(
567 'Error: job %d synch_count=%d > requested atomic_group %d '
568 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
569 job.id, job.synch_count, atomic_group.id,
570 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000571 return []
572 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
573 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
574 set())
575
576 # Look in each label associated with atomic_group until we find one with
577 # enough hosts to satisfy the job.
578 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
579 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
580 if queue_entry.meta_host is not None:
581 # If we have a metahost label, only allow its hosts.
582 group_hosts.intersection_update(hosts_in_label)
583 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000584 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000585 group_hosts, queue_entry)
586
587 # Job.synch_count is treated as "minimum synch count" when
588 # scheduling for an atomic group of hosts. The atomic group
589 # number of machines is the maximum to pick out of a single
590 # atomic group label for scheduling at one time.
591 min_hosts = job.synch_count
592 max_hosts = atomic_group.max_number_of_machines
593
showard54c1ea92009-05-20 00:32:58 +0000594 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000595 # Not enough eligible hosts in this atomic group label.
596 continue
597
showard54c1ea92009-05-20 00:32:58 +0000598 eligible_hosts_in_group = [self._hosts_available[id]
599 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000600 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000601 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000602
showard89f84db2009-03-12 20:39:13 +0000603 # Limit ourselves to scheduling the atomic group size.
604 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000605 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000606
607 # Remove the selected hosts from our cached internal state
608 # of available hosts in order to return the Host objects.
609 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000610 for host in eligible_hosts_in_group:
611 hosts_in_label.discard(host.id)
612 self._hosts_available.pop(host.id)
613 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000614 return host_list
615
616 return []
617
618
showard170873e2009-01-07 00:22:26 +0000619class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000620 def __init__(self):
621 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000622 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000623 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000624 user_cleanup_time = scheduler_config.config.clean_interval
625 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
626 _db, user_cleanup_time)
627 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000628 self._host_agents = {}
629 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000630
mbligh36768f02008-02-22 18:28:33 +0000631
showard915958d2009-04-22 21:00:58 +0000632 def initialize(self, recover_hosts=True):
633 self._periodic_cleanup.initialize()
634 self._24hr_upkeep.initialize()
635
jadmanski0afbb632008-06-06 21:10:57 +0000636 # always recover processes
637 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000638
jadmanski0afbb632008-06-06 21:10:57 +0000639 if recover_hosts:
640 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000641
642
jadmanski0afbb632008-06-06 21:10:57 +0000643 def tick(self):
showard170873e2009-01-07 00:22:26 +0000644 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000645 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000646 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000647 self._process_recurring_runs()
showard8cc058f2009-09-08 16:26:33 +0000648 self._schedule_delay_tasks()
jadmanski0afbb632008-06-06 21:10:57 +0000649 self._schedule_new_jobs()
showard8cc058f2009-09-08 16:26:33 +0000650 self._schedule_running_host_queue_entries()
651 self._schedule_special_tasks()
jadmanski0afbb632008-06-06 21:10:57 +0000652 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000653 _drone_manager.execute_actions()
654 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000655
showard97aed502008-11-04 02:01:24 +0000656
mblighf3294cc2009-04-08 21:17:38 +0000657 def _run_cleanup(self):
658 self._periodic_cleanup.run_cleanup_maybe()
659 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000660
mbligh36768f02008-02-22 18:28:33 +0000661
showard170873e2009-01-07 00:22:26 +0000662 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
663 for object_id in object_ids:
664 agent_dict.setdefault(object_id, set()).add(agent)
665
666
667 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
668 for object_id in object_ids:
669 assert object_id in agent_dict
670 agent_dict[object_id].remove(agent)
671
672
jadmanski0afbb632008-06-06 21:10:57 +0000673 def add_agent(self, agent):
674 self._agents.append(agent)
675 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000676 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
677 self._register_agent_for_ids(self._queue_entry_agents,
678 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000679
showard170873e2009-01-07 00:22:26 +0000680
681 def get_agents_for_entry(self, queue_entry):
682 """
683 Find agents corresponding to the specified queue_entry.
684 """
showardd3dc1992009-04-22 21:01:40 +0000685 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000686
687
688 def host_has_agent(self, host):
689 """
690 Determine if there is currently an Agent present using this host.
691 """
692 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000693
694
jadmanski0afbb632008-06-06 21:10:57 +0000695 def remove_agent(self, agent):
696 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000697 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
698 agent)
699 self._unregister_agent_for_ids(self._queue_entry_agents,
700 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000701
702
showard8cc058f2009-09-08 16:26:33 +0000703 def _host_has_scheduled_special_task(self, host):
704 return bool(models.SpecialTask.objects.filter(host__id=host.id,
705 is_active=False,
706 is_complete=False))
707
708
jadmanski0afbb632008-06-06 21:10:57 +0000709 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000710 self._register_pidfiles()
711 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000712 self._recover_all_recoverable_entries()
showard8cc058f2009-09-08 16:26:33 +0000713 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000714 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000715 self._reverify_remaining_hosts()
716 # reinitialize drones after killing orphaned processes, since they can
717 # leave around files when they die
718 _drone_manager.execute_actions()
719 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000720
showard170873e2009-01-07 00:22:26 +0000721
722 def _register_pidfiles(self):
723 # during recovery we may need to read pidfiles for both running and
724 # parsing entries
725 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000726 where="status IN ('Running', 'Gathering', 'Parsing')")
showarded2afea2009-07-07 20:54:07 +0000727 special_tasks = models.SpecialTask.objects.filter(is_active=True)
728 for execution_entry in itertools.chain(queue_entries, special_tasks):
showardd3dc1992009-04-22 21:01:40 +0000729 for pidfile_name in _ALL_PIDFILE_NAMES:
730 pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000731 execution_entry.execution_path(), pidfile_name=pidfile_name)
showardd3dc1992009-04-22 21:01:40 +0000732 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000733
734
showarded2afea2009-07-07 20:54:07 +0000735 def _get_recovery_run_monitor(self, execution_path, pidfile_name, orphans):
736 run_monitor = PidfileRunMonitor()
737 run_monitor.attach_to_existing_process(execution_path,
738 pidfile_name=pidfile_name)
739 if run_monitor.has_process():
740 orphans.discard(run_monitor.get_process())
741 return run_monitor, '(process %s)' % run_monitor.get_process()
742 return None, 'without process'
743
744
showard8cc058f2009-09-08 16:26:33 +0000745 def _get_unassigned_entries(self, status):
746 for entry in HostQueueEntry.fetch(where="status = '%s'" % status):
showard0db3d432009-10-12 20:29:15 +0000747 if entry.status == status and not self.get_agents_for_entry(entry):
748 # The status can change during iteration, e.g., if job.run()
749 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000750 yield entry
751
752
showardd3dc1992009-04-22 21:01:40 +0000753 def _recover_entries_with_status(self, status, orphans, pidfile_name,
754 recover_entries_fn):
showard8cc058f2009-09-08 16:26:33 +0000755 for queue_entry in self._get_unassigned_entries(status):
showardd3dc1992009-04-22 21:01:40 +0000756 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showarded2afea2009-07-07 20:54:07 +0000757 run_monitor, process_string = self._get_recovery_run_monitor(
758 queue_entry.execution_path(), pidfile_name, orphans)
showard8cc058f2009-09-08 16:26:33 +0000759 if not run_monitor:
760 # _schedule_running_host_queue_entries should schedule and
761 # recover these entries
762 continue
showard597bfd32009-05-08 18:22:50 +0000763
showarded2afea2009-07-07 20:54:07 +0000764 logging.info('Recovering %s entry %s %s',status.lower(),
765 ', '.join(str(entry) for entry in queue_entries),
766 process_string)
showardd3dc1992009-04-22 21:01:40 +0000767 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000768
769
showard6878e8b2009-07-20 22:37:45 +0000770 def _check_for_remaining_orphan_processes(self, orphans):
771 if not orphans:
772 return
773 subject = 'Unrecovered orphan autoserv processes remain'
774 message = '\n'.join(str(process) for process in orphans)
775 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000776
777 die_on_orphans = global_config.global_config.get_config_value(
778 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
779
780 if die_on_orphans:
781 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000782
showard170873e2009-01-07 00:22:26 +0000783
showardd3dc1992009-04-22 21:01:40 +0000784 def _recover_running_entries(self, orphans):
785 def recover_entries(job, queue_entries, run_monitor):
showard8cc058f2009-09-08 16:26:33 +0000786 queue_task = QueueTask(job=job, queue_entries=queue_entries,
787 recover_run_monitor=run_monitor)
788 self.add_agent(Agent(task=queue_task,
789 num_processes=len(queue_entries)))
showardd3dc1992009-04-22 21:01:40 +0000790
791 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
showarded2afea2009-07-07 20:54:07 +0000792 orphans, _AUTOSERV_PID_FILE,
showardd3dc1992009-04-22 21:01:40 +0000793 recover_entries)
794
795
796 def _recover_gathering_entries(self, orphans):
797 def recover_entries(job, queue_entries, run_monitor):
798 gather_task = GatherLogsTask(job, queue_entries,
showarded2afea2009-07-07 20:54:07 +0000799 recover_run_monitor=run_monitor)
showard8cc058f2009-09-08 16:26:33 +0000800 self.add_agent(Agent(gather_task))
showardd3dc1992009-04-22 21:01:40 +0000801
802 self._recover_entries_with_status(
803 models.HostQueueEntry.Status.GATHERING,
804 orphans, _CRASHINFO_PID_FILE, recover_entries)
805
806
807 def _recover_parsing_entries(self, orphans):
808 def recover_entries(job, queue_entries, run_monitor):
809 reparse_task = FinalReparseTask(queue_entries,
showarded2afea2009-07-07 20:54:07 +0000810 recover_run_monitor=run_monitor)
showard8cc058f2009-09-08 16:26:33 +0000811 self.add_agent(Agent(reparse_task, num_processes=0))
showardd3dc1992009-04-22 21:01:40 +0000812
813 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
814 orphans, _PARSER_PID_FILE,
815 recover_entries)
816
817
showard8cc058f2009-09-08 16:26:33 +0000818 def _recover_pending_entries(self):
819 for entry in self._get_unassigned_entries(
820 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000821 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000822 entry.on_pending()
823
824
showardd3dc1992009-04-22 21:01:40 +0000825 def _recover_all_recoverable_entries(self):
826 orphans = _drone_manager.get_orphaned_autoserv_processes()
827 self._recover_running_entries(orphans)
828 self._recover_gathering_entries(orphans)
829 self._recover_parsing_entries(orphans)
showarded2afea2009-07-07 20:54:07 +0000830 self._recover_special_tasks(orphans)
showard6878e8b2009-07-20 22:37:45 +0000831 self._check_for_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000832
showard97aed502008-11-04 02:01:24 +0000833
showarded2afea2009-07-07 20:54:07 +0000834 def _recover_special_tasks(self, orphans):
showard2fe3f1d2009-07-06 20:19:11 +0000835 """\
836 Recovers all special tasks that have started running but have not
837 completed.
838 """
839
840 tasks = models.SpecialTask.objects.filter(is_active=True,
841 is_complete=False)
842 # Use ordering to force NULL queue_entry_id's to the end of the list
showarda5288b42009-07-28 20:06:08 +0000843 for task in tasks.order_by('-queue_entry__id'):
showard9b6ec502009-08-20 23:25:17 +0000844 if self.host_has_agent(task.host):
845 raise SchedulerError(
846 "%s already has a host agent %s." % (
showard8cc058f2009-09-08 16:26:33 +0000847 task, self._host_agents.get(task.host.id)))
showard2fe3f1d2009-07-06 20:19:11 +0000848
showarded2afea2009-07-07 20:54:07 +0000849 run_monitor, process_string = self._get_recovery_run_monitor(
850 task.execution_path(), _AUTOSERV_PID_FILE, orphans)
851
852 logging.info('Recovering %s %s', task, process_string)
showard8cc058f2009-09-08 16:26:33 +0000853 self._recover_special_task(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000854
855
showard8cc058f2009-09-08 16:26:33 +0000856 def _recover_special_task(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000857 """\
858 Recovers a single special task.
859 """
860 if task.task == models.SpecialTask.Task.VERIFY:
showard8cc058f2009-09-08 16:26:33 +0000861 agent_task = self._recover_verify(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000862 elif task.task == models.SpecialTask.Task.REPAIR:
showard8cc058f2009-09-08 16:26:33 +0000863 agent_task = self._recover_repair(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000864 elif task.task == models.SpecialTask.Task.CLEANUP:
showard8cc058f2009-09-08 16:26:33 +0000865 agent_task = self._recover_cleanup(task, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000866 else:
867 # Should never happen
868 logging.error(
869 "Special task id %d had invalid task %s", (task.id, task.task))
870
showard8cc058f2009-09-08 16:26:33 +0000871 self.add_agent(Agent(agent_task))
showard2fe3f1d2009-07-06 20:19:11 +0000872
873
showard8cc058f2009-09-08 16:26:33 +0000874 def _recover_verify(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000875 """\
876 Recovers a verify task.
877 No associated queue entry: Verify host
878 With associated queue entry: Verify host, and run associated queue
879 entry
880 """
showard8cc058f2009-09-08 16:26:33 +0000881 return VerifyTask(task=task, recover_run_monitor=run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000882
883
showard8cc058f2009-09-08 16:26:33 +0000884 def _recover_repair(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000885 """\
886 Recovers a repair task.
887 Always repair host
888 """
showard8cc058f2009-09-08 16:26:33 +0000889 return RepairTask(task=task, recover_run_monitor=run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000890
891
showard8cc058f2009-09-08 16:26:33 +0000892 def _recover_cleanup(self, task, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000893 """\
894 Recovers a cleanup task.
895 No associated queue entry: Clean host
896 With associated queue entry: Clean host, verify host if needed, and
897 run associated queue entry
898 """
showard8cc058f2009-09-08 16:26:33 +0000899 return CleanupTask(task=task, recover_run_monitor=run_monitor)
showard6af73ad2009-07-28 20:00:58 +0000900
901
showardb8900452009-10-12 20:31:01 +0000902 def _check_for_unrecovered_verifying_entries(self):
showard170873e2009-01-07 00:22:26 +0000903 queue_entries = HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000904 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
905 unrecovered_hqes = []
906 for queue_entry in queue_entries:
907 special_tasks = models.SpecialTask.objects.filter(
908 task__in=(models.SpecialTask.Task.CLEANUP,
909 models.SpecialTask.Task.VERIFY),
910 queue_entry__id=queue_entry.id,
911 is_complete=False)
912 if special_tasks.count() == 0:
913 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000914
showardb8900452009-10-12 20:31:01 +0000915 if unrecovered_hqes:
916 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
showarde8e37072009-08-20 23:31:30 +0000917 raise SchedulerError(
showard37757f32009-10-19 18:34:24 +0000918 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000919 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000920
921
showard8cc058f2009-09-08 16:26:33 +0000922 def _schedule_special_tasks(self):
923 tasks = models.SpecialTask.objects.filter(is_active=False,
924 is_complete=False,
925 host__locked=False)
926 # We want lower ids to come first, but the NULL queue_entry_ids need to
927 # come last
928 tasks = tasks.extra(select={'isnull' : 'queue_entry_id IS NULL'})
929 tasks = tasks.extra(order_by=['isnull', 'id'])
showard6d7b2ff2009-06-10 00:16:47 +0000930
showard2fe3f1d2009-07-06 20:19:11 +0000931 for task in tasks:
showard8cc058f2009-09-08 16:26:33 +0000932 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000933 continue
showard6d7b2ff2009-06-10 00:16:47 +0000934
showard8cc058f2009-09-08 16:26:33 +0000935 if task.task == models.SpecialTask.Task.CLEANUP:
936 agent_task = CleanupTask(task=task)
937 elif task.task == models.SpecialTask.Task.VERIFY:
938 agent_task = VerifyTask(task=task)
939 elif task.task == models.SpecialTask.Task.REPAIR:
940 agent_task = RepairTask(task=task)
941 else:
942 email_manager.manager.enqueue_notify_email(
943 'Special task with invalid task', task)
944 continue
945
946 self.add_agent(Agent(agent_task))
showard1ff7b2e2009-05-15 23:17:18 +0000947
948
showard170873e2009-01-07 00:22:26 +0000949 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000950 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000951 # should never happen
showarded2afea2009-07-07 20:54:07 +0000952 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000953 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000954 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000955 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000956 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000957
958
jadmanski0afbb632008-06-06 21:10:57 +0000959 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000960 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +0000961 full_where='locked = 0 AND invalid = 0 AND ' + where
962 for host in Host.fetch(where=full_where):
963 if self.host_has_agent(host):
964 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000965 continue
showard8cc058f2009-09-08 16:26:33 +0000966 if self._host_has_scheduled_special_task(host):
967 # host will have a special task scheduled on the next cycle
968 continue
showard170873e2009-01-07 00:22:26 +0000969 if print_message:
showardb18134f2009-03-20 20:52:18 +0000970 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000971 models.SpecialTask.objects.create(
972 task=models.SpecialTask.Task.CLEANUP,
973 host=models.Host(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000974
975
jadmanski0afbb632008-06-06 21:10:57 +0000976 def _recover_hosts(self):
977 # recover "Repair Failed" hosts
978 message = 'Reverifying dead host %s'
979 self._reverify_hosts_where("status = 'Repair Failed'",
980 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000981
982
showard04c82c52008-05-29 19:38:12 +0000983
showardb95b1bd2008-08-15 18:11:04 +0000984 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000985 # prioritize by job priority, then non-metahost over metahost, then FIFO
986 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000987 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000988 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000989 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000990
991
showard89f84db2009-03-12 20:39:13 +0000992 def _refresh_pending_queue_entries(self):
993 """
994 Lookup the pending HostQueueEntries and call our HostScheduler
995 refresh() method given that list. Return the list.
996
997 @returns A list of pending HostQueueEntries sorted in priority order.
998 """
showard63a34772008-08-18 19:32:50 +0000999 queue_entries = self._get_pending_queue_entries()
1000 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +00001001 return []
showardb95b1bd2008-08-15 18:11:04 +00001002
showard63a34772008-08-18 19:32:50 +00001003 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +00001004
showard89f84db2009-03-12 20:39:13 +00001005 return queue_entries
1006
1007
1008 def _schedule_atomic_group(self, queue_entry):
1009 """
1010 Schedule the given queue_entry on an atomic group of hosts.
1011
1012 Returns immediately if there are insufficient available hosts.
1013
1014 Creates new HostQueueEntries based off of queue_entry for the
1015 scheduled hosts and starts them all running.
1016 """
1017 # This is a virtual host queue entry representing an entire
1018 # atomic group, find a group and schedule their hosts.
1019 group_hosts = self._host_scheduler.find_eligible_atomic_group(
1020 queue_entry)
1021 if not group_hosts:
1022 return
showardcbe6f942009-06-17 19:33:49 +00001023
1024 logging.info('Expanding atomic group entry %s with hosts %s',
1025 queue_entry,
1026 ', '.join(host.hostname for host in group_hosts))
showard89f84db2009-03-12 20:39:13 +00001027 # The first assigned host uses the original HostQueueEntry
1028 group_queue_entries = [queue_entry]
1029 for assigned_host in group_hosts[1:]:
1030 # Create a new HQE for every additional assigned_host.
1031 new_hqe = HostQueueEntry.clone(queue_entry)
1032 new_hqe.save()
1033 group_queue_entries.append(new_hqe)
1034 assert len(group_queue_entries) == len(group_hosts)
1035 for queue_entry, host in itertools.izip(group_queue_entries,
1036 group_hosts):
1037 self._run_queue_entry(queue_entry, host)
1038
1039
1040 def _schedule_new_jobs(self):
1041 queue_entries = self._refresh_pending_queue_entries()
1042 if not queue_entries:
1043 return
1044
showard63a34772008-08-18 19:32:50 +00001045 for queue_entry in queue_entries:
showarde55955f2009-10-07 20:48:58 +00001046 is_unassigned_atomic_group = (
1047 queue_entry.atomic_group_id is not None
1048 and queue_entry.host_id is None)
1049 if is_unassigned_atomic_group:
1050 self._schedule_atomic_group(queue_entry)
1051 else:
showard89f84db2009-03-12 20:39:13 +00001052 assigned_host = self._host_scheduler.find_eligible_host(
1053 queue_entry)
1054 if assigned_host:
1055 self._run_queue_entry(queue_entry, assigned_host)
showardb95b1bd2008-08-15 18:11:04 +00001056
1057
showard8cc058f2009-09-08 16:26:33 +00001058 def _schedule_running_host_queue_entries(self):
showard8375ce02009-10-12 20:35:13 +00001059 status_enum = models.HostQueueEntry.Status
1060 running_statuses = (status_enum.STARTING, status_enum.RUNNING,
1061 status_enum.GATHERING, status_enum.PARSING)
1062 sql_statuses = ', '.join(('"%s"' % s for s in running_statuses))
1063 entries = HostQueueEntry.fetch(where="status IN (%s)" % sql_statuses)
showard8cc058f2009-09-08 16:26:33 +00001064 for entry in entries:
1065 if self.get_agents_for_entry(entry):
1066 continue
1067
1068 task_entries = entry.job.get_group_entries(entry)
1069 for task_entry in task_entries:
1070 if (task_entry.status != models.HostQueueEntry.Status.PARSING
1071 and self.host_has_agent(task_entry.host)):
showard8375ce02009-10-12 20:35:13 +00001072 agent = tuple(self._host_agents.get(task_entry.host.id))[0]
showard8cc058f2009-09-08 16:26:33 +00001073 raise SchedulerError('Attempted to schedule on host that '
1074 'already has agent: %s (previous '
1075 'agent task: %s)'
1076 % (task_entry, agent.task))
1077
1078 if entry.status in (models.HostQueueEntry.Status.STARTING,
1079 models.HostQueueEntry.Status.RUNNING):
1080 params = entry.job.get_autoserv_params(task_entries)
1081 agent_task = QueueTask(job=entry.job,
1082 queue_entries=task_entries, cmd=params)
1083 elif entry.status == models.HostQueueEntry.Status.GATHERING:
1084 agent_task = GatherLogsTask(
1085 job=entry.job, queue_entries=task_entries)
1086 elif entry.status == models.HostQueueEntry.Status.PARSING:
1087 agent_task = FinalReparseTask(queue_entries=task_entries)
1088 else:
1089 raise SchedulerError('_schedule_running_host_queue_entries got '
1090 'entry with invalid status %s: %s'
1091 % (entry.status, entry))
1092
1093 self.add_agent(Agent(agent_task, num_processes=len(task_entries)))
1094
1095
1096 def _schedule_delay_tasks(self):
showardd2014822009-10-12 20:26:58 +00001097 for entry in HostQueueEntry.fetch(where='status = "%s"' %
1098 models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +00001099 task = entry.job.schedule_delayed_callback_task(entry)
1100 if task:
1101 self.add_agent(Agent(task, num_processes=0))
1102
1103
showardb95b1bd2008-08-15 18:11:04 +00001104 def _run_queue_entry(self, queue_entry, host):
showard8cc058f2009-09-08 16:26:33 +00001105 queue_entry.schedule_pre_job_tasks(assigned_host=host)
mblighd5c95802008-03-05 00:33:46 +00001106
1107
jadmanski0afbb632008-06-06 21:10:57 +00001108 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +00001109 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001110 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001111 for agent in self.get_agents_for_entry(entry):
1112 agent.abort()
1113 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001114
1115
showard324bf812009-01-20 23:23:38 +00001116 def _can_start_agent(self, agent, num_started_this_cycle,
1117 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001118 # always allow zero-process agents to run
1119 if agent.num_processes == 0:
1120 return True
1121 # don't allow any nonzero-process agents to run after we've reached a
1122 # limit (this avoids starvation of many-process agents)
1123 if have_reached_limit:
1124 return False
1125 # total process throttling
showard324bf812009-01-20 23:23:38 +00001126 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +00001127 return False
1128 # if a single agent exceeds the per-cycle throttling, still allow it to
1129 # run when it's the first agent in the cycle
1130 if num_started_this_cycle == 0:
1131 return True
1132 # per-cycle throttling
1133 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +00001134 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001135 return False
1136 return True
1137
1138
jadmanski0afbb632008-06-06 21:10:57 +00001139 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001140 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001141 have_reached_limit = False
1142 # iterate over copy, so we can remove agents during iteration
1143 for agent in list(self._agents):
showard8cc058f2009-09-08 16:26:33 +00001144 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001145 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001146 have_reached_limit):
1147 have_reached_limit = True
1148 continue
showard4c5374f2008-09-04 17:02:56 +00001149 num_started_this_cycle += agent.num_processes
1150 agent.tick()
showard8cc058f2009-09-08 16:26:33 +00001151 if agent.is_done():
1152 logging.info("agent finished")
1153 self.remove_agent(agent)
showarda9435c02009-05-13 21:28:17 +00001154 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001155 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001156
1157
showard29f7cd22009-04-29 21:16:24 +00001158 def _process_recurring_runs(self):
1159 recurring_runs = models.RecurringRun.objects.filter(
1160 start_date__lte=datetime.datetime.now())
1161 for rrun in recurring_runs:
1162 # Create job from template
1163 job = rrun.job
1164 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001165 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001166
1167 host_objects = info['hosts']
1168 one_time_hosts = info['one_time_hosts']
1169 metahost_objects = info['meta_hosts']
1170 dependencies = info['dependencies']
1171 atomic_group = info['atomic_group']
1172
1173 for host in one_time_hosts or []:
1174 this_host = models.Host.create_one_time_host(host.hostname)
1175 host_objects.append(this_host)
1176
1177 try:
1178 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001179 options=options,
showard29f7cd22009-04-29 21:16:24 +00001180 host_objects=host_objects,
1181 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001182 atomic_group=atomic_group)
1183
1184 except Exception, ex:
1185 logging.exception(ex)
1186 #TODO send email
1187
1188 if rrun.loop_count == 1:
1189 rrun.delete()
1190 else:
1191 if rrun.loop_count != 0: # if not infinite loop
1192 # calculate new start_date
1193 difference = datetime.timedelta(seconds=rrun.loop_period)
1194 rrun.start_date = rrun.start_date + difference
1195 rrun.loop_count -= 1
1196 rrun.save()
1197
1198
showard170873e2009-01-07 00:22:26 +00001199class PidfileRunMonitor(object):
1200 """
1201 Client must call either run() to start a new process or
1202 attach_to_existing_process().
1203 """
mbligh36768f02008-02-22 18:28:33 +00001204
showard170873e2009-01-07 00:22:26 +00001205 class _PidfileException(Exception):
1206 """
1207 Raised when there's some unexpected behavior with the pid file, but only
1208 used internally (never allowed to escape this class).
1209 """
mbligh36768f02008-02-22 18:28:33 +00001210
1211
showard170873e2009-01-07 00:22:26 +00001212 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001213 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001214 self._start_time = None
1215 self.pidfile_id = None
1216 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001217
1218
showard170873e2009-01-07 00:22:26 +00001219 def _add_nice_command(self, command, nice_level):
1220 if not nice_level:
1221 return command
1222 return ['nice', '-n', str(nice_level)] + command
1223
1224
1225 def _set_start_time(self):
1226 self._start_time = time.time()
1227
1228
1229 def run(self, command, working_directory, nice_level=None, log_file=None,
1230 pidfile_name=None, paired_with_pidfile=None):
1231 assert command is not None
1232 if nice_level is not None:
1233 command = ['nice', '-n', str(nice_level)] + command
1234 self._set_start_time()
1235 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001236 command, working_directory, pidfile_name=pidfile_name,
1237 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001238
1239
showarded2afea2009-07-07 20:54:07 +00001240 def attach_to_existing_process(self, execution_path,
showardd3dc1992009-04-22 21:01:40 +00001241 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001242 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001243 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001244 execution_path, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001245 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001246
1247
jadmanski0afbb632008-06-06 21:10:57 +00001248 def kill(self):
showard170873e2009-01-07 00:22:26 +00001249 if self.has_process():
1250 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001251
mbligh36768f02008-02-22 18:28:33 +00001252
showard170873e2009-01-07 00:22:26 +00001253 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001254 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001255 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001256
1257
showard170873e2009-01-07 00:22:26 +00001258 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001259 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001260 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001261 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001262
1263
showard170873e2009-01-07 00:22:26 +00001264 def _read_pidfile(self, use_second_read=False):
1265 assert self.pidfile_id is not None, (
1266 'You must call run() or attach_to_existing_process()')
1267 contents = _drone_manager.get_pidfile_contents(
1268 self.pidfile_id, use_second_read=use_second_read)
1269 if contents.is_invalid():
1270 self._state = drone_manager.PidfileContents()
1271 raise self._PidfileException(contents)
1272 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001273
1274
showard21baa452008-10-21 00:08:39 +00001275 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001276 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1277 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001278 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001279 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001280
1281
1282 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001283 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001284 return
mblighbb421852008-03-11 22:36:16 +00001285
showard21baa452008-10-21 00:08:39 +00001286 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001287
showard170873e2009-01-07 00:22:26 +00001288 if self._state.process is None:
1289 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001290 return
mbligh90a549d2008-03-25 23:52:34 +00001291
showard21baa452008-10-21 00:08:39 +00001292 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001293 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001294 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001295 return
mbligh90a549d2008-03-25 23:52:34 +00001296
showard170873e2009-01-07 00:22:26 +00001297 # pid but no running process - maybe process *just* exited
1298 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001299 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001300 # autoserv exited without writing an exit code
1301 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001302 self._handle_pidfile_error(
1303 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001304
showard21baa452008-10-21 00:08:39 +00001305
1306 def _get_pidfile_info(self):
1307 """\
1308 After completion, self._state will contain:
1309 pid=None, exit_status=None if autoserv has not yet run
1310 pid!=None, exit_status=None if autoserv is running
1311 pid!=None, exit_status!=None if autoserv has completed
1312 """
1313 try:
1314 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001315 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001316 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001317
1318
showard170873e2009-01-07 00:22:26 +00001319 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001320 """\
1321 Called when no pidfile is found or no pid is in the pidfile.
1322 """
showard170873e2009-01-07 00:22:26 +00001323 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001324 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001325 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001326 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001327 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001328
1329
showard35162b02009-03-03 02:17:30 +00001330 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001331 """\
1332 Called when autoserv has exited without writing an exit status,
1333 or we've timed out waiting for autoserv to write a pid to the
1334 pidfile. In either case, we just return failure and the caller
1335 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001336
showard170873e2009-01-07 00:22:26 +00001337 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001338 """
1339 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001340 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001341 self._state.exit_status = 1
1342 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001343
1344
jadmanski0afbb632008-06-06 21:10:57 +00001345 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001346 self._get_pidfile_info()
1347 return self._state.exit_status
1348
1349
1350 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001351 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001352 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001353 if self._state.num_tests_failed is None:
1354 return -1
showard21baa452008-10-21 00:08:39 +00001355 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001356
1357
showardcdaeae82009-08-31 18:32:48 +00001358 def try_copy_results_on_drone(self, **kwargs):
1359 if self.has_process():
1360 # copy results logs into the normal place for job results
1361 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1362
1363
1364 def try_copy_to_results_repository(self, source, **kwargs):
1365 if self.has_process():
1366 _drone_manager.copy_to_results_repository(self.get_process(),
1367 source, **kwargs)
1368
1369
mbligh36768f02008-02-22 18:28:33 +00001370class Agent(object):
showard77182562009-06-10 00:16:05 +00001371 """
showard8cc058f2009-09-08 16:26:33 +00001372 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001373
1374 The following methods are required on all task objects:
1375 poll() - Called periodically to let the task check its status and
1376 update its internal state. If the task succeeded.
1377 is_done() - Returns True if the task is finished.
1378 abort() - Called when an abort has been requested. The task must
1379 set its aborted attribute to True if it actually aborted.
1380
1381 The following attributes are required on all task objects:
1382 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001383 success - bool, True if this task succeeded.
1384 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1385 host_ids - A sequence of Host ids this task represents.
1386
1387 The following attribute is written to all task objects:
1388 agent - A reference to the Agent instance that the task has been
1389 added to.
1390 """
1391
1392
showard8cc058f2009-09-08 16:26:33 +00001393 def __init__(self, task, num_processes=1):
showard77182562009-06-10 00:16:05 +00001394 """
showard8cc058f2009-09-08 16:26:33 +00001395 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001396 @param num_processes: The number of subprocesses the Agent represents.
1397 This is used by the Dispatcher for managing the load on the
1398 system. Defaults to 1.
1399 """
showard8cc058f2009-09-08 16:26:33 +00001400 self.task = task
1401 task.agent = self
1402
showard77182562009-06-10 00:16:05 +00001403 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001404 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001405 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001406
showard8cc058f2009-09-08 16:26:33 +00001407 self.queue_entry_ids = task.queue_entry_ids
1408 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001409
showard8cc058f2009-09-08 16:26:33 +00001410 self.started = False
mbligh36768f02008-02-22 18:28:33 +00001411
1412
jadmanski0afbb632008-06-06 21:10:57 +00001413 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001414 self.started = True
1415 if self.task:
1416 self.task.poll()
1417 if self.task.is_done():
1418 self.task = None
showardec113162008-05-08 00:52:49 +00001419
1420
jadmanski0afbb632008-06-06 21:10:57 +00001421 def is_done(self):
showard8cc058f2009-09-08 16:26:33 +00001422 return self.task is None
mbligh36768f02008-02-22 18:28:33 +00001423
1424
showardd3dc1992009-04-22 21:01:40 +00001425 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001426 if self.task:
1427 self.task.abort()
1428 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001429 # tasks can choose to ignore aborts
showard8cc058f2009-09-08 16:26:33 +00001430 self.task = None
showard20f9bdd2009-04-29 19:48:33 +00001431
showardd3dc1992009-04-22 21:01:40 +00001432
showard77182562009-06-10 00:16:05 +00001433class DelayedCallTask(object):
1434 """
1435 A task object like AgentTask for an Agent to run that waits for the
1436 specified amount of time to have elapsed before calling the supplied
1437 callback once and finishing. If the callback returns anything, it is
1438 assumed to be a new Agent instance and will be added to the dispatcher.
1439
1440 @attribute end_time: The absolute posix time after which this task will
1441 call its callback when it is polled and be finished.
1442
1443 Also has all attributes required by the Agent class.
1444 """
1445 def __init__(self, delay_seconds, callback, now_func=None):
1446 """
1447 @param delay_seconds: The delay in seconds from now that this task
1448 will call the supplied callback and be done.
1449 @param callback: A callable to be called by this task once after at
1450 least delay_seconds time has elapsed. It must return None
1451 or a new Agent instance.
1452 @param now_func: A time.time like function. Default: time.time.
1453 Used for testing.
1454 """
1455 assert delay_seconds > 0
1456 assert callable(callback)
1457 if not now_func:
1458 now_func = time.time
1459 self._now_func = now_func
1460 self._callback = callback
1461
1462 self.end_time = self._now_func() + delay_seconds
1463
1464 # These attributes are required by Agent.
1465 self.aborted = False
showard77182562009-06-10 00:16:05 +00001466 self.host_ids = ()
1467 self.success = False
1468 self.queue_entry_ids = ()
1469 # This is filled in by Agent.add_task().
1470 self.agent = None
1471
1472
1473 def poll(self):
showard8cc058f2009-09-08 16:26:33 +00001474 if not self.is_done() and self._now_func() >= self.end_time:
1475 self._callback()
showard77182562009-06-10 00:16:05 +00001476 self.success = True
1477
1478
1479 def is_done(self):
showard8cc058f2009-09-08 16:26:33 +00001480 return self.success or self.aborted
showard77182562009-06-10 00:16:05 +00001481
1482
1483 def abort(self):
1484 self.aborted = True
showard77182562009-06-10 00:16:05 +00001485
1486
mbligh36768f02008-02-22 18:28:33 +00001487class AgentTask(object):
showard8cc058f2009-09-08 16:26:33 +00001488 def __init__(self, cmd=None, working_directory=None,
showarded2afea2009-07-07 20:54:07 +00001489 pidfile_name=None, paired_with_pidfile=None,
1490 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001491 self.done = False
jadmanski0afbb632008-06-06 21:10:57 +00001492 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001493 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001494 self.agent = None
showarded2afea2009-07-07 20:54:07 +00001495 self.monitor = recover_run_monitor
1496 self.started = bool(recover_run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +00001497 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001498 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001499 self.queue_entry_ids = []
1500 self.host_ids = []
1501 self.log_file = None
1502
1503
1504 def _set_ids(self, host=None, queue_entries=None):
1505 if queue_entries and queue_entries != [None]:
1506 self.host_ids = [entry.host.id for entry in queue_entries]
1507 self.queue_entry_ids = [entry.id for entry in queue_entries]
1508 else:
1509 assert host
1510 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001511
1512
jadmanski0afbb632008-06-06 21:10:57 +00001513 def poll(self):
showard08a36412009-05-05 01:01:13 +00001514 if not self.started:
1515 self.start()
1516 self.tick()
1517
1518
1519 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001520 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001521 exit_code = self.monitor.exit_code()
1522 if exit_code is None:
1523 return
1524 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001525 else:
1526 success = False
mbligh36768f02008-02-22 18:28:33 +00001527
jadmanski0afbb632008-06-06 21:10:57 +00001528 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001529
1530
jadmanski0afbb632008-06-06 21:10:57 +00001531 def is_done(self):
1532 return self.done
mbligh36768f02008-02-22 18:28:33 +00001533
1534
jadmanski0afbb632008-06-06 21:10:57 +00001535 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001536 if self.done:
1537 return
jadmanski0afbb632008-06-06 21:10:57 +00001538 self.done = True
1539 self.success = success
1540 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001541
1542
jadmanski0afbb632008-06-06 21:10:57 +00001543 def prolog(self):
showarded2afea2009-07-07 20:54:07 +00001544 assert not self.monitor
mblighd64e5702008-04-04 21:39:28 +00001545
mbligh36768f02008-02-22 18:28:33 +00001546
jadmanski0afbb632008-06-06 21:10:57 +00001547 def cleanup(self):
showardcdaeae82009-08-31 18:32:48 +00001548 if self.monitor and self.log_file:
1549 self.monitor.try_copy_to_results_repository(self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001550
1551
jadmanski0afbb632008-06-06 21:10:57 +00001552 def epilog(self):
1553 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001554
1555
jadmanski0afbb632008-06-06 21:10:57 +00001556 def start(self):
1557 assert self.agent
1558
1559 if not self.started:
1560 self.prolog()
1561 self.run()
1562
1563 self.started = True
1564
1565
1566 def abort(self):
1567 if self.monitor:
1568 self.monitor.kill()
1569 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001570 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001571 self.cleanup()
1572
1573
showarded2afea2009-07-07 20:54:07 +00001574 def _get_consistent_execution_path(self, execution_entries):
1575 first_execution_path = execution_entries[0].execution_path()
1576 for execution_entry in execution_entries[1:]:
1577 assert execution_entry.execution_path() == first_execution_path, (
1578 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1579 execution_entry,
1580 first_execution_path,
1581 execution_entries[0]))
1582 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001583
1584
showarded2afea2009-07-07 20:54:07 +00001585 def _copy_results(self, execution_entries, use_monitor=None):
1586 """
1587 @param execution_entries: list of objects with execution_path() method
1588 """
showard6d1c1432009-08-20 23:30:39 +00001589 if use_monitor is not None and not use_monitor.has_process():
1590 return
1591
showarded2afea2009-07-07 20:54:07 +00001592 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001593 if use_monitor is None:
1594 assert self.monitor
1595 use_monitor = self.monitor
1596 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001597 execution_path = self._get_consistent_execution_path(execution_entries)
1598 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001599 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001600
showarda1e74b32009-05-12 17:32:04 +00001601
1602 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001603 for queue_entry in queue_entries:
1604 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001605
1606
showarda1e74b32009-05-12 17:32:04 +00001607 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1608 self._copy_results(queue_entries, use_monitor)
1609 self._parse_results(queue_entries)
1610
1611
showardd3dc1992009-04-22 21:01:40 +00001612 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
showarded2afea2009-07-07 20:54:07 +00001613 assert not self.monitor
jadmanski0afbb632008-06-06 21:10:57 +00001614 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001615 self.monitor = PidfileRunMonitor()
1616 self.monitor.run(self.cmd, self._working_directory,
1617 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001618 log_file=self.log_file,
1619 pidfile_name=pidfile_name,
1620 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001621
1622
showardd9205182009-04-27 20:09:55 +00001623class TaskWithJobKeyvals(object):
1624 """AgentTask mixin providing functionality to help with job keyval files."""
1625 _KEYVAL_FILE = 'keyval'
1626 def _format_keyval(self, key, value):
1627 return '%s=%s' % (key, value)
1628
1629
1630 def _keyval_path(self):
1631 """Subclasses must override this"""
1632 raise NotImplemented
1633
1634
1635 def _write_keyval_after_job(self, field, value):
1636 assert self.monitor
1637 if not self.monitor.has_process():
1638 return
1639 _drone_manager.write_lines_to_file(
1640 self._keyval_path(), [self._format_keyval(field, value)],
1641 paired_with_process=self.monitor.get_process())
1642
1643
1644 def _job_queued_keyval(self, job):
1645 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1646
1647
1648 def _write_job_finished(self):
1649 self._write_keyval_after_job("job_finished", int(time.time()))
1650
1651
showarddb502762009-09-09 15:31:20 +00001652 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1653 keyval_contents = '\n'.join(self._format_keyval(key, value)
1654 for key, value in keyval_dict.iteritems())
1655 # always end with a newline to allow additional keyvals to be written
1656 keyval_contents += '\n'
1657 _drone_manager.attach_file_to_execution(self._working_directory,
1658 keyval_contents,
1659 file_path=keyval_path)
1660
1661
1662 def _write_keyvals_before_job(self, keyval_dict):
1663 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1664
1665
1666 def _write_host_keyvals(self, host):
1667 keyval_path = os.path.join(self._working_directory, 'host_keyvals',
1668 host.hostname)
1669 platform, all_labels = host.platform_and_labels()
1670 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1671 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1672
1673
showard8cc058f2009-09-08 16:26:33 +00001674class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001675 """
1676 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1677 """
1678
1679 TASK_TYPE = None
1680 host = None
1681 queue_entry = None
1682
1683 def __init__(self, task, extra_command_args, **kwargs):
showarded2afea2009-07-07 20:54:07 +00001684 assert (self.TASK_TYPE is not None,
1685 'self.TASK_TYPE must be overridden')
showard8cc058f2009-09-08 16:26:33 +00001686
1687 self.host = Host(id=task.host.id)
1688 self.queue_entry = None
1689 if task.queue_entry:
1690 self.queue_entry = HostQueueEntry(id=task.queue_entry.id)
1691
showarded2afea2009-07-07 20:54:07 +00001692 self.task = task
showarddb502762009-09-09 15:31:20 +00001693 kwargs['working_directory'] = task.execution_path()
showarded2afea2009-07-07 20:54:07 +00001694 self._extra_command_args = extra_command_args
1695 super(SpecialAgentTask, self).__init__(**kwargs)
1696
1697
showard8cc058f2009-09-08 16:26:33 +00001698 def _keyval_path(self):
1699 return os.path.join(self._working_directory, self._KEYVAL_FILE)
1700
1701
showarded2afea2009-07-07 20:54:07 +00001702 def prolog(self):
1703 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001704 self.cmd = _autoserv_command_line(self.host.hostname,
1705 self._extra_command_args,
1706 queue_entry=self.queue_entry)
1707 self._working_directory = self.task.execution_path()
1708 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001709 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001710
1711
showardde634ee2009-01-30 01:44:24 +00001712 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001713 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001714
showard2fe3f1d2009-07-06 20:19:11 +00001715 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001716 return # don't fail metahost entries, they'll be reassigned
1717
showard2fe3f1d2009-07-06 20:19:11 +00001718 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001719 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001720 return # entry has been aborted
1721
showard2fe3f1d2009-07-06 20:19:11 +00001722 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001723 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001724 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001725 self._write_keyval_after_job(queued_key, queued_time)
1726 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001727
showard8cc058f2009-09-08 16:26:33 +00001728 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001729 self.monitor.try_copy_results_on_drone(
1730 source_path=self._working_directory + '/',
1731 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001732
showard2fe3f1d2009-07-06 20:19:11 +00001733 self._copy_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001734 self.queue_entry.handle_host_failure()
showard2fe3f1d2009-07-06 20:19:11 +00001735 if self.queue_entry.job.parse_failed_repair:
1736 self._parse_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001737
1738 pidfile_id = _drone_manager.get_pidfile_id_from(
1739 self.queue_entry.execution_path(),
1740 pidfile_name=_AUTOSERV_PID_FILE)
1741 _drone_manager.register_pidfile(pidfile_id)
1742
1743
1744 def cleanup(self):
1745 super(SpecialAgentTask, self).cleanup()
1746 self.task.finish()
showardf85a0b72009-10-07 20:48:45 +00001747 if self.monitor:
1748 if self.monitor.has_process():
1749 self._copy_results([self.task])
1750 if self.monitor.pidfile_id is not None:
1751 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001752
1753
1754class RepairTask(SpecialAgentTask):
1755 TASK_TYPE = models.SpecialTask.Task.REPAIR
1756
1757
1758 def __init__(self, task, recover_run_monitor=None):
1759 """\
1760 queue_entry: queue entry to mark failed if this repair fails.
1761 """
1762 protection = host_protections.Protection.get_string(
1763 task.host.protection)
1764 # normalize the protection name
1765 protection = host_protections.Protection.get_attr_name(protection)
1766
1767 super(RepairTask, self).__init__(
1768 task, ['-R', '--host-protection', protection],
1769 recover_run_monitor=recover_run_monitor)
1770
1771 # *don't* include the queue entry in IDs -- if the queue entry is
1772 # aborted, we want to leave the repair task running
1773 self._set_ids(host=self.host)
1774
1775
1776 def prolog(self):
1777 super(RepairTask, self).prolog()
1778 logging.info("repair_task starting")
1779 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001780
1781
jadmanski0afbb632008-06-06 21:10:57 +00001782 def epilog(self):
1783 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001784
jadmanski0afbb632008-06-06 21:10:57 +00001785 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001786 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001787 else:
showard8cc058f2009-09-08 16:26:33 +00001788 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001789 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001790 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001791
1792
showarded2afea2009-07-07 20:54:07 +00001793class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001794 def _copy_to_results_repository(self):
1795 if not self.queue_entry or self.queue_entry.meta_host:
1796 return
1797
1798 self.queue_entry.set_execution_subdir()
1799 log_name = os.path.basename(self.task.execution_path())
1800 source = os.path.join(self.task.execution_path(), 'debug',
1801 'autoserv.DEBUG')
1802 destination = os.path.join(
1803 self.queue_entry.execution_path(), log_name)
1804
1805 self.monitor.try_copy_to_results_repository(
1806 source, destination_path=destination)
1807
1808
showard170873e2009-01-07 00:22:26 +00001809 def epilog(self):
1810 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001811
showard775300b2009-09-09 15:30:50 +00001812 if self.success:
1813 return
showard8fe93b52008-11-18 17:53:22 +00001814
showard775300b2009-09-09 15:30:50 +00001815 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001816
showard775300b2009-09-09 15:30:50 +00001817 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001818 # effectively ignore failure for these hosts
1819 self.success = True
showard775300b2009-09-09 15:30:50 +00001820 return
1821
1822 if self.queue_entry:
1823 self.queue_entry.requeue()
1824
1825 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001826 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001827 queue_entry__id=self.queue_entry.id):
1828 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1829 self._fail_queue_entry()
1830 return
1831
1832 queue_entry = models.HostQueueEntry(id=self.queue_entry.id)
1833 else:
1834 queue_entry = None
1835
1836 models.SpecialTask.objects.create(
1837 host=models.Host(id=self.host.id),
1838 task=models.SpecialTask.Task.REPAIR,
1839 queue_entry=queue_entry)
showard58721a82009-08-20 23:32:40 +00001840
showard8fe93b52008-11-18 17:53:22 +00001841
1842class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001843 TASK_TYPE = models.SpecialTask.Task.VERIFY
1844
1845
showard8cc058f2009-09-08 16:26:33 +00001846 def __init__(self, task, recover_run_monitor=None):
showarded2afea2009-07-07 20:54:07 +00001847 super(VerifyTask, self).__init__(
showard8cc058f2009-09-08 16:26:33 +00001848 task, ['-v'], recover_run_monitor=recover_run_monitor)
mblighe2586682008-02-29 22:45:46 +00001849
showard8cc058f2009-09-08 16:26:33 +00001850 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001851
1852
jadmanski0afbb632008-06-06 21:10:57 +00001853 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001854 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001855
showardb18134f2009-03-20 20:52:18 +00001856 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001857 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001858 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1859 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001860
showarded2afea2009-07-07 20:54:07 +00001861 # Delete any other queued verifies for this host. One verify will do
1862 # and there's no need to keep records of other requests.
1863 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001864 host__id=self.host.id,
1865 task=models.SpecialTask.Task.VERIFY,
showarded2afea2009-07-07 20:54:07 +00001866 is_active=False, is_complete=False)
1867 queued_verifies = queued_verifies.exclude(id=self.task.id)
1868 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001869
mbligh36768f02008-02-22 18:28:33 +00001870
jadmanski0afbb632008-06-06 21:10:57 +00001871 def epilog(self):
1872 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001873 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001874 if self.queue_entry:
1875 self.queue_entry.on_pending()
1876 else:
1877 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001878
1879
showardb5626452009-06-30 01:57:28 +00001880class CleanupHostsMixin(object):
1881 def _reboot_hosts(self, job, queue_entries, final_success,
1882 num_tests_failed):
1883 reboot_after = job.reboot_after
1884 do_reboot = (
1885 # always reboot after aborted jobs
1886 self._final_status == models.HostQueueEntry.Status.ABORTED
1887 or reboot_after == models.RebootAfter.ALWAYS
1888 or (reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED
1889 and final_success and num_tests_failed == 0))
1890
1891 for queue_entry in queue_entries:
1892 if do_reboot:
1893 # don't pass the queue entry to the CleanupTask. if the cleanup
1894 # fails, the job doesn't care -- it's over.
showard8cc058f2009-09-08 16:26:33 +00001895 models.SpecialTask.objects.create(
1896 host=models.Host(id=queue_entry.host.id),
1897 task=models.SpecialTask.Task.CLEANUP)
showardb5626452009-06-30 01:57:28 +00001898 else:
showard8cc058f2009-09-08 16:26:33 +00001899 queue_entry.host.set_status(models.Host.Status.READY)
showardb5626452009-06-30 01:57:28 +00001900
1901
1902class QueueTask(AgentTask, TaskWithJobKeyvals, CleanupHostsMixin):
showard8cc058f2009-09-08 16:26:33 +00001903 def __init__(self, job, queue_entries, cmd=None, recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001904 self.job = job
1905 self.queue_entries = queue_entries
showard8cc058f2009-09-08 16:26:33 +00001906 self.group_name = queue_entries[0].get_group_name()
showarded2afea2009-07-07 20:54:07 +00001907 super(QueueTask, self).__init__(
1908 cmd, self._execution_path(),
1909 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00001910 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001911
1912
showard73ec0442009-02-07 02:05:20 +00001913 def _keyval_path(self):
showarded2afea2009-07-07 20:54:07 +00001914 return os.path.join(self._execution_path(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001915
1916
showarded2afea2009-07-07 20:54:07 +00001917 def _execution_path(self):
1918 return self.queue_entries[0].execution_path()
mblighbb421852008-03-11 22:36:16 +00001919
1920
jadmanski0afbb632008-06-06 21:10:57 +00001921 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00001922 for entry in self.queue_entries:
1923 if entry.status not in (models.HostQueueEntry.Status.STARTING,
1924 models.HostQueueEntry.Status.RUNNING):
1925 raise SchedulerError('Queue task attempting to start '
1926 'entry with invalid status %s: %s'
1927 % (entry.status, entry))
1928 if entry.host.status not in (models.Host.Status.PENDING,
1929 models.Host.Status.RUNNING):
1930 raise SchedulerError('Queue task attempting to start on queue '
1931 'entry with invalid host status %s: %s'
1932 % (entry.host.status, entry))
1933
showardd9205182009-04-27 20:09:55 +00001934 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001935 keyval_dict = {queued_key: queued_time}
1936 if self.group_name:
1937 keyval_dict['host_group_name'] = self.group_name
1938 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001939 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001940 self._write_host_keyvals(queue_entry.host)
showard8cc058f2009-09-08 16:26:33 +00001941 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showard12f3e322009-05-13 21:27:42 +00001942 queue_entry.update_field('started_on', datetime.datetime.now())
showard8cc058f2009-09-08 16:26:33 +00001943 queue_entry.host.set_status(models.Host.Status.RUNNING)
showard21baa452008-10-21 00:08:39 +00001944 queue_entry.host.update_field('dirty', 1)
showardc6a56872009-07-28 20:11:58 +00001945 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
1946 # TODO(gps): Remove this if nothing needs it anymore.
1947 # A potential user is: tko/parser
jadmanski0afbb632008-06-06 21:10:57 +00001948 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001949
1950
showard35162b02009-03-03 02:17:30 +00001951 def _write_lost_process_error_file(self):
showarded2afea2009-07-07 20:54:07 +00001952 error_file_path = os.path.join(self._execution_path(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001953 _drone_manager.write_lines_to_file(error_file_path,
1954 [_LOST_PROCESS_ERROR])
1955
1956
showardd3dc1992009-04-22 21:01:40 +00001957 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001958 if not self.monitor:
1959 return
1960
showardd9205182009-04-27 20:09:55 +00001961 self._write_job_finished()
1962
showard35162b02009-03-03 02:17:30 +00001963 if self.monitor.lost_process:
1964 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001965
showard8cc058f2009-09-08 16:26:33 +00001966 for queue_entry in self.queue_entries:
1967 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001968
1969
showardcbd74612008-11-19 21:42:02 +00001970 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001971 _drone_manager.write_lines_to_file(
showarded2afea2009-07-07 20:54:07 +00001972 os.path.join(self._execution_path(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001973 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001974 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001975
1976
jadmanskif7fa2cc2008-10-01 14:13:23 +00001977 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001978 if not self.monitor or not self.monitor.has_process():
1979 return
1980
jadmanskif7fa2cc2008-10-01 14:13:23 +00001981 # build up sets of all the aborted_by and aborted_on values
1982 aborted_by, aborted_on = set(), set()
1983 for queue_entry in self.queue_entries:
1984 if queue_entry.aborted_by:
1985 aborted_by.add(queue_entry.aborted_by)
1986 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1987 aborted_on.add(t)
1988
1989 # extract some actual, unique aborted by value and write it out
1990 assert len(aborted_by) <= 1
1991 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001992 aborted_by_value = aborted_by.pop()
1993 aborted_on_value = max(aborted_on)
1994 else:
1995 aborted_by_value = 'autotest_system'
1996 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001997
showarda0382352009-02-11 23:36:43 +00001998 self._write_keyval_after_job("aborted_by", aborted_by_value)
1999 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00002000
showardcbd74612008-11-19 21:42:02 +00002001 aborted_on_string = str(datetime.datetime.fromtimestamp(
2002 aborted_on_value))
2003 self._write_status_comment('Job aborted by %s on %s' %
2004 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00002005
2006
jadmanski0afbb632008-06-06 21:10:57 +00002007 def abort(self):
2008 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00002009 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00002010 self._finish_task()
showard21baa452008-10-21 00:08:39 +00002011
2012
jadmanski0afbb632008-06-06 21:10:57 +00002013 def epilog(self):
2014 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002015 self._finish_task()
2016 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00002017
2018
showardd3dc1992009-04-22 21:01:40 +00002019class PostJobTask(AgentTask):
2020 def __init__(self, queue_entries, pidfile_name, logfile_name,
showarded2afea2009-07-07 20:54:07 +00002021 recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00002022 self._queue_entries = queue_entries
2023 self._pidfile_name = pidfile_name
showardd3dc1992009-04-22 21:01:40 +00002024
showarded2afea2009-07-07 20:54:07 +00002025 self._execution_path = self._get_consistent_execution_path(
2026 queue_entries)
2027 self._results_dir = _drone_manager.absolute_path(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00002028 self._autoserv_monitor = PidfileRunMonitor()
showarded2afea2009-07-07 20:54:07 +00002029 self._autoserv_monitor.attach_to_existing_process(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00002030 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
2031
2032 if _testing_mode:
2033 command = 'true'
2034 else:
2035 command = self._generate_command(self._results_dir)
2036
showarded2afea2009-07-07 20:54:07 +00002037 super(PostJobTask, self).__init__(
2038 cmd=command, working_directory=self._execution_path,
2039 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00002040
showarded2afea2009-07-07 20:54:07 +00002041 self.log_file = os.path.join(self._execution_path, logfile_name)
showardd3dc1992009-04-22 21:01:40 +00002042 self._final_status = self._determine_final_status()
2043
2044
2045 def _generate_command(self, results_dir):
2046 raise NotImplementedError('Subclasses must override this')
2047
2048
2049 def _job_was_aborted(self):
2050 was_aborted = None
2051 for queue_entry in self._queue_entries:
2052 queue_entry.update_from_database()
2053 if was_aborted is None: # first queue entry
2054 was_aborted = bool(queue_entry.aborted)
2055 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
2056 email_manager.manager.enqueue_notify_email(
2057 'Inconsistent abort state',
2058 'Queue entries have inconsistent abort state: ' +
2059 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
2060 # don't crash here, just assume true
2061 return True
2062 return was_aborted
2063
2064
2065 def _determine_final_status(self):
2066 if self._job_was_aborted():
2067 return models.HostQueueEntry.Status.ABORTED
2068
2069 # we'll use a PidfileRunMonitor to read the autoserv exit status
2070 if self._autoserv_monitor.exit_code() == 0:
2071 return models.HostQueueEntry.Status.COMPLETED
2072 return models.HostQueueEntry.Status.FAILED
2073
2074
2075 def run(self):
showard8cc058f2009-09-08 16:26:33 +00002076 # Make sure we actually have results to work with.
2077 # This should never happen in normal operation.
showard5add1c82009-05-26 19:27:46 +00002078 if not self._autoserv_monitor.has_process():
2079 email_manager.manager.enqueue_notify_email(
showard8cc058f2009-09-08 16:26:33 +00002080 'No results in post-job task',
2081 'No results in post-job task at %s' %
2082 self._autoserv_monitor.pidfile_id)
showard5add1c82009-05-26 19:27:46 +00002083 self.finished(False)
2084 return
2085
2086 super(PostJobTask, self).run(
2087 pidfile_name=self._pidfile_name,
2088 paired_with_pidfile=self._paired_with_pidfile)
showardd3dc1992009-04-22 21:01:40 +00002089
2090
2091 def _set_all_statuses(self, status):
2092 for queue_entry in self._queue_entries:
2093 queue_entry.set_status(status)
2094
2095
2096 def abort(self):
2097 # override AgentTask.abort() to avoid killing the process and ending
2098 # the task. post-job tasks continue when the job is aborted.
2099 pass
2100
2101
showardb5626452009-06-30 01:57:28 +00002102class GatherLogsTask(PostJobTask, CleanupHostsMixin):
showardd3dc1992009-04-22 21:01:40 +00002103 """
2104 Task responsible for
2105 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2106 * copying logs to the results repository
2107 * spawning CleanupTasks for hosts, if necessary
2108 * spawning a FinalReparseTask for the job
2109 """
showarded2afea2009-07-07 20:54:07 +00002110 def __init__(self, job, queue_entries, recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00002111 self._job = job
2112 super(GatherLogsTask, self).__init__(
2113 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
showarded2afea2009-07-07 20:54:07 +00002114 logfile_name='.collect_crashinfo.log',
2115 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00002116 self._set_ids(queue_entries=queue_entries)
2117
2118
2119 def _generate_command(self, results_dir):
2120 host_list = ','.join(queue_entry.host.hostname
2121 for queue_entry in self._queue_entries)
2122 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
2123 '-r', results_dir]
2124
2125
2126 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00002127 for queue_entry in self._queue_entries:
2128 if queue_entry.status != models.HostQueueEntry.Status.GATHERING:
2129 raise SchedulerError('Gather task attempting to start on '
2130 'non-gathering entry: %s' % queue_entry)
2131 if queue_entry.host.status != models.Host.Status.RUNNING:
2132 raise SchedulerError('Gather task attempting to start on queue '
showard0c5c18d2009-09-24 22:20:45 +00002133 'entry with non-running host status %s: %s'
2134 % (queue_entry.host.status, queue_entry))
showard8cc058f2009-09-08 16:26:33 +00002135
showardd3dc1992009-04-22 21:01:40 +00002136 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002137
2138
showardd3dc1992009-04-22 21:01:40 +00002139 def epilog(self):
2140 super(GatherLogsTask, self).epilog()
showardb5626452009-06-30 01:57:28 +00002141
showard6d1c1432009-08-20 23:30:39 +00002142 self._copy_and_parse_results(self._queue_entries,
2143 use_monitor=self._autoserv_monitor)
2144
2145 if self._autoserv_monitor.has_process():
2146 final_success = (self._final_status ==
2147 models.HostQueueEntry.Status.COMPLETED)
2148 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2149 else:
2150 final_success = False
2151 num_tests_failed = 0
2152
showardb5626452009-06-30 01:57:28 +00002153 self._reboot_hosts(self._job, self._queue_entries, final_success,
2154 num_tests_failed)
showardd3dc1992009-04-22 21:01:40 +00002155
2156
showard0bbfc212009-04-29 21:06:13 +00002157 def run(self):
showard597bfd32009-05-08 18:22:50 +00002158 autoserv_exit_code = self._autoserv_monitor.exit_code()
2159 # only run if Autoserv exited due to some signal. if we have no exit
2160 # code, assume something bad (and signal-like) happened.
2161 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002162 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002163 else:
2164 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002165
2166
showard8fe93b52008-11-18 17:53:22 +00002167class CleanupTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002168 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2169
2170
showard8cc058f2009-09-08 16:26:33 +00002171 def __init__(self, task, recover_run_monitor=None):
showarded2afea2009-07-07 20:54:07 +00002172 super(CleanupTask, self).__init__(
showard8cc058f2009-09-08 16:26:33 +00002173 task, ['--cleanup'], recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002174
showard8cc058f2009-09-08 16:26:33 +00002175 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mbligh16c722d2008-03-05 00:58:44 +00002176
mblighd5c95802008-03-05 00:33:46 +00002177
jadmanski0afbb632008-06-06 21:10:57 +00002178 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002179 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00002180 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard8cc058f2009-09-08 16:26:33 +00002181 self.host.set_status(models.Host.Status.CLEANING)
2182 if self.queue_entry:
2183 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2184
2185
showard775300b2009-09-09 15:30:50 +00002186 def _finish_epilog(self):
showard7b2d7cb2009-10-28 19:53:03 +00002187 if not self.queue_entry or not self.success:
showard775300b2009-09-09 15:30:50 +00002188 return
2189
showard7b2d7cb2009-10-28 19:53:03 +00002190 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
2191 should_run_verify = (
2192 self.queue_entry.job.run_verify
2193 and self.host.protection != do_not_verify_protection)
2194 if should_run_verify:
2195 entry = models.HostQueueEntry(id=self.queue_entry.id)
2196 models.SpecialTask.objects.create(
2197 host=models.Host(id=self.host.id),
2198 queue_entry=entry,
2199 task=models.SpecialTask.Task.VERIFY)
2200 else:
showard775300b2009-09-09 15:30:50 +00002201 self.queue_entry.on_pending()
mblighd5c95802008-03-05 00:33:46 +00002202
mblighd5c95802008-03-05 00:33:46 +00002203
showard21baa452008-10-21 00:08:39 +00002204 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00002205 super(CleanupTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002206
showard21baa452008-10-21 00:08:39 +00002207 if self.success:
2208 self.host.update_field('dirty', 0)
showard775300b2009-09-09 15:30:50 +00002209 self.host.set_status(models.Host.Status.READY)
showard21baa452008-10-21 00:08:39 +00002210
showard775300b2009-09-09 15:30:50 +00002211 self._finish_epilog()
showard8cc058f2009-09-08 16:26:33 +00002212
showard21baa452008-10-21 00:08:39 +00002213
showardd3dc1992009-04-22 21:01:40 +00002214class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00002215 _num_running_parses = 0
2216
showarded2afea2009-07-07 20:54:07 +00002217 def __init__(self, queue_entries, recover_run_monitor=None):
2218 super(FinalReparseTask, self).__init__(
2219 queue_entries, pidfile_name=_PARSER_PID_FILE,
2220 logfile_name='.parse.log',
2221 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002222 # don't use _set_ids, since we don't want to set the host_ids
2223 self.queue_entry_ids = [entry.id for entry in queue_entries]
showarded2afea2009-07-07 20:54:07 +00002224 self._parse_started = self.started
showard97aed502008-11-04 02:01:24 +00002225
showard97aed502008-11-04 02:01:24 +00002226
2227 @classmethod
2228 def _increment_running_parses(cls):
2229 cls._num_running_parses += 1
2230
2231
2232 @classmethod
2233 def _decrement_running_parses(cls):
2234 cls._num_running_parses -= 1
2235
2236
2237 @classmethod
2238 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002239 return (cls._num_running_parses <
2240 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002241
2242
2243 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00002244 for queue_entry in self._queue_entries:
2245 if queue_entry.status != models.HostQueueEntry.Status.PARSING:
2246 raise SchedulerError('Parse task attempting to start on '
2247 'non-parsing entry: %s' % queue_entry)
2248
showard97aed502008-11-04 02:01:24 +00002249 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002250
2251
2252 def epilog(self):
2253 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002254 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00002255
2256
showardd3dc1992009-04-22 21:01:40 +00002257 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00002258 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00002259 results_dir]
showard97aed502008-11-04 02:01:24 +00002260
2261
showard08a36412009-05-05 01:01:13 +00002262 def tick(self):
2263 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002264 # and we can, at which point we revert to default behavior
2265 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00002266 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002267 else:
2268 self._try_starting_parse()
2269
2270
2271 def run(self):
2272 # override run() to not actually run unless we can
2273 self._try_starting_parse()
2274
2275
2276 def _try_starting_parse(self):
2277 if not self._can_run_new_parse():
2278 return
showard170873e2009-01-07 00:22:26 +00002279
showard97aed502008-11-04 02:01:24 +00002280 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002281 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00002282
showard97aed502008-11-04 02:01:24 +00002283 self._increment_running_parses()
2284 self._parse_started = True
2285
2286
2287 def finished(self, success):
2288 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00002289 if self._parse_started:
2290 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002291
2292
showarda3c58572009-03-12 20:36:59 +00002293class DBError(Exception):
2294 """Raised by the DBObject constructor when its select fails."""
2295
2296
mbligh36768f02008-02-22 18:28:33 +00002297class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002298 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002299
2300 # Subclasses MUST override these:
2301 _table_name = ''
2302 _fields = ()
2303
showarda3c58572009-03-12 20:36:59 +00002304 # A mapping from (type, id) to the instance of the object for that
2305 # particular id. This prevents us from creating new Job() and Host()
2306 # instances for every HostQueueEntry object that we instantiate as
2307 # multiple HQEs often share the same Job.
2308 _instances_by_type_and_id = weakref.WeakValueDictionary()
2309 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002310
showarda3c58572009-03-12 20:36:59 +00002311
2312 def __new__(cls, id=None, **kwargs):
2313 """
2314 Look to see if we already have an instance for this particular type
2315 and id. If so, use it instead of creating a duplicate instance.
2316 """
2317 if id is not None:
2318 instance = cls._instances_by_type_and_id.get((cls, id))
2319 if instance:
2320 return instance
2321 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2322
2323
2324 def __init__(self, id=None, row=None, new_record=False, always_query=True):
showard8cc058f2009-09-08 16:26:33 +00002325 assert bool(id) or bool(row)
2326 if id is not None and row is not None:
2327 assert id == row[0]
showard6ae5ea92009-02-25 00:11:51 +00002328 assert self._table_name, '_table_name must be defined in your class'
2329 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002330 if not new_record:
2331 if self._initialized and not always_query:
2332 return # We've already been initialized.
2333 if id is None:
2334 id = row[0]
2335 # Tell future constructors to use us instead of re-querying while
2336 # this instance is still around.
2337 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002338
showard6ae5ea92009-02-25 00:11:51 +00002339 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002340
jadmanski0afbb632008-06-06 21:10:57 +00002341 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002342
jadmanski0afbb632008-06-06 21:10:57 +00002343 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002344 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002345
showarda3c58572009-03-12 20:36:59 +00002346 if self._initialized:
2347 differences = self._compare_fields_in_row(row)
2348 if differences:
showard7629f142009-03-27 21:02:02 +00002349 logging.warn(
2350 'initialized %s %s instance requery is updating: %s',
2351 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002352 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002353 self._initialized = True
2354
2355
2356 @classmethod
2357 def _clear_instance_cache(cls):
2358 """Used for testing, clear the internal instance cache."""
2359 cls._instances_by_type_and_id.clear()
2360
2361
showardccbd6c52009-03-21 00:10:21 +00002362 def _fetch_row_from_db(self, row_id):
2363 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2364 rows = _db.execute(sql, (row_id,))
2365 if not rows:
showard76e29d12009-04-15 21:53:10 +00002366 raise DBError("row not found (table=%s, row id=%s)"
2367 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002368 return rows[0]
2369
2370
showarda3c58572009-03-12 20:36:59 +00002371 def _assert_row_length(self, row):
2372 assert len(row) == len(self._fields), (
2373 "table = %s, row = %s/%d, fields = %s/%d" % (
2374 self.__table, row, len(row), self._fields, len(self._fields)))
2375
2376
2377 def _compare_fields_in_row(self, row):
2378 """
showarddae680a2009-10-12 20:26:43 +00002379 Given a row as returned by a SELECT query, compare it to our existing in
2380 memory fields. Fractional seconds are stripped from datetime values
2381 before comparison.
showarda3c58572009-03-12 20:36:59 +00002382
2383 @param row - A sequence of values corresponding to fields named in
2384 The class attribute _fields.
2385
2386 @returns A dictionary listing the differences keyed by field name
2387 containing tuples of (current_value, row_value).
2388 """
2389 self._assert_row_length(row)
2390 differences = {}
showarddae680a2009-10-12 20:26:43 +00002391 datetime_cmp_fmt = '%Y-%m-%d %H:%M:%S' # Leave off the microseconds.
showarda3c58572009-03-12 20:36:59 +00002392 for field, row_value in itertools.izip(self._fields, row):
2393 current_value = getattr(self, field)
showarddae680a2009-10-12 20:26:43 +00002394 if (isinstance(current_value, datetime.datetime)
2395 and isinstance(row_value, datetime.datetime)):
2396 current_value = current_value.strftime(datetime_cmp_fmt)
2397 row_value = row_value.strftime(datetime_cmp_fmt)
showarda3c58572009-03-12 20:36:59 +00002398 if current_value != row_value:
2399 differences[field] = (current_value, row_value)
2400 return differences
showard2bab8f42008-11-12 18:15:22 +00002401
2402
2403 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002404 """
2405 Update our field attributes using a single row returned by SELECT.
2406
2407 @param row - A sequence of values corresponding to fields named in
2408 the class fields list.
2409 """
2410 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002411
showard2bab8f42008-11-12 18:15:22 +00002412 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002413 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002414 setattr(self, field, value)
2415 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002416
showard2bab8f42008-11-12 18:15:22 +00002417 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002418
mblighe2586682008-02-29 22:45:46 +00002419
showardccbd6c52009-03-21 00:10:21 +00002420 def update_from_database(self):
2421 assert self.id is not None
2422 row = self._fetch_row_from_db(self.id)
2423 self._update_fields_from_row(row)
2424
2425
jadmanski0afbb632008-06-06 21:10:57 +00002426 def count(self, where, table = None):
2427 if not table:
2428 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002429
jadmanski0afbb632008-06-06 21:10:57 +00002430 rows = _db.execute("""
2431 SELECT count(*) FROM %s
2432 WHERE %s
2433 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002434
jadmanski0afbb632008-06-06 21:10:57 +00002435 assert len(rows) == 1
2436
2437 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002438
2439
showardd3dc1992009-04-22 21:01:40 +00002440 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002441 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002442
showard2bab8f42008-11-12 18:15:22 +00002443 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002444 return
mbligh36768f02008-02-22 18:28:33 +00002445
mblighf8c624d2008-07-03 16:58:45 +00002446 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002447 _db.execute(query, (value, self.id))
2448
showard2bab8f42008-11-12 18:15:22 +00002449 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002450
2451
jadmanski0afbb632008-06-06 21:10:57 +00002452 def save(self):
2453 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002454 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002455 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002456 values = []
2457 for key in keys:
2458 value = getattr(self, key)
2459 if value is None:
2460 values.append('NULL')
2461 else:
2462 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002463 values_str = ','.join(values)
2464 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2465 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002466 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002467 # Update our id to the one the database just assigned to us.
2468 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002469
2470
jadmanski0afbb632008-06-06 21:10:57 +00002471 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002472 self._instances_by_type_and_id.pop((type(self), id), None)
2473 self._initialized = False
2474 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002475 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2476 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002477
2478
showard63a34772008-08-18 19:32:50 +00002479 @staticmethod
2480 def _prefix_with(string, prefix):
2481 if string:
2482 string = prefix + string
2483 return string
2484
2485
jadmanski0afbb632008-06-06 21:10:57 +00002486 @classmethod
showard989f25d2008-10-01 11:38:11 +00002487 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002488 """
2489 Construct instances of our class based on the given database query.
2490
2491 @yields One class instance for each row fetched.
2492 """
showard63a34772008-08-18 19:32:50 +00002493 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2494 where = cls._prefix_with(where, 'WHERE ')
2495 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002496 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002497 'joins' : joins,
2498 'where' : where,
2499 'order_by' : order_by})
2500 rows = _db.execute(query, params)
showard8cc058f2009-09-08 16:26:33 +00002501 return [cls(id=row[0], row=row) for row in rows]
mblighe2586682008-02-29 22:45:46 +00002502
mbligh36768f02008-02-22 18:28:33 +00002503
2504class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002505 _table_name = 'ineligible_host_queues'
2506 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002507
2508
showard89f84db2009-03-12 20:39:13 +00002509class AtomicGroup(DBObject):
2510 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002511 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2512 'invalid')
showard89f84db2009-03-12 20:39:13 +00002513
2514
showard989f25d2008-10-01 11:38:11 +00002515class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002516 _table_name = 'labels'
2517 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002518 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002519
2520
showard6157c632009-07-06 20:19:31 +00002521 def __repr__(self):
2522 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
2523 self.name, self.id, self.atomic_group_id)
2524
2525
mbligh36768f02008-02-22 18:28:33 +00002526class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002527 _table_name = 'hosts'
2528 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2529 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2530
2531
jadmanski0afbb632008-06-06 21:10:57 +00002532 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002533 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002534 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002535
2536
showard170873e2009-01-07 00:22:26 +00002537 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002538 """
showard170873e2009-01-07 00:22:26 +00002539 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002540 """
2541 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002542 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002543 FROM labels
2544 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002545 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002546 ORDER BY labels.name
2547 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002548 platform = None
2549 all_labels = []
2550 for label_name, is_platform in rows:
2551 if is_platform:
2552 platform = label_name
2553 all_labels.append(label_name)
2554 return platform, all_labels
2555
2556
showard54c1ea92009-05-20 00:32:58 +00002557 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2558
2559
2560 @classmethod
2561 def cmp_for_sort(cls, a, b):
2562 """
2563 A comparison function for sorting Host objects by hostname.
2564
2565 This strips any trailing numeric digits, ignores leading 0s and
2566 compares hostnames by the leading name and the trailing digits as a
2567 number. If both hostnames do not match this pattern, they are simply
2568 compared as lower case strings.
2569
2570 Example of how hostnames will be sorted:
2571
2572 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2573
2574 This hopefully satisfy most people's hostname sorting needs regardless
2575 of their exact naming schemes. Nobody sane should have both a host10
2576 and host010 (but the algorithm works regardless).
2577 """
2578 lower_a = a.hostname.lower()
2579 lower_b = b.hostname.lower()
2580 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2581 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2582 if match_a and match_b:
2583 name_a, number_a_str = match_a.groups()
2584 name_b, number_b_str = match_b.groups()
2585 number_a = int(number_a_str.lstrip('0'))
2586 number_b = int(number_b_str.lstrip('0'))
2587 result = cmp((name_a, number_a), (name_b, number_b))
2588 if result == 0 and lower_a != lower_b:
2589 # If they compared equal above but the lower case names are
2590 # indeed different, don't report equality. abc012 != abc12.
2591 return cmp(lower_a, lower_b)
2592 return result
2593 else:
2594 return cmp(lower_a, lower_b)
2595
2596
mbligh36768f02008-02-22 18:28:33 +00002597class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002598 _table_name = 'host_queue_entries'
2599 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002600 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002601 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002602
2603
showarda3c58572009-03-12 20:36:59 +00002604 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002605 assert id or row
showarda3c58572009-03-12 20:36:59 +00002606 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002607 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002608
jadmanski0afbb632008-06-06 21:10:57 +00002609 if self.host_id:
2610 self.host = Host(self.host_id)
2611 else:
2612 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002613
showard77182562009-06-10 00:16:05 +00002614 if self.atomic_group_id:
2615 self.atomic_group = AtomicGroup(self.atomic_group_id,
2616 always_query=False)
2617 else:
2618 self.atomic_group = None
2619
showard170873e2009-01-07 00:22:26 +00002620 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002621 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002622
2623
showard89f84db2009-03-12 20:39:13 +00002624 @classmethod
2625 def clone(cls, template):
2626 """
2627 Creates a new row using the values from a template instance.
2628
2629 The new instance will not exist in the database or have a valid
2630 id attribute until its save() method is called.
2631 """
2632 assert isinstance(template, cls)
2633 new_row = [getattr(template, field) for field in cls._fields]
2634 clone = cls(row=new_row, new_record=True)
2635 clone.id = None
2636 return clone
2637
2638
showardc85c21b2008-11-24 22:17:37 +00002639 def _view_job_url(self):
2640 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2641
2642
showardf1ae3542009-05-11 19:26:02 +00002643 def get_labels(self):
2644 """
2645 Get all labels associated with this host queue entry (either via the
2646 meta_host or as a job dependency label). The labels yielded are not
2647 guaranteed to be unique.
2648
2649 @yields Label instances associated with this host_queue_entry.
2650 """
2651 if self.meta_host:
2652 yield Label(id=self.meta_host, always_query=False)
2653 labels = Label.fetch(
2654 joins="JOIN jobs_dependency_labels AS deps "
2655 "ON (labels.id = deps.label_id)",
2656 where="deps.job_id = %d" % self.job.id)
2657 for label in labels:
2658 yield label
2659
2660
jadmanski0afbb632008-06-06 21:10:57 +00002661 def set_host(self, host):
2662 if host:
2663 self.queue_log_record('Assigning host ' + host.hostname)
2664 self.update_field('host_id', host.id)
2665 self.update_field('active', True)
2666 self.block_host(host.id)
2667 else:
2668 self.queue_log_record('Releasing host')
2669 self.unblock_host(self.host.id)
2670 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002671
jadmanski0afbb632008-06-06 21:10:57 +00002672 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002673
2674
jadmanski0afbb632008-06-06 21:10:57 +00002675 def get_host(self):
2676 return self.host
mbligh36768f02008-02-22 18:28:33 +00002677
2678
jadmanski0afbb632008-06-06 21:10:57 +00002679 def queue_log_record(self, log_line):
2680 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002681 _drone_manager.write_lines_to_file(self.queue_log_path,
2682 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002683
2684
jadmanski0afbb632008-06-06 21:10:57 +00002685 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002686 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002687 row = [0, self.job.id, host_id]
2688 block = IneligibleHostQueue(row=row, new_record=True)
2689 block.save()
mblighe2586682008-02-29 22:45:46 +00002690
2691
jadmanski0afbb632008-06-06 21:10:57 +00002692 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002693 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002694 blocks = IneligibleHostQueue.fetch(
2695 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2696 for block in blocks:
2697 block.delete()
mblighe2586682008-02-29 22:45:46 +00002698
2699
showard2bab8f42008-11-12 18:15:22 +00002700 def set_execution_subdir(self, subdir=None):
2701 if subdir is None:
2702 assert self.get_host()
2703 subdir = self.get_host().hostname
2704 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002705
2706
showard6355f6b2008-12-05 18:52:13 +00002707 def _get_hostname(self):
2708 if self.host:
2709 return self.host.hostname
2710 return 'no host'
2711
2712
showard170873e2009-01-07 00:22:26 +00002713 def __str__(self):
showard828fc4c2009-09-14 20:31:00 +00002714 flags = []
2715 if self.active:
2716 flags.append('active')
2717 if self.complete:
2718 flags.append('complete')
2719 if self.deleted:
2720 flags.append('deleted')
2721 if self.aborted:
2722 flags.append('aborted')
2723 flags_str = ','.join(flags)
2724 if flags_str:
2725 flags_str = ' [%s]' % flags_str
2726 return "%s/%d (%d) %s%s" % (self._get_hostname(), self.job.id, self.id,
2727 self.status, flags_str)
showard170873e2009-01-07 00:22:26 +00002728
2729
jadmanski0afbb632008-06-06 21:10:57 +00002730 def set_status(self, status):
showard56824072009-10-12 20:30:21 +00002731 logging.info("%s -> %s", self, status)
mblighf8c624d2008-07-03 16:58:45 +00002732
showard56824072009-10-12 20:30:21 +00002733 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002734
showard8cc058f2009-09-08 16:26:33 +00002735 if status in (models.HostQueueEntry.Status.QUEUED,
2736 models.HostQueueEntry.Status.PARSING):
jadmanski0afbb632008-06-06 21:10:57 +00002737 self.update_field('complete', False)
2738 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002739
showard8cc058f2009-09-08 16:26:33 +00002740 if status in (models.HostQueueEntry.Status.PENDING,
2741 models.HostQueueEntry.Status.RUNNING,
2742 models.HostQueueEntry.Status.VERIFYING,
2743 models.HostQueueEntry.Status.STARTING,
2744 models.HostQueueEntry.Status.GATHERING):
jadmanski0afbb632008-06-06 21:10:57 +00002745 self.update_field('complete', False)
2746 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002747
showard8cc058f2009-09-08 16:26:33 +00002748 if status in (models.HostQueueEntry.Status.FAILED,
2749 models.HostQueueEntry.Status.COMPLETED,
2750 models.HostQueueEntry.Status.STOPPED,
2751 models.HostQueueEntry.Status.ABORTED):
jadmanski0afbb632008-06-06 21:10:57 +00002752 self.update_field('complete', True)
2753 self.update_field('active', False)
showardf85a0b72009-10-07 20:48:45 +00002754 self._on_complete()
showardc85c21b2008-11-24 22:17:37 +00002755
2756 should_email_status = (status.lower() in _notify_email_statuses or
2757 'all' in _notify_email_statuses)
2758 if should_email_status:
2759 self._email_on_status(status)
2760
2761 self._email_on_job_complete()
2762
2763
showardf85a0b72009-10-07 20:48:45 +00002764 def _on_complete(self):
2765 if not self.execution_subdir:
2766 return
2767 # unregister any possible pidfiles associated with this queue entry
2768 for pidfile_name in _ALL_PIDFILE_NAMES:
2769 pidfile_id = _drone_manager.get_pidfile_id_from(
2770 self.execution_path(), pidfile_name=pidfile_name)
2771 _drone_manager.unregister_pidfile(pidfile_id)
2772
2773
showardc85c21b2008-11-24 22:17:37 +00002774 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002775 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002776
2777 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2778 self.job.id, self.job.name, hostname, status)
2779 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2780 self.job.id, self.job.name, hostname, status,
2781 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002782 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002783
2784
2785 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002786 if not self.job.is_finished():
2787 return
showard542e8402008-09-19 20:16:18 +00002788
showardc85c21b2008-11-24 22:17:37 +00002789 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002790 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002791 for queue_entry in hosts_queue:
2792 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002793 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002794 queue_entry.status))
2795
2796 summary_text = "\n".join(summary_text)
2797 status_counts = models.Job.objects.get_status_counts(
2798 [self.job.id])[self.job.id]
2799 status = ', '.join('%d %s' % (count, status) for status, count
2800 in status_counts.iteritems())
2801
2802 subject = 'Autotest: Job ID: %s "%s" %s' % (
2803 self.job.id, self.job.name, status)
2804 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2805 self.job.id, self.job.name, status, self._view_job_url(),
2806 summary_text)
showard170873e2009-01-07 00:22:26 +00002807 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002808
2809
showard8cc058f2009-09-08 16:26:33 +00002810 def schedule_pre_job_tasks(self, assigned_host=None):
showard77182562009-06-10 00:16:05 +00002811 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002812 assert assigned_host
2813 # ensure results dir exists for the queue log
showard08356c12009-06-15 20:24:01 +00002814 if self.host_id is None:
2815 self.set_host(assigned_host)
2816 else:
2817 assert assigned_host.id == self.host_id
mbligh36768f02008-02-22 18:28:33 +00002818
showardcfd4a7e2009-07-11 01:47:33 +00002819 logging.info("%s/%s/%s scheduled on %s, status=%s",
showardb18134f2009-03-20 20:52:18 +00002820 self.job.name, self.meta_host, self.atomic_group_id,
2821 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002822
showard8cc058f2009-09-08 16:26:33 +00002823 self._do_schedule_pre_job_tasks()
showard77182562009-06-10 00:16:05 +00002824
2825
showard8cc058f2009-09-08 16:26:33 +00002826 def _do_schedule_pre_job_tasks(self):
showard77182562009-06-10 00:16:05 +00002827 # Every host goes thru the Verifying stage (which may or may not
2828 # actually do anything as determined by get_pre_job_tasks).
2829 self.set_status(models.HostQueueEntry.Status.VERIFYING)
showard8cc058f2009-09-08 16:26:33 +00002830 self.job.schedule_pre_job_tasks(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002831
showard6ae5ea92009-02-25 00:11:51 +00002832
jadmanski0afbb632008-06-06 21:10:57 +00002833 def requeue(self):
showardcfd4a7e2009-07-11 01:47:33 +00002834 assert self.host
showard8cc058f2009-09-08 16:26:33 +00002835 self.set_status(models.HostQueueEntry.Status.QUEUED)
showard12f3e322009-05-13 21:27:42 +00002836 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002837 # verify/cleanup failure sets the execution subdir, so reset it here
2838 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002839 if self.meta_host:
2840 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002841
2842
jadmanski0afbb632008-06-06 21:10:57 +00002843 def handle_host_failure(self):
2844 """\
2845 Called when this queue entry's host has failed verification and
2846 repair.
2847 """
2848 assert not self.meta_host
showard8cc058f2009-09-08 16:26:33 +00002849 self.set_status(models.HostQueueEntry.Status.FAILED)
showard2bab8f42008-11-12 18:15:22 +00002850 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002851
2852
jadmanskif7fa2cc2008-10-01 14:13:23 +00002853 @property
2854 def aborted_by(self):
2855 self._load_abort_info()
2856 return self._aborted_by
2857
2858
2859 @property
2860 def aborted_on(self):
2861 self._load_abort_info()
2862 return self._aborted_on
2863
2864
2865 def _load_abort_info(self):
2866 """ Fetch info about who aborted the job. """
2867 if hasattr(self, "_aborted_by"):
2868 return
2869 rows = _db.execute("""
2870 SELECT users.login, aborted_host_queue_entries.aborted_on
2871 FROM aborted_host_queue_entries
2872 INNER JOIN users
2873 ON users.id = aborted_host_queue_entries.aborted_by_id
2874 WHERE aborted_host_queue_entries.queue_entry_id = %s
2875 """, (self.id,))
2876 if rows:
2877 self._aborted_by, self._aborted_on = rows[0]
2878 else:
2879 self._aborted_by = self._aborted_on = None
2880
2881
showardb2e2c322008-10-14 17:33:55 +00002882 def on_pending(self):
2883 """
2884 Called when an entry in a synchronous job has passed verify. If the
showard8cc058f2009-09-08 16:26:33 +00002885 job is ready to run, sets the entries to STARTING. Otherwise, it leaves
2886 them in PENDING.
showardb2e2c322008-10-14 17:33:55 +00002887 """
showard8cc058f2009-09-08 16:26:33 +00002888 self.set_status(models.HostQueueEntry.Status.PENDING)
2889 self.host.set_status(models.Host.Status.PENDING)
showardb000a8d2009-07-28 20:02:07 +00002890
2891 # Some debug code here: sends an email if an asynchronous job does not
2892 # immediately enter Starting.
2893 # TODO: Remove this once we figure out why asynchronous jobs are getting
2894 # stuck in Pending.
showard8cc058f2009-09-08 16:26:33 +00002895 self.job.run_if_ready(queue_entry=self)
2896 if (self.job.synch_count == 1 and
2897 self.status == models.HostQueueEntry.Status.PENDING):
showardb000a8d2009-07-28 20:02:07 +00002898 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
2899 message = 'Asynchronous job stuck in Pending'
2900 email_manager.manager.enqueue_notify_email(subject, message)
showardb2e2c322008-10-14 17:33:55 +00002901
2902
showardd3dc1992009-04-22 21:01:40 +00002903 def abort(self, dispatcher):
2904 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002905
showardd3dc1992009-04-22 21:01:40 +00002906 Status = models.HostQueueEntry.Status
showard8cc058f2009-09-08 16:26:33 +00002907 if self.status in (Status.GATHERING, Status.PARSING):
showardd3dc1992009-04-22 21:01:40 +00002908 # do nothing; post-job tasks will finish and then mark this entry
2909 # with status "Aborted" and take care of the host
2910 return
2911
showard8cc058f2009-09-08 16:26:33 +00002912 if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING):
2913 assert not dispatcher.get_agents_for_entry(self)
showardd3dc1992009-04-22 21:01:40 +00002914 self.host.set_status(models.Host.Status.READY)
2915 elif self.status == Status.VERIFYING:
showard8cc058f2009-09-08 16:26:33 +00002916 models.SpecialTask.objects.create(
2917 task=models.SpecialTask.Task.CLEANUP,
2918 host=models.Host(id=self.host.id))
showardd3dc1992009-04-22 21:01:40 +00002919
2920 self.set_status(Status.ABORTED)
showardd2014822009-10-12 20:26:58 +00002921 self.job.abort_delay_ready_task()
showard170873e2009-01-07 00:22:26 +00002922
showard8cc058f2009-09-08 16:26:33 +00002923
2924 def get_group_name(self):
2925 atomic_group = self.atomic_group
2926 if not atomic_group:
2927 return ''
2928
2929 # Look at any meta_host and dependency labels and pick the first
2930 # one that also specifies this atomic group. Use that label name
2931 # as the group name if possible (it is more specific).
2932 for label in self.get_labels():
2933 if label.atomic_group_id:
2934 assert label.atomic_group_id == atomic_group.id
2935 return label.name
2936 return atomic_group.name
2937
2938
showard170873e2009-01-07 00:22:26 +00002939 def execution_tag(self):
2940 assert self.execution_subdir
mblighe7d9c602009-07-02 19:02:33 +00002941 return "%s/%s" % (self.job.tag(), self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002942
2943
showarded2afea2009-07-07 20:54:07 +00002944 def execution_path(self):
2945 return self.execution_tag()
2946
2947
mbligh36768f02008-02-22 18:28:33 +00002948class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002949 _table_name = 'jobs'
2950 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2951 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002952 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002953 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002954
showard77182562009-06-10 00:16:05 +00002955 # This does not need to be a column in the DB. The delays are likely to
2956 # be configured short. If the scheduler is stopped and restarted in
2957 # the middle of a job's delay cycle, the delay cycle will either be
2958 # repeated or skipped depending on the number of Pending machines found
2959 # when the restarted scheduler recovers to track it. Not a problem.
2960 #
2961 # A reference to the DelayedCallTask that will wake up the job should
2962 # no other HQEs change state in time. Its end_time attribute is used
2963 # by our run_with_ready_delay() method to determine if the wait is over.
2964 _delay_ready_task = None
2965
2966 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
2967 # all status='Pending' atomic group HQEs incase a delay was running when the
2968 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00002969
showarda3c58572009-03-12 20:36:59 +00002970 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002971 assert id or row
showarda3c58572009-03-12 20:36:59 +00002972 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002973
mblighe2586682008-02-29 22:45:46 +00002974
jadmanski0afbb632008-06-06 21:10:57 +00002975 def is_server_job(self):
2976 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002977
2978
showard170873e2009-01-07 00:22:26 +00002979 def tag(self):
2980 return "%s-%s" % (self.id, self.owner)
2981
2982
jadmanski0afbb632008-06-06 21:10:57 +00002983 def get_host_queue_entries(self):
2984 rows = _db.execute("""
2985 SELECT * FROM host_queue_entries
2986 WHERE job_id= %s
2987 """, (self.id,))
2988 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002989
jadmanski0afbb632008-06-06 21:10:57 +00002990 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002991
jadmanski0afbb632008-06-06 21:10:57 +00002992 return entries
mbligh36768f02008-02-22 18:28:33 +00002993
2994
jadmanski0afbb632008-06-06 21:10:57 +00002995 def set_status(self, status, update_queues=False):
2996 self.update_field('status',status)
2997
2998 if update_queues:
2999 for queue_entry in self.get_host_queue_entries():
3000 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00003001
3002
showard77182562009-06-10 00:16:05 +00003003 def _atomic_and_has_started(self):
3004 """
3005 @returns True if any of the HostQueueEntries associated with this job
3006 have entered the Status.STARTING state or beyond.
3007 """
3008 atomic_entries = models.HostQueueEntry.objects.filter(
3009 job=self.id, atomic_group__isnull=False)
3010 if atomic_entries.count() <= 0:
3011 return False
3012
showardaf8b4ca2009-06-16 18:47:26 +00003013 # These states may *only* be reached if Job.run() has been called.
3014 started_statuses = (models.HostQueueEntry.Status.STARTING,
3015 models.HostQueueEntry.Status.RUNNING,
3016 models.HostQueueEntry.Status.COMPLETED)
3017
3018 started_entries = atomic_entries.filter(status__in=started_statuses)
showard77182562009-06-10 00:16:05 +00003019 return started_entries.count() > 0
3020
3021
showard708b3522009-08-20 23:26:15 +00003022 def _hosts_assigned_count(self):
3023 """The number of HostQueueEntries assigned a Host for this job."""
3024 entries = models.HostQueueEntry.objects.filter(job=self.id,
3025 host__isnull=False)
3026 return entries.count()
3027
3028
showard77182562009-06-10 00:16:05 +00003029 def _pending_count(self):
3030 """The number of HostQueueEntries for this job in the Pending state."""
3031 pending_entries = models.HostQueueEntry.objects.filter(
3032 job=self.id, status=models.HostQueueEntry.Status.PENDING)
3033 return pending_entries.count()
3034
3035
showardd2014822009-10-12 20:26:58 +00003036 def _pending_threshold(self, atomic_group):
3037 """
3038 @param atomic_group: The AtomicGroup associated with this job that we
3039 are using to bound the threshold.
3040 @returns The minimum number of HostQueueEntries assigned a Host before
3041 this job can run.
3042 """
3043 return min(self._hosts_assigned_count(),
3044 atomic_group.max_number_of_machines)
3045
3046
jadmanski0afbb632008-06-06 21:10:57 +00003047 def is_ready(self):
showard77182562009-06-10 00:16:05 +00003048 # NOTE: Atomic group jobs stop reporting ready after they have been
3049 # started to avoid launching multiple copies of one atomic job.
3050 # Only possible if synch_count is less than than half the number of
3051 # machines in the atomic group.
showardb000a8d2009-07-28 20:02:07 +00003052 pending_count = self._pending_count()
3053 atomic_and_has_started = self._atomic_and_has_started()
3054 ready = (pending_count >= self.synch_count
showardd2014822009-10-12 20:26:58 +00003055 and not atomic_and_has_started)
showardb000a8d2009-07-28 20:02:07 +00003056
3057 if not ready:
3058 logging.info(
3059 'Job %s not ready: %s pending, %s required '
3060 '(Atomic and started: %s)',
3061 self, pending_count, self.synch_count,
3062 atomic_and_has_started)
3063
3064 return ready
mbligh36768f02008-02-22 18:28:33 +00003065
3066
jadmanski0afbb632008-06-06 21:10:57 +00003067 def num_machines(self, clause = None):
3068 sql = "job_id=%s" % self.id
3069 if clause:
3070 sql += " AND (%s)" % clause
3071 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00003072
3073
jadmanski0afbb632008-06-06 21:10:57 +00003074 def num_queued(self):
3075 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00003076
3077
jadmanski0afbb632008-06-06 21:10:57 +00003078 def num_active(self):
3079 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00003080
3081
jadmanski0afbb632008-06-06 21:10:57 +00003082 def num_complete(self):
3083 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00003084
3085
jadmanski0afbb632008-06-06 21:10:57 +00003086 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00003087 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00003088
mbligh36768f02008-02-22 18:28:33 +00003089
showard6bb7c292009-01-30 01:44:51 +00003090 def _not_yet_run_entries(self, include_verifying=True):
3091 statuses = [models.HostQueueEntry.Status.QUEUED,
3092 models.HostQueueEntry.Status.PENDING]
3093 if include_verifying:
3094 statuses.append(models.HostQueueEntry.Status.VERIFYING)
3095 return models.HostQueueEntry.objects.filter(job=self.id,
3096 status__in=statuses)
3097
3098
3099 def _stop_all_entries(self):
3100 entries_to_stop = self._not_yet_run_entries(
3101 include_verifying=False)
3102 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00003103 assert not child_entry.complete, (
3104 '%s status=%s, active=%s, complete=%s' %
3105 (child_entry.id, child_entry.status, child_entry.active,
3106 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00003107 if child_entry.status == models.HostQueueEntry.Status.PENDING:
3108 child_entry.host.status = models.Host.Status.READY
3109 child_entry.host.save()
3110 child_entry.status = models.HostQueueEntry.Status.STOPPED
3111 child_entry.save()
3112
showard2bab8f42008-11-12 18:15:22 +00003113 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00003114 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00003115 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00003116 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00003117
3118
jadmanski0afbb632008-06-06 21:10:57 +00003119 def write_to_machines_file(self, queue_entry):
3120 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00003121 file_path = os.path.join(self.tag(), '.machines')
3122 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00003123
3124
showardf1ae3542009-05-11 19:26:02 +00003125 def _next_group_name(self, group_name=''):
3126 """@returns a directory name to use for the next host group results."""
3127 if group_name:
3128 # Sanitize for use as a pathname.
3129 group_name = group_name.replace(os.path.sep, '_')
3130 if group_name.startswith('.'):
3131 group_name = '_' + group_name[1:]
3132 # Add a separator between the group name and 'group%d'.
3133 group_name += '.'
3134 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00003135 query = models.HostQueueEntry.objects.filter(
3136 job=self.id).values('execution_subdir').distinct()
3137 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00003138 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
3139 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00003140 if ids:
3141 next_id = max(ids) + 1
3142 else:
3143 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00003144 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00003145
3146
showarddb502762009-09-09 15:31:20 +00003147 def _write_control_file(self, execution_path):
showard170873e2009-01-07 00:22:26 +00003148 control_path = _drone_manager.attach_file_to_execution(
showarddb502762009-09-09 15:31:20 +00003149 execution_path, self.control_file)
showard170873e2009-01-07 00:22:26 +00003150 return control_path
mbligh36768f02008-02-22 18:28:33 +00003151
showardb2e2c322008-10-14 17:33:55 +00003152
showard2bab8f42008-11-12 18:15:22 +00003153 def get_group_entries(self, queue_entry_from_group):
showard8375ce02009-10-12 20:35:13 +00003154 """
3155 @param queue_entry_from_group: A HostQueueEntry instance to find other
3156 group entries on this job for.
3157
3158 @returns A list of HostQueueEntry objects all executing this job as
3159 part of the same group as the one supplied (having the same
3160 execution_subdir).
3161 """
showard2bab8f42008-11-12 18:15:22 +00003162 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00003163 return list(HostQueueEntry.fetch(
3164 where='job_id=%s AND execution_subdir=%s',
3165 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00003166
3167
showard8cc058f2009-09-08 16:26:33 +00003168 def get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00003169 assert queue_entries
showarddb502762009-09-09 15:31:20 +00003170 execution_path = queue_entries[0].execution_path()
3171 control_path = self._write_control_file(execution_path)
jadmanski0afbb632008-06-06 21:10:57 +00003172 hostnames = ','.join([entry.get_host().hostname
3173 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00003174
showarddb502762009-09-09 15:31:20 +00003175 execution_tag = queue_entries[0].execution_tag()
showard87ba02a2009-04-20 19:37:32 +00003176 params = _autoserv_command_line(
showarded2afea2009-07-07 20:54:07 +00003177 hostnames,
showard87ba02a2009-04-20 19:37:32 +00003178 ['-P', execution_tag, '-n',
3179 _drone_manager.absolute_path(control_path)],
showarde9c69362009-06-30 01:58:03 +00003180 job=self, verbose=False)
mbligh36768f02008-02-22 18:28:33 +00003181
jadmanski0afbb632008-06-06 21:10:57 +00003182 if not self.is_server_job():
3183 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00003184
showardb2e2c322008-10-14 17:33:55 +00003185 return params
mblighe2586682008-02-29 22:45:46 +00003186
mbligh36768f02008-02-22 18:28:33 +00003187
showardc9ae1782009-01-30 01:42:37 +00003188 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00003189 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00003190 return True
showard0fc38302008-10-23 00:44:07 +00003191 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00003192 return queue_entry.get_host().dirty
3193 return False
showard21baa452008-10-21 00:08:39 +00003194
showardc9ae1782009-01-30 01:42:37 +00003195
showard8cc058f2009-09-08 16:26:33 +00003196 def _should_run_verify(self, queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003197 do_not_verify = (queue_entry.host.protection ==
3198 host_protections.Protection.DO_NOT_VERIFY)
3199 if do_not_verify:
3200 return False
3201 return self.run_verify
3202
3203
showard8cc058f2009-09-08 16:26:33 +00003204 def schedule_pre_job_tasks(self, queue_entry):
showard77182562009-06-10 00:16:05 +00003205 """
3206 Get a list of tasks to perform before the host_queue_entry
3207 may be used to run this Job (such as Cleanup & Verify).
3208
3209 @returns A list of tasks to be done to the given queue_entry before
mbligh6fbdb802009-08-03 16:42:55 +00003210 it should be considered be ready to run this job. The last
showard77182562009-06-10 00:16:05 +00003211 task in the list calls HostQueueEntry.on_pending(), which
3212 continues the flow of the job.
3213 """
showardc9ae1782009-01-30 01:42:37 +00003214 if self._should_run_cleanup(queue_entry):
showard8cc058f2009-09-08 16:26:33 +00003215 task = models.SpecialTask.Task.CLEANUP
3216 elif self._should_run_verify(queue_entry):
3217 task = models.SpecialTask.Task.VERIFY
3218 else:
3219 queue_entry.on_pending()
3220 return
3221
3222 models.SpecialTask.objects.create(
3223 host=models.Host(id=queue_entry.host_id),
3224 queue_entry=models.HostQueueEntry(id=queue_entry.id),
3225 task=task)
showard21baa452008-10-21 00:08:39 +00003226
3227
showardf1ae3542009-05-11 19:26:02 +00003228 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00003229 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00003230 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00003231 else:
showardf1ae3542009-05-11 19:26:02 +00003232 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00003233 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00003234 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00003235 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003236
3237 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00003238 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003239
3240
3241 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00003242 """
3243 @returns A tuple containing a list of HostQueueEntry instances to be
3244 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00003245 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00003246 """
showard77182562009-06-10 00:16:05 +00003247 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00003248 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00003249 if atomic_group:
3250 num_entries_wanted = atomic_group.max_number_of_machines
3251 else:
3252 num_entries_wanted = self.synch_count
3253 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00003254
showardf1ae3542009-05-11 19:26:02 +00003255 if num_entries_wanted > 0:
3256 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00003257 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00003258 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00003259 params=(self.id, include_queue_entry.id)))
3260
3261 # Sort the chosen hosts by hostname before slicing.
3262 def cmp_queue_entries_by_hostname(entry_a, entry_b):
3263 return Host.cmp_for_sort(entry_a.host, entry_b.host)
3264 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
3265 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00003266
showardf1ae3542009-05-11 19:26:02 +00003267 # Sanity check. We'll only ever be called if this can be met.
showard828fc4c2009-09-14 20:31:00 +00003268 if len(chosen_entries) < self.synch_count:
3269 message = ('job %s got less than %s chosen entries: %s' % (
3270 self.id, self.synch_count, chosen_entries))
3271 logging.error(message)
3272 email_manager.manager.enqueue_notify_email(
3273 'Job not started, too few chosen entries', message)
3274 return []
showardf1ae3542009-05-11 19:26:02 +00003275
showard8cc058f2009-09-08 16:26:33 +00003276 group_name = include_queue_entry.get_group_name()
showardf1ae3542009-05-11 19:26:02 +00003277
3278 self._assign_new_group(chosen_entries, group_name=group_name)
showard8cc058f2009-09-08 16:26:33 +00003279 return chosen_entries
showard2bab8f42008-11-12 18:15:22 +00003280
3281
showard77182562009-06-10 00:16:05 +00003282 def run_if_ready(self, queue_entry):
3283 """
showard8375ce02009-10-12 20:35:13 +00003284 Run this job by kicking its HQEs into status='Starting' if enough
3285 hosts are ready for it to run.
3286
3287 Cleans up by kicking HQEs into status='Stopped' if this Job is not
3288 ready to run.
showard77182562009-06-10 00:16:05 +00003289 """
showardb2e2c322008-10-14 17:33:55 +00003290 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00003291 self.stop_if_necessary()
showard8cc058f2009-09-08 16:26:33 +00003292 elif queue_entry.atomic_group:
3293 self.run_with_ready_delay(queue_entry)
3294 else:
3295 self.run(queue_entry)
showard77182562009-06-10 00:16:05 +00003296
3297
3298 def run_with_ready_delay(self, queue_entry):
3299 """
3300 Start a delay to wait for more hosts to enter Pending state before
3301 launching an atomic group job. Once set, the a delay cannot be reset.
3302
3303 @param queue_entry: The HostQueueEntry object to get atomic group
3304 info from and pass to run_if_ready when the delay is up.
3305
3306 @returns An Agent to run the job as appropriate or None if a delay
3307 has already been set.
3308 """
3309 assert queue_entry.job_id == self.id
3310 assert queue_entry.atomic_group
3311 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
showardd2014822009-10-12 20:26:58 +00003312 over_max_threshold = (self._pending_count() >=
3313 self._pending_threshold(queue_entry.atomic_group))
showard77182562009-06-10 00:16:05 +00003314 delay_expired = (self._delay_ready_task and
3315 time.time() >= self._delay_ready_task.end_time)
3316
3317 # Delay is disabled or we already have enough? Do not wait to run.
3318 if not delay or over_max_threshold or delay_expired:
showard8cc058f2009-09-08 16:26:33 +00003319 self.run(queue_entry)
3320 else:
3321 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
showard77182562009-06-10 00:16:05 +00003322
showard8cc058f2009-09-08 16:26:33 +00003323
3324 def schedule_delayed_callback_task(self, queue_entry):
3325 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
3326
showard77182562009-06-10 00:16:05 +00003327 if self._delay_ready_task:
3328 return None
3329
showard8cc058f2009-09-08 16:26:33 +00003330 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
3331
showard77182562009-06-10 00:16:05 +00003332 def run_job_after_delay():
showardd2014822009-10-12 20:26:58 +00003333 logging.info('Job %s done waiting for extra hosts.', self)
3334 # Check to see if the job is still relevant. It could have aborted
3335 # while we were waiting or hosts could have disappearred, etc.
3336 threshold = self._pending_threshold(queue_entry.atomic_group)
3337 if self._pending_count() < threshold:
3338 logging.info('Job %s had too few Pending hosts after waiting '
3339 'for extras. Not running.', self)
3340 return
showard77182562009-06-10 00:16:05 +00003341 return self.run(queue_entry)
3342
showard708b3522009-08-20 23:26:15 +00003343 logging.info('Job %s waiting up to %s seconds for more hosts.',
3344 self.id, delay)
showard77182562009-06-10 00:16:05 +00003345 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3346 callback=run_job_after_delay)
showard8cc058f2009-09-08 16:26:33 +00003347 return self._delay_ready_task
showard77182562009-06-10 00:16:05 +00003348
3349
3350 def run(self, queue_entry):
3351 """
3352 @param queue_entry: The HostQueueEntry instance calling this method.
showard77182562009-06-10 00:16:05 +00003353 """
3354 if queue_entry.atomic_group and self._atomic_and_has_started():
3355 logging.error('Job.run() called on running atomic Job %d '
3356 'with HQE %s.', self.id, queue_entry)
showard8cc058f2009-09-08 16:26:33 +00003357 return
3358 queue_entries = self._choose_group_to_run(queue_entry)
showard828fc4c2009-09-14 20:31:00 +00003359 if queue_entries:
3360 self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00003361
3362
showard8cc058f2009-09-08 16:26:33 +00003363 def _finish_run(self, queue_entries):
showardb2ccdda2008-10-28 20:39:05 +00003364 for queue_entry in queue_entries:
showard8cc058f2009-09-08 16:26:33 +00003365 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showardd2014822009-10-12 20:26:58 +00003366 self.abort_delay_ready_task()
3367
3368
3369 def abort_delay_ready_task(self):
3370 """Abort the delayed task associated with this job, if any."""
showard77182562009-06-10 00:16:05 +00003371 if self._delay_ready_task:
3372 # Cancel any pending callback that would try to run again
3373 # as we are already running.
3374 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003375
showardd2014822009-10-12 20:26:58 +00003376
showardb000a8d2009-07-28 20:02:07 +00003377 def __str__(self):
3378 return '%s-%s' % (self.id, self.owner)
3379
3380
mbligh36768f02008-02-22 18:28:33 +00003381if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003382 main()