blob: 5df4b0d3f4ba5792aaa7563ba5c0a7fd79716563 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showard136e6dc2009-06-10 19:38:49 +000010import itertools, logging, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard043c62a2009-06-10 19:48:57 +000013from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard136e6dc2009-06-10 19:38:49 +000015from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000016from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000017from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000018from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000019from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000020from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000021from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000022
showard549afad2009-08-20 23:33:36 +000023BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
24PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000025
mbligh36768f02008-02-22 18:28:33 +000026RESULTS_DIR = '.'
27AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000028DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000029AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
30
31if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000032 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000033AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
34AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
35
36if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000037 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000038
showardd3dc1992009-04-22 21:01:40 +000039_AUTOSERV_PID_FILE = '.autoserv_execute'
40_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
41_PARSER_PID_FILE = '.parser_execute'
42
43_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
44 _PARSER_PID_FILE)
45
showard35162b02009-03-03 02:17:30 +000046# error message to leave in results dir when an autoserv process disappears
47# mysteriously
48_LOST_PROCESS_ERROR = """\
49Autoserv failed abnormally during execution for this job, probably due to a
50system error on the Autotest server. Full results may not be available. Sorry.
51"""
52
mbligh6f8bab42008-02-29 22:45:14 +000053_db = None
mbligh36768f02008-02-22 18:28:33 +000054_shutdown = False
showard170873e2009-01-07 00:22:26 +000055_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
56_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000057_testing_mode = False
showard542e8402008-09-19 20:16:18 +000058_base_url = None
showardc85c21b2008-11-24 22:17:37 +000059_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000060_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000061
62
showardec6a3b92009-09-25 20:29:13 +000063def _get_pidfile_timeout_secs():
64 """@returns How long to wait for autoserv to write pidfile."""
65 pidfile_timeout_mins = global_config.global_config.get_config_value(
66 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
67 return pidfile_timeout_mins * 60
68
69
mbligh83c1e9e2009-05-01 23:10:41 +000070def _site_init_monitor_db_dummy():
71 return {}
72
73
mbligh36768f02008-02-22 18:28:33 +000074def main():
showard27f33872009-04-07 18:20:53 +000075 try:
showard549afad2009-08-20 23:33:36 +000076 try:
77 main_without_exception_handling()
78 except SystemExit:
79 raise
80 except:
81 logging.exception('Exception escaping in monitor_db')
82 raise
83 finally:
84 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000085
86
87def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000088 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000089
showard136e6dc2009-06-10 19:38:49 +000090 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000091 parser = optparse.OptionParser(usage)
92 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
93 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000094 parser.add_option('--test', help='Indicate that scheduler is under ' +
95 'test and should use dummy autoserv and no parsing',
96 action='store_true')
97 (options, args) = parser.parse_args()
98 if len(args) != 1:
99 parser.print_usage()
100 return
mbligh36768f02008-02-22 18:28:33 +0000101
showard5613c662009-06-08 23:30:33 +0000102 scheduler_enabled = global_config.global_config.get_config_value(
103 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
104
105 if not scheduler_enabled:
106 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
107 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000108 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000109 sys.exit(1)
110
jadmanski0afbb632008-06-06 21:10:57 +0000111 global RESULTS_DIR
112 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000113
mbligh83c1e9e2009-05-01 23:10:41 +0000114 site_init = utils.import_site_function(__file__,
115 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
116 _site_init_monitor_db_dummy)
117 site_init()
118
showardcca334f2009-03-12 20:38:34 +0000119 # Change the cwd while running to avoid issues incase we were launched from
120 # somewhere odd (such as a random NFS home directory of the person running
121 # sudo to launch us as the appropriate user).
122 os.chdir(RESULTS_DIR)
123
jadmanski0afbb632008-06-06 21:10:57 +0000124 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000125 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
126 "notify_email_statuses",
127 default='')
showardc85c21b2008-11-24 22:17:37 +0000128 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000129 _notify_email_statuses = [status for status in
130 re.split(r'[\s,;:]', notify_statuses_list.lower())
131 if status]
showardc85c21b2008-11-24 22:17:37 +0000132
jadmanski0afbb632008-06-06 21:10:57 +0000133 if options.test:
134 global _autoserv_path
135 _autoserv_path = 'autoserv_dummy'
136 global _testing_mode
137 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000138
mbligh37eceaa2008-12-15 22:56:37 +0000139 # AUTOTEST_WEB.base_url is still a supported config option as some people
140 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000141 global _base_url
showard170873e2009-01-07 00:22:26 +0000142 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
143 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000144 if config_base_url:
145 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000146 else:
mbligh37eceaa2008-12-15 22:56:37 +0000147 # For the common case of everything running on a single server you
148 # can just set the hostname in a single place in the config file.
149 server_name = c.get_config_value('SERVER', 'hostname')
150 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000151 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000152 sys.exit(1)
153 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000154
showardc5afc462009-01-13 00:09:39 +0000155 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000156 server.start()
157
jadmanski0afbb632008-06-06 21:10:57 +0000158 try:
showard136e6dc2009-06-10 19:38:49 +0000159 init()
showardc5afc462009-01-13 00:09:39 +0000160 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000161 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000162
jadmanski0afbb632008-06-06 21:10:57 +0000163 while not _shutdown:
164 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000165 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000166 except:
showard170873e2009-01-07 00:22:26 +0000167 email_manager.manager.log_stacktrace(
168 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000169
showard170873e2009-01-07 00:22:26 +0000170 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000171 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000172 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000173 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000174
175
showard136e6dc2009-06-10 19:38:49 +0000176def setup_logging():
177 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
178 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
179 logging_manager.configure_logging(
180 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
181 logfile_name=log_name)
182
183
mbligh36768f02008-02-22 18:28:33 +0000184def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000185 global _shutdown
186 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000187 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000188
189
showard136e6dc2009-06-10 19:38:49 +0000190def init():
showardb18134f2009-03-20 20:52:18 +0000191 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
192 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000193
showard8de37132009-08-31 18:33:08 +0000194 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000195 logging.critical("monitor_db already running, aborting!")
196 sys.exit(1)
197 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000198
showardb1e51872008-10-07 11:08:18 +0000199 if _testing_mode:
200 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000201 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000202
jadmanski0afbb632008-06-06 21:10:57 +0000203 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
204 global _db
showard170873e2009-01-07 00:22:26 +0000205 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000206 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000207
showardfa8629c2008-11-04 16:51:23 +0000208 # ensure Django connection is in autocommit
209 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000210 # bypass the readonly connection
211 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000212
showardb18134f2009-03-20 20:52:18 +0000213 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000214 signal.signal(signal.SIGINT, handle_sigint)
215
showardd1ee1dd2009-01-07 21:33:08 +0000216 drones = global_config.global_config.get_config_value(
217 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
218 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000219 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000220 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000221 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
222
showardb18134f2009-03-20 20:52:18 +0000223 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000224
225
showarded2afea2009-07-07 20:54:07 +0000226def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
227 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000228 """
229 @returns The autoserv command line as a list of executable + parameters.
230
231 @param machines - string - A machine or comma separated list of machines
232 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000233 @param extra_args - list - Additional arguments to pass to autoserv.
234 @param job - Job object - If supplied, -u owner and -l name parameters
235 will be added.
236 @param queue_entry - A HostQueueEntry object - If supplied and no Job
237 object was supplied, this will be used to lookup the Job object.
238 """
showard87ba02a2009-04-20 19:37:32 +0000239 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
showarded2afea2009-07-07 20:54:07 +0000240 '-r', drone_manager.WORKING_DIRECTORY]
showard87ba02a2009-04-20 19:37:32 +0000241 if job or queue_entry:
242 if not job:
243 job = queue_entry.job
244 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000245 if verbose:
246 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000247 return autoserv_argv + extra_args
248
249
showard89f84db2009-03-12 20:39:13 +0000250class SchedulerError(Exception):
251 """Raised by HostScheduler when an inconsistent state occurs."""
252
253
showard63a34772008-08-18 19:32:50 +0000254class HostScheduler(object):
255 def _get_ready_hosts(self):
256 # avoid any host with a currently active queue entry against it
257 hosts = Host.fetch(
258 joins='LEFT JOIN host_queue_entries AS active_hqe '
259 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000260 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000261 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000262 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000263 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
264 return dict((host.id, host) for host in hosts)
265
266
267 @staticmethod
268 def _get_sql_id_list(id_list):
269 return ','.join(str(item_id) for item_id in id_list)
270
271
272 @classmethod
showard989f25d2008-10-01 11:38:11 +0000273 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000274 if not id_list:
275 return {}
showard63a34772008-08-18 19:32:50 +0000276 query %= cls._get_sql_id_list(id_list)
277 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000278 return cls._process_many2many_dict(rows, flip)
279
280
281 @staticmethod
282 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000283 result = {}
284 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000285 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000286 if flip:
287 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000288 result.setdefault(left_id, set()).add(right_id)
289 return result
290
291
292 @classmethod
293 def _get_job_acl_groups(cls, job_ids):
294 query = """
showardd9ac4452009-02-07 02:04:37 +0000295 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000296 FROM jobs
297 INNER JOIN users ON users.login = jobs.owner
298 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
299 WHERE jobs.id IN (%s)
300 """
301 return cls._get_many2many_dict(query, job_ids)
302
303
304 @classmethod
305 def _get_job_ineligible_hosts(cls, job_ids):
306 query = """
307 SELECT job_id, host_id
308 FROM ineligible_host_queues
309 WHERE job_id IN (%s)
310 """
311 return cls._get_many2many_dict(query, job_ids)
312
313
314 @classmethod
showard989f25d2008-10-01 11:38:11 +0000315 def _get_job_dependencies(cls, job_ids):
316 query = """
317 SELECT job_id, label_id
318 FROM jobs_dependency_labels
319 WHERE job_id IN (%s)
320 """
321 return cls._get_many2many_dict(query, job_ids)
322
323
324 @classmethod
showard63a34772008-08-18 19:32:50 +0000325 def _get_host_acls(cls, host_ids):
326 query = """
showardd9ac4452009-02-07 02:04:37 +0000327 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000328 FROM acl_groups_hosts
329 WHERE host_id IN (%s)
330 """
331 return cls._get_many2many_dict(query, host_ids)
332
333
334 @classmethod
335 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000336 if not host_ids:
337 return {}, {}
showard63a34772008-08-18 19:32:50 +0000338 query = """
339 SELECT label_id, host_id
340 FROM hosts_labels
341 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000342 """ % cls._get_sql_id_list(host_ids)
343 rows = _db.execute(query)
344 labels_to_hosts = cls._process_many2many_dict(rows)
345 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
346 return labels_to_hosts, hosts_to_labels
347
348
349 @classmethod
350 def _get_labels(cls):
351 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000352
353
354 def refresh(self, pending_queue_entries):
355 self._hosts_available = self._get_ready_hosts()
356
357 relevant_jobs = [queue_entry.job_id
358 for queue_entry in pending_queue_entries]
359 self._job_acls = self._get_job_acl_groups(relevant_jobs)
360 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000361 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000362
363 host_ids = self._hosts_available.keys()
364 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000365 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
366
367 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000368
369
370 def _is_acl_accessible(self, host_id, queue_entry):
371 job_acls = self._job_acls.get(queue_entry.job_id, set())
372 host_acls = self._host_acls.get(host_id, set())
373 return len(host_acls.intersection(job_acls)) > 0
374
375
showard989f25d2008-10-01 11:38:11 +0000376 def _check_job_dependencies(self, job_dependencies, host_labels):
377 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000378 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000379
380
381 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
382 queue_entry):
showardade14e22009-01-26 22:38:32 +0000383 if not queue_entry.meta_host:
384 # bypass only_if_needed labels when a specific host is selected
385 return True
386
showard989f25d2008-10-01 11:38:11 +0000387 for label_id in host_labels:
388 label = self._labels[label_id]
389 if not label.only_if_needed:
390 # we don't care about non-only_if_needed labels
391 continue
392 if queue_entry.meta_host == label_id:
393 # if the label was requested in a metahost it's OK
394 continue
395 if label_id not in job_dependencies:
396 return False
397 return True
398
399
showard89f84db2009-03-12 20:39:13 +0000400 def _check_atomic_group_labels(self, host_labels, queue_entry):
401 """
402 Determine if the given HostQueueEntry's atomic group settings are okay
403 to schedule on a host with the given labels.
404
showard6157c632009-07-06 20:19:31 +0000405 @param host_labels: A list of label ids that the host has.
406 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000407
408 @returns True if atomic group settings are okay, False otherwise.
409 """
showard6157c632009-07-06 20:19:31 +0000410 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000411 queue_entry.atomic_group_id)
412
413
showard6157c632009-07-06 20:19:31 +0000414 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000415 """
416 Return the atomic group label id for a host with the given set of
417 labels if any, or None otherwise. Raises an exception if more than
418 one atomic group are found in the set of labels.
419
showard6157c632009-07-06 20:19:31 +0000420 @param host_labels: A list of label ids that the host has.
421 @param queue_entry: The HostQueueEntry we're testing. Only used for
422 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000423
424 @returns The id of the atomic group found on a label in host_labels
425 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000426 """
showard6157c632009-07-06 20:19:31 +0000427 atomic_labels = [self._labels[label_id] for label_id in host_labels
428 if self._labels[label_id].atomic_group_id is not None]
429 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000430 if not atomic_ids:
431 return None
432 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000433 logging.error('More than one Atomic Group on HQE "%s" via: %r',
434 queue_entry, atomic_labels)
435 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000436
437
438 def _get_atomic_group_labels(self, atomic_group_id):
439 """
440 Lookup the label ids that an atomic_group is associated with.
441
442 @param atomic_group_id - The id of the AtomicGroup to look up.
443
444 @returns A generator yeilding Label ids for this atomic group.
445 """
446 return (id for id, label in self._labels.iteritems()
447 if label.atomic_group_id == atomic_group_id
448 and not label.invalid)
449
450
showard54c1ea92009-05-20 00:32:58 +0000451 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000452 """
453 @param group_hosts - A sequence of Host ids to test for usability
454 and eligibility against the Job associated with queue_entry.
455 @param queue_entry - The HostQueueEntry that these hosts are being
456 tested for eligibility against.
457
458 @returns A subset of group_hosts Host ids that are eligible for the
459 supplied queue_entry.
460 """
461 return set(host_id for host_id in group_hosts
462 if self._is_host_usable(host_id)
463 and self._is_host_eligible_for_job(host_id, queue_entry))
464
465
showard989f25d2008-10-01 11:38:11 +0000466 def _is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000467 if self._is_host_invalid(host_id):
468 # if an invalid host is scheduled for a job, it's a one-time host
469 # and it therefore bypasses eligibility checks. note this can only
470 # happen for non-metahosts, because invalid hosts have their label
471 # relationships cleared.
472 return True
473
showard989f25d2008-10-01 11:38:11 +0000474 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
475 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000476
showard89f84db2009-03-12 20:39:13 +0000477 return (self._is_acl_accessible(host_id, queue_entry) and
478 self._check_job_dependencies(job_dependencies, host_labels) and
479 self._check_only_if_needed_labels(
480 job_dependencies, host_labels, queue_entry) and
481 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000482
483
showard2924b0a2009-06-18 23:16:15 +0000484 def _is_host_invalid(self, host_id):
485 host_object = self._hosts_available.get(host_id, None)
486 return host_object and host_object.invalid
487
488
showard63a34772008-08-18 19:32:50 +0000489 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000490 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000491 return None
492 return self._hosts_available.pop(queue_entry.host_id, None)
493
494
495 def _is_host_usable(self, host_id):
496 if host_id not in self._hosts_available:
497 # host was already used during this scheduling cycle
498 return False
499 if self._hosts_available[host_id].invalid:
500 # Invalid hosts cannot be used for metahosts. They're included in
501 # the original query because they can be used by non-metahosts.
502 return False
503 return True
504
505
506 def _schedule_metahost(self, queue_entry):
507 label_id = queue_entry.meta_host
508 hosts_in_label = self._label_hosts.get(label_id, set())
509 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
510 set())
511
512 # must iterate over a copy so we can mutate the original while iterating
513 for host_id in list(hosts_in_label):
514 if not self._is_host_usable(host_id):
515 hosts_in_label.remove(host_id)
516 continue
517 if host_id in ineligible_host_ids:
518 continue
showard989f25d2008-10-01 11:38:11 +0000519 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000520 continue
521
showard89f84db2009-03-12 20:39:13 +0000522 # Remove the host from our cached internal state before returning
523 # the host object.
showard63a34772008-08-18 19:32:50 +0000524 hosts_in_label.remove(host_id)
525 return self._hosts_available.pop(host_id)
526 return None
527
528
529 def find_eligible_host(self, queue_entry):
530 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000531 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000532 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000533 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000534 return self._schedule_metahost(queue_entry)
535
536
showard89f84db2009-03-12 20:39:13 +0000537 def find_eligible_atomic_group(self, queue_entry):
538 """
539 Given an atomic group host queue entry, locate an appropriate group
540 of hosts for the associated job to run on.
541
542 The caller is responsible for creating new HQEs for the additional
543 hosts returned in order to run the actual job on them.
544
545 @returns A list of Host instances in a ready state to satisfy this
546 atomic group scheduling. Hosts will all belong to the same
547 atomic group label as specified by the queue_entry.
548 An empty list will be returned if no suitable atomic
549 group could be found.
550
551 TODO(gps): what is responsible for kicking off any attempted repairs on
552 a group of hosts? not this function, but something needs to. We do
553 not communicate that reason for returning [] outside of here...
554 For now, we'll just be unschedulable if enough hosts within one group
555 enter Repair Failed state.
556 """
557 assert queue_entry.atomic_group_id is not None
558 job = queue_entry.job
559 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000560 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000561 if job.synch_count > atomic_group.max_number_of_machines:
562 # Such a Job and HostQueueEntry should never be possible to
563 # create using the frontend. Regardless, we can't process it.
564 # Abort it immediately and log an error on the scheduler.
565 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000566 logging.error(
567 'Error: job %d synch_count=%d > requested atomic_group %d '
568 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
569 job.id, job.synch_count, atomic_group.id,
570 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000571 return []
572 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
573 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
574 set())
575
576 # Look in each label associated with atomic_group until we find one with
577 # enough hosts to satisfy the job.
578 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
579 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
580 if queue_entry.meta_host is not None:
581 # If we have a metahost label, only allow its hosts.
582 group_hosts.intersection_update(hosts_in_label)
583 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000584 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000585 group_hosts, queue_entry)
586
587 # Job.synch_count is treated as "minimum synch count" when
588 # scheduling for an atomic group of hosts. The atomic group
589 # number of machines is the maximum to pick out of a single
590 # atomic group label for scheduling at one time.
591 min_hosts = job.synch_count
592 max_hosts = atomic_group.max_number_of_machines
593
showard54c1ea92009-05-20 00:32:58 +0000594 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000595 # Not enough eligible hosts in this atomic group label.
596 continue
597
showard54c1ea92009-05-20 00:32:58 +0000598 eligible_hosts_in_group = [self._hosts_available[id]
599 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000600 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000601 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000602
showard89f84db2009-03-12 20:39:13 +0000603 # Limit ourselves to scheduling the atomic group size.
604 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000605 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000606
607 # Remove the selected hosts from our cached internal state
608 # of available hosts in order to return the Host objects.
609 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000610 for host in eligible_hosts_in_group:
611 hosts_in_label.discard(host.id)
612 self._hosts_available.pop(host.id)
613 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000614 return host_list
615
616 return []
617
618
showard170873e2009-01-07 00:22:26 +0000619class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000620 def __init__(self):
621 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000622 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000623 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000624 user_cleanup_time = scheduler_config.config.clean_interval
625 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
626 _db, user_cleanup_time)
627 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000628 self._host_agents = {}
629 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000630
mbligh36768f02008-02-22 18:28:33 +0000631
showard915958d2009-04-22 21:00:58 +0000632 def initialize(self, recover_hosts=True):
633 self._periodic_cleanup.initialize()
634 self._24hr_upkeep.initialize()
635
jadmanski0afbb632008-06-06 21:10:57 +0000636 # always recover processes
637 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000638
jadmanski0afbb632008-06-06 21:10:57 +0000639 if recover_hosts:
640 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000641
642
jadmanski0afbb632008-06-06 21:10:57 +0000643 def tick(self):
showard170873e2009-01-07 00:22:26 +0000644 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000645 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000646 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000647 self._process_recurring_runs()
showard8cc058f2009-09-08 16:26:33 +0000648 self._schedule_delay_tasks()
showard8cc058f2009-09-08 16:26:33 +0000649 self._schedule_running_host_queue_entries()
650 self._schedule_special_tasks()
showard65db3932009-10-28 19:54:35 +0000651 self._schedule_new_jobs()
jadmanski0afbb632008-06-06 21:10:57 +0000652 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000653 _drone_manager.execute_actions()
654 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000655
showard97aed502008-11-04 02:01:24 +0000656
mblighf3294cc2009-04-08 21:17:38 +0000657 def _run_cleanup(self):
658 self._periodic_cleanup.run_cleanup_maybe()
659 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000660
mbligh36768f02008-02-22 18:28:33 +0000661
showard170873e2009-01-07 00:22:26 +0000662 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
663 for object_id in object_ids:
664 agent_dict.setdefault(object_id, set()).add(agent)
665
666
667 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
668 for object_id in object_ids:
669 assert object_id in agent_dict
670 agent_dict[object_id].remove(agent)
671
672
showardd1195652009-12-08 22:21:02 +0000673 def add_agent_task(self, agent_task):
674 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000675 self._agents.append(agent)
676 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000677 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
678 self._register_agent_for_ids(self._queue_entry_agents,
679 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000680
showard170873e2009-01-07 00:22:26 +0000681
682 def get_agents_for_entry(self, queue_entry):
683 """
684 Find agents corresponding to the specified queue_entry.
685 """
showardd3dc1992009-04-22 21:01:40 +0000686 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000687
688
689 def host_has_agent(self, host):
690 """
691 Determine if there is currently an Agent present using this host.
692 """
693 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000694
695
jadmanski0afbb632008-06-06 21:10:57 +0000696 def remove_agent(self, agent):
697 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000698 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
699 agent)
700 self._unregister_agent_for_ids(self._queue_entry_agents,
701 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000702
703
showard8cc058f2009-09-08 16:26:33 +0000704 def _host_has_scheduled_special_task(self, host):
705 return bool(models.SpecialTask.objects.filter(host__id=host.id,
706 is_active=False,
707 is_complete=False))
708
709
jadmanski0afbb632008-06-06 21:10:57 +0000710 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000711 agent_tasks = self._create_recovery_agent_tasks()
712 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000713 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000714 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000715 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000716 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000717 self._reverify_remaining_hosts()
718 # reinitialize drones after killing orphaned processes, since they can
719 # leave around files when they die
720 _drone_manager.execute_actions()
721 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000722
showard170873e2009-01-07 00:22:26 +0000723
showardd1195652009-12-08 22:21:02 +0000724 def _create_recovery_agent_tasks(self):
725 return (self._get_queue_entry_agent_tasks()
726 + self._get_special_task_agent_tasks(is_active=True))
727
728
729 def _get_queue_entry_agent_tasks(self):
730 # host queue entry statuses handled directly by AgentTasks (Verifying is
731 # handled through SpecialTasks, so is not listed here)
732 statuses = (models.HostQueueEntry.Status.STARTING,
733 models.HostQueueEntry.Status.RUNNING,
734 models.HostQueueEntry.Status.GATHERING,
735 models.HostQueueEntry.Status.PARSING)
736 status_list = ','.join("'%s'" % status for status in statuses)
showard170873e2009-01-07 00:22:26 +0000737 queue_entries = HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000738 where='status IN (%s)' % status_list)
739
740 agent_tasks = []
741 used_queue_entries = set()
742 for entry in queue_entries:
743 if self.get_agents_for_entry(entry):
744 # already being handled
745 continue
746 if entry in used_queue_entries:
747 # already picked up by a synchronous job
748 continue
749 agent_task = self._get_agent_task_for_queue_entry(entry)
750 agent_tasks.append(agent_task)
751 used_queue_entries.update(agent_task.queue_entries)
752 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000753
754
showardd1195652009-12-08 22:21:02 +0000755 def _get_special_task_agent_tasks(self, is_active=False):
756 special_tasks = models.SpecialTask.objects.filter(
757 is_active=is_active, is_complete=False)
758 return [self._get_agent_task_for_special_task(task)
759 for task in special_tasks]
760
761
762 def _get_agent_task_for_queue_entry(self, queue_entry):
763 """
764 Construct an AgentTask instance for the given active HostQueueEntry,
765 if one can currently run it.
766 @param queue_entry: a HostQueueEntry
767 @returns an AgentTask to run the queue entry
768 """
769 task_entries = queue_entry.job.get_group_entries(queue_entry)
770 self._check_for_duplicate_host_entries(task_entries)
771
772 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
773 models.HostQueueEntry.Status.RUNNING):
774 return QueueTask(queue_entries=task_entries)
775 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
776 return GatherLogsTask(queue_entries=task_entries)
777 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
778 return FinalReparseTask(queue_entries=task_entries)
779
780 raise SchedulerError('_get_agent_task_for_queue_entry got entry with '
781 'invalid status %s: %s' % (entry.status, entry))
782
783
784 def _check_for_duplicate_host_entries(self, task_entries):
785 for task_entry in task_entries:
786 if task_entry.status != models.HostQueueEntry.Status.PARSING:
787 self._assert_host_has_no_agent(task_entry)
788
789
790 def _assert_host_has_no_agent(self, entry):
791 """
792 @param entry: a HostQueueEntry or a SpecialTask
793 """
794 if self.host_has_agent(entry.host):
795 agent = tuple(self._host_agents.get(entry.host.id))[0]
796 raise SchedulerError(
797 'While scheduling %s, host %s already has a host agent %s'
798 % (entry, entry.host, agent.task))
799
800
801 def _get_agent_task_for_special_task(self, special_task):
802 """
803 Construct an AgentTask class to run the given SpecialTask and add it
804 to this dispatcher.
805 @param special_task: a models.SpecialTask instance
806 @returns an AgentTask to run this SpecialTask
807 """
808 self._assert_host_has_no_agent(special_task)
809
810 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
811 for agent_task_class in special_agent_task_classes:
812 if agent_task_class.TASK_TYPE == special_task.task:
813 return agent_task_class(task=special_task)
814
815 raise SchedulerError('No AgentTask class for task', str(special_task))
816
817
818 def _register_pidfiles(self, agent_tasks):
819 for agent_task in agent_tasks:
820 agent_task.register_necessary_pidfiles()
821
822
823 def _recover_tasks(self, agent_tasks):
824 orphans = _drone_manager.get_orphaned_autoserv_processes()
825
826 for agent_task in agent_tasks:
827 agent_task.recover()
828 if agent_task.monitor and agent_task.monitor.has_process():
829 orphans.discard(agent_task.monitor.get_process())
830 self.add_agent_task(agent_task)
831
832 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000833
834
showard8cc058f2009-09-08 16:26:33 +0000835 def _get_unassigned_entries(self, status):
836 for entry in HostQueueEntry.fetch(where="status = '%s'" % status):
showard0db3d432009-10-12 20:29:15 +0000837 if entry.status == status and not self.get_agents_for_entry(entry):
838 # The status can change during iteration, e.g., if job.run()
839 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000840 yield entry
841
842
showard6878e8b2009-07-20 22:37:45 +0000843 def _check_for_remaining_orphan_processes(self, orphans):
844 if not orphans:
845 return
846 subject = 'Unrecovered orphan autoserv processes remain'
847 message = '\n'.join(str(process) for process in orphans)
848 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000849
850 die_on_orphans = global_config.global_config.get_config_value(
851 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
852
853 if die_on_orphans:
854 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000855
showard170873e2009-01-07 00:22:26 +0000856
showard8cc058f2009-09-08 16:26:33 +0000857 def _recover_pending_entries(self):
858 for entry in self._get_unassigned_entries(
859 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000860 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000861 entry.on_pending()
862
863
showardb8900452009-10-12 20:31:01 +0000864 def _check_for_unrecovered_verifying_entries(self):
showard170873e2009-01-07 00:22:26 +0000865 queue_entries = HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000866 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
867 unrecovered_hqes = []
868 for queue_entry in queue_entries:
869 special_tasks = models.SpecialTask.objects.filter(
870 task__in=(models.SpecialTask.Task.CLEANUP,
871 models.SpecialTask.Task.VERIFY),
872 queue_entry__id=queue_entry.id,
873 is_complete=False)
874 if special_tasks.count() == 0:
875 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000876
showardb8900452009-10-12 20:31:01 +0000877 if unrecovered_hqes:
878 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
showarde8e37072009-08-20 23:31:30 +0000879 raise SchedulerError(
showard37757f32009-10-19 18:34:24 +0000880 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000881 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000882
883
showard65db3932009-10-28 19:54:35 +0000884 def _get_prioritized_special_tasks(self):
885 """
886 Returns all queued SpecialTasks prioritized for repair first, then
887 cleanup, then verify.
888 """
889 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
890 is_complete=False,
891 host__locked=False)
892 # exclude hosts with active queue entries unless the SpecialTask is for
893 # that queue entry
894 queued_tasks = models.Host.objects.add_join(
895 queued_tasks, 'host_queue_entries', 'host_id',
896 join_condition='host_queue_entries.active',
897 force_left_join=True)
898 queued_tasks = queued_tasks.extra(
899 where=['(host_queue_entries.id IS NULL OR '
900 'host_queue_entries.id = special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000901
showard65db3932009-10-28 19:54:35 +0000902 # reorder tasks by priority
903 task_priority_order = [models.SpecialTask.Task.REPAIR,
904 models.SpecialTask.Task.CLEANUP,
905 models.SpecialTask.Task.VERIFY]
906 def task_priority_key(task):
907 return task_priority_order.index(task.task)
908 return sorted(queued_tasks, key=task_priority_key)
909
910
showard65db3932009-10-28 19:54:35 +0000911 def _schedule_special_tasks(self):
912 """
913 Execute queued SpecialTasks that are ready to run on idle hosts.
914 """
915 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000916 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000917 continue
showardd1195652009-12-08 22:21:02 +0000918 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000919
920
showard170873e2009-01-07 00:22:26 +0000921 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000922 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000923 # should never happen
showarded2afea2009-07-07 20:54:07 +0000924 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000925 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000926 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000927 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000928 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000929
930
jadmanski0afbb632008-06-06 21:10:57 +0000931 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000932 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +0000933 full_where='locked = 0 AND invalid = 0 AND ' + where
934 for host in Host.fetch(where=full_where):
935 if self.host_has_agent(host):
936 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000937 continue
showard8cc058f2009-09-08 16:26:33 +0000938 if self._host_has_scheduled_special_task(host):
939 # host will have a special task scheduled on the next cycle
940 continue
showard170873e2009-01-07 00:22:26 +0000941 if print_message:
showardb18134f2009-03-20 20:52:18 +0000942 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000943 models.SpecialTask.objects.create(
944 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000945 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000946
947
jadmanski0afbb632008-06-06 21:10:57 +0000948 def _recover_hosts(self):
949 # recover "Repair Failed" hosts
950 message = 'Reverifying dead host %s'
951 self._reverify_hosts_where("status = 'Repair Failed'",
952 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000953
954
showard04c82c52008-05-29 19:38:12 +0000955
showardb95b1bd2008-08-15 18:11:04 +0000956 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000957 # prioritize by job priority, then non-metahost over metahost, then FIFO
958 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000959 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000960 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000961 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000962
963
showard89f84db2009-03-12 20:39:13 +0000964 def _refresh_pending_queue_entries(self):
965 """
966 Lookup the pending HostQueueEntries and call our HostScheduler
967 refresh() method given that list. Return the list.
968
969 @returns A list of pending HostQueueEntries sorted in priority order.
970 """
showard63a34772008-08-18 19:32:50 +0000971 queue_entries = self._get_pending_queue_entries()
972 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000973 return []
showardb95b1bd2008-08-15 18:11:04 +0000974
showard63a34772008-08-18 19:32:50 +0000975 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000976
showard89f84db2009-03-12 20:39:13 +0000977 return queue_entries
978
979
980 def _schedule_atomic_group(self, queue_entry):
981 """
982 Schedule the given queue_entry on an atomic group of hosts.
983
984 Returns immediately if there are insufficient available hosts.
985
986 Creates new HostQueueEntries based off of queue_entry for the
987 scheduled hosts and starts them all running.
988 """
989 # This is a virtual host queue entry representing an entire
990 # atomic group, find a group and schedule their hosts.
991 group_hosts = self._host_scheduler.find_eligible_atomic_group(
992 queue_entry)
993 if not group_hosts:
994 return
showardcbe6f942009-06-17 19:33:49 +0000995
996 logging.info('Expanding atomic group entry %s with hosts %s',
997 queue_entry,
998 ', '.join(host.hostname for host in group_hosts))
showard89f84db2009-03-12 20:39:13 +0000999 # The first assigned host uses the original HostQueueEntry
1000 group_queue_entries = [queue_entry]
1001 for assigned_host in group_hosts[1:]:
1002 # Create a new HQE for every additional assigned_host.
1003 new_hqe = HostQueueEntry.clone(queue_entry)
1004 new_hqe.save()
1005 group_queue_entries.append(new_hqe)
1006 assert len(group_queue_entries) == len(group_hosts)
1007 for queue_entry, host in itertools.izip(group_queue_entries,
1008 group_hosts):
1009 self._run_queue_entry(queue_entry, host)
1010
1011
1012 def _schedule_new_jobs(self):
1013 queue_entries = self._refresh_pending_queue_entries()
1014 if not queue_entries:
1015 return
1016
showard63a34772008-08-18 19:32:50 +00001017 for queue_entry in queue_entries:
showarde55955f2009-10-07 20:48:58 +00001018 is_unassigned_atomic_group = (
1019 queue_entry.atomic_group_id is not None
1020 and queue_entry.host_id is None)
1021 if is_unassigned_atomic_group:
1022 self._schedule_atomic_group(queue_entry)
1023 else:
showard89f84db2009-03-12 20:39:13 +00001024 assigned_host = self._host_scheduler.find_eligible_host(
1025 queue_entry)
showard65db3932009-10-28 19:54:35 +00001026 if assigned_host and not self.host_has_agent(assigned_host):
showard89f84db2009-03-12 20:39:13 +00001027 self._run_queue_entry(queue_entry, assigned_host)
showardb95b1bd2008-08-15 18:11:04 +00001028
1029
showard8cc058f2009-09-08 16:26:33 +00001030 def _schedule_running_host_queue_entries(self):
showardd1195652009-12-08 22:21:02 +00001031 for agent_task in self._get_queue_entry_agent_tasks():
1032 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +00001033
1034
1035 def _schedule_delay_tasks(self):
showardd2014822009-10-12 20:26:58 +00001036 for entry in HostQueueEntry.fetch(where='status = "%s"' %
1037 models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +00001038 task = entry.job.schedule_delayed_callback_task(entry)
1039 if task:
showardd1195652009-12-08 22:21:02 +00001040 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +00001041
1042
showardb95b1bd2008-08-15 18:11:04 +00001043 def _run_queue_entry(self, queue_entry, host):
showard8cc058f2009-09-08 16:26:33 +00001044 queue_entry.schedule_pre_job_tasks(assigned_host=host)
mblighd5c95802008-03-05 00:33:46 +00001045
1046
jadmanski0afbb632008-06-06 21:10:57 +00001047 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +00001048 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001049 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001050 for agent in self.get_agents_for_entry(entry):
1051 agent.abort()
1052 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001053
1054
showard324bf812009-01-20 23:23:38 +00001055 def _can_start_agent(self, agent, num_started_this_cycle,
1056 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001057 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +00001058 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +00001059 return True
1060 # don't allow any nonzero-process agents to run after we've reached a
1061 # limit (this avoids starvation of many-process agents)
1062 if have_reached_limit:
1063 return False
1064 # total process throttling
showard9bb960b2009-11-19 01:02:11 +00001065 max_runnable_processes = _drone_manager.max_runnable_processes(
showardd1195652009-12-08 22:21:02 +00001066 agent.task.owner_username)
1067 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +00001068 return False
1069 # if a single agent exceeds the per-cycle throttling, still allow it to
1070 # run when it's the first agent in the cycle
1071 if num_started_this_cycle == 0:
1072 return True
1073 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +00001074 if (num_started_this_cycle + agent.task.num_processes >
1075 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001076 return False
1077 return True
1078
1079
jadmanski0afbb632008-06-06 21:10:57 +00001080 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001081 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001082 have_reached_limit = False
1083 # iterate over copy, so we can remove agents during iteration
1084 for agent in list(self._agents):
showard8cc058f2009-09-08 16:26:33 +00001085 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001086 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001087 have_reached_limit):
1088 have_reached_limit = True
1089 continue
showardd1195652009-12-08 22:21:02 +00001090 num_started_this_cycle += agent.task.num_processes
showard4c5374f2008-09-04 17:02:56 +00001091 agent.tick()
showard8cc058f2009-09-08 16:26:33 +00001092 if agent.is_done():
1093 logging.info("agent finished")
1094 self.remove_agent(agent)
showarda9435c02009-05-13 21:28:17 +00001095 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001096 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001097
1098
showard29f7cd22009-04-29 21:16:24 +00001099 def _process_recurring_runs(self):
1100 recurring_runs = models.RecurringRun.objects.filter(
1101 start_date__lte=datetime.datetime.now())
1102 for rrun in recurring_runs:
1103 # Create job from template
1104 job = rrun.job
1105 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001106 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001107
1108 host_objects = info['hosts']
1109 one_time_hosts = info['one_time_hosts']
1110 metahost_objects = info['meta_hosts']
1111 dependencies = info['dependencies']
1112 atomic_group = info['atomic_group']
1113
1114 for host in one_time_hosts or []:
1115 this_host = models.Host.create_one_time_host(host.hostname)
1116 host_objects.append(this_host)
1117
1118 try:
1119 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001120 options=options,
showard29f7cd22009-04-29 21:16:24 +00001121 host_objects=host_objects,
1122 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001123 atomic_group=atomic_group)
1124
1125 except Exception, ex:
1126 logging.exception(ex)
1127 #TODO send email
1128
1129 if rrun.loop_count == 1:
1130 rrun.delete()
1131 else:
1132 if rrun.loop_count != 0: # if not infinite loop
1133 # calculate new start_date
1134 difference = datetime.timedelta(seconds=rrun.loop_period)
1135 rrun.start_date = rrun.start_date + difference
1136 rrun.loop_count -= 1
1137 rrun.save()
1138
1139
showard170873e2009-01-07 00:22:26 +00001140class PidfileRunMonitor(object):
1141 """
1142 Client must call either run() to start a new process or
1143 attach_to_existing_process().
1144 """
mbligh36768f02008-02-22 18:28:33 +00001145
showard170873e2009-01-07 00:22:26 +00001146 class _PidfileException(Exception):
1147 """
1148 Raised when there's some unexpected behavior with the pid file, but only
1149 used internally (never allowed to escape this class).
1150 """
mbligh36768f02008-02-22 18:28:33 +00001151
1152
showard170873e2009-01-07 00:22:26 +00001153 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001154 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001155 self._start_time = None
1156 self.pidfile_id = None
1157 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001158
1159
showard170873e2009-01-07 00:22:26 +00001160 def _add_nice_command(self, command, nice_level):
1161 if not nice_level:
1162 return command
1163 return ['nice', '-n', str(nice_level)] + command
1164
1165
1166 def _set_start_time(self):
1167 self._start_time = time.time()
1168
1169
showard418785b2009-11-23 20:19:59 +00001170 def run(self, command, working_directory, num_processes, nice_level=None,
1171 log_file=None, pidfile_name=None, paired_with_pidfile=None,
1172 username=None):
showard170873e2009-01-07 00:22:26 +00001173 assert command is not None
1174 if nice_level is not None:
1175 command = ['nice', '-n', str(nice_level)] + command
1176 self._set_start_time()
1177 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001178 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +00001179 num_processes=num_processes, log_file=log_file,
1180 paired_with_pidfile=paired_with_pidfile, username=username)
showard170873e2009-01-07 00:22:26 +00001181
1182
showarded2afea2009-07-07 20:54:07 +00001183 def attach_to_existing_process(self, execution_path,
showardd1195652009-12-08 22:21:02 +00001184 pidfile_name=_AUTOSERV_PID_FILE,
1185 num_processes=None):
showard170873e2009-01-07 00:22:26 +00001186 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001187 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001188 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +00001189 if num_processes is not None:
1190 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +00001191
1192
jadmanski0afbb632008-06-06 21:10:57 +00001193 def kill(self):
showard170873e2009-01-07 00:22:26 +00001194 if self.has_process():
1195 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001196
mbligh36768f02008-02-22 18:28:33 +00001197
showard170873e2009-01-07 00:22:26 +00001198 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001199 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001200 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001201
1202
showard170873e2009-01-07 00:22:26 +00001203 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001204 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001205 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001206 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001207
1208
showard170873e2009-01-07 00:22:26 +00001209 def _read_pidfile(self, use_second_read=False):
1210 assert self.pidfile_id is not None, (
1211 'You must call run() or attach_to_existing_process()')
1212 contents = _drone_manager.get_pidfile_contents(
1213 self.pidfile_id, use_second_read=use_second_read)
1214 if contents.is_invalid():
1215 self._state = drone_manager.PidfileContents()
1216 raise self._PidfileException(contents)
1217 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001218
1219
showard21baa452008-10-21 00:08:39 +00001220 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001221 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1222 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001223 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001224 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001225
1226
1227 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001228 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001229 return
mblighbb421852008-03-11 22:36:16 +00001230
showard21baa452008-10-21 00:08:39 +00001231 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001232
showard170873e2009-01-07 00:22:26 +00001233 if self._state.process is None:
1234 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001235 return
mbligh90a549d2008-03-25 23:52:34 +00001236
showard21baa452008-10-21 00:08:39 +00001237 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001238 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001239 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001240 return
mbligh90a549d2008-03-25 23:52:34 +00001241
showard170873e2009-01-07 00:22:26 +00001242 # pid but no running process - maybe process *just* exited
1243 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001244 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001245 # autoserv exited without writing an exit code
1246 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001247 self._handle_pidfile_error(
1248 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001249
showard21baa452008-10-21 00:08:39 +00001250
1251 def _get_pidfile_info(self):
1252 """\
1253 After completion, self._state will contain:
1254 pid=None, exit_status=None if autoserv has not yet run
1255 pid!=None, exit_status=None if autoserv is running
1256 pid!=None, exit_status!=None if autoserv has completed
1257 """
1258 try:
1259 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001260 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001261 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001262
1263
showard170873e2009-01-07 00:22:26 +00001264 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001265 """\
1266 Called when no pidfile is found or no pid is in the pidfile.
1267 """
showard170873e2009-01-07 00:22:26 +00001268 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001269 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001270 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001271 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001272 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001273
1274
showard35162b02009-03-03 02:17:30 +00001275 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001276 """\
1277 Called when autoserv has exited without writing an exit status,
1278 or we've timed out waiting for autoserv to write a pid to the
1279 pidfile. In either case, we just return failure and the caller
1280 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001281
showard170873e2009-01-07 00:22:26 +00001282 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001283 """
1284 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001285 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001286 self._state.exit_status = 1
1287 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001288
1289
jadmanski0afbb632008-06-06 21:10:57 +00001290 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001291 self._get_pidfile_info()
1292 return self._state.exit_status
1293
1294
1295 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001296 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001297 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001298 if self._state.num_tests_failed is None:
1299 return -1
showard21baa452008-10-21 00:08:39 +00001300 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001301
1302
showardcdaeae82009-08-31 18:32:48 +00001303 def try_copy_results_on_drone(self, **kwargs):
1304 if self.has_process():
1305 # copy results logs into the normal place for job results
1306 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1307
1308
1309 def try_copy_to_results_repository(self, source, **kwargs):
1310 if self.has_process():
1311 _drone_manager.copy_to_results_repository(self.get_process(),
1312 source, **kwargs)
1313
1314
mbligh36768f02008-02-22 18:28:33 +00001315class Agent(object):
showard77182562009-06-10 00:16:05 +00001316 """
showard8cc058f2009-09-08 16:26:33 +00001317 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001318
1319 The following methods are required on all task objects:
1320 poll() - Called periodically to let the task check its status and
1321 update its internal state. If the task succeeded.
1322 is_done() - Returns True if the task is finished.
1323 abort() - Called when an abort has been requested. The task must
1324 set its aborted attribute to True if it actually aborted.
1325
1326 The following attributes are required on all task objects:
1327 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001328 success - bool, True if this task succeeded.
1329 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1330 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001331 """
1332
1333
showard418785b2009-11-23 20:19:59 +00001334 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001335 """
showard8cc058f2009-09-08 16:26:33 +00001336 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001337 """
showard8cc058f2009-09-08 16:26:33 +00001338 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001339
showard77182562009-06-10 00:16:05 +00001340 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001341 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001342
showard8cc058f2009-09-08 16:26:33 +00001343 self.queue_entry_ids = task.queue_entry_ids
1344 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001345
showard8cc058f2009-09-08 16:26:33 +00001346 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001347 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001348
1349
jadmanski0afbb632008-06-06 21:10:57 +00001350 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001351 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001352 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001353 self.task.poll()
1354 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001355 self.finished = True
showardec113162008-05-08 00:52:49 +00001356
1357
jadmanski0afbb632008-06-06 21:10:57 +00001358 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001359 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001360
1361
showardd3dc1992009-04-22 21:01:40 +00001362 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001363 if self.task:
1364 self.task.abort()
1365 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001366 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001367 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001368
showardd3dc1992009-04-22 21:01:40 +00001369
showard77182562009-06-10 00:16:05 +00001370class DelayedCallTask(object):
1371 """
1372 A task object like AgentTask for an Agent to run that waits for the
1373 specified amount of time to have elapsed before calling the supplied
1374 callback once and finishing. If the callback returns anything, it is
1375 assumed to be a new Agent instance and will be added to the dispatcher.
1376
1377 @attribute end_time: The absolute posix time after which this task will
1378 call its callback when it is polled and be finished.
1379
1380 Also has all attributes required by the Agent class.
1381 """
1382 def __init__(self, delay_seconds, callback, now_func=None):
1383 """
1384 @param delay_seconds: The delay in seconds from now that this task
1385 will call the supplied callback and be done.
1386 @param callback: A callable to be called by this task once after at
1387 least delay_seconds time has elapsed. It must return None
1388 or a new Agent instance.
1389 @param now_func: A time.time like function. Default: time.time.
1390 Used for testing.
1391 """
1392 assert delay_seconds > 0
1393 assert callable(callback)
1394 if not now_func:
1395 now_func = time.time
1396 self._now_func = now_func
1397 self._callback = callback
1398
1399 self.end_time = self._now_func() + delay_seconds
1400
1401 # These attributes are required by Agent.
1402 self.aborted = False
showard77182562009-06-10 00:16:05 +00001403 self.host_ids = ()
1404 self.success = False
1405 self.queue_entry_ids = ()
showard418785b2009-11-23 20:19:59 +00001406 self.num_processes = 0
showard77182562009-06-10 00:16:05 +00001407
1408
1409 def poll(self):
showard8cc058f2009-09-08 16:26:33 +00001410 if not self.is_done() and self._now_func() >= self.end_time:
1411 self._callback()
showard77182562009-06-10 00:16:05 +00001412 self.success = True
1413
1414
1415 def is_done(self):
showard8cc058f2009-09-08 16:26:33 +00001416 return self.success or self.aborted
showard77182562009-06-10 00:16:05 +00001417
1418
1419 def abort(self):
1420 self.aborted = True
showard77182562009-06-10 00:16:05 +00001421
1422
mbligh36768f02008-02-22 18:28:33 +00001423class AgentTask(object):
showardd1195652009-12-08 22:21:02 +00001424 class _NullMonitor(object):
1425 pidfile_id = None
1426
1427 def has_process(self):
1428 return True
1429
1430
1431 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001432 """
showardd1195652009-12-08 22:21:02 +00001433 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001434 """
jadmanski0afbb632008-06-06 21:10:57 +00001435 self.done = False
showardd1195652009-12-08 22:21:02 +00001436 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001437 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001438 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001439 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001440 self.queue_entry_ids = []
1441 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001442 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001443
1444
1445 def _set_ids(self, host=None, queue_entries=None):
1446 if queue_entries and queue_entries != [None]:
1447 self.host_ids = [entry.host.id for entry in queue_entries]
1448 self.queue_entry_ids = [entry.id for entry in queue_entries]
1449 else:
1450 assert host
1451 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001452
1453
jadmanski0afbb632008-06-06 21:10:57 +00001454 def poll(self):
showard08a36412009-05-05 01:01:13 +00001455 if not self.started:
1456 self.start()
showardd1195652009-12-08 22:21:02 +00001457 if not self.done:
1458 self.tick()
showard08a36412009-05-05 01:01:13 +00001459
1460
1461 def tick(self):
showardd1195652009-12-08 22:21:02 +00001462 assert self.monitor
1463 exit_code = self.monitor.exit_code()
1464 if exit_code is None:
1465 return
mbligh36768f02008-02-22 18:28:33 +00001466
showardd1195652009-12-08 22:21:02 +00001467 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001468 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001469
1470
jadmanski0afbb632008-06-06 21:10:57 +00001471 def is_done(self):
1472 return self.done
mbligh36768f02008-02-22 18:28:33 +00001473
1474
jadmanski0afbb632008-06-06 21:10:57 +00001475 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001476 if self.done:
showardd1195652009-12-08 22:21:02 +00001477 assert self.started
showard08a36412009-05-05 01:01:13 +00001478 return
showardd1195652009-12-08 22:21:02 +00001479 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001480 self.done = True
1481 self.success = success
1482 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001483
1484
jadmanski0afbb632008-06-06 21:10:57 +00001485 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001486 """
1487 To be overridden.
1488 """
showarded2afea2009-07-07 20:54:07 +00001489 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001490 self.register_necessary_pidfiles()
1491
1492
1493 def _log_file(self):
1494 if not self._log_file_name:
1495 return None
1496 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001497
mbligh36768f02008-02-22 18:28:33 +00001498
jadmanski0afbb632008-06-06 21:10:57 +00001499 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001500 log_file = self._log_file()
1501 if self.monitor and log_file:
1502 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001503
1504
jadmanski0afbb632008-06-06 21:10:57 +00001505 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001506 """
1507 To be overridden.
1508 """
jadmanski0afbb632008-06-06 21:10:57 +00001509 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001510
1511
jadmanski0afbb632008-06-06 21:10:57 +00001512 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001513 if not self.started:
1514 self.prolog()
1515 self.run()
1516
1517 self.started = True
1518
1519
1520 def abort(self):
1521 if self.monitor:
1522 self.monitor.kill()
1523 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001524 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001525 self.cleanup()
1526
1527
showarded2afea2009-07-07 20:54:07 +00001528 def _get_consistent_execution_path(self, execution_entries):
1529 first_execution_path = execution_entries[0].execution_path()
1530 for execution_entry in execution_entries[1:]:
1531 assert execution_entry.execution_path() == first_execution_path, (
1532 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1533 execution_entry,
1534 first_execution_path,
1535 execution_entries[0]))
1536 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001537
1538
showarded2afea2009-07-07 20:54:07 +00001539 def _copy_results(self, execution_entries, use_monitor=None):
1540 """
1541 @param execution_entries: list of objects with execution_path() method
1542 """
showard6d1c1432009-08-20 23:30:39 +00001543 if use_monitor is not None and not use_monitor.has_process():
1544 return
1545
showarded2afea2009-07-07 20:54:07 +00001546 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001547 if use_monitor is None:
1548 assert self.monitor
1549 use_monitor = self.monitor
1550 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001551 execution_path = self._get_consistent_execution_path(execution_entries)
1552 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001553 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001554
showarda1e74b32009-05-12 17:32:04 +00001555
1556 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001557 for queue_entry in queue_entries:
1558 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001559
1560
showarda1e74b32009-05-12 17:32:04 +00001561 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1562 self._copy_results(queue_entries, use_monitor)
1563 self._parse_results(queue_entries)
1564
1565
showardd1195652009-12-08 22:21:02 +00001566 def _command_line(self):
1567 """
1568 Return the command line to run. Must be overridden.
1569 """
1570 raise NotImplementedError
1571
1572
1573 @property
1574 def num_processes(self):
1575 """
1576 Return the number of processes forked by this AgentTask's process. It
1577 may only be approximate. To be overridden if necessary.
1578 """
1579 return 1
1580
1581
1582 def _paired_with_monitor(self):
1583 """
1584 If this AgentTask's process must run on the same machine as some
1585 previous process, this method should be overridden to return a
1586 PidfileRunMonitor for that process.
1587 """
1588 return self._NullMonitor()
1589
1590
1591 @property
1592 def owner_username(self):
1593 """
1594 Return login of user responsible for this task. May be None. Must be
1595 overridden.
1596 """
1597 raise NotImplementedError
1598
1599
1600 def _working_directory(self):
1601 """
1602 Return the directory where this AgentTask's process executes. Must be
1603 overridden.
1604 """
1605 raise NotImplementedError
1606
1607
1608 def _pidfile_name(self):
1609 """
1610 Return the name of the pidfile this AgentTask's process uses. To be
1611 overridden if necessary.
1612 """
1613 return _AUTOSERV_PID_FILE
1614
1615
1616 def _check_paired_results_exist(self):
1617 if not self._paired_with_monitor().has_process():
1618 email_manager.manager.enqueue_notify_email(
1619 'No paired results in task',
1620 'No paired results in task %s at %s'
1621 % (self, self._paired_with_monitor().pidfile_id))
1622 self.finished(False)
1623 return False
1624 return True
1625
1626
1627 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001628 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001629 self.monitor = PidfileRunMonitor()
1630
1631
1632 def run(self):
1633 if not self._check_paired_results_exist():
1634 return
1635
1636 self._create_monitor()
1637 self.monitor.run(
1638 self._command_line(), self._working_directory(),
1639 num_processes=self.num_processes,
1640 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1641 pidfile_name=self._pidfile_name(),
1642 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
1643 username=self.owner_username)
1644
1645
1646 def register_necessary_pidfiles(self):
1647 pidfile_id = _drone_manager.get_pidfile_id_from(
1648 self._working_directory(), self._pidfile_name())
1649 _drone_manager.register_pidfile(pidfile_id)
1650
1651 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1652 if paired_pidfile_id:
1653 _drone_manager.register_pidfile(paired_pidfile_id)
1654
1655
1656 def recover(self):
1657 if not self._check_paired_results_exist():
1658 return
1659
1660 self._create_monitor()
1661 self.monitor.attach_to_existing_process(
1662 self._working_directory(), pidfile_name=self._pidfile_name(),
1663 num_processes=self.num_processes)
1664 if not self.monitor.has_process():
1665 # no process to recover; wait to be started normally
1666 self.monitor = None
1667 return
1668
1669 self.started = True
1670 logging.info('Recovering process %s for %s at %s'
1671 % (self.monitor.get_process(), type(self).__name__,
1672 self._working_directory()))
mbligh36768f02008-02-22 18:28:33 +00001673
1674
showardd9205182009-04-27 20:09:55 +00001675class TaskWithJobKeyvals(object):
1676 """AgentTask mixin providing functionality to help with job keyval files."""
1677 _KEYVAL_FILE = 'keyval'
1678 def _format_keyval(self, key, value):
1679 return '%s=%s' % (key, value)
1680
1681
1682 def _keyval_path(self):
1683 """Subclasses must override this"""
1684 raise NotImplemented
1685
1686
1687 def _write_keyval_after_job(self, field, value):
1688 assert self.monitor
1689 if not self.monitor.has_process():
1690 return
1691 _drone_manager.write_lines_to_file(
1692 self._keyval_path(), [self._format_keyval(field, value)],
1693 paired_with_process=self.monitor.get_process())
1694
1695
1696 def _job_queued_keyval(self, job):
1697 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1698
1699
1700 def _write_job_finished(self):
1701 self._write_keyval_after_job("job_finished", int(time.time()))
1702
1703
showarddb502762009-09-09 15:31:20 +00001704 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1705 keyval_contents = '\n'.join(self._format_keyval(key, value)
1706 for key, value in keyval_dict.iteritems())
1707 # always end with a newline to allow additional keyvals to be written
1708 keyval_contents += '\n'
1709 _drone_manager.attach_file_to_execution(self._working_directory,
1710 keyval_contents,
1711 file_path=keyval_path)
1712
1713
1714 def _write_keyvals_before_job(self, keyval_dict):
1715 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1716
1717
1718 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001719 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001720 host.hostname)
1721 platform, all_labels = host.platform_and_labels()
1722 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1723 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1724
1725
showard8cc058f2009-09-08 16:26:33 +00001726class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001727 """
1728 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1729 """
1730
1731 TASK_TYPE = None
1732 host = None
1733 queue_entry = None
1734
showardd1195652009-12-08 22:21:02 +00001735 def __init__(self, task, extra_command_args):
1736 super(SpecialAgentTask, self).__init__()
1737
showarded2afea2009-07-07 20:54:07 +00001738 assert (self.TASK_TYPE is not None,
1739 'self.TASK_TYPE must be overridden')
showard8cc058f2009-09-08 16:26:33 +00001740
1741 self.host = Host(id=task.host.id)
1742 self.queue_entry = None
1743 if task.queue_entry:
1744 self.queue_entry = HostQueueEntry(id=task.queue_entry.id)
1745
showarded2afea2009-07-07 20:54:07 +00001746 self.task = task
1747 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001748
1749
showard8cc058f2009-09-08 16:26:33 +00001750 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001751 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1752
1753
1754 def _command_line(self):
1755 return _autoserv_command_line(self.host.hostname,
1756 self._extra_command_args,
1757 queue_entry=self.queue_entry)
1758
1759
1760 def _working_directory(self):
1761 return self.task.execution_path()
1762
1763
1764 @property
1765 def owner_username(self):
1766 if self.task.requested_by:
1767 return self.task.requested_by.login
1768 return None
showard8cc058f2009-09-08 16:26:33 +00001769
1770
showarded2afea2009-07-07 20:54:07 +00001771 def prolog(self):
1772 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001773 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001774 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001775
1776
showardde634ee2009-01-30 01:44:24 +00001777 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001778 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001779
showard2fe3f1d2009-07-06 20:19:11 +00001780 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001781 return # don't fail metahost entries, they'll be reassigned
1782
showard2fe3f1d2009-07-06 20:19:11 +00001783 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001784 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001785 return # entry has been aborted
1786
showard2fe3f1d2009-07-06 20:19:11 +00001787 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001788 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001789 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001790 self._write_keyval_after_job(queued_key, queued_time)
1791 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001792
showard8cc058f2009-09-08 16:26:33 +00001793 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001794 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001795 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001796 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001797
showard2fe3f1d2009-07-06 20:19:11 +00001798 self._copy_results([self.queue_entry])
showardd1195652009-12-08 22:21:02 +00001799
1800 if not self.queue_entry.job.parse_failed_repair:
1801 self.queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
1802 return
showard8cc058f2009-09-08 16:26:33 +00001803
1804 pidfile_id = _drone_manager.get_pidfile_id_from(
1805 self.queue_entry.execution_path(),
1806 pidfile_name=_AUTOSERV_PID_FILE)
1807 _drone_manager.register_pidfile(pidfile_id)
showardd1195652009-12-08 22:21:02 +00001808 self._parse_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001809
1810
1811 def cleanup(self):
1812 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001813
1814 # We will consider an aborted task to be "Failed"
1815 self.task.finish(bool(self.success))
1816
showardf85a0b72009-10-07 20:48:45 +00001817 if self.monitor:
1818 if self.monitor.has_process():
1819 self._copy_results([self.task])
1820 if self.monitor.pidfile_id is not None:
1821 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001822
1823
1824class RepairTask(SpecialAgentTask):
1825 TASK_TYPE = models.SpecialTask.Task.REPAIR
1826
1827
showardd1195652009-12-08 22:21:02 +00001828 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001829 """\
1830 queue_entry: queue entry to mark failed if this repair fails.
1831 """
1832 protection = host_protections.Protection.get_string(
1833 task.host.protection)
1834 # normalize the protection name
1835 protection = host_protections.Protection.get_attr_name(protection)
1836
1837 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001838 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001839
1840 # *don't* include the queue entry in IDs -- if the queue entry is
1841 # aborted, we want to leave the repair task running
1842 self._set_ids(host=self.host)
1843
1844
1845 def prolog(self):
1846 super(RepairTask, self).prolog()
1847 logging.info("repair_task starting")
1848 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001849
1850
jadmanski0afbb632008-06-06 21:10:57 +00001851 def epilog(self):
1852 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001853
jadmanski0afbb632008-06-06 21:10:57 +00001854 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001855 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001856 else:
showard8cc058f2009-09-08 16:26:33 +00001857 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001858 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001859 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001860
1861
showarded2afea2009-07-07 20:54:07 +00001862class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001863 def _copy_to_results_repository(self):
1864 if not self.queue_entry or self.queue_entry.meta_host:
1865 return
1866
1867 self.queue_entry.set_execution_subdir()
1868 log_name = os.path.basename(self.task.execution_path())
1869 source = os.path.join(self.task.execution_path(), 'debug',
1870 'autoserv.DEBUG')
1871 destination = os.path.join(
1872 self.queue_entry.execution_path(), log_name)
1873
1874 self.monitor.try_copy_to_results_repository(
1875 source, destination_path=destination)
1876
1877
showard170873e2009-01-07 00:22:26 +00001878 def epilog(self):
1879 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001880
showard775300b2009-09-09 15:30:50 +00001881 if self.success:
1882 return
showard8fe93b52008-11-18 17:53:22 +00001883
showard775300b2009-09-09 15:30:50 +00001884 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001885
showard775300b2009-09-09 15:30:50 +00001886 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001887 # effectively ignore failure for these hosts
1888 self.success = True
showard775300b2009-09-09 15:30:50 +00001889 return
1890
1891 if self.queue_entry:
1892 self.queue_entry.requeue()
1893
1894 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001895 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001896 queue_entry__id=self.queue_entry.id):
1897 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1898 self._fail_queue_entry()
1899 return
1900
showard9bb960b2009-11-19 01:02:11 +00001901 queue_entry = models.HostQueueEntry.objects.get(
1902 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00001903 else:
1904 queue_entry = None
1905
1906 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00001907 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00001908 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00001909 queue_entry=queue_entry,
1910 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00001911
showard8fe93b52008-11-18 17:53:22 +00001912
1913class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001914 TASK_TYPE = models.SpecialTask.Task.VERIFY
1915
1916
showardd1195652009-12-08 22:21:02 +00001917 def __init__(self, task):
1918 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00001919 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001920
1921
jadmanski0afbb632008-06-06 21:10:57 +00001922 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001923 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001924
showardb18134f2009-03-20 20:52:18 +00001925 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001926 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001927 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1928 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001929
showarded2afea2009-07-07 20:54:07 +00001930 # Delete any other queued verifies for this host. One verify will do
1931 # and there's no need to keep records of other requests.
1932 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001933 host__id=self.host.id,
1934 task=models.SpecialTask.Task.VERIFY,
showarded2afea2009-07-07 20:54:07 +00001935 is_active=False, is_complete=False)
1936 queued_verifies = queued_verifies.exclude(id=self.task.id)
1937 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001938
mbligh36768f02008-02-22 18:28:33 +00001939
jadmanski0afbb632008-06-06 21:10:57 +00001940 def epilog(self):
1941 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001942 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001943 if self.queue_entry:
1944 self.queue_entry.on_pending()
1945 else:
1946 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001947
1948
showard9bb960b2009-11-19 01:02:11 +00001949class QueueTask(AgentTask, TaskWithJobKeyvals):
showardd1195652009-12-08 22:21:02 +00001950 def __init__(self, queue_entries, cmd=None):
1951 super(QueueTask, self).__init__()
1952 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001953 self.queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001954 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001955
1956
showard73ec0442009-02-07 02:05:20 +00001957 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001958 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001959
1960
showardd1195652009-12-08 22:21:02 +00001961 def _command_line(self):
1962 return self.job.get_autoserv_params(self.queue_entries)
1963
1964
1965 @property
1966 def num_processes(self):
1967 return len(self.queue_entries)
1968
1969
1970 @property
1971 def owner_username(self):
1972 return self.job.owner
1973
1974
1975 def _working_directory(self):
1976 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001977
1978
jadmanski0afbb632008-06-06 21:10:57 +00001979 def prolog(self):
showard8cc058f2009-09-08 16:26:33 +00001980 for entry in self.queue_entries:
1981 if entry.status not in (models.HostQueueEntry.Status.STARTING,
1982 models.HostQueueEntry.Status.RUNNING):
1983 raise SchedulerError('Queue task attempting to start '
1984 'entry with invalid status %s: %s'
1985 % (entry.status, entry))
1986 if entry.host.status not in (models.Host.Status.PENDING,
1987 models.Host.Status.RUNNING):
1988 raise SchedulerError('Queue task attempting to start on queue '
1989 'entry with invalid host status %s: %s'
1990 % (entry.host.status, entry))
1991
showardd9205182009-04-27 20:09:55 +00001992 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001993 keyval_dict = {queued_key: queued_time}
showardd1195652009-12-08 22:21:02 +00001994 group_name = self.queue_entries[0].get_group_name()
1995 if group_name:
1996 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001997 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001998 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001999 self._write_host_keyvals(queue_entry.host)
showard8cc058f2009-09-08 16:26:33 +00002000 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showard12f3e322009-05-13 21:27:42 +00002001 queue_entry.update_field('started_on', datetime.datetime.now())
showard8cc058f2009-09-08 16:26:33 +00002002 queue_entry.host.set_status(models.Host.Status.RUNNING)
showard21baa452008-10-21 00:08:39 +00002003 queue_entry.host.update_field('dirty', 1)
showardc6a56872009-07-28 20:11:58 +00002004 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
2005 # TODO(gps): Remove this if nothing needs it anymore.
2006 # A potential user is: tko/parser
jadmanski0afbb632008-06-06 21:10:57 +00002007 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00002008
2009
showard35162b02009-03-03 02:17:30 +00002010 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00002011 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00002012 _drone_manager.write_lines_to_file(error_file_path,
2013 [_LOST_PROCESS_ERROR])
2014
2015
showardd3dc1992009-04-22 21:01:40 +00002016 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00002017 if not self.monitor:
2018 return
2019
showardd9205182009-04-27 20:09:55 +00002020 self._write_job_finished()
2021
showard35162b02009-03-03 02:17:30 +00002022 if self.monitor.lost_process:
2023 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00002024
showard8cc058f2009-09-08 16:26:33 +00002025 for queue_entry in self.queue_entries:
2026 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jadmanskif7fa2cc2008-10-01 14:13:23 +00002027
2028
showardcbd74612008-11-19 21:42:02 +00002029 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00002030 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00002031 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00002032 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00002033 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00002034
2035
jadmanskif7fa2cc2008-10-01 14:13:23 +00002036 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00002037 if not self.monitor or not self.monitor.has_process():
2038 return
2039
jadmanskif7fa2cc2008-10-01 14:13:23 +00002040 # build up sets of all the aborted_by and aborted_on values
2041 aborted_by, aborted_on = set(), set()
2042 for queue_entry in self.queue_entries:
2043 if queue_entry.aborted_by:
2044 aborted_by.add(queue_entry.aborted_by)
2045 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
2046 aborted_on.add(t)
2047
2048 # extract some actual, unique aborted by value and write it out
2049 assert len(aborted_by) <= 1
2050 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00002051 aborted_by_value = aborted_by.pop()
2052 aborted_on_value = max(aborted_on)
2053 else:
2054 aborted_by_value = 'autotest_system'
2055 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00002056
showarda0382352009-02-11 23:36:43 +00002057 self._write_keyval_after_job("aborted_by", aborted_by_value)
2058 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00002059
showardcbd74612008-11-19 21:42:02 +00002060 aborted_on_string = str(datetime.datetime.fromtimestamp(
2061 aborted_on_value))
2062 self._write_status_comment('Job aborted by %s on %s' %
2063 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00002064
2065
jadmanski0afbb632008-06-06 21:10:57 +00002066 def abort(self):
2067 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00002068 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00002069 self._finish_task()
showard21baa452008-10-21 00:08:39 +00002070
2071
jadmanski0afbb632008-06-06 21:10:57 +00002072 def epilog(self):
2073 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002074 self._finish_task()
2075 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00002076
2077
showardd3dc1992009-04-22 21:01:40 +00002078class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00002079 def __init__(self, queue_entries, log_file_name):
2080 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00002081
showardd1195652009-12-08 22:21:02 +00002082 self.queue_entries = queue_entries
2083
showardd3dc1992009-04-22 21:01:40 +00002084 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00002085 self._autoserv_monitor.attach_to_existing_process(
2086 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00002087
showardd1195652009-12-08 22:21:02 +00002088
2089 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00002090 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00002091 return 'true'
2092 return self._generate_command(
2093 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00002094
2095
2096 def _generate_command(self, results_dir):
2097 raise NotImplementedError('Subclasses must override this')
2098
2099
showardd1195652009-12-08 22:21:02 +00002100 @property
2101 def owner_username(self):
2102 return self.queue_entries[0].job.owner
2103
2104
2105 def _working_directory(self):
2106 return self._get_consistent_execution_path(self.queue_entries)
2107
2108
2109 def _paired_with_monitor(self):
2110 return self._autoserv_monitor
2111
2112
showardd3dc1992009-04-22 21:01:40 +00002113 def _job_was_aborted(self):
2114 was_aborted = None
showardd1195652009-12-08 22:21:02 +00002115 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002116 queue_entry.update_from_database()
2117 if was_aborted is None: # first queue entry
2118 was_aborted = bool(queue_entry.aborted)
2119 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
2120 email_manager.manager.enqueue_notify_email(
2121 'Inconsistent abort state',
2122 'Queue entries have inconsistent abort state: ' +
2123 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
2124 # don't crash here, just assume true
2125 return True
2126 return was_aborted
2127
2128
showardd1195652009-12-08 22:21:02 +00002129 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002130 if self._job_was_aborted():
2131 return models.HostQueueEntry.Status.ABORTED
2132
2133 # we'll use a PidfileRunMonitor to read the autoserv exit status
2134 if self._autoserv_monitor.exit_code() == 0:
2135 return models.HostQueueEntry.Status.COMPLETED
2136 return models.HostQueueEntry.Status.FAILED
2137
2138
showardd3dc1992009-04-22 21:01:40 +00002139 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002140 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002141 queue_entry.set_status(status)
2142
2143
2144 def abort(self):
2145 # override AgentTask.abort() to avoid killing the process and ending
2146 # the task. post-job tasks continue when the job is aborted.
2147 pass
2148
2149
showard9bb960b2009-11-19 01:02:11 +00002150class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002151 """
2152 Task responsible for
2153 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2154 * copying logs to the results repository
2155 * spawning CleanupTasks for hosts, if necessary
2156 * spawning a FinalReparseTask for the job
2157 """
showardd1195652009-12-08 22:21:02 +00002158 def __init__(self, queue_entries, recover_run_monitor=None):
2159 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002160 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002161 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002162 self._set_ids(queue_entries=queue_entries)
2163
2164
2165 def _generate_command(self, results_dir):
2166 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002167 for queue_entry in self.queue_entries)
showardd3dc1992009-04-22 21:01:40 +00002168 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
2169 '-r', results_dir]
2170
2171
showardd1195652009-12-08 22:21:02 +00002172 @property
2173 def num_processes(self):
2174 return len(self.queue_entries)
2175
2176
2177 def _pidfile_name(self):
2178 return _CRASHINFO_PID_FILE
2179
2180
showardd3dc1992009-04-22 21:01:40 +00002181 def prolog(self):
showardd1195652009-12-08 22:21:02 +00002182 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002183 if queue_entry.status != models.HostQueueEntry.Status.GATHERING:
2184 raise SchedulerError('Gather task attempting to start on '
2185 'non-gathering entry: %s' % queue_entry)
2186 if queue_entry.host.status != models.Host.Status.RUNNING:
2187 raise SchedulerError('Gather task attempting to start on queue '
showard0c5c18d2009-09-24 22:20:45 +00002188 'entry with non-running host status %s: %s'
2189 % (queue_entry.host.status, queue_entry))
showard8cc058f2009-09-08 16:26:33 +00002190
showardd3dc1992009-04-22 21:01:40 +00002191 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002192
2193
showardd3dc1992009-04-22 21:01:40 +00002194 def epilog(self):
2195 super(GatherLogsTask, self).epilog()
showardb5626452009-06-30 01:57:28 +00002196
showardd1195652009-12-08 22:21:02 +00002197 self._copy_and_parse_results(self.queue_entries,
showard6d1c1432009-08-20 23:30:39 +00002198 use_monitor=self._autoserv_monitor)
showard9bb960b2009-11-19 01:02:11 +00002199 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002200
showard9bb960b2009-11-19 01:02:11 +00002201
2202 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002203 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002204 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002205 models.HostQueueEntry.Status.COMPLETED)
2206 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2207 else:
2208 final_success = False
2209 num_tests_failed = 0
2210
showard9bb960b2009-11-19 01:02:11 +00002211 reboot_after = self._job.reboot_after
2212 do_reboot = (
2213 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002214 self._final_status() == models.HostQueueEntry.Status.ABORTED
showard9bb960b2009-11-19 01:02:11 +00002215 or reboot_after == models.RebootAfter.ALWAYS
2216 or (reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED
2217 and final_success and num_tests_failed == 0))
2218
showardd1195652009-12-08 22:21:02 +00002219 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002220 if do_reboot:
2221 # don't pass the queue entry to the CleanupTask. if the cleanup
2222 # fails, the job doesn't care -- it's over.
2223 models.SpecialTask.objects.create(
2224 host=models.Host.objects.get(id=queue_entry.host.id),
2225 task=models.SpecialTask.Task.CLEANUP,
2226 requested_by=self._job.owner_model())
2227 else:
2228 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002229
2230
showard0bbfc212009-04-29 21:06:13 +00002231 def run(self):
showard597bfd32009-05-08 18:22:50 +00002232 autoserv_exit_code = self._autoserv_monitor.exit_code()
2233 # only run if Autoserv exited due to some signal. if we have no exit
2234 # code, assume something bad (and signal-like) happened.
2235 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002236 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002237 else:
2238 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002239
2240
showard8fe93b52008-11-18 17:53:22 +00002241class CleanupTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002242 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2243
2244
showard8cc058f2009-09-08 16:26:33 +00002245 def __init__(self, task, recover_run_monitor=None):
showardd1195652009-12-08 22:21:02 +00002246 super(CleanupTask, self).__init__(task, ['--cleanup'])
showard8cc058f2009-09-08 16:26:33 +00002247 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mbligh16c722d2008-03-05 00:58:44 +00002248
mblighd5c95802008-03-05 00:33:46 +00002249
jadmanski0afbb632008-06-06 21:10:57 +00002250 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002251 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00002252 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard8cc058f2009-09-08 16:26:33 +00002253 self.host.set_status(models.Host.Status.CLEANING)
2254 if self.queue_entry:
2255 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2256
2257
showard775300b2009-09-09 15:30:50 +00002258 def _finish_epilog(self):
showard7b2d7cb2009-10-28 19:53:03 +00002259 if not self.queue_entry or not self.success:
showard775300b2009-09-09 15:30:50 +00002260 return
2261
showard7b2d7cb2009-10-28 19:53:03 +00002262 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
2263 should_run_verify = (
2264 self.queue_entry.job.run_verify
2265 and self.host.protection != do_not_verify_protection)
2266 if should_run_verify:
showard9bb960b2009-11-19 01:02:11 +00002267 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
showard7b2d7cb2009-10-28 19:53:03 +00002268 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00002269 host=models.Host.objects.get(id=self.host.id),
showard7b2d7cb2009-10-28 19:53:03 +00002270 queue_entry=entry,
2271 task=models.SpecialTask.Task.VERIFY)
2272 else:
showard775300b2009-09-09 15:30:50 +00002273 self.queue_entry.on_pending()
mblighd5c95802008-03-05 00:33:46 +00002274
mblighd5c95802008-03-05 00:33:46 +00002275
showard21baa452008-10-21 00:08:39 +00002276 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00002277 super(CleanupTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002278
showard21baa452008-10-21 00:08:39 +00002279 if self.success:
2280 self.host.update_field('dirty', 0)
showard775300b2009-09-09 15:30:50 +00002281 self.host.set_status(models.Host.Status.READY)
showard21baa452008-10-21 00:08:39 +00002282
showard775300b2009-09-09 15:30:50 +00002283 self._finish_epilog()
showard8cc058f2009-09-08 16:26:33 +00002284
showard21baa452008-10-21 00:08:39 +00002285
showardd3dc1992009-04-22 21:01:40 +00002286class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00002287 _num_running_parses = 0
2288
showardd1195652009-12-08 22:21:02 +00002289 def __init__(self, queue_entries):
2290 super(FinalReparseTask, self).__init__(queue_entries,
2291 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002292 # don't use _set_ids, since we don't want to set the host_ids
2293 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002294
2295
2296 def _generate_command(self, results_dir):
2297 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
2298 results_dir]
2299
2300
2301 @property
2302 def num_processes(self):
2303 return 0 # don't include parser processes in accounting
2304
2305
2306 def _pidfile_name(self):
2307 return _PARSER_PID_FILE
2308
2309
2310 def _parse_started(self):
2311 return bool(self.monitor)
showard97aed502008-11-04 02:01:24 +00002312
showard97aed502008-11-04 02:01:24 +00002313
2314 @classmethod
2315 def _increment_running_parses(cls):
2316 cls._num_running_parses += 1
2317
2318
2319 @classmethod
2320 def _decrement_running_parses(cls):
2321 cls._num_running_parses -= 1
2322
2323
2324 @classmethod
2325 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002326 return (cls._num_running_parses <
2327 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002328
2329
2330 def prolog(self):
showardd1195652009-12-08 22:21:02 +00002331 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002332 if queue_entry.status != models.HostQueueEntry.Status.PARSING:
2333 raise SchedulerError('Parse task attempting to start on '
2334 'non-parsing entry: %s' % queue_entry)
2335
showard97aed502008-11-04 02:01:24 +00002336 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002337
2338
2339 def epilog(self):
2340 super(FinalReparseTask, self).epilog()
showardd1195652009-12-08 22:21:02 +00002341 self._set_all_statuses(self._final_status())
showard97aed502008-11-04 02:01:24 +00002342
2343
showard08a36412009-05-05 01:01:13 +00002344 def tick(self):
2345 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002346 # and we can, at which point we revert to default behavior
showardd1195652009-12-08 22:21:02 +00002347 if self._parse_started():
showard08a36412009-05-05 01:01:13 +00002348 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002349 else:
2350 self._try_starting_parse()
2351
2352
2353 def run(self):
2354 # override run() to not actually run unless we can
2355 self._try_starting_parse()
2356
2357
2358 def _try_starting_parse(self):
2359 if not self._can_run_new_parse():
2360 return
showard170873e2009-01-07 00:22:26 +00002361
showard97aed502008-11-04 02:01:24 +00002362 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002363 super(FinalReparseTask, self).run()
showard97aed502008-11-04 02:01:24 +00002364 self._increment_running_parses()
showard97aed502008-11-04 02:01:24 +00002365
2366
2367 def finished(self, success):
2368 super(FinalReparseTask, self).finished(success)
showardd1195652009-12-08 22:21:02 +00002369 if self._parse_started():
showard678df4f2009-02-04 21:36:39 +00002370 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002371
2372
showarda3c58572009-03-12 20:36:59 +00002373class DBError(Exception):
2374 """Raised by the DBObject constructor when its select fails."""
2375
2376
mbligh36768f02008-02-22 18:28:33 +00002377class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002378 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002379
2380 # Subclasses MUST override these:
2381 _table_name = ''
2382 _fields = ()
2383
showarda3c58572009-03-12 20:36:59 +00002384 # A mapping from (type, id) to the instance of the object for that
2385 # particular id. This prevents us from creating new Job() and Host()
2386 # instances for every HostQueueEntry object that we instantiate as
2387 # multiple HQEs often share the same Job.
2388 _instances_by_type_and_id = weakref.WeakValueDictionary()
2389 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002390
showarda3c58572009-03-12 20:36:59 +00002391
2392 def __new__(cls, id=None, **kwargs):
2393 """
2394 Look to see if we already have an instance for this particular type
2395 and id. If so, use it instead of creating a duplicate instance.
2396 """
2397 if id is not None:
2398 instance = cls._instances_by_type_and_id.get((cls, id))
2399 if instance:
2400 return instance
2401 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2402
2403
2404 def __init__(self, id=None, row=None, new_record=False, always_query=True):
showard8cc058f2009-09-08 16:26:33 +00002405 assert bool(id) or bool(row)
2406 if id is not None and row is not None:
2407 assert id == row[0]
showard6ae5ea92009-02-25 00:11:51 +00002408 assert self._table_name, '_table_name must be defined in your class'
2409 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002410 if not new_record:
2411 if self._initialized and not always_query:
2412 return # We've already been initialized.
2413 if id is None:
2414 id = row[0]
2415 # Tell future constructors to use us instead of re-querying while
2416 # this instance is still around.
2417 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002418
showard6ae5ea92009-02-25 00:11:51 +00002419 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002420
jadmanski0afbb632008-06-06 21:10:57 +00002421 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002422
jadmanski0afbb632008-06-06 21:10:57 +00002423 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002424 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002425
showarda3c58572009-03-12 20:36:59 +00002426 if self._initialized:
2427 differences = self._compare_fields_in_row(row)
2428 if differences:
showard7629f142009-03-27 21:02:02 +00002429 logging.warn(
2430 'initialized %s %s instance requery is updating: %s',
2431 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002432 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002433 self._initialized = True
2434
2435
2436 @classmethod
2437 def _clear_instance_cache(cls):
2438 """Used for testing, clear the internal instance cache."""
2439 cls._instances_by_type_and_id.clear()
2440
2441
showardccbd6c52009-03-21 00:10:21 +00002442 def _fetch_row_from_db(self, row_id):
2443 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2444 rows = _db.execute(sql, (row_id,))
2445 if not rows:
showard76e29d12009-04-15 21:53:10 +00002446 raise DBError("row not found (table=%s, row id=%s)"
2447 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002448 return rows[0]
2449
2450
showarda3c58572009-03-12 20:36:59 +00002451 def _assert_row_length(self, row):
2452 assert len(row) == len(self._fields), (
2453 "table = %s, row = %s/%d, fields = %s/%d" % (
2454 self.__table, row, len(row), self._fields, len(self._fields)))
2455
2456
2457 def _compare_fields_in_row(self, row):
2458 """
showarddae680a2009-10-12 20:26:43 +00002459 Given a row as returned by a SELECT query, compare it to our existing in
2460 memory fields. Fractional seconds are stripped from datetime values
2461 before comparison.
showarda3c58572009-03-12 20:36:59 +00002462
2463 @param row - A sequence of values corresponding to fields named in
2464 The class attribute _fields.
2465
2466 @returns A dictionary listing the differences keyed by field name
2467 containing tuples of (current_value, row_value).
2468 """
2469 self._assert_row_length(row)
2470 differences = {}
showarddae680a2009-10-12 20:26:43 +00002471 datetime_cmp_fmt = '%Y-%m-%d %H:%M:%S' # Leave off the microseconds.
showarda3c58572009-03-12 20:36:59 +00002472 for field, row_value in itertools.izip(self._fields, row):
2473 current_value = getattr(self, field)
showarddae680a2009-10-12 20:26:43 +00002474 if (isinstance(current_value, datetime.datetime)
2475 and isinstance(row_value, datetime.datetime)):
2476 current_value = current_value.strftime(datetime_cmp_fmt)
2477 row_value = row_value.strftime(datetime_cmp_fmt)
showarda3c58572009-03-12 20:36:59 +00002478 if current_value != row_value:
2479 differences[field] = (current_value, row_value)
2480 return differences
showard2bab8f42008-11-12 18:15:22 +00002481
2482
2483 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002484 """
2485 Update our field attributes using a single row returned by SELECT.
2486
2487 @param row - A sequence of values corresponding to fields named in
2488 the class fields list.
2489 """
2490 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002491
showard2bab8f42008-11-12 18:15:22 +00002492 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002493 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002494 setattr(self, field, value)
2495 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002496
showard2bab8f42008-11-12 18:15:22 +00002497 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002498
mblighe2586682008-02-29 22:45:46 +00002499
showardccbd6c52009-03-21 00:10:21 +00002500 def update_from_database(self):
2501 assert self.id is not None
2502 row = self._fetch_row_from_db(self.id)
2503 self._update_fields_from_row(row)
2504
2505
jadmanski0afbb632008-06-06 21:10:57 +00002506 def count(self, where, table = None):
2507 if not table:
2508 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002509
jadmanski0afbb632008-06-06 21:10:57 +00002510 rows = _db.execute("""
2511 SELECT count(*) FROM %s
2512 WHERE %s
2513 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002514
jadmanski0afbb632008-06-06 21:10:57 +00002515 assert len(rows) == 1
2516
2517 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002518
2519
showardd3dc1992009-04-22 21:01:40 +00002520 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002521 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002522
showard2bab8f42008-11-12 18:15:22 +00002523 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002524 return
mbligh36768f02008-02-22 18:28:33 +00002525
mblighf8c624d2008-07-03 16:58:45 +00002526 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002527 _db.execute(query, (value, self.id))
2528
showard2bab8f42008-11-12 18:15:22 +00002529 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002530
2531
jadmanski0afbb632008-06-06 21:10:57 +00002532 def save(self):
2533 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002534 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002535 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002536 values = []
2537 for key in keys:
2538 value = getattr(self, key)
2539 if value is None:
2540 values.append('NULL')
2541 else:
2542 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002543 values_str = ','.join(values)
2544 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2545 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002546 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002547 # Update our id to the one the database just assigned to us.
2548 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002549
2550
jadmanski0afbb632008-06-06 21:10:57 +00002551 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002552 self._instances_by_type_and_id.pop((type(self), id), None)
2553 self._initialized = False
2554 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002555 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2556 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002557
2558
showard63a34772008-08-18 19:32:50 +00002559 @staticmethod
2560 def _prefix_with(string, prefix):
2561 if string:
2562 string = prefix + string
2563 return string
2564
2565
jadmanski0afbb632008-06-06 21:10:57 +00002566 @classmethod
showard989f25d2008-10-01 11:38:11 +00002567 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002568 """
2569 Construct instances of our class based on the given database query.
2570
2571 @yields One class instance for each row fetched.
2572 """
showard63a34772008-08-18 19:32:50 +00002573 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2574 where = cls._prefix_with(where, 'WHERE ')
2575 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002576 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002577 'joins' : joins,
2578 'where' : where,
2579 'order_by' : order_by})
2580 rows = _db.execute(query, params)
showard8cc058f2009-09-08 16:26:33 +00002581 return [cls(id=row[0], row=row) for row in rows]
mblighe2586682008-02-29 22:45:46 +00002582
mbligh36768f02008-02-22 18:28:33 +00002583
2584class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002585 _table_name = 'ineligible_host_queues'
2586 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002587
2588
showard89f84db2009-03-12 20:39:13 +00002589class AtomicGroup(DBObject):
2590 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002591 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2592 'invalid')
showard89f84db2009-03-12 20:39:13 +00002593
2594
showard989f25d2008-10-01 11:38:11 +00002595class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002596 _table_name = 'labels'
2597 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002598 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002599
2600
showard6157c632009-07-06 20:19:31 +00002601 def __repr__(self):
2602 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
2603 self.name, self.id, self.atomic_group_id)
2604
2605
mbligh36768f02008-02-22 18:28:33 +00002606class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002607 _table_name = 'hosts'
2608 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2609 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2610
2611
jadmanski0afbb632008-06-06 21:10:57 +00002612 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002613 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002614 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002615
2616
showard170873e2009-01-07 00:22:26 +00002617 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002618 """
showard170873e2009-01-07 00:22:26 +00002619 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002620 """
2621 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002622 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002623 FROM labels
2624 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002625 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002626 ORDER BY labels.name
2627 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002628 platform = None
2629 all_labels = []
2630 for label_name, is_platform in rows:
2631 if is_platform:
2632 platform = label_name
2633 all_labels.append(label_name)
2634 return platform, all_labels
2635
2636
showard54c1ea92009-05-20 00:32:58 +00002637 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2638
2639
2640 @classmethod
2641 def cmp_for_sort(cls, a, b):
2642 """
2643 A comparison function for sorting Host objects by hostname.
2644
2645 This strips any trailing numeric digits, ignores leading 0s and
2646 compares hostnames by the leading name and the trailing digits as a
2647 number. If both hostnames do not match this pattern, they are simply
2648 compared as lower case strings.
2649
2650 Example of how hostnames will be sorted:
2651
2652 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2653
2654 This hopefully satisfy most people's hostname sorting needs regardless
2655 of their exact naming schemes. Nobody sane should have both a host10
2656 and host010 (but the algorithm works regardless).
2657 """
2658 lower_a = a.hostname.lower()
2659 lower_b = b.hostname.lower()
2660 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2661 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2662 if match_a and match_b:
2663 name_a, number_a_str = match_a.groups()
2664 name_b, number_b_str = match_b.groups()
2665 number_a = int(number_a_str.lstrip('0'))
2666 number_b = int(number_b_str.lstrip('0'))
2667 result = cmp((name_a, number_a), (name_b, number_b))
2668 if result == 0 and lower_a != lower_b:
2669 # If they compared equal above but the lower case names are
2670 # indeed different, don't report equality. abc012 != abc12.
2671 return cmp(lower_a, lower_b)
2672 return result
2673 else:
2674 return cmp(lower_a, lower_b)
2675
2676
mbligh36768f02008-02-22 18:28:33 +00002677class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002678 _table_name = 'host_queue_entries'
2679 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002680 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002681 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002682
2683
showarda3c58572009-03-12 20:36:59 +00002684 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002685 assert id or row
showarda3c58572009-03-12 20:36:59 +00002686 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002687 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002688
jadmanski0afbb632008-06-06 21:10:57 +00002689 if self.host_id:
2690 self.host = Host(self.host_id)
2691 else:
2692 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002693
showard77182562009-06-10 00:16:05 +00002694 if self.atomic_group_id:
2695 self.atomic_group = AtomicGroup(self.atomic_group_id,
2696 always_query=False)
2697 else:
2698 self.atomic_group = None
2699
showard170873e2009-01-07 00:22:26 +00002700 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002701 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002702
2703
showard89f84db2009-03-12 20:39:13 +00002704 @classmethod
2705 def clone(cls, template):
2706 """
2707 Creates a new row using the values from a template instance.
2708
2709 The new instance will not exist in the database or have a valid
2710 id attribute until its save() method is called.
2711 """
2712 assert isinstance(template, cls)
2713 new_row = [getattr(template, field) for field in cls._fields]
2714 clone = cls(row=new_row, new_record=True)
2715 clone.id = None
2716 return clone
2717
2718
showardc85c21b2008-11-24 22:17:37 +00002719 def _view_job_url(self):
2720 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2721
2722
showardf1ae3542009-05-11 19:26:02 +00002723 def get_labels(self):
2724 """
2725 Get all labels associated with this host queue entry (either via the
2726 meta_host or as a job dependency label). The labels yielded are not
2727 guaranteed to be unique.
2728
2729 @yields Label instances associated with this host_queue_entry.
2730 """
2731 if self.meta_host:
2732 yield Label(id=self.meta_host, always_query=False)
2733 labels = Label.fetch(
2734 joins="JOIN jobs_dependency_labels AS deps "
2735 "ON (labels.id = deps.label_id)",
2736 where="deps.job_id = %d" % self.job.id)
2737 for label in labels:
2738 yield label
2739
2740
jadmanski0afbb632008-06-06 21:10:57 +00002741 def set_host(self, host):
2742 if host:
2743 self.queue_log_record('Assigning host ' + host.hostname)
2744 self.update_field('host_id', host.id)
2745 self.update_field('active', True)
2746 self.block_host(host.id)
2747 else:
2748 self.queue_log_record('Releasing host')
2749 self.unblock_host(self.host.id)
2750 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002751
jadmanski0afbb632008-06-06 21:10:57 +00002752 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002753
2754
jadmanski0afbb632008-06-06 21:10:57 +00002755 def get_host(self):
2756 return self.host
mbligh36768f02008-02-22 18:28:33 +00002757
2758
jadmanski0afbb632008-06-06 21:10:57 +00002759 def queue_log_record(self, log_line):
2760 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002761 _drone_manager.write_lines_to_file(self.queue_log_path,
2762 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002763
2764
jadmanski0afbb632008-06-06 21:10:57 +00002765 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002766 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002767 row = [0, self.job.id, host_id]
2768 block = IneligibleHostQueue(row=row, new_record=True)
2769 block.save()
mblighe2586682008-02-29 22:45:46 +00002770
2771
jadmanski0afbb632008-06-06 21:10:57 +00002772 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002773 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002774 blocks = IneligibleHostQueue.fetch(
2775 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2776 for block in blocks:
2777 block.delete()
mblighe2586682008-02-29 22:45:46 +00002778
2779
showard2bab8f42008-11-12 18:15:22 +00002780 def set_execution_subdir(self, subdir=None):
2781 if subdir is None:
2782 assert self.get_host()
2783 subdir = self.get_host().hostname
2784 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002785
2786
showard6355f6b2008-12-05 18:52:13 +00002787 def _get_hostname(self):
2788 if self.host:
2789 return self.host.hostname
2790 return 'no host'
2791
2792
showard170873e2009-01-07 00:22:26 +00002793 def __str__(self):
showard828fc4c2009-09-14 20:31:00 +00002794 flags = []
2795 if self.active:
2796 flags.append('active')
2797 if self.complete:
2798 flags.append('complete')
2799 if self.deleted:
2800 flags.append('deleted')
2801 if self.aborted:
2802 flags.append('aborted')
2803 flags_str = ','.join(flags)
2804 if flags_str:
2805 flags_str = ' [%s]' % flags_str
2806 return "%s/%d (%d) %s%s" % (self._get_hostname(), self.job.id, self.id,
2807 self.status, flags_str)
showard170873e2009-01-07 00:22:26 +00002808
2809
jadmanski0afbb632008-06-06 21:10:57 +00002810 def set_status(self, status):
showard56824072009-10-12 20:30:21 +00002811 logging.info("%s -> %s", self, status)
mblighf8c624d2008-07-03 16:58:45 +00002812
showard56824072009-10-12 20:30:21 +00002813 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002814
showard8cc058f2009-09-08 16:26:33 +00002815 if status in (models.HostQueueEntry.Status.QUEUED,
2816 models.HostQueueEntry.Status.PARSING):
jadmanski0afbb632008-06-06 21:10:57 +00002817 self.update_field('complete', False)
2818 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002819
showard8cc058f2009-09-08 16:26:33 +00002820 if status in (models.HostQueueEntry.Status.PENDING,
2821 models.HostQueueEntry.Status.RUNNING,
2822 models.HostQueueEntry.Status.VERIFYING,
2823 models.HostQueueEntry.Status.STARTING,
2824 models.HostQueueEntry.Status.GATHERING):
jadmanski0afbb632008-06-06 21:10:57 +00002825 self.update_field('complete', False)
2826 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002827
showard8cc058f2009-09-08 16:26:33 +00002828 if status in (models.HostQueueEntry.Status.FAILED,
2829 models.HostQueueEntry.Status.COMPLETED,
2830 models.HostQueueEntry.Status.STOPPED,
2831 models.HostQueueEntry.Status.ABORTED):
jadmanski0afbb632008-06-06 21:10:57 +00002832 self.update_field('complete', True)
2833 self.update_field('active', False)
showardf85a0b72009-10-07 20:48:45 +00002834 self._on_complete()
showardc85c21b2008-11-24 22:17:37 +00002835
2836 should_email_status = (status.lower() in _notify_email_statuses or
2837 'all' in _notify_email_statuses)
2838 if should_email_status:
2839 self._email_on_status(status)
2840
2841 self._email_on_job_complete()
2842
2843
showardf85a0b72009-10-07 20:48:45 +00002844 def _on_complete(self):
showardd1195652009-12-08 22:21:02 +00002845 self.job.stop_if_necessary()
showardf85a0b72009-10-07 20:48:45 +00002846 if not self.execution_subdir:
2847 return
2848 # unregister any possible pidfiles associated with this queue entry
2849 for pidfile_name in _ALL_PIDFILE_NAMES:
2850 pidfile_id = _drone_manager.get_pidfile_id_from(
2851 self.execution_path(), pidfile_name=pidfile_name)
2852 _drone_manager.unregister_pidfile(pidfile_id)
2853
2854
showardc85c21b2008-11-24 22:17:37 +00002855 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002856 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002857
2858 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2859 self.job.id, self.job.name, hostname, status)
2860 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2861 self.job.id, self.job.name, hostname, status,
2862 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002863 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002864
2865
2866 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002867 if not self.job.is_finished():
2868 return
showard542e8402008-09-19 20:16:18 +00002869
showardc85c21b2008-11-24 22:17:37 +00002870 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002871 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002872 for queue_entry in hosts_queue:
2873 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002874 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002875 queue_entry.status))
2876
2877 summary_text = "\n".join(summary_text)
2878 status_counts = models.Job.objects.get_status_counts(
2879 [self.job.id])[self.job.id]
2880 status = ', '.join('%d %s' % (count, status) for status, count
2881 in status_counts.iteritems())
2882
2883 subject = 'Autotest: Job ID: %s "%s" %s' % (
2884 self.job.id, self.job.name, status)
2885 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2886 self.job.id, self.job.name, status, self._view_job_url(),
2887 summary_text)
showard170873e2009-01-07 00:22:26 +00002888 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002889
2890
showard8cc058f2009-09-08 16:26:33 +00002891 def schedule_pre_job_tasks(self, assigned_host=None):
showard77182562009-06-10 00:16:05 +00002892 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002893 assert assigned_host
2894 # ensure results dir exists for the queue log
showard08356c12009-06-15 20:24:01 +00002895 if self.host_id is None:
2896 self.set_host(assigned_host)
2897 else:
2898 assert assigned_host.id == self.host_id
mbligh36768f02008-02-22 18:28:33 +00002899
showard2ca64c92009-12-10 21:41:02 +00002900 logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",
showardb18134f2009-03-20 20:52:18 +00002901 self.job.name, self.meta_host, self.atomic_group_id,
showard2ca64c92009-12-10 21:41:02 +00002902 self.job.id, self.id, self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002903
showard8cc058f2009-09-08 16:26:33 +00002904 self._do_schedule_pre_job_tasks()
showard77182562009-06-10 00:16:05 +00002905
2906
showard8cc058f2009-09-08 16:26:33 +00002907 def _do_schedule_pre_job_tasks(self):
showard77182562009-06-10 00:16:05 +00002908 # Every host goes thru the Verifying stage (which may or may not
2909 # actually do anything as determined by get_pre_job_tasks).
2910 self.set_status(models.HostQueueEntry.Status.VERIFYING)
showard8cc058f2009-09-08 16:26:33 +00002911 self.job.schedule_pre_job_tasks(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002912
showard6ae5ea92009-02-25 00:11:51 +00002913
jadmanski0afbb632008-06-06 21:10:57 +00002914 def requeue(self):
showardcfd4a7e2009-07-11 01:47:33 +00002915 assert self.host
showard8cc058f2009-09-08 16:26:33 +00002916 self.set_status(models.HostQueueEntry.Status.QUEUED)
showard12f3e322009-05-13 21:27:42 +00002917 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002918 # verify/cleanup failure sets the execution subdir, so reset it here
2919 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002920 if self.meta_host:
2921 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002922
2923
jadmanskif7fa2cc2008-10-01 14:13:23 +00002924 @property
2925 def aborted_by(self):
2926 self._load_abort_info()
2927 return self._aborted_by
2928
2929
2930 @property
2931 def aborted_on(self):
2932 self._load_abort_info()
2933 return self._aborted_on
2934
2935
2936 def _load_abort_info(self):
2937 """ Fetch info about who aborted the job. """
2938 if hasattr(self, "_aborted_by"):
2939 return
2940 rows = _db.execute("""
2941 SELECT users.login, aborted_host_queue_entries.aborted_on
2942 FROM aborted_host_queue_entries
2943 INNER JOIN users
2944 ON users.id = aborted_host_queue_entries.aborted_by_id
2945 WHERE aborted_host_queue_entries.queue_entry_id = %s
2946 """, (self.id,))
2947 if rows:
2948 self._aborted_by, self._aborted_on = rows[0]
2949 else:
2950 self._aborted_by = self._aborted_on = None
2951
2952
showardb2e2c322008-10-14 17:33:55 +00002953 def on_pending(self):
2954 """
2955 Called when an entry in a synchronous job has passed verify. If the
showard8cc058f2009-09-08 16:26:33 +00002956 job is ready to run, sets the entries to STARTING. Otherwise, it leaves
2957 them in PENDING.
showardb2e2c322008-10-14 17:33:55 +00002958 """
showard8cc058f2009-09-08 16:26:33 +00002959 self.set_status(models.HostQueueEntry.Status.PENDING)
2960 self.host.set_status(models.Host.Status.PENDING)
showardb000a8d2009-07-28 20:02:07 +00002961
2962 # Some debug code here: sends an email if an asynchronous job does not
2963 # immediately enter Starting.
2964 # TODO: Remove this once we figure out why asynchronous jobs are getting
2965 # stuck in Pending.
showard8cc058f2009-09-08 16:26:33 +00002966 self.job.run_if_ready(queue_entry=self)
2967 if (self.job.synch_count == 1 and
2968 self.status == models.HostQueueEntry.Status.PENDING):
showardb000a8d2009-07-28 20:02:07 +00002969 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
2970 message = 'Asynchronous job stuck in Pending'
2971 email_manager.manager.enqueue_notify_email(subject, message)
showardb2e2c322008-10-14 17:33:55 +00002972
2973
showardd3dc1992009-04-22 21:01:40 +00002974 def abort(self, dispatcher):
2975 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002976
showardd3dc1992009-04-22 21:01:40 +00002977 Status = models.HostQueueEntry.Status
showard8cc058f2009-09-08 16:26:33 +00002978 if self.status in (Status.GATHERING, Status.PARSING):
showardd3dc1992009-04-22 21:01:40 +00002979 # do nothing; post-job tasks will finish and then mark this entry
2980 # with status "Aborted" and take care of the host
2981 return
2982
showard8cc058f2009-09-08 16:26:33 +00002983 if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING):
2984 assert not dispatcher.get_agents_for_entry(self)
showardd3dc1992009-04-22 21:01:40 +00002985 self.host.set_status(models.Host.Status.READY)
2986 elif self.status == Status.VERIFYING:
showard8cc058f2009-09-08 16:26:33 +00002987 models.SpecialTask.objects.create(
2988 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +00002989 host=models.Host.objects.get(id=self.host.id),
2990 requested_by=self.job.owner_model())
showardd3dc1992009-04-22 21:01:40 +00002991
2992 self.set_status(Status.ABORTED)
showardd2014822009-10-12 20:26:58 +00002993 self.job.abort_delay_ready_task()
showard170873e2009-01-07 00:22:26 +00002994
showard8cc058f2009-09-08 16:26:33 +00002995
2996 def get_group_name(self):
2997 atomic_group = self.atomic_group
2998 if not atomic_group:
2999 return ''
3000
3001 # Look at any meta_host and dependency labels and pick the first
3002 # one that also specifies this atomic group. Use that label name
3003 # as the group name if possible (it is more specific).
3004 for label in self.get_labels():
3005 if label.atomic_group_id:
3006 assert label.atomic_group_id == atomic_group.id
3007 return label.name
3008 return atomic_group.name
3009
3010
showard170873e2009-01-07 00:22:26 +00003011 def execution_tag(self):
3012 assert self.execution_subdir
mblighe7d9c602009-07-02 19:02:33 +00003013 return "%s/%s" % (self.job.tag(), self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00003014
3015
showarded2afea2009-07-07 20:54:07 +00003016 def execution_path(self):
3017 return self.execution_tag()
3018
3019
mbligh36768f02008-02-22 18:28:33 +00003020class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00003021 _table_name = 'jobs'
3022 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
3023 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00003024 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00003025 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00003026
showard77182562009-06-10 00:16:05 +00003027 # This does not need to be a column in the DB. The delays are likely to
3028 # be configured short. If the scheduler is stopped and restarted in
3029 # the middle of a job's delay cycle, the delay cycle will either be
3030 # repeated or skipped depending on the number of Pending machines found
3031 # when the restarted scheduler recovers to track it. Not a problem.
3032 #
3033 # A reference to the DelayedCallTask that will wake up the job should
3034 # no other HQEs change state in time. Its end_time attribute is used
3035 # by our run_with_ready_delay() method to determine if the wait is over.
3036 _delay_ready_task = None
3037
3038 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
3039 # all status='Pending' atomic group HQEs incase a delay was running when the
3040 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00003041
showarda3c58572009-03-12 20:36:59 +00003042 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00003043 assert id or row
showarda3c58572009-03-12 20:36:59 +00003044 super(Job, self).__init__(id=id, row=row, **kwargs)
showard9bb960b2009-11-19 01:02:11 +00003045 self._owner_model = None # caches model instance of owner
3046
3047
3048 def owner_model(self):
3049 # work around the fact that the Job owner field is a string, not a
3050 # foreign key
3051 if not self._owner_model:
3052 self._owner_model = models.User.objects.get(login=self.owner)
3053 return self._owner_model
mbligh36768f02008-02-22 18:28:33 +00003054
mblighe2586682008-02-29 22:45:46 +00003055
jadmanski0afbb632008-06-06 21:10:57 +00003056 def is_server_job(self):
3057 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00003058
3059
showard170873e2009-01-07 00:22:26 +00003060 def tag(self):
3061 return "%s-%s" % (self.id, self.owner)
3062
3063
jadmanski0afbb632008-06-06 21:10:57 +00003064 def get_host_queue_entries(self):
3065 rows = _db.execute("""
3066 SELECT * FROM host_queue_entries
3067 WHERE job_id= %s
3068 """, (self.id,))
3069 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00003070
jadmanski0afbb632008-06-06 21:10:57 +00003071 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00003072
jadmanski0afbb632008-06-06 21:10:57 +00003073 return entries
mbligh36768f02008-02-22 18:28:33 +00003074
3075
jadmanski0afbb632008-06-06 21:10:57 +00003076 def set_status(self, status, update_queues=False):
3077 self.update_field('status',status)
3078
3079 if update_queues:
3080 for queue_entry in self.get_host_queue_entries():
3081 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00003082
3083
showard77182562009-06-10 00:16:05 +00003084 def _atomic_and_has_started(self):
3085 """
3086 @returns True if any of the HostQueueEntries associated with this job
3087 have entered the Status.STARTING state or beyond.
3088 """
3089 atomic_entries = models.HostQueueEntry.objects.filter(
3090 job=self.id, atomic_group__isnull=False)
3091 if atomic_entries.count() <= 0:
3092 return False
3093
showardaf8b4ca2009-06-16 18:47:26 +00003094 # These states may *only* be reached if Job.run() has been called.
3095 started_statuses = (models.HostQueueEntry.Status.STARTING,
3096 models.HostQueueEntry.Status.RUNNING,
3097 models.HostQueueEntry.Status.COMPLETED)
3098
3099 started_entries = atomic_entries.filter(status__in=started_statuses)
showard77182562009-06-10 00:16:05 +00003100 return started_entries.count() > 0
3101
3102
showard708b3522009-08-20 23:26:15 +00003103 def _hosts_assigned_count(self):
3104 """The number of HostQueueEntries assigned a Host for this job."""
3105 entries = models.HostQueueEntry.objects.filter(job=self.id,
3106 host__isnull=False)
3107 return entries.count()
3108
3109
showard77182562009-06-10 00:16:05 +00003110 def _pending_count(self):
3111 """The number of HostQueueEntries for this job in the Pending state."""
3112 pending_entries = models.HostQueueEntry.objects.filter(
3113 job=self.id, status=models.HostQueueEntry.Status.PENDING)
3114 return pending_entries.count()
3115
3116
showardd07a5f32009-12-07 19:36:20 +00003117 def _max_hosts_needed_to_run(self, atomic_group):
showardd2014822009-10-12 20:26:58 +00003118 """
3119 @param atomic_group: The AtomicGroup associated with this job that we
showardd07a5f32009-12-07 19:36:20 +00003120 are using to set an upper bound on the threshold.
3121 @returns The maximum number of HostQueueEntries assigned a Host before
showardd2014822009-10-12 20:26:58 +00003122 this job can run.
3123 """
3124 return min(self._hosts_assigned_count(),
3125 atomic_group.max_number_of_machines)
3126
3127
showardd07a5f32009-12-07 19:36:20 +00003128 def _min_hosts_needed_to_run(self):
3129 """Return the minumum number of hsots needed to run this job."""
3130 return self.synch_count
3131
3132
jadmanski0afbb632008-06-06 21:10:57 +00003133 def is_ready(self):
showard77182562009-06-10 00:16:05 +00003134 # NOTE: Atomic group jobs stop reporting ready after they have been
3135 # started to avoid launching multiple copies of one atomic job.
3136 # Only possible if synch_count is less than than half the number of
3137 # machines in the atomic group.
showardb000a8d2009-07-28 20:02:07 +00003138 pending_count = self._pending_count()
3139 atomic_and_has_started = self._atomic_and_has_started()
3140 ready = (pending_count >= self.synch_count
showardd2014822009-10-12 20:26:58 +00003141 and not atomic_and_has_started)
showardb000a8d2009-07-28 20:02:07 +00003142
3143 if not ready:
3144 logging.info(
3145 'Job %s not ready: %s pending, %s required '
3146 '(Atomic and started: %s)',
3147 self, pending_count, self.synch_count,
3148 atomic_and_has_started)
3149
3150 return ready
mbligh36768f02008-02-22 18:28:33 +00003151
3152
jadmanski0afbb632008-06-06 21:10:57 +00003153 def num_machines(self, clause = None):
3154 sql = "job_id=%s" % self.id
3155 if clause:
3156 sql += " AND (%s)" % clause
3157 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00003158
3159
jadmanski0afbb632008-06-06 21:10:57 +00003160 def num_queued(self):
3161 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00003162
3163
jadmanski0afbb632008-06-06 21:10:57 +00003164 def num_active(self):
3165 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00003166
3167
jadmanski0afbb632008-06-06 21:10:57 +00003168 def num_complete(self):
3169 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00003170
3171
jadmanski0afbb632008-06-06 21:10:57 +00003172 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00003173 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00003174
mbligh36768f02008-02-22 18:28:33 +00003175
showard6bb7c292009-01-30 01:44:51 +00003176 def _not_yet_run_entries(self, include_verifying=True):
3177 statuses = [models.HostQueueEntry.Status.QUEUED,
3178 models.HostQueueEntry.Status.PENDING]
3179 if include_verifying:
3180 statuses.append(models.HostQueueEntry.Status.VERIFYING)
3181 return models.HostQueueEntry.objects.filter(job=self.id,
3182 status__in=statuses)
3183
3184
3185 def _stop_all_entries(self):
3186 entries_to_stop = self._not_yet_run_entries(
3187 include_verifying=False)
3188 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00003189 assert not child_entry.complete, (
3190 '%s status=%s, active=%s, complete=%s' %
3191 (child_entry.id, child_entry.status, child_entry.active,
3192 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00003193 if child_entry.status == models.HostQueueEntry.Status.PENDING:
3194 child_entry.host.status = models.Host.Status.READY
3195 child_entry.host.save()
3196 child_entry.status = models.HostQueueEntry.Status.STOPPED
3197 child_entry.save()
3198
showard2bab8f42008-11-12 18:15:22 +00003199 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00003200 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00003201 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00003202 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00003203
3204
jadmanski0afbb632008-06-06 21:10:57 +00003205 def write_to_machines_file(self, queue_entry):
3206 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00003207 file_path = os.path.join(self.tag(), '.machines')
3208 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00003209
3210
showardf1ae3542009-05-11 19:26:02 +00003211 def _next_group_name(self, group_name=''):
3212 """@returns a directory name to use for the next host group results."""
3213 if group_name:
3214 # Sanitize for use as a pathname.
3215 group_name = group_name.replace(os.path.sep, '_')
3216 if group_name.startswith('.'):
3217 group_name = '_' + group_name[1:]
3218 # Add a separator between the group name and 'group%d'.
3219 group_name += '.'
3220 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00003221 query = models.HostQueueEntry.objects.filter(
3222 job=self.id).values('execution_subdir').distinct()
3223 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00003224 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
3225 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00003226 if ids:
3227 next_id = max(ids) + 1
3228 else:
3229 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00003230 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00003231
3232
showarddb502762009-09-09 15:31:20 +00003233 def _write_control_file(self, execution_path):
showard170873e2009-01-07 00:22:26 +00003234 control_path = _drone_manager.attach_file_to_execution(
showarddb502762009-09-09 15:31:20 +00003235 execution_path, self.control_file)
showard170873e2009-01-07 00:22:26 +00003236 return control_path
mbligh36768f02008-02-22 18:28:33 +00003237
showardb2e2c322008-10-14 17:33:55 +00003238
showard2bab8f42008-11-12 18:15:22 +00003239 def get_group_entries(self, queue_entry_from_group):
showard8375ce02009-10-12 20:35:13 +00003240 """
3241 @param queue_entry_from_group: A HostQueueEntry instance to find other
3242 group entries on this job for.
3243
3244 @returns A list of HostQueueEntry objects all executing this job as
3245 part of the same group as the one supplied (having the same
3246 execution_subdir).
3247 """
showard2bab8f42008-11-12 18:15:22 +00003248 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00003249 return list(HostQueueEntry.fetch(
3250 where='job_id=%s AND execution_subdir=%s',
3251 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00003252
3253
showard8cc058f2009-09-08 16:26:33 +00003254 def get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00003255 assert queue_entries
showarddb502762009-09-09 15:31:20 +00003256 execution_path = queue_entries[0].execution_path()
3257 control_path = self._write_control_file(execution_path)
jadmanski0afbb632008-06-06 21:10:57 +00003258 hostnames = ','.join([entry.get_host().hostname
3259 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00003260
showarddb502762009-09-09 15:31:20 +00003261 execution_tag = queue_entries[0].execution_tag()
showard87ba02a2009-04-20 19:37:32 +00003262 params = _autoserv_command_line(
showarded2afea2009-07-07 20:54:07 +00003263 hostnames,
showard87ba02a2009-04-20 19:37:32 +00003264 ['-P', execution_tag, '-n',
3265 _drone_manager.absolute_path(control_path)],
showarde9c69362009-06-30 01:58:03 +00003266 job=self, verbose=False)
mbligh36768f02008-02-22 18:28:33 +00003267
jadmanski0afbb632008-06-06 21:10:57 +00003268 if not self.is_server_job():
3269 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00003270
showardb2e2c322008-10-14 17:33:55 +00003271 return params
mblighe2586682008-02-29 22:45:46 +00003272
mbligh36768f02008-02-22 18:28:33 +00003273
showardc9ae1782009-01-30 01:42:37 +00003274 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00003275 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00003276 return True
showard0fc38302008-10-23 00:44:07 +00003277 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00003278 return queue_entry.get_host().dirty
3279 return False
showard21baa452008-10-21 00:08:39 +00003280
showardc9ae1782009-01-30 01:42:37 +00003281
showard8cc058f2009-09-08 16:26:33 +00003282 def _should_run_verify(self, queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003283 do_not_verify = (queue_entry.host.protection ==
3284 host_protections.Protection.DO_NOT_VERIFY)
3285 if do_not_verify:
3286 return False
3287 return self.run_verify
3288
3289
showard8cc058f2009-09-08 16:26:33 +00003290 def schedule_pre_job_tasks(self, queue_entry):
showard77182562009-06-10 00:16:05 +00003291 """
3292 Get a list of tasks to perform before the host_queue_entry
3293 may be used to run this Job (such as Cleanup & Verify).
3294
3295 @returns A list of tasks to be done to the given queue_entry before
mbligh6fbdb802009-08-03 16:42:55 +00003296 it should be considered be ready to run this job. The last
showard77182562009-06-10 00:16:05 +00003297 task in the list calls HostQueueEntry.on_pending(), which
3298 continues the flow of the job.
3299 """
showardc9ae1782009-01-30 01:42:37 +00003300 if self._should_run_cleanup(queue_entry):
showard8cc058f2009-09-08 16:26:33 +00003301 task = models.SpecialTask.Task.CLEANUP
3302 elif self._should_run_verify(queue_entry):
3303 task = models.SpecialTask.Task.VERIFY
3304 else:
3305 queue_entry.on_pending()
3306 return
3307
showard9bb960b2009-11-19 01:02:11 +00003308 queue_entry = models.HostQueueEntry.objects.get(id=queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00003309 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00003310 host=models.Host.objects.get(id=queue_entry.host_id),
3311 queue_entry=queue_entry, task=task)
showard21baa452008-10-21 00:08:39 +00003312
3313
showardf1ae3542009-05-11 19:26:02 +00003314 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00003315 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00003316 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00003317 else:
showardf1ae3542009-05-11 19:26:02 +00003318 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00003319 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00003320 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00003321 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003322
3323 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00003324 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003325
3326
3327 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00003328 """
3329 @returns A tuple containing a list of HostQueueEntry instances to be
3330 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00003331 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00003332 """
showard77182562009-06-10 00:16:05 +00003333 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00003334 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00003335 if atomic_group:
3336 num_entries_wanted = atomic_group.max_number_of_machines
3337 else:
3338 num_entries_wanted = self.synch_count
3339 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00003340
showardf1ae3542009-05-11 19:26:02 +00003341 if num_entries_wanted > 0:
3342 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00003343 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00003344 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00003345 params=(self.id, include_queue_entry.id)))
3346
3347 # Sort the chosen hosts by hostname before slicing.
3348 def cmp_queue_entries_by_hostname(entry_a, entry_b):
3349 return Host.cmp_for_sort(entry_a.host, entry_b.host)
3350 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
3351 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00003352
showardf1ae3542009-05-11 19:26:02 +00003353 # Sanity check. We'll only ever be called if this can be met.
showard828fc4c2009-09-14 20:31:00 +00003354 if len(chosen_entries) < self.synch_count:
3355 message = ('job %s got less than %s chosen entries: %s' % (
3356 self.id, self.synch_count, chosen_entries))
3357 logging.error(message)
3358 email_manager.manager.enqueue_notify_email(
3359 'Job not started, too few chosen entries', message)
3360 return []
showardf1ae3542009-05-11 19:26:02 +00003361
showard8cc058f2009-09-08 16:26:33 +00003362 group_name = include_queue_entry.get_group_name()
showardf1ae3542009-05-11 19:26:02 +00003363
3364 self._assign_new_group(chosen_entries, group_name=group_name)
showard8cc058f2009-09-08 16:26:33 +00003365 return chosen_entries
showard2bab8f42008-11-12 18:15:22 +00003366
3367
showard77182562009-06-10 00:16:05 +00003368 def run_if_ready(self, queue_entry):
3369 """
showard8375ce02009-10-12 20:35:13 +00003370 Run this job by kicking its HQEs into status='Starting' if enough
3371 hosts are ready for it to run.
3372
3373 Cleans up by kicking HQEs into status='Stopped' if this Job is not
3374 ready to run.
showard77182562009-06-10 00:16:05 +00003375 """
showardb2e2c322008-10-14 17:33:55 +00003376 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00003377 self.stop_if_necessary()
showard8cc058f2009-09-08 16:26:33 +00003378 elif queue_entry.atomic_group:
3379 self.run_with_ready_delay(queue_entry)
3380 else:
3381 self.run(queue_entry)
showard77182562009-06-10 00:16:05 +00003382
3383
3384 def run_with_ready_delay(self, queue_entry):
3385 """
3386 Start a delay to wait for more hosts to enter Pending state before
3387 launching an atomic group job. Once set, the a delay cannot be reset.
3388
3389 @param queue_entry: The HostQueueEntry object to get atomic group
3390 info from and pass to run_if_ready when the delay is up.
3391
3392 @returns An Agent to run the job as appropriate or None if a delay
3393 has already been set.
3394 """
3395 assert queue_entry.job_id == self.id
3396 assert queue_entry.atomic_group
3397 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
showardd2014822009-10-12 20:26:58 +00003398 over_max_threshold = (self._pending_count() >=
showardd07a5f32009-12-07 19:36:20 +00003399 self._max_hosts_needed_to_run(queue_entry.atomic_group))
showard77182562009-06-10 00:16:05 +00003400 delay_expired = (self._delay_ready_task and
3401 time.time() >= self._delay_ready_task.end_time)
3402
3403 # Delay is disabled or we already have enough? Do not wait to run.
3404 if not delay or over_max_threshold or delay_expired:
showard8cc058f2009-09-08 16:26:33 +00003405 self.run(queue_entry)
3406 else:
3407 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
showard77182562009-06-10 00:16:05 +00003408
showard8cc058f2009-09-08 16:26:33 +00003409
showardd07a5f32009-12-07 19:36:20 +00003410 def request_abort(self):
3411 """Request that this Job be aborted on the next scheduler cycle."""
3412 queue_entries = HostQueueEntry.fetch(where="job_id=%s" % self.id)
3413 for hqe in queue_entries:
3414 hqe.update_field('aborted', True)
3415
3416
showard8cc058f2009-09-08 16:26:33 +00003417 def schedule_delayed_callback_task(self, queue_entry):
3418 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
3419
showard77182562009-06-10 00:16:05 +00003420 if self._delay_ready_task:
3421 return None
3422
showard8cc058f2009-09-08 16:26:33 +00003423 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
3424
showard77182562009-06-10 00:16:05 +00003425 def run_job_after_delay():
showardd2014822009-10-12 20:26:58 +00003426 logging.info('Job %s done waiting for extra hosts.', self)
3427 # Check to see if the job is still relevant. It could have aborted
3428 # while we were waiting or hosts could have disappearred, etc.
showardd07a5f32009-12-07 19:36:20 +00003429 if self._pending_count() < self._min_hosts_needed_to_run():
showardd2014822009-10-12 20:26:58 +00003430 logging.info('Job %s had too few Pending hosts after waiting '
3431 'for extras. Not running.', self)
showardd07a5f32009-12-07 19:36:20 +00003432 self.request_abort()
showardd2014822009-10-12 20:26:58 +00003433 return
showard77182562009-06-10 00:16:05 +00003434 return self.run(queue_entry)
3435
showard708b3522009-08-20 23:26:15 +00003436 logging.info('Job %s waiting up to %s seconds for more hosts.',
3437 self.id, delay)
showard77182562009-06-10 00:16:05 +00003438 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3439 callback=run_job_after_delay)
showard8cc058f2009-09-08 16:26:33 +00003440 return self._delay_ready_task
showard77182562009-06-10 00:16:05 +00003441
3442
3443 def run(self, queue_entry):
3444 """
3445 @param queue_entry: The HostQueueEntry instance calling this method.
showard77182562009-06-10 00:16:05 +00003446 """
3447 if queue_entry.atomic_group and self._atomic_and_has_started():
3448 logging.error('Job.run() called on running atomic Job %d '
3449 'with HQE %s.', self.id, queue_entry)
showard8cc058f2009-09-08 16:26:33 +00003450 return
3451 queue_entries = self._choose_group_to_run(queue_entry)
showard828fc4c2009-09-14 20:31:00 +00003452 if queue_entries:
3453 self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00003454
3455
showard8cc058f2009-09-08 16:26:33 +00003456 def _finish_run(self, queue_entries):
showardb2ccdda2008-10-28 20:39:05 +00003457 for queue_entry in queue_entries:
showard8cc058f2009-09-08 16:26:33 +00003458 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showardd2014822009-10-12 20:26:58 +00003459 self.abort_delay_ready_task()
3460
3461
3462 def abort_delay_ready_task(self):
3463 """Abort the delayed task associated with this job, if any."""
showard77182562009-06-10 00:16:05 +00003464 if self._delay_ready_task:
3465 # Cancel any pending callback that would try to run again
3466 # as we are already running.
3467 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003468
showardd2014822009-10-12 20:26:58 +00003469
showardb000a8d2009-07-28 20:02:07 +00003470 def __str__(self):
3471 return '%s-%s' % (self.id, self.owner)
3472
3473
mbligh36768f02008-02-22 18:28:33 +00003474if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003475 main()