blob: a47f520523a02444a68616efc2fc5421d454a102 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard542e8402008-09-19 20:16:18 +00008import datetime, errno, MySQLdb, optparse, os, pwd, Queue, re, shutil, signal
9import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showard170873e2009-01-07 00:22:26 +000010import itertools, logging
mbligh70feeee2008-06-11 16:20:49 +000011import common
showard21baa452008-10-21 00:08:39 +000012from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000013from autotest_lib.client.common_lib import global_config
showard2bab8f42008-11-12 18:15:22 +000014from autotest_lib.client.common_lib import host_protections, utils, debug
showardb1e51872008-10-07 11:08:18 +000015from autotest_lib.database import database_connection
showard21baa452008-10-21 00:08:39 +000016from autotest_lib.frontend.afe import models
showard170873e2009-01-07 00:22:26 +000017from autotest_lib.scheduler import drone_manager, drones, email_manager
mbligh70feeee2008-06-11 16:20:49 +000018
mblighb090f142008-02-27 21:33:46 +000019
mbligh36768f02008-02-22 18:28:33 +000020RESULTS_DIR = '.'
21AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000022CONFIG_SECTION = 'SCHEDULER'
23DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000024
25AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
26
27if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000028 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000029AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
30AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
31
32if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000033 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000034
mbligh90a549d2008-03-25 23:52:34 +000035# how long to wait for autoserv to write a pidfile
36PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000037
mbligh6f8bab42008-02-29 22:45:14 +000038_db = None
mbligh36768f02008-02-22 18:28:33 +000039_shutdown = False
showard170873e2009-01-07 00:22:26 +000040_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
41_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000042_testing_mode = False
showard542e8402008-09-19 20:16:18 +000043_base_url = None
showardc85c21b2008-11-24 22:17:37 +000044_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000045_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000046
47
48def main():
jadmanski0afbb632008-06-06 21:10:57 +000049 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000050
jadmanski0afbb632008-06-06 21:10:57 +000051 parser = optparse.OptionParser(usage)
52 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
53 action='store_true')
54 parser.add_option('--logfile', help='Set a log file that all stdout ' +
55 'should be redirected to. Stderr will go to this ' +
56 'file + ".err"')
57 parser.add_option('--test', help='Indicate that scheduler is under ' +
58 'test and should use dummy autoserv and no parsing',
59 action='store_true')
60 (options, args) = parser.parse_args()
61 if len(args) != 1:
62 parser.print_usage()
63 return
mbligh36768f02008-02-22 18:28:33 +000064
jadmanski0afbb632008-06-06 21:10:57 +000065 global RESULTS_DIR
66 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +000067
jadmanski0afbb632008-06-06 21:10:57 +000068 c = global_config.global_config
showard170873e2009-01-07 00:22:26 +000069 notify_statuses_list = c.get_config_value(CONFIG_SECTION,
70 "notify_email_statuses")
showardc85c21b2008-11-24 22:17:37 +000071 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +000072 _notify_email_statuses = [status for status in
73 re.split(r'[\s,;:]', notify_statuses_list.lower())
74 if status]
showardc85c21b2008-11-24 22:17:37 +000075
showard170873e2009-01-07 00:22:26 +000076 tick_pause = c.get_config_value(CONFIG_SECTION, 'tick_pause_sec',
77 type=int)
showard3bb499f2008-07-03 19:42:20 +000078
jadmanski0afbb632008-06-06 21:10:57 +000079 if options.test:
80 global _autoserv_path
81 _autoserv_path = 'autoserv_dummy'
82 global _testing_mode
83 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +000084
mbligh37eceaa2008-12-15 22:56:37 +000085 # AUTOTEST_WEB.base_url is still a supported config option as some people
86 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +000087 global _base_url
showard170873e2009-01-07 00:22:26 +000088 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
89 default='')
mbligh37eceaa2008-12-15 22:56:37 +000090 if config_base_url:
91 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +000092 else:
mbligh37eceaa2008-12-15 22:56:37 +000093 # For the common case of everything running on a single server you
94 # can just set the hostname in a single place in the config file.
95 server_name = c.get_config_value('SERVER', 'hostname')
96 if not server_name:
97 print 'Error: [SERVER] hostname missing from the config file.'
98 sys.exit(1)
99 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000100
jadmanski0afbb632008-06-06 21:10:57 +0000101 init(options.logfile)
102 dispatcher = Dispatcher()
103 dispatcher.do_initial_recovery(recover_hosts=options.recover_hosts)
104
105 try:
106 while not _shutdown:
107 dispatcher.tick()
showard3bb499f2008-07-03 19:42:20 +0000108 time.sleep(tick_pause)
jadmanski0afbb632008-06-06 21:10:57 +0000109 except:
showard170873e2009-01-07 00:22:26 +0000110 email_manager.manager.log_stacktrace(
111 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000112
showard170873e2009-01-07 00:22:26 +0000113 email_manager.manager.send_queued_emails()
114 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000115 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000116
117
118def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000119 global _shutdown
120 _shutdown = True
121 print "Shutdown request received."
mbligh36768f02008-02-22 18:28:33 +0000122
123
124def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000125 if logfile:
126 enable_logging(logfile)
127 print "%s> dispatcher starting" % time.strftime("%X %x")
128 print "My PID is %d" % os.getpid()
mbligh36768f02008-02-22 18:28:33 +0000129
showardb1e51872008-10-07 11:08:18 +0000130 if _testing_mode:
131 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000132 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000133
jadmanski0afbb632008-06-06 21:10:57 +0000134 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
135 global _db
showard170873e2009-01-07 00:22:26 +0000136 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000137 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000138
showardfa8629c2008-11-04 16:51:23 +0000139 # ensure Django connection is in autocommit
140 setup_django_environment.enable_autocommit()
141
showard2bab8f42008-11-12 18:15:22 +0000142 debug.configure('scheduler', format_string='%(message)s')
showard170873e2009-01-07 00:22:26 +0000143 debug.get_logger().setLevel(logging.WARNING)
showard2bab8f42008-11-12 18:15:22 +0000144
jadmanski0afbb632008-06-06 21:10:57 +0000145 print "Setting signal handler"
146 signal.signal(signal.SIGINT, handle_sigint)
147
showard170873e2009-01-07 00:22:26 +0000148 drones = global_config.global_config.get_config_value(CONFIG_SECTION,
149 'drones')
150 if drones:
151 drone_list = [hostname.strip() for hostname in drones.split(',')]
152 else:
153 drone_list = ['localhost']
154 results_host = global_config.global_config.get_config_value(
155 CONFIG_SECTION, 'results_host', default='localhost')
156 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
157
jadmanski0afbb632008-06-06 21:10:57 +0000158 print "Connected! Running..."
mbligh36768f02008-02-22 18:28:33 +0000159
160
161def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000162 out_file = logfile
163 err_file = "%s.err" % logfile
164 print "Enabling logging to %s (%s)" % (out_file, err_file)
165 out_fd = open(out_file, "a", buffering=0)
166 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000167
jadmanski0afbb632008-06-06 21:10:57 +0000168 os.dup2(out_fd.fileno(), sys.stdout.fileno())
169 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000170
jadmanski0afbb632008-06-06 21:10:57 +0000171 sys.stdout = out_fd
172 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000173
174
mblighd5c95802008-03-05 00:33:46 +0000175def queue_entries_to_abort():
jadmanski0afbb632008-06-06 21:10:57 +0000176 rows = _db.execute("""
177 SELECT * FROM host_queue_entries WHERE status='Abort';
178 """)
showard2bab8f42008-11-12 18:15:22 +0000179
jadmanski0afbb632008-06-06 21:10:57 +0000180 qe = [HostQueueEntry(row=i) for i in rows]
181 return qe
mbligh36768f02008-02-22 18:28:33 +0000182
showard7cf9a9b2008-05-15 21:15:52 +0000183
showard63a34772008-08-18 19:32:50 +0000184class HostScheduler(object):
185 def _get_ready_hosts(self):
186 # avoid any host with a currently active queue entry against it
187 hosts = Host.fetch(
188 joins='LEFT JOIN host_queue_entries AS active_hqe '
189 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000190 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000191 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000192 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000193 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
194 return dict((host.id, host) for host in hosts)
195
196
197 @staticmethod
198 def _get_sql_id_list(id_list):
199 return ','.join(str(item_id) for item_id in id_list)
200
201
202 @classmethod
showard989f25d2008-10-01 11:38:11 +0000203 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000204 if not id_list:
205 return {}
showard63a34772008-08-18 19:32:50 +0000206 query %= cls._get_sql_id_list(id_list)
207 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000208 return cls._process_many2many_dict(rows, flip)
209
210
211 @staticmethod
212 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000213 result = {}
214 for row in rows:
215 left_id, right_id = long(row[0]), long(row[1])
showard989f25d2008-10-01 11:38:11 +0000216 if flip:
217 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000218 result.setdefault(left_id, set()).add(right_id)
219 return result
220
221
222 @classmethod
223 def _get_job_acl_groups(cls, job_ids):
224 query = """
225 SELECT jobs.id, acl_groups_users.acl_group_id
226 FROM jobs
227 INNER JOIN users ON users.login = jobs.owner
228 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
229 WHERE jobs.id IN (%s)
230 """
231 return cls._get_many2many_dict(query, job_ids)
232
233
234 @classmethod
235 def _get_job_ineligible_hosts(cls, job_ids):
236 query = """
237 SELECT job_id, host_id
238 FROM ineligible_host_queues
239 WHERE job_id IN (%s)
240 """
241 return cls._get_many2many_dict(query, job_ids)
242
243
244 @classmethod
showard989f25d2008-10-01 11:38:11 +0000245 def _get_job_dependencies(cls, job_ids):
246 query = """
247 SELECT job_id, label_id
248 FROM jobs_dependency_labels
249 WHERE job_id IN (%s)
250 """
251 return cls._get_many2many_dict(query, job_ids)
252
253
254 @classmethod
showard63a34772008-08-18 19:32:50 +0000255 def _get_host_acls(cls, host_ids):
256 query = """
257 SELECT host_id, acl_group_id
258 FROM acl_groups_hosts
259 WHERE host_id IN (%s)
260 """
261 return cls._get_many2many_dict(query, host_ids)
262
263
264 @classmethod
265 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000266 if not host_ids:
267 return {}, {}
showard63a34772008-08-18 19:32:50 +0000268 query = """
269 SELECT label_id, host_id
270 FROM hosts_labels
271 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000272 """ % cls._get_sql_id_list(host_ids)
273 rows = _db.execute(query)
274 labels_to_hosts = cls._process_many2many_dict(rows)
275 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
276 return labels_to_hosts, hosts_to_labels
277
278
279 @classmethod
280 def _get_labels(cls):
281 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000282
283
284 def refresh(self, pending_queue_entries):
285 self._hosts_available = self._get_ready_hosts()
286
287 relevant_jobs = [queue_entry.job_id
288 for queue_entry in pending_queue_entries]
289 self._job_acls = self._get_job_acl_groups(relevant_jobs)
290 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000291 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000292
293 host_ids = self._hosts_available.keys()
294 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000295 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
296
297 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000298
299
300 def _is_acl_accessible(self, host_id, queue_entry):
301 job_acls = self._job_acls.get(queue_entry.job_id, set())
302 host_acls = self._host_acls.get(host_id, set())
303 return len(host_acls.intersection(job_acls)) > 0
304
305
showard989f25d2008-10-01 11:38:11 +0000306 def _check_job_dependencies(self, job_dependencies, host_labels):
307 missing = job_dependencies - host_labels
308 return len(job_dependencies - host_labels) == 0
309
310
311 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
312 queue_entry):
313 for label_id in host_labels:
314 label = self._labels[label_id]
315 if not label.only_if_needed:
316 # we don't care about non-only_if_needed labels
317 continue
318 if queue_entry.meta_host == label_id:
319 # if the label was requested in a metahost it's OK
320 continue
321 if label_id not in job_dependencies:
322 return False
323 return True
324
325
326 def _is_host_eligible_for_job(self, host_id, queue_entry):
327 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
328 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000329
330 acl = self._is_acl_accessible(host_id, queue_entry)
331 deps = self._check_job_dependencies(job_dependencies, host_labels)
332 only_if = self._check_only_if_needed_labels(job_dependencies,
333 host_labels, queue_entry)
334 return acl and deps and only_if
showard989f25d2008-10-01 11:38:11 +0000335
336
showard63a34772008-08-18 19:32:50 +0000337 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000338 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000339 return None
340 return self._hosts_available.pop(queue_entry.host_id, None)
341
342
343 def _is_host_usable(self, host_id):
344 if host_id not in self._hosts_available:
345 # host was already used during this scheduling cycle
346 return False
347 if self._hosts_available[host_id].invalid:
348 # Invalid hosts cannot be used for metahosts. They're included in
349 # the original query because they can be used by non-metahosts.
350 return False
351 return True
352
353
354 def _schedule_metahost(self, queue_entry):
355 label_id = queue_entry.meta_host
356 hosts_in_label = self._label_hosts.get(label_id, set())
357 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
358 set())
359
360 # must iterate over a copy so we can mutate the original while iterating
361 for host_id in list(hosts_in_label):
362 if not self._is_host_usable(host_id):
363 hosts_in_label.remove(host_id)
364 continue
365 if host_id in ineligible_host_ids:
366 continue
showard989f25d2008-10-01 11:38:11 +0000367 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000368 continue
369
370 hosts_in_label.remove(host_id)
371 return self._hosts_available.pop(host_id)
372 return None
373
374
375 def find_eligible_host(self, queue_entry):
376 if not queue_entry.meta_host:
377 return self._schedule_non_metahost(queue_entry)
378 return self._schedule_metahost(queue_entry)
379
380
showard170873e2009-01-07 00:22:26 +0000381class Dispatcher(object):
showard4c5374f2008-09-04 17:02:56 +0000382 max_running_processes = global_config.global_config.get_config_value(
showard170873e2009-01-07 00:22:26 +0000383 CONFIG_SECTION, 'max_running_jobs', type=int)
showard4c5374f2008-09-04 17:02:56 +0000384 max_processes_started_per_cycle = (
jadmanski0afbb632008-06-06 21:10:57 +0000385 global_config.global_config.get_config_value(
showard170873e2009-01-07 00:22:26 +0000386 CONFIG_SECTION, 'max_jobs_started_per_cycle', type=int))
showard3bb499f2008-07-03 19:42:20 +0000387 clean_interval = (
388 global_config.global_config.get_config_value(
showard170873e2009-01-07 00:22:26 +0000389 CONFIG_SECTION, 'clean_interval_minutes', type=int))
showard98863972008-10-29 21:14:56 +0000390 synch_job_start_timeout_minutes = (
391 global_config.global_config.get_config_value(
showard170873e2009-01-07 00:22:26 +0000392 CONFIG_SECTION, 'synch_job_start_timeout_minutes',
showard98863972008-10-29 21:14:56 +0000393 type=int))
mbligh90a549d2008-03-25 23:52:34 +0000394
jadmanski0afbb632008-06-06 21:10:57 +0000395 def __init__(self):
396 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000397 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000398 self._host_scheduler = HostScheduler()
showard170873e2009-01-07 00:22:26 +0000399 self._host_agents = {}
400 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000401
mbligh36768f02008-02-22 18:28:33 +0000402
jadmanski0afbb632008-06-06 21:10:57 +0000403 def do_initial_recovery(self, recover_hosts=True):
404 # always recover processes
405 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000406
jadmanski0afbb632008-06-06 21:10:57 +0000407 if recover_hosts:
408 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000409
410
jadmanski0afbb632008-06-06 21:10:57 +0000411 def tick(self):
showard170873e2009-01-07 00:22:26 +0000412 _drone_manager.refresh()
showarda3ab0d52008-11-03 19:03:47 +0000413 self._run_cleanup_maybe()
jadmanski0afbb632008-06-06 21:10:57 +0000414 self._find_aborting()
415 self._schedule_new_jobs()
416 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000417 _drone_manager.execute_actions()
418 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000419
showard97aed502008-11-04 02:01:24 +0000420
showarda3ab0d52008-11-03 19:03:47 +0000421 def _run_cleanup_maybe(self):
422 if self._last_clean_time + self.clean_interval * 60 < time.time():
423 print 'Running cleanup'
424 self._abort_timed_out_jobs()
425 self._abort_jobs_past_synch_start_timeout()
426 self._clear_inactive_blocks()
showardfa8629c2008-11-04 16:51:23 +0000427 self._check_for_db_inconsistencies()
showarda3ab0d52008-11-03 19:03:47 +0000428 self._last_clean_time = time.time()
429
mbligh36768f02008-02-22 18:28:33 +0000430
showard170873e2009-01-07 00:22:26 +0000431 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
432 for object_id in object_ids:
433 agent_dict.setdefault(object_id, set()).add(agent)
434
435
436 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
437 for object_id in object_ids:
438 assert object_id in agent_dict
439 agent_dict[object_id].remove(agent)
440
441
jadmanski0afbb632008-06-06 21:10:57 +0000442 def add_agent(self, agent):
443 self._agents.append(agent)
444 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000445 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
446 self._register_agent_for_ids(self._queue_entry_agents,
447 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000448
showard170873e2009-01-07 00:22:26 +0000449
450 def get_agents_for_entry(self, queue_entry):
451 """
452 Find agents corresponding to the specified queue_entry.
453 """
454 return self._queue_entry_agents.get(queue_entry.id, set())
455
456
457 def host_has_agent(self, host):
458 """
459 Determine if there is currently an Agent present using this host.
460 """
461 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000462
463
jadmanski0afbb632008-06-06 21:10:57 +0000464 def remove_agent(self, agent):
465 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000466 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
467 agent)
468 self._unregister_agent_for_ids(self._queue_entry_agents,
469 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000470
471
showard4c5374f2008-09-04 17:02:56 +0000472 def num_running_processes(self):
473 return sum(agent.num_processes for agent in self._agents
474 if agent.is_running())
mblighbb421852008-03-11 22:36:16 +0000475
476
showard170873e2009-01-07 00:22:26 +0000477 def _extract_execution_tag(self, command_line):
478 match = re.match(r'.* -P (\S+) ', command_line)
479 if not match:
480 return None
481 return match.group(1)
mblighbb421852008-03-11 22:36:16 +0000482
483
showard2bab8f42008-11-12 18:15:22 +0000484 def _recover_queue_entries(self, queue_entries, run_monitor):
485 assert len(queue_entries) > 0
showard2bab8f42008-11-12 18:15:22 +0000486 queue_task = RecoveryQueueTask(job=queue_entries[0].job,
487 queue_entries=queue_entries,
488 run_monitor=run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +0000489 self.add_agent(Agent(tasks=[queue_task],
showard170873e2009-01-07 00:22:26 +0000490 num_processes=len(queue_entries)))
mblighbb421852008-03-11 22:36:16 +0000491
492
jadmanski0afbb632008-06-06 21:10:57 +0000493 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000494 self._register_pidfiles()
495 _drone_manager.refresh()
496 self._recover_running_entries()
497 self._recover_aborting_entries()
498 self._requeue_other_active_entries()
499 self._recover_parsing_entries()
500 self._reverify_remaining_hosts()
501 # reinitialize drones after killing orphaned processes, since they can
502 # leave around files when they die
503 _drone_manager.execute_actions()
504 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000505
showard170873e2009-01-07 00:22:26 +0000506
507 def _register_pidfiles(self):
508 # during recovery we may need to read pidfiles for both running and
509 # parsing entries
510 queue_entries = HostQueueEntry.fetch(
511 where="status IN ('Running', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000512 for queue_entry in queue_entries:
showard170873e2009-01-07 00:22:26 +0000513 pidfile_id = _drone_manager.get_pidfile_id_from(
514 queue_entry.execution_tag())
515 _drone_manager.register_pidfile(pidfile_id)
516
517
518 def _recover_running_entries(self):
519 orphans = _drone_manager.get_orphaned_autoserv_processes()
520
521 queue_entries = HostQueueEntry.fetch(where="status = 'Running'")
522 requeue_entries = []
523 for queue_entry in queue_entries:
524 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000525 # synchronous job we've already recovered
526 continue
showard170873e2009-01-07 00:22:26 +0000527 execution_tag = queue_entry.execution_tag()
528 run_monitor = PidfileRunMonitor()
529 run_monitor.attach_to_existing_process(execution_tag)
530 if not run_monitor.has_process():
531 # autoserv apparently never got run, so let it get requeued
532 continue
showarde788ea62008-11-17 21:02:47 +0000533 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showard170873e2009-01-07 00:22:26 +0000534 print 'Recovering %s (process %s)' % (
535 ', '.join(str(entry) for entry in queue_entries),
536 run_monitor.get_process())
showard2bab8f42008-11-12 18:15:22 +0000537 self._recover_queue_entries(queue_entries, run_monitor)
showard170873e2009-01-07 00:22:26 +0000538 orphans.pop(execution_tag, None)
mbligh90a549d2008-03-25 23:52:34 +0000539
jadmanski0afbb632008-06-06 21:10:57 +0000540 # now kill any remaining autoserv processes
showard170873e2009-01-07 00:22:26 +0000541 for process in orphans.itervalues():
542 print 'Killing orphan %s' % process
543 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000544
showard170873e2009-01-07 00:22:26 +0000545
546 def _recover_aborting_entries(self):
547 queue_entries = HostQueueEntry.fetch(
548 where='status IN ("Abort", "Aborting")')
jadmanski0afbb632008-06-06 21:10:57 +0000549 for queue_entry in queue_entries:
showard170873e2009-01-07 00:22:26 +0000550 print 'Recovering aborting QE %s' % queue_entry
551 agent = queue_entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000552
showard97aed502008-11-04 02:01:24 +0000553
showard170873e2009-01-07 00:22:26 +0000554 def _requeue_other_active_entries(self):
555 queue_entries = HostQueueEntry.fetch(
556 where='active AND NOT complete AND status != "Pending"')
557 for queue_entry in queue_entries:
558 if self.get_agents_for_entry(queue_entry):
559 # entry has already been recovered
560 continue
561 print 'Requeuing active QE %s (status=%s)' % (queue_entry,
562 queue_entry.status)
563 if queue_entry.host:
564 tasks = queue_entry.host.reverify_tasks()
565 self.add_agent(Agent(tasks))
566 agent = queue_entry.requeue()
567
568
569 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000570 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000571 self._reverify_hosts_where("""(status = 'Repairing' OR
572 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000573 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000574
showard170873e2009-01-07 00:22:26 +0000575 # recover "Running" hosts with no active queue entries, although this
576 # should never happen
577 message = ('Recovering running host %s - this probably indicates a '
578 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000579 self._reverify_hosts_where("""status = 'Running' AND
580 id NOT IN (SELECT host_id
581 FROM host_queue_entries
582 WHERE active)""",
583 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000584
585
jadmanski0afbb632008-06-06 21:10:57 +0000586 def _reverify_hosts_where(self, where,
showard170873e2009-01-07 00:22:26 +0000587 print_message='Reverifying host %s'):
588 full_where='locked = 0 AND invalid = 0 AND ' + where
589 for host in Host.fetch(where=full_where):
590 if self.host_has_agent(host):
591 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000592 continue
showard170873e2009-01-07 00:22:26 +0000593 if print_message:
jadmanski0afbb632008-06-06 21:10:57 +0000594 print print_message % host.hostname
showard170873e2009-01-07 00:22:26 +0000595 tasks = host.reverify_tasks()
596 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000597
598
showard97aed502008-11-04 02:01:24 +0000599 def _recover_parsing_entries(self):
showard2bab8f42008-11-12 18:15:22 +0000600 recovered_entry_ids = set()
showard97aed502008-11-04 02:01:24 +0000601 for entry in HostQueueEntry.fetch(where='status = "Parsing"'):
showard2bab8f42008-11-12 18:15:22 +0000602 if entry.id in recovered_entry_ids:
603 continue
604 queue_entries = entry.job.get_group_entries(entry)
showard170873e2009-01-07 00:22:26 +0000605 recovered_entry_ids = recovered_entry_ids.union(
606 entry.id for entry in queue_entries)
607 print 'Recovering parsing entries %s' % (
608 ', '.join(str(entry) for entry in queue_entries))
showard97aed502008-11-04 02:01:24 +0000609
610 reparse_task = FinalReparseTask(queue_entries)
showard170873e2009-01-07 00:22:26 +0000611 self.add_agent(Agent([reparse_task], num_processes=0))
showard97aed502008-11-04 02:01:24 +0000612
613
jadmanski0afbb632008-06-06 21:10:57 +0000614 def _recover_hosts(self):
615 # recover "Repair Failed" hosts
616 message = 'Reverifying dead host %s'
617 self._reverify_hosts_where("status = 'Repair Failed'",
618 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000619
620
showard3bb499f2008-07-03 19:42:20 +0000621 def _abort_timed_out_jobs(self):
622 """
623 Aborts all jobs that have timed out and not completed
624 """
showarda3ab0d52008-11-03 19:03:47 +0000625 query = models.Job.objects.filter(hostqueueentry__complete=False).extra(
626 where=['created_on + INTERVAL timeout HOUR < NOW()'])
627 for job in query.distinct():
628 print 'Aborting job %d due to job timeout' % job.id
629 job.abort(None)
showard3bb499f2008-07-03 19:42:20 +0000630
631
showard98863972008-10-29 21:14:56 +0000632 def _abort_jobs_past_synch_start_timeout(self):
633 """
634 Abort synchronous jobs that are past the start timeout (from global
635 config) and are holding a machine that's in everyone.
636 """
637 timeout_delta = datetime.timedelta(
638 minutes=self.synch_job_start_timeout_minutes)
639 timeout_start = datetime.datetime.now() - timeout_delta
640 query = models.Job.objects.filter(
showard98863972008-10-29 21:14:56 +0000641 created_on__lt=timeout_start,
642 hostqueueentry__status='Pending',
643 hostqueueentry__host__acl_group__name='Everyone')
644 for job in query.distinct():
645 print 'Aborting job %d due to start timeout' % job.id
showardff059d72008-12-03 18:18:53 +0000646 entries_to_abort = job.hostqueueentry_set.exclude(
647 status=models.HostQueueEntry.Status.RUNNING)
648 for queue_entry in entries_to_abort:
649 queue_entry.abort(None)
showard98863972008-10-29 21:14:56 +0000650
651
jadmanski0afbb632008-06-06 21:10:57 +0000652 def _clear_inactive_blocks(self):
653 """
654 Clear out blocks for all completed jobs.
655 """
656 # this would be simpler using NOT IN (subquery), but MySQL
657 # treats all IN subqueries as dependent, so this optimizes much
658 # better
659 _db.execute("""
660 DELETE ihq FROM ineligible_host_queues ihq
showard4eaaf522008-06-06 22:28:07 +0000661 LEFT JOIN (SELECT DISTINCT job_id FROM host_queue_entries
jadmanski0afbb632008-06-06 21:10:57 +0000662 WHERE NOT complete) hqe
663 USING (job_id) WHERE hqe.job_id IS NULL""")
showard04c82c52008-05-29 19:38:12 +0000664
665
showardb95b1bd2008-08-15 18:11:04 +0000666 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000667 # prioritize by job priority, then non-metahost over metahost, then FIFO
668 return list(HostQueueEntry.fetch(
showardac9ce222008-12-03 18:19:44 +0000669 where='NOT complete AND NOT active AND status="Queued"',
showard3dd6b882008-10-27 19:21:39 +0000670 order_by='priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000671
672
jadmanski0afbb632008-06-06 21:10:57 +0000673 def _schedule_new_jobs(self):
showard63a34772008-08-18 19:32:50 +0000674 queue_entries = self._get_pending_queue_entries()
675 if not queue_entries:
showardb95b1bd2008-08-15 18:11:04 +0000676 return
showardb95b1bd2008-08-15 18:11:04 +0000677
showard63a34772008-08-18 19:32:50 +0000678 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000679
showard63a34772008-08-18 19:32:50 +0000680 for queue_entry in queue_entries:
681 assigned_host = self._host_scheduler.find_eligible_host(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000682 if not assigned_host:
jadmanski0afbb632008-06-06 21:10:57 +0000683 continue
showardb95b1bd2008-08-15 18:11:04 +0000684 self._run_queue_entry(queue_entry, assigned_host)
685
686
687 def _run_queue_entry(self, queue_entry, host):
688 agent = queue_entry.run(assigned_host=host)
showard170873e2009-01-07 00:22:26 +0000689 # in some cases (synchronous jobs with run_verify=False), agent may be
690 # None
showard9976ce92008-10-15 20:28:13 +0000691 if agent:
692 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000693
694
jadmanski0afbb632008-06-06 21:10:57 +0000695 def _find_aborting(self):
jadmanski0afbb632008-06-06 21:10:57 +0000696 for entry in queue_entries_to_abort():
showard170873e2009-01-07 00:22:26 +0000697 agents_to_abort = list(self.get_agents_for_entry(entry))
showard1be97432008-10-17 15:30:45 +0000698 for agent in agents_to_abort:
699 self.remove_agent(agent)
700
showard170873e2009-01-07 00:22:26 +0000701 entry.abort(self, agents_to_abort)
jadmanski0afbb632008-06-06 21:10:57 +0000702
703
showard4c5374f2008-09-04 17:02:56 +0000704 def _can_start_agent(self, agent, num_running_processes,
705 num_started_this_cycle, have_reached_limit):
706 # always allow zero-process agents to run
707 if agent.num_processes == 0:
708 return True
709 # don't allow any nonzero-process agents to run after we've reached a
710 # limit (this avoids starvation of many-process agents)
711 if have_reached_limit:
712 return False
713 # total process throttling
714 if (num_running_processes + agent.num_processes >
715 self.max_running_processes):
716 return False
717 # if a single agent exceeds the per-cycle throttling, still allow it to
718 # run when it's the first agent in the cycle
719 if num_started_this_cycle == 0:
720 return True
721 # per-cycle throttling
722 if (num_started_this_cycle + agent.num_processes >
723 self.max_processes_started_per_cycle):
724 return False
725 return True
726
727
jadmanski0afbb632008-06-06 21:10:57 +0000728 def _handle_agents(self):
showard4c5374f2008-09-04 17:02:56 +0000729 num_running_processes = self.num_running_processes()
jadmanski0afbb632008-06-06 21:10:57 +0000730 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000731 have_reached_limit = False
732 # iterate over copy, so we can remove agents during iteration
733 for agent in list(self._agents):
734 if agent.is_done():
jadmanski0afbb632008-06-06 21:10:57 +0000735 print "agent finished"
showard170873e2009-01-07 00:22:26 +0000736 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000737 continue
738 if not agent.is_running():
739 if not self._can_start_agent(agent, num_running_processes,
740 num_started_this_cycle,
741 have_reached_limit):
742 have_reached_limit = True
743 continue
744 num_running_processes += agent.num_processes
745 num_started_this_cycle += agent.num_processes
746 agent.tick()
747 print num_running_processes, 'running processes'
mbligh36768f02008-02-22 18:28:33 +0000748
749
showardfa8629c2008-11-04 16:51:23 +0000750 def _check_for_db_inconsistencies(self):
751 query = models.HostQueueEntry.objects.filter(active=True, complete=True)
752 if query.count() != 0:
753 subject = ('%d queue entries found with active=complete=1'
754 % query.count())
755 message = '\n'.join(str(entry.get_object_dict())
756 for entry in query[:50])
757 if len(query) > 50:
758 message += '\n(truncated)\n'
759
760 print subject
showard170873e2009-01-07 00:22:26 +0000761 email_manager.manager.enqueue_notify_email(subject, message)
showardfa8629c2008-11-04 16:51:23 +0000762
763
showard170873e2009-01-07 00:22:26 +0000764class PidfileRunMonitor(object):
765 """
766 Client must call either run() to start a new process or
767 attach_to_existing_process().
768 """
mbligh36768f02008-02-22 18:28:33 +0000769
showard170873e2009-01-07 00:22:26 +0000770 class _PidfileException(Exception):
771 """
772 Raised when there's some unexpected behavior with the pid file, but only
773 used internally (never allowed to escape this class).
774 """
mbligh36768f02008-02-22 18:28:33 +0000775
776
showard170873e2009-01-07 00:22:26 +0000777 def __init__(self):
778 self._lost_process = False
779 self._start_time = None
780 self.pidfile_id = None
781 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +0000782
783
showard170873e2009-01-07 00:22:26 +0000784 def _add_nice_command(self, command, nice_level):
785 if not nice_level:
786 return command
787 return ['nice', '-n', str(nice_level)] + command
788
789
790 def _set_start_time(self):
791 self._start_time = time.time()
792
793
794 def run(self, command, working_directory, nice_level=None, log_file=None,
795 pidfile_name=None, paired_with_pidfile=None):
796 assert command is not None
797 if nice_level is not None:
798 command = ['nice', '-n', str(nice_level)] + command
799 self._set_start_time()
800 self.pidfile_id = _drone_manager.execute_command(
801 command, working_directory, log_file=log_file,
802 pidfile_name=pidfile_name, paired_with_pidfile=paired_with_pidfile)
803
804
805 def attach_to_existing_process(self, execution_tag):
806 self._set_start_time()
807 self.pidfile_id = _drone_manager.get_pidfile_id_from(execution_tag)
808 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +0000809
810
jadmanski0afbb632008-06-06 21:10:57 +0000811 def kill(self):
showard170873e2009-01-07 00:22:26 +0000812 if self.has_process():
813 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +0000814
mbligh36768f02008-02-22 18:28:33 +0000815
showard170873e2009-01-07 00:22:26 +0000816 def has_process(self):
showard21baa452008-10-21 00:08:39 +0000817 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +0000818 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +0000819
820
showard170873e2009-01-07 00:22:26 +0000821 def get_process(self):
showard21baa452008-10-21 00:08:39 +0000822 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +0000823 assert self.has_process()
824 return self._state.process
mblighbb421852008-03-11 22:36:16 +0000825
826
showard170873e2009-01-07 00:22:26 +0000827 def _read_pidfile(self, use_second_read=False):
828 assert self.pidfile_id is not None, (
829 'You must call run() or attach_to_existing_process()')
830 contents = _drone_manager.get_pidfile_contents(
831 self.pidfile_id, use_second_read=use_second_read)
832 if contents.is_invalid():
833 self._state = drone_manager.PidfileContents()
834 raise self._PidfileException(contents)
835 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +0000836
837
showard21baa452008-10-21 00:08:39 +0000838 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +0000839 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
840 self._state.process, self.pidfile_id, message)
showard21baa452008-10-21 00:08:39 +0000841 print message
showard170873e2009-01-07 00:22:26 +0000842 email_manager.manager.enqueue_notify_email(error, message)
843 if self._state.process is not None:
844 process = self._state.process
showard21baa452008-10-21 00:08:39 +0000845 else:
showard170873e2009-01-07 00:22:26 +0000846 process = _drone_manager.get_dummy_process()
847 self.on_lost_process(process)
showard21baa452008-10-21 00:08:39 +0000848
849
850 def _get_pidfile_info_helper(self):
showard170873e2009-01-07 00:22:26 +0000851 if self._lost_process:
showard21baa452008-10-21 00:08:39 +0000852 return
mblighbb421852008-03-11 22:36:16 +0000853
showard21baa452008-10-21 00:08:39 +0000854 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +0000855
showard170873e2009-01-07 00:22:26 +0000856 if self._state.process is None:
857 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +0000858 return
mbligh90a549d2008-03-25 23:52:34 +0000859
showard21baa452008-10-21 00:08:39 +0000860 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000861 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +0000862 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +0000863 return
mbligh90a549d2008-03-25 23:52:34 +0000864
showard170873e2009-01-07 00:22:26 +0000865 # pid but no running process - maybe process *just* exited
866 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +0000867 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000868 # autoserv exited without writing an exit code
869 # to the pidfile
showard21baa452008-10-21 00:08:39 +0000870 self._handle_pidfile_error(
871 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +0000872
showard21baa452008-10-21 00:08:39 +0000873
874 def _get_pidfile_info(self):
875 """\
876 After completion, self._state will contain:
877 pid=None, exit_status=None if autoserv has not yet run
878 pid!=None, exit_status=None if autoserv is running
879 pid!=None, exit_status!=None if autoserv has completed
880 """
881 try:
882 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +0000883 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +0000884 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +0000885
886
showard170873e2009-01-07 00:22:26 +0000887 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +0000888 """\
889 Called when no pidfile is found or no pid is in the pidfile.
890 """
showard170873e2009-01-07 00:22:26 +0000891 message = 'No pid found at %s' % self.pidfile_id
jadmanski0afbb632008-06-06 21:10:57 +0000892 print message
showard170873e2009-01-07 00:22:26 +0000893 if time.time() - self._start_time > PIDFILE_TIMEOUT:
894 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +0000895 'Process has failed to write pidfile', message)
showard170873e2009-01-07 00:22:26 +0000896 self.on_lost_process(_drone_manager.get_dummy_process())
mbligh90a549d2008-03-25 23:52:34 +0000897
898
showard170873e2009-01-07 00:22:26 +0000899 def on_lost_process(self, process):
jadmanski0afbb632008-06-06 21:10:57 +0000900 """\
901 Called when autoserv has exited without writing an exit status,
902 or we've timed out waiting for autoserv to write a pid to the
903 pidfile. In either case, we just return failure and the caller
904 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +0000905
showard170873e2009-01-07 00:22:26 +0000906 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +0000907 """
908 self.lost_process = True
showard170873e2009-01-07 00:22:26 +0000909 self._state.process = process
showard21baa452008-10-21 00:08:39 +0000910 self._state.exit_status = 1
911 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +0000912
913
jadmanski0afbb632008-06-06 21:10:57 +0000914 def exit_code(self):
showard21baa452008-10-21 00:08:39 +0000915 self._get_pidfile_info()
916 return self._state.exit_status
917
918
919 def num_tests_failed(self):
920 self._get_pidfile_info()
921 assert self._state.num_tests_failed is not None
922 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +0000923
924
mbligh36768f02008-02-22 18:28:33 +0000925class Agent(object):
showard170873e2009-01-07 00:22:26 +0000926 def __init__(self, tasks, num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +0000927 self.active_task = None
928 self.queue = Queue.Queue(0)
929 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +0000930 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +0000931
showard170873e2009-01-07 00:22:26 +0000932 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
933 for task in tasks)
934 self.host_ids = self._union_ids(task.host_ids for task in tasks)
935
jadmanski0afbb632008-06-06 21:10:57 +0000936 for task in tasks:
937 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +0000938
939
showard170873e2009-01-07 00:22:26 +0000940 def _union_ids(self, id_lists):
941 return set(itertools.chain(*id_lists))
942
943
jadmanski0afbb632008-06-06 21:10:57 +0000944 def add_task(self, task):
945 self.queue.put_nowait(task)
946 task.agent = self
mbligh36768f02008-02-22 18:28:33 +0000947
948
jadmanski0afbb632008-06-06 21:10:57 +0000949 def tick(self):
showard21baa452008-10-21 00:08:39 +0000950 while not self.is_done():
951 if self.active_task and not self.active_task.is_done():
952 self.active_task.poll()
953 if not self.active_task.is_done():
954 return
955 self._next_task()
mbligh36768f02008-02-22 18:28:33 +0000956
957
jadmanski0afbb632008-06-06 21:10:57 +0000958 def _next_task(self):
959 print "agent picking task"
960 if self.active_task:
961 assert self.active_task.is_done()
mbligh36768f02008-02-22 18:28:33 +0000962
jadmanski0afbb632008-06-06 21:10:57 +0000963 if not self.active_task.success:
964 self.on_task_failure()
mblighe2586682008-02-29 22:45:46 +0000965
jadmanski0afbb632008-06-06 21:10:57 +0000966 self.active_task = None
967 if not self.is_done():
968 self.active_task = self.queue.get_nowait()
969 if self.active_task:
970 self.active_task.start()
mbligh36768f02008-02-22 18:28:33 +0000971
972
jadmanski0afbb632008-06-06 21:10:57 +0000973 def on_task_failure(self):
974 self.queue = Queue.Queue(0)
975 for task in self.active_task.failure_tasks:
976 self.add_task(task)
mbligh16c722d2008-03-05 00:58:44 +0000977
mblighe2586682008-02-29 22:45:46 +0000978
showard4c5374f2008-09-04 17:02:56 +0000979 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +0000980 return self.active_task is not None
showardec113162008-05-08 00:52:49 +0000981
982
jadmanski0afbb632008-06-06 21:10:57 +0000983 def is_done(self):
mblighd876f452008-12-03 15:09:17 +0000984 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +0000985
986
jadmanski0afbb632008-06-06 21:10:57 +0000987 def start(self):
988 assert self.dispatcher
mbligh36768f02008-02-22 18:28:33 +0000989
jadmanski0afbb632008-06-06 21:10:57 +0000990 self._next_task()
mbligh36768f02008-02-22 18:28:33 +0000991
jadmanski0afbb632008-06-06 21:10:57 +0000992
mbligh36768f02008-02-22 18:28:33 +0000993class AgentTask(object):
showard170873e2009-01-07 00:22:26 +0000994 def __init__(self, cmd, working_directory=None, failure_tasks=[]):
jadmanski0afbb632008-06-06 21:10:57 +0000995 self.done = False
996 self.failure_tasks = failure_tasks
997 self.started = False
998 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +0000999 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001000 self.task = None
1001 self.agent = None
1002 self.monitor = None
1003 self.success = None
showard170873e2009-01-07 00:22:26 +00001004 self.queue_entry_ids = []
1005 self.host_ids = []
1006 self.log_file = None
1007
1008
1009 def _set_ids(self, host=None, queue_entries=None):
1010 if queue_entries and queue_entries != [None]:
1011 self.host_ids = [entry.host.id for entry in queue_entries]
1012 self.queue_entry_ids = [entry.id for entry in queue_entries]
1013 else:
1014 assert host
1015 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001016
1017
jadmanski0afbb632008-06-06 21:10:57 +00001018 def poll(self):
jadmanski0afbb632008-06-06 21:10:57 +00001019 if self.monitor:
1020 self.tick(self.monitor.exit_code())
1021 else:
1022 self.finished(False)
mbligh36768f02008-02-22 18:28:33 +00001023
1024
jadmanski0afbb632008-06-06 21:10:57 +00001025 def tick(self, exit_code):
showard170873e2009-01-07 00:22:26 +00001026 if exit_code is None:
jadmanski0afbb632008-06-06 21:10:57 +00001027 return
jadmanski0afbb632008-06-06 21:10:57 +00001028 if exit_code == 0:
1029 success = True
1030 else:
1031 success = False
mbligh36768f02008-02-22 18:28:33 +00001032
jadmanski0afbb632008-06-06 21:10:57 +00001033 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001034
1035
jadmanski0afbb632008-06-06 21:10:57 +00001036 def is_done(self):
1037 return self.done
mbligh36768f02008-02-22 18:28:33 +00001038
1039
jadmanski0afbb632008-06-06 21:10:57 +00001040 def finished(self, success):
1041 self.done = True
1042 self.success = success
1043 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001044
1045
jadmanski0afbb632008-06-06 21:10:57 +00001046 def prolog(self):
1047 pass
mblighd64e5702008-04-04 21:39:28 +00001048
1049
jadmanski0afbb632008-06-06 21:10:57 +00001050 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001051 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001052
mbligh36768f02008-02-22 18:28:33 +00001053
jadmanski0afbb632008-06-06 21:10:57 +00001054 def cleanup(self):
showard170873e2009-01-07 00:22:26 +00001055 if self.monitor and self.log_file:
1056 _drone_manager.copy_to_results_repository(
1057 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001058
1059
jadmanski0afbb632008-06-06 21:10:57 +00001060 def epilog(self):
1061 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001062
1063
jadmanski0afbb632008-06-06 21:10:57 +00001064 def start(self):
1065 assert self.agent
1066
1067 if not self.started:
1068 self.prolog()
1069 self.run()
1070
1071 self.started = True
1072
1073
1074 def abort(self):
1075 if self.monitor:
1076 self.monitor.kill()
1077 self.done = True
1078 self.cleanup()
1079
1080
showard170873e2009-01-07 00:22:26 +00001081 def set_host_log_file(self, base_name, host):
1082 filename = '%s.%s' % (time.time(), base_name)
1083 self.log_file = os.path.join('hosts', host.hostname, filename)
1084
1085
jadmanski0afbb632008-06-06 21:10:57 +00001086 def run(self):
1087 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001088 self.monitor = PidfileRunMonitor()
1089 self.monitor.run(self.cmd, self._working_directory,
1090 nice_level=AUTOSERV_NICE_LEVEL,
1091 log_file=self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001092
1093
1094class RepairTask(AgentTask):
showarde788ea62008-11-17 21:02:47 +00001095 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001096 """\
showard170873e2009-01-07 00:22:26 +00001097 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001098 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001099 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001100 # normalize the protection name
1101 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001102
jadmanski0afbb632008-06-06 21:10:57 +00001103 self.host = host
showarde788ea62008-11-17 21:02:47 +00001104 self.queue_entry = queue_entry
showard170873e2009-01-07 00:22:26 +00001105 self._set_ids(host=host, queue_entries=[queue_entry])
1106
1107 self.create_temp_resultsdir('.repair')
1108 cmd = [_autoserv_path , '-p', '-R', '-m', host.hostname,
1109 '-r', _drone_manager.absolute_path(self.temp_results_dir),
1110 '--host-protection', protection]
1111 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1112
1113 self._set_ids(host=host, queue_entries=[queue_entry])
1114 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001115
mbligh36768f02008-02-22 18:28:33 +00001116
jadmanski0afbb632008-06-06 21:10:57 +00001117 def prolog(self):
1118 print "repair_task starting"
1119 self.host.set_status('Repairing')
showarde788ea62008-11-17 21:02:47 +00001120 if self.queue_entry:
1121 self.queue_entry.requeue()
mbligh36768f02008-02-22 18:28:33 +00001122
1123
jadmanski0afbb632008-06-06 21:10:57 +00001124 def epilog(self):
1125 super(RepairTask, self).epilog()
1126 if self.success:
1127 self.host.set_status('Ready')
1128 else:
1129 self.host.set_status('Repair Failed')
showarde788ea62008-11-17 21:02:47 +00001130 if self.queue_entry and not self.queue_entry.meta_host:
1131 self.queue_entry.handle_host_failure()
mbligh36768f02008-02-22 18:28:33 +00001132
1133
showard8fe93b52008-11-18 17:53:22 +00001134class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001135 def epilog(self):
1136 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001137 should_copy_results = (self.queue_entry and not self.success
1138 and not self.queue_entry.meta_host)
1139 if should_copy_results:
1140 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001141 destination = os.path.join(self.queue_entry.execution_tag(),
1142 os.path.basename(self.log_file))
1143 _drone_manager.copy_to_results_repository(
1144 self.monitor.get_process(), self.log_file,
1145 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001146
1147
1148class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001149 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001150 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001151 self.host = host or queue_entry.host
1152 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001153
jadmanski0afbb632008-06-06 21:10:57 +00001154 self.create_temp_resultsdir('.verify')
showard170873e2009-01-07 00:22:26 +00001155 cmd = [_autoserv_path, '-p', '-v', '-m', self.host.hostname, '-r',
1156 _drone_manager.absolute_path(self.temp_results_dir)]
showarde788ea62008-11-17 21:02:47 +00001157 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001158 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1159 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001160
showard170873e2009-01-07 00:22:26 +00001161 self.set_host_log_file('verify', self.host)
1162 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001163
1164
jadmanski0afbb632008-06-06 21:10:57 +00001165 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001166 super(VerifyTask, self).prolog()
jadmanski0afbb632008-06-06 21:10:57 +00001167 print "starting verify on %s" % (self.host.hostname)
1168 if self.queue_entry:
1169 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001170 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001171
1172
jadmanski0afbb632008-06-06 21:10:57 +00001173 def epilog(self):
1174 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001175
jadmanski0afbb632008-06-06 21:10:57 +00001176 if self.success:
1177 self.host.set_status('Ready')
showard2bab8f42008-11-12 18:15:22 +00001178 if self.queue_entry:
1179 agent = self.queue_entry.on_pending()
1180 if agent:
1181 self.agent.dispatcher.add_agent(agent)
mbligh36768f02008-02-22 18:28:33 +00001182
1183
mbligh36768f02008-02-22 18:28:33 +00001184class QueueTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001185 def __init__(self, job, queue_entries, cmd):
jadmanski0afbb632008-06-06 21:10:57 +00001186 self.job = job
1187 self.queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001188 super(QueueTask, self).__init__(cmd, self._execution_tag())
1189 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001190
1191
showard170873e2009-01-07 00:22:26 +00001192 def _format_keyval(self, key, value):
1193 return '%s=%s' % (key, value)
mbligh36768f02008-02-22 18:28:33 +00001194
1195
showard170873e2009-01-07 00:22:26 +00001196 def _write_keyval(self, field, value):
1197 keyval_path = os.path.join(self._execution_tag(), 'keyval')
1198 assert self.monitor and self.monitor.has_process()
1199 paired_with_pidfile = self.monitor.pidfile_id
1200 _drone_manager.write_lines_to_file(
1201 keyval_path, [self._format_keyval(field, value)],
1202 paired_with_pidfile=paired_with_pidfile)
showardd8e548a2008-09-09 03:04:57 +00001203
1204
showard170873e2009-01-07 00:22:26 +00001205 def _write_host_keyvals(self, host):
1206 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1207 host.hostname)
1208 platform, all_labels = host.platform_and_labels()
1209 keyvals = dict(platform=platform, labels=','.join(all_labels))
1210 keyval_content = '\n'.join(self._format_keyval(key, value)
1211 for key, value in keyvals.iteritems())
1212 _drone_manager.attach_file_to_execution(self._execution_tag(),
1213 keyval_content,
1214 file_path=keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001215
1216
showard170873e2009-01-07 00:22:26 +00001217 def _execution_tag(self):
1218 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001219
1220
jadmanski0afbb632008-06-06 21:10:57 +00001221 def prolog(self):
jadmanski0afbb632008-06-06 21:10:57 +00001222 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001223 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001224 queue_entry.set_status('Running')
1225 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001226 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001227 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001228 assert len(self.queue_entries) == 1
1229 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001230
1231
showard97aed502008-11-04 02:01:24 +00001232 def _finish_task(self, success):
showard170873e2009-01-07 00:22:26 +00001233 queued = time.mktime(self.job.created_on.timetuple())
jadmanski0afbb632008-06-06 21:10:57 +00001234 finished = time.time()
showard170873e2009-01-07 00:22:26 +00001235 self._write_keyval("job_queued", int(queued))
1236 self._write_keyval("job_finished", int(finished))
1237
1238 _drone_manager.copy_to_results_repository(self.monitor.get_process(),
1239 self._execution_tag() + '/')
jadmanskic2ac77f2008-05-16 21:44:04 +00001240
jadmanski0afbb632008-06-06 21:10:57 +00001241 # parse the results of the job
showard97aed502008-11-04 02:01:24 +00001242 reparse_task = FinalReparseTask(self.queue_entries)
showard170873e2009-01-07 00:22:26 +00001243 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
jadmanskif7fa2cc2008-10-01 14:13:23 +00001244
1245
showardcbd74612008-11-19 21:42:02 +00001246 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001247 _drone_manager.write_lines_to_file(
1248 os.path.join(self._execution_tag(), 'status.log'),
1249 ['INFO\t----\t----\t' + comment],
1250 paired_with_pidfile=self.monitor.pidfile_id)
showardcbd74612008-11-19 21:42:02 +00001251
1252
jadmanskif7fa2cc2008-10-01 14:13:23 +00001253 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001254 if not self.monitor or not self.monitor.has_process():
1255 return
1256
jadmanskif7fa2cc2008-10-01 14:13:23 +00001257 # build up sets of all the aborted_by and aborted_on values
1258 aborted_by, aborted_on = set(), set()
1259 for queue_entry in self.queue_entries:
1260 if queue_entry.aborted_by:
1261 aborted_by.add(queue_entry.aborted_by)
1262 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1263 aborted_on.add(t)
1264
1265 # extract some actual, unique aborted by value and write it out
1266 assert len(aborted_by) <= 1
1267 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001268 aborted_by_value = aborted_by.pop()
1269 aborted_on_value = max(aborted_on)
1270 else:
1271 aborted_by_value = 'autotest_system'
1272 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001273
1274 self._write_keyval("aborted_by", aborted_by_value)
1275 self._write_keyval("aborted_on", aborted_on_value)
1276
showardcbd74612008-11-19 21:42:02 +00001277 aborted_on_string = str(datetime.datetime.fromtimestamp(
1278 aborted_on_value))
1279 self._write_status_comment('Job aborted by %s on %s' %
1280 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001281
1282
jadmanski0afbb632008-06-06 21:10:57 +00001283 def abort(self):
1284 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001285 self._log_abort()
showard97aed502008-11-04 02:01:24 +00001286 self._finish_task(False)
jadmanskic2ac77f2008-05-16 21:44:04 +00001287
1288
showard21baa452008-10-21 00:08:39 +00001289 def _reboot_hosts(self):
1290 reboot_after = self.job.reboot_after
1291 do_reboot = False
showard0fc38302008-10-23 00:44:07 +00001292 if reboot_after == models.RebootAfter.ALWAYS:
showard21baa452008-10-21 00:08:39 +00001293 do_reboot = True
showard0fc38302008-10-23 00:44:07 +00001294 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
showard21baa452008-10-21 00:08:39 +00001295 num_tests_failed = self.monitor.num_tests_failed()
1296 do_reboot = (self.success and num_tests_failed == 0)
1297
showard8ebca792008-11-04 21:54:22 +00001298 for queue_entry in self.queue_entries:
1299 if do_reboot:
showard45ae8192008-11-05 19:32:53 +00001300 # don't pass the queue entry to the CleanupTask. if the cleanup
showardfa8629c2008-11-04 16:51:23 +00001301 # fails, the job doesn't care -- it's over.
showard45ae8192008-11-05 19:32:53 +00001302 cleanup_task = CleanupTask(host=queue_entry.get_host())
1303 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
showard8ebca792008-11-04 21:54:22 +00001304 else:
1305 queue_entry.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001306
1307
jadmanski0afbb632008-06-06 21:10:57 +00001308 def epilog(self):
1309 super(QueueTask, self).epilog()
showard97aed502008-11-04 02:01:24 +00001310 self._finish_task(self.success)
showard21baa452008-10-21 00:08:39 +00001311 self._reboot_hosts()
mblighbb421852008-03-11 22:36:16 +00001312
showard97aed502008-11-04 02:01:24 +00001313 print "queue_task finished with succes=%s" % self.success
mbligh36768f02008-02-22 18:28:33 +00001314
1315
mblighbb421852008-03-11 22:36:16 +00001316class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001317 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001318 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
jadmanski0afbb632008-06-06 21:10:57 +00001319 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001320
1321
jadmanski0afbb632008-06-06 21:10:57 +00001322 def run(self):
1323 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001324
1325
jadmanski0afbb632008-06-06 21:10:57 +00001326 def prolog(self):
1327 # recovering an existing process - don't do prolog
1328 pass
mblighbb421852008-03-11 22:36:16 +00001329
1330
showard8fe93b52008-11-18 17:53:22 +00001331class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001332 def __init__(self, host=None, queue_entry=None):
1333 assert bool(host) ^ bool(queue_entry)
1334 if queue_entry:
1335 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001336 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001337 self.host = host
showard170873e2009-01-07 00:22:26 +00001338
1339 self.create_temp_resultsdir('.cleanup')
1340 self.cmd = [_autoserv_path, '-p', '--cleanup', '-m', host.hostname,
1341 '-r', _drone_manager.absolute_path(self.temp_results_dir)]
showarde788ea62008-11-17 21:02:47 +00001342 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001343 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1344 failure_tasks=[repair_task])
1345
1346 self._set_ids(host=host, queue_entries=[queue_entry])
1347 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001348
mblighd5c95802008-03-05 00:33:46 +00001349
jadmanski0afbb632008-06-06 21:10:57 +00001350 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001351 super(CleanupTask, self).prolog()
showard45ae8192008-11-05 19:32:53 +00001352 print "starting cleanup task for host: %s" % self.host.hostname
1353 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001354
mblighd5c95802008-03-05 00:33:46 +00001355
showard21baa452008-10-21 00:08:39 +00001356 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001357 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001358 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001359 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001360 self.host.update_field('dirty', 0)
1361
1362
mblighd5c95802008-03-05 00:33:46 +00001363class AbortTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001364 def __init__(self, queue_entry, agents_to_abort):
jadmanski0afbb632008-06-06 21:10:57 +00001365 super(AbortTask, self).__init__('')
showard170873e2009-01-07 00:22:26 +00001366 self.queue_entry = queue_entry
1367 # don't use _set_ids, since we don't want to set the host_ids
1368 self.queue_entry_ids = [queue_entry.id]
1369 self.agents_to_abort = agents_to_abort
mbligh36768f02008-02-22 18:28:33 +00001370
1371
jadmanski0afbb632008-06-06 21:10:57 +00001372 def prolog(self):
1373 print "starting abort on host %s, job %s" % (
1374 self.queue_entry.host_id, self.queue_entry.job_id)
mbligh36768f02008-02-22 18:28:33 +00001375
mblighd64e5702008-04-04 21:39:28 +00001376
jadmanski0afbb632008-06-06 21:10:57 +00001377 def epilog(self):
1378 super(AbortTask, self).epilog()
1379 self.queue_entry.set_status('Aborted')
1380 self.success = True
1381
1382
1383 def run(self):
1384 for agent in self.agents_to_abort:
1385 if (agent.active_task):
1386 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +00001387
1388
showard97aed502008-11-04 02:01:24 +00001389class FinalReparseTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001390 MAX_PARSE_PROCESSES = global_config.global_config.get_config_value(
1391 CONFIG_SECTION, 'max_parse_processes', type=int)
showard97aed502008-11-04 02:01:24 +00001392 _num_running_parses = 0
1393
1394 def __init__(self, queue_entries):
1395 self._queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001396 # don't use _set_ids, since we don't want to set the host_ids
1397 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00001398 self._parse_started = False
1399
1400 assert len(queue_entries) > 0
1401 queue_entry = queue_entries[0]
showard97aed502008-11-04 02:01:24 +00001402
showard170873e2009-01-07 00:22:26 +00001403 self._execution_tag = queue_entry.execution_tag()
1404 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1405 self._autoserv_monitor = PidfileRunMonitor()
1406 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1407 self._final_status = self._determine_final_status()
1408
showard97aed502008-11-04 02:01:24 +00001409 if _testing_mode:
1410 self.cmd = 'true'
showard170873e2009-01-07 00:22:26 +00001411 else:
1412 super(FinalReparseTask, self).__init__(
1413 cmd=self._generate_parse_command(),
1414 working_directory=self._execution_tag)
showard97aed502008-11-04 02:01:24 +00001415
showard170873e2009-01-07 00:22:26 +00001416 self.log_file = os.path.join(self._execution_tag, '.parse.log')
showard97aed502008-11-04 02:01:24 +00001417
1418
1419 @classmethod
1420 def _increment_running_parses(cls):
1421 cls._num_running_parses += 1
1422
1423
1424 @classmethod
1425 def _decrement_running_parses(cls):
1426 cls._num_running_parses -= 1
1427
1428
1429 @classmethod
1430 def _can_run_new_parse(cls):
1431 return cls._num_running_parses < cls.MAX_PARSE_PROCESSES
1432
1433
showard170873e2009-01-07 00:22:26 +00001434 def _determine_final_status(self):
1435 # we'll use a PidfileRunMonitor to read the autoserv exit status
1436 if self._autoserv_monitor.exit_code() == 0:
1437 return models.HostQueueEntry.Status.COMPLETED
1438 return models.HostQueueEntry.Status.FAILED
1439
1440
showard97aed502008-11-04 02:01:24 +00001441 def prolog(self):
1442 super(FinalReparseTask, self).prolog()
1443 for queue_entry in self._queue_entries:
1444 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
1445
1446
1447 def epilog(self):
1448 super(FinalReparseTask, self).epilog()
showard97aed502008-11-04 02:01:24 +00001449 for queue_entry in self._queue_entries:
showard170873e2009-01-07 00:22:26 +00001450 queue_entry.set_status(self._final_status)
showard97aed502008-11-04 02:01:24 +00001451
1452
showard2bab8f42008-11-12 18:15:22 +00001453 def _generate_parse_command(self):
showard170873e2009-01-07 00:22:26 +00001454 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
1455 self._results_dir]
showard97aed502008-11-04 02:01:24 +00001456
1457
1458 def poll(self):
1459 # override poll to keep trying to start until the parse count goes down
1460 # and we can, at which point we revert to default behavior
1461 if self._parse_started:
1462 super(FinalReparseTask, self).poll()
1463 else:
1464 self._try_starting_parse()
1465
1466
1467 def run(self):
1468 # override run() to not actually run unless we can
1469 self._try_starting_parse()
1470
1471
1472 def _try_starting_parse(self):
1473 if not self._can_run_new_parse():
1474 return
showard170873e2009-01-07 00:22:26 +00001475
showard97aed502008-11-04 02:01:24 +00001476 # actually run the parse command
showard170873e2009-01-07 00:22:26 +00001477 self.monitor = PidfileRunMonitor()
1478 self.monitor.run(self.cmd, self._working_directory,
1479 log_file=self.log_file,
1480 pidfile_name='.parser_execute',
1481 paired_with_pidfile=self._autoserv_monitor.pidfile_id)
1482
showard97aed502008-11-04 02:01:24 +00001483 self._increment_running_parses()
1484 self._parse_started = True
1485
1486
1487 def finished(self, success):
1488 super(FinalReparseTask, self).finished(success)
1489 self._decrement_running_parses()
1490
1491
mbligh36768f02008-02-22 18:28:33 +00001492class DBObject(object):
jadmanski0afbb632008-06-06 21:10:57 +00001493 def __init__(self, id=None, row=None, new_record=False):
1494 assert (bool(id) != bool(row))
mbligh36768f02008-02-22 18:28:33 +00001495
jadmanski0afbb632008-06-06 21:10:57 +00001496 self.__table = self._get_table()
mbligh36768f02008-02-22 18:28:33 +00001497
jadmanski0afbb632008-06-06 21:10:57 +00001498 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00001499
jadmanski0afbb632008-06-06 21:10:57 +00001500 if row is None:
1501 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
1502 rows = _db.execute(sql, (id,))
1503 if len(rows) == 0:
1504 raise "row not found (table=%s, id=%s)" % \
1505 (self.__table, id)
1506 row = rows[0]
mbligh36768f02008-02-22 18:28:33 +00001507
showard2bab8f42008-11-12 18:15:22 +00001508 self._update_fields_from_row(row)
1509
1510
1511 def _update_fields_from_row(self, row):
jadmanski0afbb632008-06-06 21:10:57 +00001512 assert len(row) == self.num_cols(), (
1513 "table = %s, row = %s/%d, fields = %s/%d" % (
showard2bab8f42008-11-12 18:15:22 +00001514 self.__table, row, len(row), self._fields(), self.num_cols()))
mbligh36768f02008-02-22 18:28:33 +00001515
showard2bab8f42008-11-12 18:15:22 +00001516 self._valid_fields = set()
1517 for field, value in zip(self._fields(), row):
1518 setattr(self, field, value)
1519 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00001520
showard2bab8f42008-11-12 18:15:22 +00001521 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00001522
mblighe2586682008-02-29 22:45:46 +00001523
jadmanski0afbb632008-06-06 21:10:57 +00001524 @classmethod
1525 def _get_table(cls):
1526 raise NotImplementedError('Subclasses must override this')
mblighe2586682008-02-29 22:45:46 +00001527
1528
jadmanski0afbb632008-06-06 21:10:57 +00001529 @classmethod
1530 def _fields(cls):
1531 raise NotImplementedError('Subclasses must override this')
showard04c82c52008-05-29 19:38:12 +00001532
1533
jadmanski0afbb632008-06-06 21:10:57 +00001534 @classmethod
1535 def num_cols(cls):
1536 return len(cls._fields())
showard04c82c52008-05-29 19:38:12 +00001537
1538
jadmanski0afbb632008-06-06 21:10:57 +00001539 def count(self, where, table = None):
1540 if not table:
1541 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00001542
jadmanski0afbb632008-06-06 21:10:57 +00001543 rows = _db.execute("""
1544 SELECT count(*) FROM %s
1545 WHERE %s
1546 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00001547
jadmanski0afbb632008-06-06 21:10:57 +00001548 assert len(rows) == 1
1549
1550 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001551
1552
mblighf8c624d2008-07-03 16:58:45 +00001553 def update_field(self, field, value, condition=''):
showard2bab8f42008-11-12 18:15:22 +00001554 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00001555
showard2bab8f42008-11-12 18:15:22 +00001556 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00001557 return
mbligh36768f02008-02-22 18:28:33 +00001558
mblighf8c624d2008-07-03 16:58:45 +00001559 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
1560 if condition:
1561 query += ' AND (%s)' % condition
jadmanski0afbb632008-06-06 21:10:57 +00001562 _db.execute(query, (value, self.id))
1563
showard2bab8f42008-11-12 18:15:22 +00001564 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00001565
1566
jadmanski0afbb632008-06-06 21:10:57 +00001567 def save(self):
1568 if self.__new_record:
1569 keys = self._fields()[1:] # avoid id
1570 columns = ','.join([str(key) for key in keys])
1571 values = ['"%s"' % self.__dict__[key] for key in keys]
1572 values = ','.join(values)
1573 query = """INSERT INTO %s (%s) VALUES (%s)""" % \
1574 (self.__table, columns, values)
1575 _db.execute(query)
mbligh36768f02008-02-22 18:28:33 +00001576
1577
jadmanski0afbb632008-06-06 21:10:57 +00001578 def delete(self):
1579 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
1580 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00001581
1582
showard63a34772008-08-18 19:32:50 +00001583 @staticmethod
1584 def _prefix_with(string, prefix):
1585 if string:
1586 string = prefix + string
1587 return string
1588
1589
jadmanski0afbb632008-06-06 21:10:57 +00001590 @classmethod
showard989f25d2008-10-01 11:38:11 +00001591 def fetch(cls, where='', params=(), joins='', order_by=''):
showard63a34772008-08-18 19:32:50 +00001592 order_by = cls._prefix_with(order_by, 'ORDER BY ')
1593 where = cls._prefix_with(where, 'WHERE ')
1594 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
1595 '%(where)s %(order_by)s' % {'table' : cls._get_table(),
1596 'joins' : joins,
1597 'where' : where,
1598 'order_by' : order_by})
1599 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00001600 for row in rows:
1601 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00001602
mbligh36768f02008-02-22 18:28:33 +00001603
1604class IneligibleHostQueue(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001605 def __init__(self, id=None, row=None, new_record=None):
1606 super(IneligibleHostQueue, self).__init__(id=id, row=row,
1607 new_record=new_record)
mblighe2586682008-02-29 22:45:46 +00001608
1609
jadmanski0afbb632008-06-06 21:10:57 +00001610 @classmethod
1611 def _get_table(cls):
1612 return 'ineligible_host_queues'
mbligh36768f02008-02-22 18:28:33 +00001613
1614
jadmanski0afbb632008-06-06 21:10:57 +00001615 @classmethod
1616 def _fields(cls):
1617 return ['id', 'job_id', 'host_id']
showard04c82c52008-05-29 19:38:12 +00001618
1619
showard989f25d2008-10-01 11:38:11 +00001620class Label(DBObject):
1621 @classmethod
1622 def _get_table(cls):
1623 return 'labels'
1624
1625
1626 @classmethod
1627 def _fields(cls):
1628 return ['id', 'name', 'kernel_config', 'platform', 'invalid',
1629 'only_if_needed']
1630
1631
mbligh36768f02008-02-22 18:28:33 +00001632class Host(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001633 def __init__(self, id=None, row=None):
1634 super(Host, self).__init__(id=id, row=row)
mblighe2586682008-02-29 22:45:46 +00001635
1636
jadmanski0afbb632008-06-06 21:10:57 +00001637 @classmethod
1638 def _get_table(cls):
1639 return 'hosts'
mbligh36768f02008-02-22 18:28:33 +00001640
1641
jadmanski0afbb632008-06-06 21:10:57 +00001642 @classmethod
1643 def _fields(cls):
1644 return ['id', 'hostname', 'locked', 'synch_id','status',
showard21baa452008-10-21 00:08:39 +00001645 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty']
showard04c82c52008-05-29 19:38:12 +00001646
1647
jadmanski0afbb632008-06-06 21:10:57 +00001648 def current_task(self):
1649 rows = _db.execute("""
1650 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
1651 """, (self.id,))
1652
1653 if len(rows) == 0:
1654 return None
1655 else:
1656 assert len(rows) == 1
1657 results = rows[0];
mblighf8c624d2008-07-03 16:58:45 +00001658# print "current = %s" % results
jadmanski0afbb632008-06-06 21:10:57 +00001659 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00001660
1661
jadmanski0afbb632008-06-06 21:10:57 +00001662 def yield_work(self):
1663 print "%s yielding work" % self.hostname
1664 if self.current_task():
1665 self.current_task().requeue()
1666
1667 def set_status(self,status):
1668 print '%s -> %s' % (self.hostname, status)
1669 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00001670
1671
showard170873e2009-01-07 00:22:26 +00001672 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00001673 """
showard170873e2009-01-07 00:22:26 +00001674 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00001675 """
1676 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00001677 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00001678 FROM labels
1679 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00001680 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00001681 ORDER BY labels.name
1682 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00001683 platform = None
1684 all_labels = []
1685 for label_name, is_platform in rows:
1686 if is_platform:
1687 platform = label_name
1688 all_labels.append(label_name)
1689 return platform, all_labels
1690
1691
1692 def reverify_tasks(self):
1693 cleanup_task = CleanupTask(host=self)
1694 verify_task = VerifyTask(host=self)
1695 # just to make sure this host does not get taken away
1696 self.set_status('Cleaning')
1697 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00001698
1699
mbligh36768f02008-02-22 18:28:33 +00001700class HostQueueEntry(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001701 def __init__(self, id=None, row=None):
1702 assert id or row
1703 super(HostQueueEntry, self).__init__(id=id, row=row)
1704 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00001705
jadmanski0afbb632008-06-06 21:10:57 +00001706 if self.host_id:
1707 self.host = Host(self.host_id)
1708 else:
1709 self.host = None
mbligh36768f02008-02-22 18:28:33 +00001710
showard170873e2009-01-07 00:22:26 +00001711 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00001712 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00001713
1714
jadmanski0afbb632008-06-06 21:10:57 +00001715 @classmethod
1716 def _get_table(cls):
1717 return 'host_queue_entries'
mblighe2586682008-02-29 22:45:46 +00001718
1719
jadmanski0afbb632008-06-06 21:10:57 +00001720 @classmethod
1721 def _fields(cls):
showard2bab8f42008-11-12 18:15:22 +00001722 return ['id', 'job_id', 'host_id', 'priority', 'status', 'meta_host',
1723 'active', 'complete', 'deleted', 'execution_subdir']
showard04c82c52008-05-29 19:38:12 +00001724
1725
showardc85c21b2008-11-24 22:17:37 +00001726 def _view_job_url(self):
1727 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
1728
1729
jadmanski0afbb632008-06-06 21:10:57 +00001730 def set_host(self, host):
1731 if host:
1732 self.queue_log_record('Assigning host ' + host.hostname)
1733 self.update_field('host_id', host.id)
1734 self.update_field('active', True)
1735 self.block_host(host.id)
1736 else:
1737 self.queue_log_record('Releasing host')
1738 self.unblock_host(self.host.id)
1739 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00001740
jadmanski0afbb632008-06-06 21:10:57 +00001741 self.host = host
mbligh36768f02008-02-22 18:28:33 +00001742
1743
jadmanski0afbb632008-06-06 21:10:57 +00001744 def get_host(self):
1745 return self.host
mbligh36768f02008-02-22 18:28:33 +00001746
1747
jadmanski0afbb632008-06-06 21:10:57 +00001748 def queue_log_record(self, log_line):
1749 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00001750 _drone_manager.write_lines_to_file(self.queue_log_path,
1751 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00001752
1753
jadmanski0afbb632008-06-06 21:10:57 +00001754 def block_host(self, host_id):
1755 print "creating block %s/%s" % (self.job.id, host_id)
1756 row = [0, self.job.id, host_id]
1757 block = IneligibleHostQueue(row=row, new_record=True)
1758 block.save()
mblighe2586682008-02-29 22:45:46 +00001759
1760
jadmanski0afbb632008-06-06 21:10:57 +00001761 def unblock_host(self, host_id):
1762 print "removing block %s/%s" % (self.job.id, host_id)
1763 blocks = IneligibleHostQueue.fetch(
1764 'job_id=%d and host_id=%d' % (self.job.id, host_id))
1765 for block in blocks:
1766 block.delete()
mblighe2586682008-02-29 22:45:46 +00001767
1768
showard2bab8f42008-11-12 18:15:22 +00001769 def set_execution_subdir(self, subdir=None):
1770 if subdir is None:
1771 assert self.get_host()
1772 subdir = self.get_host().hostname
1773 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00001774
1775
showard6355f6b2008-12-05 18:52:13 +00001776 def _get_hostname(self):
1777 if self.host:
1778 return self.host.hostname
1779 return 'no host'
1780
1781
showard170873e2009-01-07 00:22:26 +00001782 def __str__(self):
1783 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
1784
1785
jadmanski0afbb632008-06-06 21:10:57 +00001786 def set_status(self, status):
mblighf8c624d2008-07-03 16:58:45 +00001787 abort_statuses = ['Abort', 'Aborting', 'Aborted']
1788 if status not in abort_statuses:
1789 condition = ' AND '.join(['status <> "%s"' % x
1790 for x in abort_statuses])
1791 else:
1792 condition = ''
1793 self.update_field('status', status, condition=condition)
1794
showard170873e2009-01-07 00:22:26 +00001795 print "%s -> %s" % (self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00001796
showardc85c21b2008-11-24 22:17:37 +00001797 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00001798 self.update_field('complete', False)
1799 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00001800
jadmanski0afbb632008-06-06 21:10:57 +00001801 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showarde58e3f82008-11-20 19:04:59 +00001802 'Aborting']:
jadmanski0afbb632008-06-06 21:10:57 +00001803 self.update_field('complete', False)
1804 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00001805
showardc85c21b2008-11-24 22:17:37 +00001806 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00001807 self.update_field('complete', True)
1808 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00001809
1810 should_email_status = (status.lower() in _notify_email_statuses or
1811 'all' in _notify_email_statuses)
1812 if should_email_status:
1813 self._email_on_status(status)
1814
1815 self._email_on_job_complete()
1816
1817
1818 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00001819 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00001820
1821 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
1822 self.job.id, self.job.name, hostname, status)
1823 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
1824 self.job.id, self.job.name, hostname, status,
1825 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00001826 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00001827
1828
1829 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00001830 if not self.job.is_finished():
1831 return
showard542e8402008-09-19 20:16:18 +00001832
showardc85c21b2008-11-24 22:17:37 +00001833 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00001834 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00001835 for queue_entry in hosts_queue:
1836 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00001837 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00001838 queue_entry.status))
1839
1840 summary_text = "\n".join(summary_text)
1841 status_counts = models.Job.objects.get_status_counts(
1842 [self.job.id])[self.job.id]
1843 status = ', '.join('%d %s' % (count, status) for status, count
1844 in status_counts.iteritems())
1845
1846 subject = 'Autotest: Job ID: %s "%s" %s' % (
1847 self.job.id, self.job.name, status)
1848 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
1849 self.job.id, self.job.name, status, self._view_job_url(),
1850 summary_text)
showard170873e2009-01-07 00:22:26 +00001851 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00001852
1853
jadmanski0afbb632008-06-06 21:10:57 +00001854 def run(self,assigned_host=None):
1855 if self.meta_host:
1856 assert assigned_host
1857 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00001858 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00001859
jadmanski0afbb632008-06-06 21:10:57 +00001860 print "%s/%s scheduled on %s, status=%s" % (self.job.name,
1861 self.meta_host, self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00001862
jadmanski0afbb632008-06-06 21:10:57 +00001863 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00001864
jadmanski0afbb632008-06-06 21:10:57 +00001865 def requeue(self):
1866 self.set_status('Queued')
jadmanski0afbb632008-06-06 21:10:57 +00001867 if self.meta_host:
1868 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00001869
1870
jadmanski0afbb632008-06-06 21:10:57 +00001871 def handle_host_failure(self):
1872 """\
1873 Called when this queue entry's host has failed verification and
1874 repair.
1875 """
1876 assert not self.meta_host
1877 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00001878 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00001879
1880
jadmanskif7fa2cc2008-10-01 14:13:23 +00001881 @property
1882 def aborted_by(self):
1883 self._load_abort_info()
1884 return self._aborted_by
1885
1886
1887 @property
1888 def aborted_on(self):
1889 self._load_abort_info()
1890 return self._aborted_on
1891
1892
1893 def _load_abort_info(self):
1894 """ Fetch info about who aborted the job. """
1895 if hasattr(self, "_aborted_by"):
1896 return
1897 rows = _db.execute("""
1898 SELECT users.login, aborted_host_queue_entries.aborted_on
1899 FROM aborted_host_queue_entries
1900 INNER JOIN users
1901 ON users.id = aborted_host_queue_entries.aborted_by_id
1902 WHERE aborted_host_queue_entries.queue_entry_id = %s
1903 """, (self.id,))
1904 if rows:
1905 self._aborted_by, self._aborted_on = rows[0]
1906 else:
1907 self._aborted_by = self._aborted_on = None
1908
1909
showardb2e2c322008-10-14 17:33:55 +00001910 def on_pending(self):
1911 """
1912 Called when an entry in a synchronous job has passed verify. If the
1913 job is ready to run, returns an agent to run the job. Returns None
1914 otherwise.
1915 """
1916 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00001917 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00001918 if self.job.is_ready():
1919 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00001920 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00001921 return None
1922
1923
showard170873e2009-01-07 00:22:26 +00001924 def abort(self, dispatcher, agents_to_abort=[]):
showard1be97432008-10-17 15:30:45 +00001925 host = self.get_host()
showard9d9ffd52008-11-09 23:14:35 +00001926 if self.active and host:
showard170873e2009-01-07 00:22:26 +00001927 dispatcher.add_agent(Agent(tasks=host.reverify_tasks()))
showard1be97432008-10-17 15:30:45 +00001928
showard170873e2009-01-07 00:22:26 +00001929 abort_task = AbortTask(self, agents_to_abort)
showard1be97432008-10-17 15:30:45 +00001930 self.set_status('Aborting')
showard170873e2009-01-07 00:22:26 +00001931 dispatcher.add_agent(Agent(tasks=[abort_task], num_processes=0))
1932
1933 def execution_tag(self):
1934 assert self.execution_subdir
1935 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00001936
1937
mbligh36768f02008-02-22 18:28:33 +00001938class Job(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001939 def __init__(self, id=None, row=None):
1940 assert id or row
1941 super(Job, self).__init__(id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00001942
mblighe2586682008-02-29 22:45:46 +00001943
jadmanski0afbb632008-06-06 21:10:57 +00001944 @classmethod
1945 def _get_table(cls):
1946 return 'jobs'
mbligh36768f02008-02-22 18:28:33 +00001947
1948
jadmanski0afbb632008-06-06 21:10:57 +00001949 @classmethod
1950 def _fields(cls):
1951 return ['id', 'owner', 'name', 'priority', 'control_file',
showard2bab8f42008-11-12 18:15:22 +00001952 'control_type', 'created_on', 'synch_count', 'timeout',
showard21baa452008-10-21 00:08:39 +00001953 'run_verify', 'email_list', 'reboot_before', 'reboot_after']
showard04c82c52008-05-29 19:38:12 +00001954
1955
jadmanski0afbb632008-06-06 21:10:57 +00001956 def is_server_job(self):
1957 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00001958
1959
showard170873e2009-01-07 00:22:26 +00001960 def tag(self):
1961 return "%s-%s" % (self.id, self.owner)
1962
1963
jadmanski0afbb632008-06-06 21:10:57 +00001964 def get_host_queue_entries(self):
1965 rows = _db.execute("""
1966 SELECT * FROM host_queue_entries
1967 WHERE job_id= %s
1968 """, (self.id,))
1969 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00001970
jadmanski0afbb632008-06-06 21:10:57 +00001971 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00001972
jadmanski0afbb632008-06-06 21:10:57 +00001973 return entries
mbligh36768f02008-02-22 18:28:33 +00001974
1975
jadmanski0afbb632008-06-06 21:10:57 +00001976 def set_status(self, status, update_queues=False):
1977 self.update_field('status',status)
1978
1979 if update_queues:
1980 for queue_entry in self.get_host_queue_entries():
1981 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00001982
1983
jadmanski0afbb632008-06-06 21:10:57 +00001984 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00001985 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
1986 status='Pending')
1987 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00001988
1989
jadmanski0afbb632008-06-06 21:10:57 +00001990 def num_machines(self, clause = None):
1991 sql = "job_id=%s" % self.id
1992 if clause:
1993 sql += " AND (%s)" % clause
1994 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00001995
1996
jadmanski0afbb632008-06-06 21:10:57 +00001997 def num_queued(self):
1998 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00001999
2000
jadmanski0afbb632008-06-06 21:10:57 +00002001 def num_active(self):
2002 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002003
2004
jadmanski0afbb632008-06-06 21:10:57 +00002005 def num_complete(self):
2006 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002007
2008
jadmanski0afbb632008-06-06 21:10:57 +00002009 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002010 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002011
mbligh36768f02008-02-22 18:28:33 +00002012
showard2bab8f42008-11-12 18:15:22 +00002013 def _stop_all_entries(self, entries_to_abort):
2014 """
2015 queue_entries: sequence of models.HostQueueEntry objects
2016 """
2017 for child_entry in entries_to_abort:
2018 assert not child_entry.complete
2019 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2020 child_entry.host.status = models.Host.Status.READY
2021 child_entry.host.save()
2022 child_entry.status = models.HostQueueEntry.Status.STOPPED
2023 child_entry.save()
2024
2025
2026 def stop_if_necessary(self):
2027 not_yet_run = models.HostQueueEntry.objects.filter(
2028 job=self.id, status__in=(models.HostQueueEntry.Status.QUEUED,
2029 models.HostQueueEntry.Status.VERIFYING,
2030 models.HostQueueEntry.Status.PENDING))
2031 if not_yet_run.count() < self.synch_count:
2032 self._stop_all_entries(not_yet_run)
mblighe2586682008-02-29 22:45:46 +00002033
2034
jadmanski0afbb632008-06-06 21:10:57 +00002035 def write_to_machines_file(self, queue_entry):
2036 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002037 file_path = os.path.join(self.tag(), '.machines')
2038 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002039
2040
showard2bab8f42008-11-12 18:15:22 +00002041 def _next_group_name(self):
2042 query = models.HostQueueEntry.objects.filter(
2043 job=self.id).values('execution_subdir').distinct()
2044 subdirs = (entry['execution_subdir'] for entry in query)
2045 groups = (re.match(r'group(\d+)', subdir) for subdir in subdirs)
2046 ids = [int(match.group(1)) for match in groups if match]
2047 if ids:
2048 next_id = max(ids) + 1
2049 else:
2050 next_id = 0
2051 return "group%d" % next_id
2052
2053
showard170873e2009-01-07 00:22:26 +00002054 def _write_control_file(self, execution_tag):
2055 control_path = _drone_manager.attach_file_to_execution(
2056 execution_tag, self.control_file)
2057 return control_path
mbligh36768f02008-02-22 18:28:33 +00002058
showardb2e2c322008-10-14 17:33:55 +00002059
showard2bab8f42008-11-12 18:15:22 +00002060 def get_group_entries(self, queue_entry_from_group):
2061 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002062 return list(HostQueueEntry.fetch(
2063 where='job_id=%s AND execution_subdir=%s',
2064 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002065
2066
showardb2e2c322008-10-14 17:33:55 +00002067 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002068 assert queue_entries
2069 execution_tag = queue_entries[0].execution_tag()
2070 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002071 hostnames = ','.join([entry.get_host().hostname
2072 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002073
showard170873e2009-01-07 00:22:26 +00002074 params = [_autoserv_path, '-P', execution_tag, '-p', '-n',
2075 '-r', _drone_manager.absolute_path(execution_tag),
2076 '-u', self.owner, '-l', self.name, '-m', hostnames,
2077 _drone_manager.absolute_path(control_path)]
mbligh36768f02008-02-22 18:28:33 +00002078
jadmanski0afbb632008-06-06 21:10:57 +00002079 if not self.is_server_job():
2080 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002081
showardb2e2c322008-10-14 17:33:55 +00002082 return params
mblighe2586682008-02-29 22:45:46 +00002083
mbligh36768f02008-02-22 18:28:33 +00002084
showard2bab8f42008-11-12 18:15:22 +00002085 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002086 do_reboot = False
showard0fc38302008-10-23 00:44:07 +00002087 if self.reboot_before == models.RebootBefore.ALWAYS:
showard21baa452008-10-21 00:08:39 +00002088 do_reboot = True
showard0fc38302008-10-23 00:44:07 +00002089 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showard21baa452008-10-21 00:08:39 +00002090 do_reboot = queue_entry.get_host().dirty
2091
2092 tasks = []
2093 if do_reboot:
showard45ae8192008-11-05 19:32:53 +00002094 tasks.append(CleanupTask(queue_entry=queue_entry))
showard2bab8f42008-11-12 18:15:22 +00002095 tasks.append(VerifyTask(queue_entry=queue_entry))
showard21baa452008-10-21 00:08:39 +00002096 return tasks
2097
2098
showard2bab8f42008-11-12 18:15:22 +00002099 def _assign_new_group(self, queue_entries):
2100 if len(queue_entries) == 1:
2101 group_name = queue_entries[0].get_host().hostname
2102 else:
2103 group_name = self._next_group_name()
2104 print 'Running synchronous job %d hosts %s as %s' % (
2105 self.id, [entry.host.hostname for entry in queue_entries],
2106 group_name)
2107
2108 for queue_entry in queue_entries:
2109 queue_entry.set_execution_subdir(group_name)
2110
2111
2112 def _choose_group_to_run(self, include_queue_entry):
2113 chosen_entries = [include_queue_entry]
2114
2115 num_entries_needed = self.synch_count - 1
2116 if num_entries_needed > 0:
2117 pending_entries = HostQueueEntry.fetch(
2118 where='job_id = %s AND status = "Pending" AND id != %s',
2119 params=(self.id, include_queue_entry.id))
2120 chosen_entries += list(pending_entries)[:num_entries_needed]
2121
2122 self._assign_new_group(chosen_entries)
2123 return chosen_entries
2124
2125
2126 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002127 if not self.is_ready():
showard9976ce92008-10-15 20:28:13 +00002128 if self.run_verify:
showarde58e3f82008-11-20 19:04:59 +00002129 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
showard170873e2009-01-07 00:22:26 +00002130 return Agent(self._get_pre_job_tasks(queue_entry))
showard9976ce92008-10-15 20:28:13 +00002131 else:
2132 return queue_entry.on_pending()
mbligh36768f02008-02-22 18:28:33 +00002133
showard2bab8f42008-11-12 18:15:22 +00002134 queue_entries = self._choose_group_to_run(queue_entry)
2135 return self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00002136
2137
2138 def _finish_run(self, queue_entries, initial_tasks=[]):
showardb2ccdda2008-10-28 20:39:05 +00002139 for queue_entry in queue_entries:
2140 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002141 params = self._get_autoserv_params(queue_entries)
2142 queue_task = QueueTask(job=self, queue_entries=queue_entries,
2143 cmd=params)
2144 tasks = initial_tasks + [queue_task]
2145 entry_ids = [entry.id for entry in queue_entries]
2146
showard170873e2009-01-07 00:22:26 +00002147 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00002148
2149
mbligh36768f02008-02-22 18:28:33 +00002150if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002151 main()