blob: 38af8f61c11863f54a467a634fc48b81d533ae4e [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
mbligh8bcd23a2009-02-03 19:14:06 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showard170873e2009-01-07 00:22:26 +000010import itertools, logging
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000014from autotest_lib.client.common_lib import global_config
showard2bab8f42008-11-12 18:15:22 +000015from autotest_lib.client.common_lib import host_protections, utils, debug
showardb1e51872008-10-07 11:08:18 +000016from autotest_lib.database import database_connection
showard21baa452008-10-21 00:08:39 +000017from autotest_lib.frontend.afe import models
showard170873e2009-01-07 00:22:26 +000018from autotest_lib.scheduler import drone_manager, drones, email_manager
showardd1ee1dd2009-01-07 21:33:08 +000019from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000020
mblighb090f142008-02-27 21:33:46 +000021
mbligh36768f02008-02-22 18:28:33 +000022RESULTS_DIR = '.'
23AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000024DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000025
26AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
27
28if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000029 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000030AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
31AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
32
33if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000034 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000035
mbligh90a549d2008-03-25 23:52:34 +000036# how long to wait for autoserv to write a pidfile
37PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000038
mbligh6f8bab42008-02-29 22:45:14 +000039_db = None
mbligh36768f02008-02-22 18:28:33 +000040_shutdown = False
showard170873e2009-01-07 00:22:26 +000041_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
42_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000043_testing_mode = False
showard542e8402008-09-19 20:16:18 +000044_base_url = None
showardc85c21b2008-11-24 22:17:37 +000045_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000046_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000047
48
49def main():
jadmanski0afbb632008-06-06 21:10:57 +000050 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000051
jadmanski0afbb632008-06-06 21:10:57 +000052 parser = optparse.OptionParser(usage)
53 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
54 action='store_true')
55 parser.add_option('--logfile', help='Set a log file that all stdout ' +
56 'should be redirected to. Stderr will go to this ' +
57 'file + ".err"')
58 parser.add_option('--test', help='Indicate that scheduler is under ' +
59 'test and should use dummy autoserv and no parsing',
60 action='store_true')
61 (options, args) = parser.parse_args()
62 if len(args) != 1:
63 parser.print_usage()
64 return
mbligh36768f02008-02-22 18:28:33 +000065
jadmanski0afbb632008-06-06 21:10:57 +000066 global RESULTS_DIR
67 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +000068
jadmanski0afbb632008-06-06 21:10:57 +000069 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +000070 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
71 "notify_email_statuses",
72 default='')
showardc85c21b2008-11-24 22:17:37 +000073 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +000074 _notify_email_statuses = [status for status in
75 re.split(r'[\s,;:]', notify_statuses_list.lower())
76 if status]
showardc85c21b2008-11-24 22:17:37 +000077
jadmanski0afbb632008-06-06 21:10:57 +000078 if options.test:
79 global _autoserv_path
80 _autoserv_path = 'autoserv_dummy'
81 global _testing_mode
82 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +000083
mbligh37eceaa2008-12-15 22:56:37 +000084 # AUTOTEST_WEB.base_url is still a supported config option as some people
85 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +000086 global _base_url
showard170873e2009-01-07 00:22:26 +000087 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
88 default='')
mbligh37eceaa2008-12-15 22:56:37 +000089 if config_base_url:
90 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +000091 else:
mbligh37eceaa2008-12-15 22:56:37 +000092 # For the common case of everything running on a single server you
93 # can just set the hostname in a single place in the config file.
94 server_name = c.get_config_value('SERVER', 'hostname')
95 if not server_name:
96 print 'Error: [SERVER] hostname missing from the config file.'
97 sys.exit(1)
98 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +000099
showardc5afc462009-01-13 00:09:39 +0000100 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000101 server.start()
102
jadmanski0afbb632008-06-06 21:10:57 +0000103 try:
showardc5afc462009-01-13 00:09:39 +0000104 init(options.logfile)
105 dispatcher = Dispatcher()
106 dispatcher.do_initial_recovery(recover_hosts=options.recover_hosts)
107
jadmanski0afbb632008-06-06 21:10:57 +0000108 while not _shutdown:
109 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000110 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000111 except:
showard170873e2009-01-07 00:22:26 +0000112 email_manager.manager.log_stacktrace(
113 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000114
showard170873e2009-01-07 00:22:26 +0000115 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000116 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000117 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000118 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000119
120
121def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000122 global _shutdown
123 _shutdown = True
124 print "Shutdown request received."
mbligh36768f02008-02-22 18:28:33 +0000125
126
127def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000128 if logfile:
129 enable_logging(logfile)
130 print "%s> dispatcher starting" % time.strftime("%X %x")
131 print "My PID is %d" % os.getpid()
mbligh36768f02008-02-22 18:28:33 +0000132
showardb1e51872008-10-07 11:08:18 +0000133 if _testing_mode:
134 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000135 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000136
jadmanski0afbb632008-06-06 21:10:57 +0000137 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
138 global _db
showard170873e2009-01-07 00:22:26 +0000139 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000140 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000141
showardfa8629c2008-11-04 16:51:23 +0000142 # ensure Django connection is in autocommit
143 setup_django_environment.enable_autocommit()
144
showard2bab8f42008-11-12 18:15:22 +0000145 debug.configure('scheduler', format_string='%(message)s')
showard67831ae2009-01-16 03:07:38 +0000146 debug.get_logger().setLevel(logging.INFO)
showard2bab8f42008-11-12 18:15:22 +0000147
jadmanski0afbb632008-06-06 21:10:57 +0000148 print "Setting signal handler"
149 signal.signal(signal.SIGINT, handle_sigint)
150
showardd1ee1dd2009-01-07 21:33:08 +0000151 drones = global_config.global_config.get_config_value(
152 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
153 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000154 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000155 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000156 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
157
jadmanski0afbb632008-06-06 21:10:57 +0000158 print "Connected! Running..."
mbligh36768f02008-02-22 18:28:33 +0000159
160
161def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000162 out_file = logfile
163 err_file = "%s.err" % logfile
164 print "Enabling logging to %s (%s)" % (out_file, err_file)
165 out_fd = open(out_file, "a", buffering=0)
166 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000167
jadmanski0afbb632008-06-06 21:10:57 +0000168 os.dup2(out_fd.fileno(), sys.stdout.fileno())
169 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000170
jadmanski0afbb632008-06-06 21:10:57 +0000171 sys.stdout = out_fd
172 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000173
174
mblighd5c95802008-03-05 00:33:46 +0000175def queue_entries_to_abort():
jadmanski0afbb632008-06-06 21:10:57 +0000176 rows = _db.execute("""
177 SELECT * FROM host_queue_entries WHERE status='Abort';
178 """)
showard2bab8f42008-11-12 18:15:22 +0000179
jadmanski0afbb632008-06-06 21:10:57 +0000180 qe = [HostQueueEntry(row=i) for i in rows]
181 return qe
mbligh36768f02008-02-22 18:28:33 +0000182
showard7cf9a9b2008-05-15 21:15:52 +0000183
showard63a34772008-08-18 19:32:50 +0000184class HostScheduler(object):
185 def _get_ready_hosts(self):
186 # avoid any host with a currently active queue entry against it
187 hosts = Host.fetch(
188 joins='LEFT JOIN host_queue_entries AS active_hqe '
189 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000190 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000191 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000192 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000193 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
194 return dict((host.id, host) for host in hosts)
195
196
197 @staticmethod
198 def _get_sql_id_list(id_list):
199 return ','.join(str(item_id) for item_id in id_list)
200
201
202 @classmethod
showard989f25d2008-10-01 11:38:11 +0000203 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000204 if not id_list:
205 return {}
showard63a34772008-08-18 19:32:50 +0000206 query %= cls._get_sql_id_list(id_list)
207 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000208 return cls._process_many2many_dict(rows, flip)
209
210
211 @staticmethod
212 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000213 result = {}
214 for row in rows:
215 left_id, right_id = long(row[0]), long(row[1])
showard989f25d2008-10-01 11:38:11 +0000216 if flip:
217 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000218 result.setdefault(left_id, set()).add(right_id)
219 return result
220
221
222 @classmethod
223 def _get_job_acl_groups(cls, job_ids):
224 query = """
showardd9ac4452009-02-07 02:04:37 +0000225 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000226 FROM jobs
227 INNER JOIN users ON users.login = jobs.owner
228 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
229 WHERE jobs.id IN (%s)
230 """
231 return cls._get_many2many_dict(query, job_ids)
232
233
234 @classmethod
235 def _get_job_ineligible_hosts(cls, job_ids):
236 query = """
237 SELECT job_id, host_id
238 FROM ineligible_host_queues
239 WHERE job_id IN (%s)
240 """
241 return cls._get_many2many_dict(query, job_ids)
242
243
244 @classmethod
showard989f25d2008-10-01 11:38:11 +0000245 def _get_job_dependencies(cls, job_ids):
246 query = """
247 SELECT job_id, label_id
248 FROM jobs_dependency_labels
249 WHERE job_id IN (%s)
250 """
251 return cls._get_many2many_dict(query, job_ids)
252
253
254 @classmethod
showard63a34772008-08-18 19:32:50 +0000255 def _get_host_acls(cls, host_ids):
256 query = """
showardd9ac4452009-02-07 02:04:37 +0000257 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000258 FROM acl_groups_hosts
259 WHERE host_id IN (%s)
260 """
261 return cls._get_many2many_dict(query, host_ids)
262
263
264 @classmethod
265 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000266 if not host_ids:
267 return {}, {}
showard63a34772008-08-18 19:32:50 +0000268 query = """
269 SELECT label_id, host_id
270 FROM hosts_labels
271 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000272 """ % cls._get_sql_id_list(host_ids)
273 rows = _db.execute(query)
274 labels_to_hosts = cls._process_many2many_dict(rows)
275 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
276 return labels_to_hosts, hosts_to_labels
277
278
279 @classmethod
280 def _get_labels(cls):
281 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000282
283
284 def refresh(self, pending_queue_entries):
285 self._hosts_available = self._get_ready_hosts()
286
287 relevant_jobs = [queue_entry.job_id
288 for queue_entry in pending_queue_entries]
289 self._job_acls = self._get_job_acl_groups(relevant_jobs)
290 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000291 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000292
293 host_ids = self._hosts_available.keys()
294 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000295 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
296
297 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000298
299
300 def _is_acl_accessible(self, host_id, queue_entry):
301 job_acls = self._job_acls.get(queue_entry.job_id, set())
302 host_acls = self._host_acls.get(host_id, set())
303 return len(host_acls.intersection(job_acls)) > 0
304
305
showard989f25d2008-10-01 11:38:11 +0000306 def _check_job_dependencies(self, job_dependencies, host_labels):
307 missing = job_dependencies - host_labels
308 return len(job_dependencies - host_labels) == 0
309
310
311 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
312 queue_entry):
showardade14e22009-01-26 22:38:32 +0000313 if not queue_entry.meta_host:
314 # bypass only_if_needed labels when a specific host is selected
315 return True
316
showard989f25d2008-10-01 11:38:11 +0000317 for label_id in host_labels:
318 label = self._labels[label_id]
319 if not label.only_if_needed:
320 # we don't care about non-only_if_needed labels
321 continue
322 if queue_entry.meta_host == label_id:
323 # if the label was requested in a metahost it's OK
324 continue
325 if label_id not in job_dependencies:
326 return False
327 return True
328
329
330 def _is_host_eligible_for_job(self, host_id, queue_entry):
331 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
332 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000333
334 acl = self._is_acl_accessible(host_id, queue_entry)
335 deps = self._check_job_dependencies(job_dependencies, host_labels)
336 only_if = self._check_only_if_needed_labels(job_dependencies,
337 host_labels, queue_entry)
338 return acl and deps and only_if
showard989f25d2008-10-01 11:38:11 +0000339
340
showard63a34772008-08-18 19:32:50 +0000341 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000342 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000343 return None
344 return self._hosts_available.pop(queue_entry.host_id, None)
345
346
347 def _is_host_usable(self, host_id):
348 if host_id not in self._hosts_available:
349 # host was already used during this scheduling cycle
350 return False
351 if self._hosts_available[host_id].invalid:
352 # Invalid hosts cannot be used for metahosts. They're included in
353 # the original query because they can be used by non-metahosts.
354 return False
355 return True
356
357
358 def _schedule_metahost(self, queue_entry):
359 label_id = queue_entry.meta_host
360 hosts_in_label = self._label_hosts.get(label_id, set())
361 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
362 set())
363
364 # must iterate over a copy so we can mutate the original while iterating
365 for host_id in list(hosts_in_label):
366 if not self._is_host_usable(host_id):
367 hosts_in_label.remove(host_id)
368 continue
369 if host_id in ineligible_host_ids:
370 continue
showard989f25d2008-10-01 11:38:11 +0000371 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000372 continue
373
374 hosts_in_label.remove(host_id)
375 return self._hosts_available.pop(host_id)
376 return None
377
378
379 def find_eligible_host(self, queue_entry):
380 if not queue_entry.meta_host:
381 return self._schedule_non_metahost(queue_entry)
382 return self._schedule_metahost(queue_entry)
383
384
showard170873e2009-01-07 00:22:26 +0000385class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000386 def __init__(self):
387 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000388 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000389 self._host_scheduler = HostScheduler()
showard170873e2009-01-07 00:22:26 +0000390 self._host_agents = {}
391 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000392
mbligh36768f02008-02-22 18:28:33 +0000393
jadmanski0afbb632008-06-06 21:10:57 +0000394 def do_initial_recovery(self, recover_hosts=True):
395 # always recover processes
396 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000397
jadmanski0afbb632008-06-06 21:10:57 +0000398 if recover_hosts:
399 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000400
401
jadmanski0afbb632008-06-06 21:10:57 +0000402 def tick(self):
showard170873e2009-01-07 00:22:26 +0000403 _drone_manager.refresh()
showarda3ab0d52008-11-03 19:03:47 +0000404 self._run_cleanup_maybe()
jadmanski0afbb632008-06-06 21:10:57 +0000405 self._find_aborting()
406 self._schedule_new_jobs()
407 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000408 _drone_manager.execute_actions()
409 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000410
showard97aed502008-11-04 02:01:24 +0000411
showarda3ab0d52008-11-03 19:03:47 +0000412 def _run_cleanup_maybe(self):
showardd1ee1dd2009-01-07 21:33:08 +0000413 should_cleanup = (self._last_clean_time +
414 scheduler_config.config.clean_interval * 60 <
415 time.time())
416 if should_cleanup:
showarda3ab0d52008-11-03 19:03:47 +0000417 print 'Running cleanup'
418 self._abort_timed_out_jobs()
419 self._abort_jobs_past_synch_start_timeout()
420 self._clear_inactive_blocks()
showardfa8629c2008-11-04 16:51:23 +0000421 self._check_for_db_inconsistencies()
showarda3ab0d52008-11-03 19:03:47 +0000422 self._last_clean_time = time.time()
423
mbligh36768f02008-02-22 18:28:33 +0000424
showard170873e2009-01-07 00:22:26 +0000425 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
426 for object_id in object_ids:
427 agent_dict.setdefault(object_id, set()).add(agent)
428
429
430 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
431 for object_id in object_ids:
432 assert object_id in agent_dict
433 agent_dict[object_id].remove(agent)
434
435
jadmanski0afbb632008-06-06 21:10:57 +0000436 def add_agent(self, agent):
437 self._agents.append(agent)
438 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000439 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
440 self._register_agent_for_ids(self._queue_entry_agents,
441 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000442
showard170873e2009-01-07 00:22:26 +0000443
444 def get_agents_for_entry(self, queue_entry):
445 """
446 Find agents corresponding to the specified queue_entry.
447 """
448 return self._queue_entry_agents.get(queue_entry.id, set())
449
450
451 def host_has_agent(self, host):
452 """
453 Determine if there is currently an Agent present using this host.
454 """
455 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000456
457
jadmanski0afbb632008-06-06 21:10:57 +0000458 def remove_agent(self, agent):
459 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000460 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
461 agent)
462 self._unregister_agent_for_ids(self._queue_entry_agents,
463 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000464
465
showard4c5374f2008-09-04 17:02:56 +0000466 def num_running_processes(self):
467 return sum(agent.num_processes for agent in self._agents
468 if agent.is_running())
mblighbb421852008-03-11 22:36:16 +0000469
470
showard170873e2009-01-07 00:22:26 +0000471 def _extract_execution_tag(self, command_line):
472 match = re.match(r'.* -P (\S+) ', command_line)
473 if not match:
474 return None
475 return match.group(1)
mblighbb421852008-03-11 22:36:16 +0000476
477
showard2bab8f42008-11-12 18:15:22 +0000478 def _recover_queue_entries(self, queue_entries, run_monitor):
479 assert len(queue_entries) > 0
showard2bab8f42008-11-12 18:15:22 +0000480 queue_task = RecoveryQueueTask(job=queue_entries[0].job,
481 queue_entries=queue_entries,
482 run_monitor=run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +0000483 self.add_agent(Agent(tasks=[queue_task],
showard170873e2009-01-07 00:22:26 +0000484 num_processes=len(queue_entries)))
mblighbb421852008-03-11 22:36:16 +0000485
486
jadmanski0afbb632008-06-06 21:10:57 +0000487 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000488 self._register_pidfiles()
489 _drone_manager.refresh()
490 self._recover_running_entries()
491 self._recover_aborting_entries()
492 self._requeue_other_active_entries()
493 self._recover_parsing_entries()
494 self._reverify_remaining_hosts()
495 # reinitialize drones after killing orphaned processes, since they can
496 # leave around files when they die
497 _drone_manager.execute_actions()
498 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000499
showard170873e2009-01-07 00:22:26 +0000500
501 def _register_pidfiles(self):
502 # during recovery we may need to read pidfiles for both running and
503 # parsing entries
504 queue_entries = HostQueueEntry.fetch(
505 where="status IN ('Running', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000506 for queue_entry in queue_entries:
showard170873e2009-01-07 00:22:26 +0000507 pidfile_id = _drone_manager.get_pidfile_id_from(
508 queue_entry.execution_tag())
509 _drone_manager.register_pidfile(pidfile_id)
510
511
512 def _recover_running_entries(self):
513 orphans = _drone_manager.get_orphaned_autoserv_processes()
514
515 queue_entries = HostQueueEntry.fetch(where="status = 'Running'")
516 requeue_entries = []
517 for queue_entry in queue_entries:
518 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000519 # synchronous job we've already recovered
520 continue
showard170873e2009-01-07 00:22:26 +0000521 execution_tag = queue_entry.execution_tag()
522 run_monitor = PidfileRunMonitor()
523 run_monitor.attach_to_existing_process(execution_tag)
524 if not run_monitor.has_process():
525 # autoserv apparently never got run, so let it get requeued
526 continue
showarde788ea62008-11-17 21:02:47 +0000527 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showard170873e2009-01-07 00:22:26 +0000528 print 'Recovering %s (process %s)' % (
529 ', '.join(str(entry) for entry in queue_entries),
530 run_monitor.get_process())
showard2bab8f42008-11-12 18:15:22 +0000531 self._recover_queue_entries(queue_entries, run_monitor)
showard170873e2009-01-07 00:22:26 +0000532 orphans.pop(execution_tag, None)
mbligh90a549d2008-03-25 23:52:34 +0000533
jadmanski0afbb632008-06-06 21:10:57 +0000534 # now kill any remaining autoserv processes
showard170873e2009-01-07 00:22:26 +0000535 for process in orphans.itervalues():
536 print 'Killing orphan %s' % process
537 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000538
showard170873e2009-01-07 00:22:26 +0000539
540 def _recover_aborting_entries(self):
541 queue_entries = HostQueueEntry.fetch(
542 where='status IN ("Abort", "Aborting")')
jadmanski0afbb632008-06-06 21:10:57 +0000543 for queue_entry in queue_entries:
showard170873e2009-01-07 00:22:26 +0000544 print 'Recovering aborting QE %s' % queue_entry
545 agent = queue_entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000546
showard97aed502008-11-04 02:01:24 +0000547
showard170873e2009-01-07 00:22:26 +0000548 def _requeue_other_active_entries(self):
549 queue_entries = HostQueueEntry.fetch(
550 where='active AND NOT complete AND status != "Pending"')
551 for queue_entry in queue_entries:
552 if self.get_agents_for_entry(queue_entry):
553 # entry has already been recovered
554 continue
555 print 'Requeuing active QE %s (status=%s)' % (queue_entry,
556 queue_entry.status)
557 if queue_entry.host:
558 tasks = queue_entry.host.reverify_tasks()
559 self.add_agent(Agent(tasks))
560 agent = queue_entry.requeue()
561
562
563 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000564 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000565 self._reverify_hosts_where("""(status = 'Repairing' OR
566 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000567 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000568
showard170873e2009-01-07 00:22:26 +0000569 # recover "Running" hosts with no active queue entries, although this
570 # should never happen
571 message = ('Recovering running host %s - this probably indicates a '
572 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000573 self._reverify_hosts_where("""status = 'Running' AND
574 id NOT IN (SELECT host_id
575 FROM host_queue_entries
576 WHERE active)""",
577 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000578
579
jadmanski0afbb632008-06-06 21:10:57 +0000580 def _reverify_hosts_where(self, where,
showard170873e2009-01-07 00:22:26 +0000581 print_message='Reverifying host %s'):
582 full_where='locked = 0 AND invalid = 0 AND ' + where
583 for host in Host.fetch(where=full_where):
584 if self.host_has_agent(host):
585 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000586 continue
showard170873e2009-01-07 00:22:26 +0000587 if print_message:
jadmanski0afbb632008-06-06 21:10:57 +0000588 print print_message % host.hostname
showard170873e2009-01-07 00:22:26 +0000589 tasks = host.reverify_tasks()
590 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000591
592
showard97aed502008-11-04 02:01:24 +0000593 def _recover_parsing_entries(self):
showard2bab8f42008-11-12 18:15:22 +0000594 recovered_entry_ids = set()
showard97aed502008-11-04 02:01:24 +0000595 for entry in HostQueueEntry.fetch(where='status = "Parsing"'):
showard2bab8f42008-11-12 18:15:22 +0000596 if entry.id in recovered_entry_ids:
597 continue
598 queue_entries = entry.job.get_group_entries(entry)
showard170873e2009-01-07 00:22:26 +0000599 recovered_entry_ids = recovered_entry_ids.union(
600 entry.id for entry in queue_entries)
601 print 'Recovering parsing entries %s' % (
602 ', '.join(str(entry) for entry in queue_entries))
showard97aed502008-11-04 02:01:24 +0000603
604 reparse_task = FinalReparseTask(queue_entries)
showard170873e2009-01-07 00:22:26 +0000605 self.add_agent(Agent([reparse_task], num_processes=0))
showard97aed502008-11-04 02:01:24 +0000606
607
jadmanski0afbb632008-06-06 21:10:57 +0000608 def _recover_hosts(self):
609 # recover "Repair Failed" hosts
610 message = 'Reverifying dead host %s'
611 self._reverify_hosts_where("status = 'Repair Failed'",
612 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000613
614
showard3bb499f2008-07-03 19:42:20 +0000615 def _abort_timed_out_jobs(self):
616 """
617 Aborts all jobs that have timed out and not completed
618 """
showarda3ab0d52008-11-03 19:03:47 +0000619 query = models.Job.objects.filter(hostqueueentry__complete=False).extra(
620 where=['created_on + INTERVAL timeout HOUR < NOW()'])
621 for job in query.distinct():
622 print 'Aborting job %d due to job timeout' % job.id
623 job.abort(None)
showard3bb499f2008-07-03 19:42:20 +0000624
625
showard98863972008-10-29 21:14:56 +0000626 def _abort_jobs_past_synch_start_timeout(self):
627 """
628 Abort synchronous jobs that are past the start timeout (from global
629 config) and are holding a machine that's in everyone.
630 """
631 timeout_delta = datetime.timedelta(
showardd1ee1dd2009-01-07 21:33:08 +0000632 minutes=scheduler_config.config.synch_job_start_timeout_minutes)
showard98863972008-10-29 21:14:56 +0000633 timeout_start = datetime.datetime.now() - timeout_delta
634 query = models.Job.objects.filter(
showard98863972008-10-29 21:14:56 +0000635 created_on__lt=timeout_start,
636 hostqueueentry__status='Pending',
showardd9ac4452009-02-07 02:04:37 +0000637 hostqueueentry__host__aclgroup__name='Everyone')
showard98863972008-10-29 21:14:56 +0000638 for job in query.distinct():
639 print 'Aborting job %d due to start timeout' % job.id
showardff059d72008-12-03 18:18:53 +0000640 entries_to_abort = job.hostqueueentry_set.exclude(
641 status=models.HostQueueEntry.Status.RUNNING)
642 for queue_entry in entries_to_abort:
643 queue_entry.abort(None)
showard98863972008-10-29 21:14:56 +0000644
645
jadmanski0afbb632008-06-06 21:10:57 +0000646 def _clear_inactive_blocks(self):
647 """
648 Clear out blocks for all completed jobs.
649 """
650 # this would be simpler using NOT IN (subquery), but MySQL
651 # treats all IN subqueries as dependent, so this optimizes much
652 # better
653 _db.execute("""
654 DELETE ihq FROM ineligible_host_queues ihq
showard4eaaf522008-06-06 22:28:07 +0000655 LEFT JOIN (SELECT DISTINCT job_id FROM host_queue_entries
jadmanski0afbb632008-06-06 21:10:57 +0000656 WHERE NOT complete) hqe
657 USING (job_id) WHERE hqe.job_id IS NULL""")
showard04c82c52008-05-29 19:38:12 +0000658
659
showardb95b1bd2008-08-15 18:11:04 +0000660 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000661 # prioritize by job priority, then non-metahost over metahost, then FIFO
662 return list(HostQueueEntry.fetch(
showardac9ce222008-12-03 18:19:44 +0000663 where='NOT complete AND NOT active AND status="Queued"',
showard3dd6b882008-10-27 19:21:39 +0000664 order_by='priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000665
666
jadmanski0afbb632008-06-06 21:10:57 +0000667 def _schedule_new_jobs(self):
showard63a34772008-08-18 19:32:50 +0000668 queue_entries = self._get_pending_queue_entries()
669 if not queue_entries:
showardb95b1bd2008-08-15 18:11:04 +0000670 return
showardb95b1bd2008-08-15 18:11:04 +0000671
showard63a34772008-08-18 19:32:50 +0000672 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000673
showard63a34772008-08-18 19:32:50 +0000674 for queue_entry in queue_entries:
675 assigned_host = self._host_scheduler.find_eligible_host(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000676 if not assigned_host:
jadmanski0afbb632008-06-06 21:10:57 +0000677 continue
showardb95b1bd2008-08-15 18:11:04 +0000678 self._run_queue_entry(queue_entry, assigned_host)
679
680
681 def _run_queue_entry(self, queue_entry, host):
682 agent = queue_entry.run(assigned_host=host)
showard170873e2009-01-07 00:22:26 +0000683 # in some cases (synchronous jobs with run_verify=False), agent may be
684 # None
showard9976ce92008-10-15 20:28:13 +0000685 if agent:
686 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000687
688
jadmanski0afbb632008-06-06 21:10:57 +0000689 def _find_aborting(self):
jadmanski0afbb632008-06-06 21:10:57 +0000690 for entry in queue_entries_to_abort():
showard170873e2009-01-07 00:22:26 +0000691 agents_to_abort = list(self.get_agents_for_entry(entry))
showard1be97432008-10-17 15:30:45 +0000692 for agent in agents_to_abort:
693 self.remove_agent(agent)
694
showard170873e2009-01-07 00:22:26 +0000695 entry.abort(self, agents_to_abort)
jadmanski0afbb632008-06-06 21:10:57 +0000696
697
showard324bf812009-01-20 23:23:38 +0000698 def _can_start_agent(self, agent, num_started_this_cycle,
699 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000700 # always allow zero-process agents to run
701 if agent.num_processes == 0:
702 return True
703 # don't allow any nonzero-process agents to run after we've reached a
704 # limit (this avoids starvation of many-process agents)
705 if have_reached_limit:
706 return False
707 # total process throttling
showard324bf812009-01-20 23:23:38 +0000708 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000709 return False
710 # if a single agent exceeds the per-cycle throttling, still allow it to
711 # run when it's the first agent in the cycle
712 if num_started_this_cycle == 0:
713 return True
714 # per-cycle throttling
715 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000716 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000717 return False
718 return True
719
720
jadmanski0afbb632008-06-06 21:10:57 +0000721 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000722 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000723 have_reached_limit = False
724 # iterate over copy, so we can remove agents during iteration
725 for agent in list(self._agents):
726 if agent.is_done():
jadmanski0afbb632008-06-06 21:10:57 +0000727 print "agent finished"
showard170873e2009-01-07 00:22:26 +0000728 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000729 continue
730 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000731 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000732 have_reached_limit):
733 have_reached_limit = True
734 continue
showard4c5374f2008-09-04 17:02:56 +0000735 num_started_this_cycle += agent.num_processes
736 agent.tick()
showard324bf812009-01-20 23:23:38 +0000737 print _drone_manager.total_running_processes(), 'running processes'
mbligh36768f02008-02-22 18:28:33 +0000738
739
showardfa8629c2008-11-04 16:51:23 +0000740 def _check_for_db_inconsistencies(self):
741 query = models.HostQueueEntry.objects.filter(active=True, complete=True)
742 if query.count() != 0:
743 subject = ('%d queue entries found with active=complete=1'
744 % query.count())
745 message = '\n'.join(str(entry.get_object_dict())
746 for entry in query[:50])
747 if len(query) > 50:
748 message += '\n(truncated)\n'
749
750 print subject
showard170873e2009-01-07 00:22:26 +0000751 email_manager.manager.enqueue_notify_email(subject, message)
showardfa8629c2008-11-04 16:51:23 +0000752
753
showard170873e2009-01-07 00:22:26 +0000754class PidfileRunMonitor(object):
755 """
756 Client must call either run() to start a new process or
757 attach_to_existing_process().
758 """
mbligh36768f02008-02-22 18:28:33 +0000759
showard170873e2009-01-07 00:22:26 +0000760 class _PidfileException(Exception):
761 """
762 Raised when there's some unexpected behavior with the pid file, but only
763 used internally (never allowed to escape this class).
764 """
mbligh36768f02008-02-22 18:28:33 +0000765
766
showard170873e2009-01-07 00:22:26 +0000767 def __init__(self):
768 self._lost_process = False
769 self._start_time = None
770 self.pidfile_id = None
771 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +0000772
773
showard170873e2009-01-07 00:22:26 +0000774 def _add_nice_command(self, command, nice_level):
775 if not nice_level:
776 return command
777 return ['nice', '-n', str(nice_level)] + command
778
779
780 def _set_start_time(self):
781 self._start_time = time.time()
782
783
784 def run(self, command, working_directory, nice_level=None, log_file=None,
785 pidfile_name=None, paired_with_pidfile=None):
786 assert command is not None
787 if nice_level is not None:
788 command = ['nice', '-n', str(nice_level)] + command
789 self._set_start_time()
790 self.pidfile_id = _drone_manager.execute_command(
791 command, working_directory, log_file=log_file,
792 pidfile_name=pidfile_name, paired_with_pidfile=paired_with_pidfile)
793
794
795 def attach_to_existing_process(self, execution_tag):
796 self._set_start_time()
797 self.pidfile_id = _drone_manager.get_pidfile_id_from(execution_tag)
798 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +0000799
800
jadmanski0afbb632008-06-06 21:10:57 +0000801 def kill(self):
showard170873e2009-01-07 00:22:26 +0000802 if self.has_process():
803 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +0000804
mbligh36768f02008-02-22 18:28:33 +0000805
showard170873e2009-01-07 00:22:26 +0000806 def has_process(self):
showard21baa452008-10-21 00:08:39 +0000807 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +0000808 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +0000809
810
showard170873e2009-01-07 00:22:26 +0000811 def get_process(self):
showard21baa452008-10-21 00:08:39 +0000812 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +0000813 assert self.has_process()
814 return self._state.process
mblighbb421852008-03-11 22:36:16 +0000815
816
showard170873e2009-01-07 00:22:26 +0000817 def _read_pidfile(self, use_second_read=False):
818 assert self.pidfile_id is not None, (
819 'You must call run() or attach_to_existing_process()')
820 contents = _drone_manager.get_pidfile_contents(
821 self.pidfile_id, use_second_read=use_second_read)
822 if contents.is_invalid():
823 self._state = drone_manager.PidfileContents()
824 raise self._PidfileException(contents)
825 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +0000826
827
showard21baa452008-10-21 00:08:39 +0000828 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +0000829 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
830 self._state.process, self.pidfile_id, message)
showard21baa452008-10-21 00:08:39 +0000831 print message
showard170873e2009-01-07 00:22:26 +0000832 email_manager.manager.enqueue_notify_email(error, message)
833 if self._state.process is not None:
834 process = self._state.process
showard21baa452008-10-21 00:08:39 +0000835 else:
showard170873e2009-01-07 00:22:26 +0000836 process = _drone_manager.get_dummy_process()
837 self.on_lost_process(process)
showard21baa452008-10-21 00:08:39 +0000838
839
840 def _get_pidfile_info_helper(self):
showard170873e2009-01-07 00:22:26 +0000841 if self._lost_process:
showard21baa452008-10-21 00:08:39 +0000842 return
mblighbb421852008-03-11 22:36:16 +0000843
showard21baa452008-10-21 00:08:39 +0000844 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +0000845
showard170873e2009-01-07 00:22:26 +0000846 if self._state.process is None:
847 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +0000848 return
mbligh90a549d2008-03-25 23:52:34 +0000849
showard21baa452008-10-21 00:08:39 +0000850 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000851 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +0000852 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +0000853 return
mbligh90a549d2008-03-25 23:52:34 +0000854
showard170873e2009-01-07 00:22:26 +0000855 # pid but no running process - maybe process *just* exited
856 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +0000857 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000858 # autoserv exited without writing an exit code
859 # to the pidfile
showard21baa452008-10-21 00:08:39 +0000860 self._handle_pidfile_error(
861 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +0000862
showard21baa452008-10-21 00:08:39 +0000863
864 def _get_pidfile_info(self):
865 """\
866 After completion, self._state will contain:
867 pid=None, exit_status=None if autoserv has not yet run
868 pid!=None, exit_status=None if autoserv is running
869 pid!=None, exit_status!=None if autoserv has completed
870 """
871 try:
872 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +0000873 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +0000874 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +0000875
876
showard170873e2009-01-07 00:22:26 +0000877 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +0000878 """\
879 Called when no pidfile is found or no pid is in the pidfile.
880 """
showard170873e2009-01-07 00:22:26 +0000881 message = 'No pid found at %s' % self.pidfile_id
jadmanski0afbb632008-06-06 21:10:57 +0000882 print message
showard170873e2009-01-07 00:22:26 +0000883 if time.time() - self._start_time > PIDFILE_TIMEOUT:
884 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +0000885 'Process has failed to write pidfile', message)
showard170873e2009-01-07 00:22:26 +0000886 self.on_lost_process(_drone_manager.get_dummy_process())
mbligh90a549d2008-03-25 23:52:34 +0000887
888
showard170873e2009-01-07 00:22:26 +0000889 def on_lost_process(self, process):
jadmanski0afbb632008-06-06 21:10:57 +0000890 """\
891 Called when autoserv has exited without writing an exit status,
892 or we've timed out waiting for autoserv to write a pid to the
893 pidfile. In either case, we just return failure and the caller
894 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +0000895
showard170873e2009-01-07 00:22:26 +0000896 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +0000897 """
898 self.lost_process = True
showard170873e2009-01-07 00:22:26 +0000899 self._state.process = process
showard21baa452008-10-21 00:08:39 +0000900 self._state.exit_status = 1
901 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +0000902
903
jadmanski0afbb632008-06-06 21:10:57 +0000904 def exit_code(self):
showard21baa452008-10-21 00:08:39 +0000905 self._get_pidfile_info()
906 return self._state.exit_status
907
908
909 def num_tests_failed(self):
910 self._get_pidfile_info()
911 assert self._state.num_tests_failed is not None
912 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +0000913
914
mbligh36768f02008-02-22 18:28:33 +0000915class Agent(object):
showard170873e2009-01-07 00:22:26 +0000916 def __init__(self, tasks, num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +0000917 self.active_task = None
918 self.queue = Queue.Queue(0)
919 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +0000920 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +0000921
showard170873e2009-01-07 00:22:26 +0000922 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
923 for task in tasks)
924 self.host_ids = self._union_ids(task.host_ids for task in tasks)
925
jadmanski0afbb632008-06-06 21:10:57 +0000926 for task in tasks:
927 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +0000928
929
showard170873e2009-01-07 00:22:26 +0000930 def _union_ids(self, id_lists):
931 return set(itertools.chain(*id_lists))
932
933
jadmanski0afbb632008-06-06 21:10:57 +0000934 def add_task(self, task):
935 self.queue.put_nowait(task)
936 task.agent = self
mbligh36768f02008-02-22 18:28:33 +0000937
938
jadmanski0afbb632008-06-06 21:10:57 +0000939 def tick(self):
showard21baa452008-10-21 00:08:39 +0000940 while not self.is_done():
941 if self.active_task and not self.active_task.is_done():
942 self.active_task.poll()
943 if not self.active_task.is_done():
944 return
945 self._next_task()
mbligh36768f02008-02-22 18:28:33 +0000946
947
jadmanski0afbb632008-06-06 21:10:57 +0000948 def _next_task(self):
949 print "agent picking task"
950 if self.active_task:
951 assert self.active_task.is_done()
mbligh36768f02008-02-22 18:28:33 +0000952
jadmanski0afbb632008-06-06 21:10:57 +0000953 if not self.active_task.success:
954 self.on_task_failure()
mblighe2586682008-02-29 22:45:46 +0000955
jadmanski0afbb632008-06-06 21:10:57 +0000956 self.active_task = None
957 if not self.is_done():
958 self.active_task = self.queue.get_nowait()
959 if self.active_task:
960 self.active_task.start()
mbligh36768f02008-02-22 18:28:33 +0000961
962
jadmanski0afbb632008-06-06 21:10:57 +0000963 def on_task_failure(self):
964 self.queue = Queue.Queue(0)
965 for task in self.active_task.failure_tasks:
966 self.add_task(task)
mbligh16c722d2008-03-05 00:58:44 +0000967
mblighe2586682008-02-29 22:45:46 +0000968
showard4c5374f2008-09-04 17:02:56 +0000969 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +0000970 return self.active_task is not None
showardec113162008-05-08 00:52:49 +0000971
972
jadmanski0afbb632008-06-06 21:10:57 +0000973 def is_done(self):
mblighd876f452008-12-03 15:09:17 +0000974 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +0000975
976
jadmanski0afbb632008-06-06 21:10:57 +0000977 def start(self):
978 assert self.dispatcher
mbligh36768f02008-02-22 18:28:33 +0000979
jadmanski0afbb632008-06-06 21:10:57 +0000980 self._next_task()
mbligh36768f02008-02-22 18:28:33 +0000981
jadmanski0afbb632008-06-06 21:10:57 +0000982
mbligh36768f02008-02-22 18:28:33 +0000983class AgentTask(object):
showard170873e2009-01-07 00:22:26 +0000984 def __init__(self, cmd, working_directory=None, failure_tasks=[]):
jadmanski0afbb632008-06-06 21:10:57 +0000985 self.done = False
986 self.failure_tasks = failure_tasks
987 self.started = False
988 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +0000989 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +0000990 self.task = None
991 self.agent = None
992 self.monitor = None
993 self.success = None
showard170873e2009-01-07 00:22:26 +0000994 self.queue_entry_ids = []
995 self.host_ids = []
996 self.log_file = None
997
998
999 def _set_ids(self, host=None, queue_entries=None):
1000 if queue_entries and queue_entries != [None]:
1001 self.host_ids = [entry.host.id for entry in queue_entries]
1002 self.queue_entry_ids = [entry.id for entry in queue_entries]
1003 else:
1004 assert host
1005 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001006
1007
jadmanski0afbb632008-06-06 21:10:57 +00001008 def poll(self):
jadmanski0afbb632008-06-06 21:10:57 +00001009 if self.monitor:
1010 self.tick(self.monitor.exit_code())
1011 else:
1012 self.finished(False)
mbligh36768f02008-02-22 18:28:33 +00001013
1014
jadmanski0afbb632008-06-06 21:10:57 +00001015 def tick(self, exit_code):
showard170873e2009-01-07 00:22:26 +00001016 if exit_code is None:
jadmanski0afbb632008-06-06 21:10:57 +00001017 return
jadmanski0afbb632008-06-06 21:10:57 +00001018 if exit_code == 0:
1019 success = True
1020 else:
1021 success = False
mbligh36768f02008-02-22 18:28:33 +00001022
jadmanski0afbb632008-06-06 21:10:57 +00001023 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001024
1025
jadmanski0afbb632008-06-06 21:10:57 +00001026 def is_done(self):
1027 return self.done
mbligh36768f02008-02-22 18:28:33 +00001028
1029
jadmanski0afbb632008-06-06 21:10:57 +00001030 def finished(self, success):
1031 self.done = True
1032 self.success = success
1033 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001034
1035
jadmanski0afbb632008-06-06 21:10:57 +00001036 def prolog(self):
1037 pass
mblighd64e5702008-04-04 21:39:28 +00001038
1039
jadmanski0afbb632008-06-06 21:10:57 +00001040 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001041 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001042
mbligh36768f02008-02-22 18:28:33 +00001043
jadmanski0afbb632008-06-06 21:10:57 +00001044 def cleanup(self):
showard170873e2009-01-07 00:22:26 +00001045 if self.monitor and self.log_file:
1046 _drone_manager.copy_to_results_repository(
1047 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001048
1049
jadmanski0afbb632008-06-06 21:10:57 +00001050 def epilog(self):
1051 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001052
1053
jadmanski0afbb632008-06-06 21:10:57 +00001054 def start(self):
1055 assert self.agent
1056
1057 if not self.started:
1058 self.prolog()
1059 self.run()
1060
1061 self.started = True
1062
1063
1064 def abort(self):
1065 if self.monitor:
1066 self.monitor.kill()
1067 self.done = True
1068 self.cleanup()
1069
1070
showard170873e2009-01-07 00:22:26 +00001071 def set_host_log_file(self, base_name, host):
1072 filename = '%s.%s' % (time.time(), base_name)
1073 self.log_file = os.path.join('hosts', host.hostname, filename)
1074
1075
showardde634ee2009-01-30 01:44:24 +00001076 def _get_consistent_execution_tag(self, queue_entries):
1077 first_execution_tag = queue_entries[0].execution_tag()
1078 for queue_entry in queue_entries[1:]:
1079 assert queue_entry.execution_tag() == first_execution_tag, (
1080 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1081 queue_entry,
1082 first_execution_tag,
1083 queue_entries[0]))
1084 return first_execution_tag
1085
1086
showard678df4f2009-02-04 21:36:39 +00001087 def _copy_and_parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001088 assert len(queue_entries) > 0
1089 assert self.monitor
1090 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001091 results_path = execution_tag + '/'
1092 _drone_manager.copy_to_results_repository(self.monitor.get_process(),
1093 results_path)
showardde634ee2009-01-30 01:44:24 +00001094
1095 reparse_task = FinalReparseTask(queue_entries)
1096 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1097
1098
jadmanski0afbb632008-06-06 21:10:57 +00001099 def run(self):
1100 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001101 self.monitor = PidfileRunMonitor()
1102 self.monitor.run(self.cmd, self._working_directory,
1103 nice_level=AUTOSERV_NICE_LEVEL,
1104 log_file=self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001105
1106
1107class RepairTask(AgentTask):
showarde788ea62008-11-17 21:02:47 +00001108 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001109 """\
showard170873e2009-01-07 00:22:26 +00001110 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001111 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001112 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001113 # normalize the protection name
1114 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001115
jadmanski0afbb632008-06-06 21:10:57 +00001116 self.host = host
showarde788ea62008-11-17 21:02:47 +00001117 self.queue_entry = queue_entry
showard170873e2009-01-07 00:22:26 +00001118 self._set_ids(host=host, queue_entries=[queue_entry])
1119
1120 self.create_temp_resultsdir('.repair')
1121 cmd = [_autoserv_path , '-p', '-R', '-m', host.hostname,
1122 '-r', _drone_manager.absolute_path(self.temp_results_dir),
1123 '--host-protection', protection]
1124 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1125
1126 self._set_ids(host=host, queue_entries=[queue_entry])
1127 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001128
mbligh36768f02008-02-22 18:28:33 +00001129
jadmanski0afbb632008-06-06 21:10:57 +00001130 def prolog(self):
1131 print "repair_task starting"
1132 self.host.set_status('Repairing')
showarde788ea62008-11-17 21:02:47 +00001133 if self.queue_entry:
1134 self.queue_entry.requeue()
mbligh36768f02008-02-22 18:28:33 +00001135
1136
showardde634ee2009-01-30 01:44:24 +00001137 def _fail_queue_entry(self):
1138 assert self.queue_entry
1139 self.queue_entry.set_execution_subdir()
showard678df4f2009-02-04 21:36:39 +00001140 # copy results logs into the normal place for job results
1141 _drone_manager.copy_results_on_drone(
1142 self.monitor.get_process(),
1143 source_path=self.temp_results_dir + '/',
1144 destination_path=self.queue_entry.execution_tag() + '/')
1145
1146 self._copy_and_parse_results([self.queue_entry])
showardde634ee2009-01-30 01:44:24 +00001147 self.queue_entry.handle_host_failure()
1148
1149
jadmanski0afbb632008-06-06 21:10:57 +00001150 def epilog(self):
1151 super(RepairTask, self).epilog()
1152 if self.success:
1153 self.host.set_status('Ready')
1154 else:
1155 self.host.set_status('Repair Failed')
showarde788ea62008-11-17 21:02:47 +00001156 if self.queue_entry and not self.queue_entry.meta_host:
showardde634ee2009-01-30 01:44:24 +00001157 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001158
1159
showard8fe93b52008-11-18 17:53:22 +00001160class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001161 def epilog(self):
1162 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001163 should_copy_results = (self.queue_entry and not self.success
1164 and not self.queue_entry.meta_host)
1165 if should_copy_results:
1166 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001167 destination = os.path.join(self.queue_entry.execution_tag(),
1168 os.path.basename(self.log_file))
1169 _drone_manager.copy_to_results_repository(
1170 self.monitor.get_process(), self.log_file,
1171 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001172
1173
1174class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001175 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001176 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001177 self.host = host or queue_entry.host
1178 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001179
jadmanski0afbb632008-06-06 21:10:57 +00001180 self.create_temp_resultsdir('.verify')
showard170873e2009-01-07 00:22:26 +00001181 cmd = [_autoserv_path, '-p', '-v', '-m', self.host.hostname, '-r',
1182 _drone_manager.absolute_path(self.temp_results_dir)]
showarde788ea62008-11-17 21:02:47 +00001183 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001184 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1185 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001186
showard170873e2009-01-07 00:22:26 +00001187 self.set_host_log_file('verify', self.host)
1188 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001189
1190
jadmanski0afbb632008-06-06 21:10:57 +00001191 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001192 super(VerifyTask, self).prolog()
jadmanski0afbb632008-06-06 21:10:57 +00001193 print "starting verify on %s" % (self.host.hostname)
1194 if self.queue_entry:
1195 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001196 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001197
1198
jadmanski0afbb632008-06-06 21:10:57 +00001199 def epilog(self):
1200 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001201
jadmanski0afbb632008-06-06 21:10:57 +00001202 if self.success:
1203 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001204
1205
mbligh36768f02008-02-22 18:28:33 +00001206class QueueTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001207 def __init__(self, job, queue_entries, cmd):
jadmanski0afbb632008-06-06 21:10:57 +00001208 self.job = job
1209 self.queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001210 super(QueueTask, self).__init__(cmd, self._execution_tag())
1211 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001212
1213
showard170873e2009-01-07 00:22:26 +00001214 def _format_keyval(self, key, value):
1215 return '%s=%s' % (key, value)
mbligh36768f02008-02-22 18:28:33 +00001216
1217
showard73ec0442009-02-07 02:05:20 +00001218 def _keyval_path(self):
1219 return os.path.join(self._execution_tag(), 'keyval')
1220
1221
1222 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1223 keyval_contents = '\n'.join(self._format_keyval(key, value)
1224 for key, value in keyval_dict.iteritems())
1225 # always end with a newline to allow additional keyvals to be written
1226 keyval_contents += '\n'
1227 _drone_manager.attach_file_to_execution(self._execution_tag(),
1228 keyval_contents,
1229 file_path=keyval_path)
1230
1231
1232 def _write_keyvals_before_job(self, keyval_dict):
1233 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1234
1235
1236 def _write_keyval_after_job(self, field, value):
showard170873e2009-01-07 00:22:26 +00001237 assert self.monitor and self.monitor.has_process()
1238 paired_with_pidfile = self.monitor.pidfile_id
1239 _drone_manager.write_lines_to_file(
showard73ec0442009-02-07 02:05:20 +00001240 self._keyval_path(), [self._format_keyval(field, value)],
showard170873e2009-01-07 00:22:26 +00001241 paired_with_pidfile=paired_with_pidfile)
showardd8e548a2008-09-09 03:04:57 +00001242
1243
showard170873e2009-01-07 00:22:26 +00001244 def _write_host_keyvals(self, host):
1245 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1246 host.hostname)
1247 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001248 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1249 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001250
1251
showard170873e2009-01-07 00:22:26 +00001252 def _execution_tag(self):
1253 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001254
1255
jadmanski0afbb632008-06-06 21:10:57 +00001256 def prolog(self):
showard73ec0442009-02-07 02:05:20 +00001257 queued = int(time.mktime(self.job.created_on.timetuple()))
1258 self._write_keyvals_before_job({'job_queued': queued})
jadmanski0afbb632008-06-06 21:10:57 +00001259 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001260 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001261 queue_entry.set_status('Running')
1262 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001263 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001264 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001265 assert len(self.queue_entries) == 1
1266 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001267
1268
showard97aed502008-11-04 02:01:24 +00001269 def _finish_task(self, success):
showard73ec0442009-02-07 02:05:20 +00001270 finished = int(time.time())
1271 self._write_keyval_after_job("job_finished", finished)
showard678df4f2009-02-04 21:36:39 +00001272 self._copy_and_parse_results(self.queue_entries)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001273
1274
showardcbd74612008-11-19 21:42:02 +00001275 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001276 _drone_manager.write_lines_to_file(
1277 os.path.join(self._execution_tag(), 'status.log'),
1278 ['INFO\t----\t----\t' + comment],
1279 paired_with_pidfile=self.monitor.pidfile_id)
showardcbd74612008-11-19 21:42:02 +00001280
1281
jadmanskif7fa2cc2008-10-01 14:13:23 +00001282 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001283 if not self.monitor or not self.monitor.has_process():
1284 return
1285
jadmanskif7fa2cc2008-10-01 14:13:23 +00001286 # build up sets of all the aborted_by and aborted_on values
1287 aborted_by, aborted_on = set(), set()
1288 for queue_entry in self.queue_entries:
1289 if queue_entry.aborted_by:
1290 aborted_by.add(queue_entry.aborted_by)
1291 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1292 aborted_on.add(t)
1293
1294 # extract some actual, unique aborted by value and write it out
1295 assert len(aborted_by) <= 1
1296 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001297 aborted_by_value = aborted_by.pop()
1298 aborted_on_value = max(aborted_on)
1299 else:
1300 aborted_by_value = 'autotest_system'
1301 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001302
showarda0382352009-02-11 23:36:43 +00001303 self._write_keyval_after_job("aborted_by", aborted_by_value)
1304 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001305
showardcbd74612008-11-19 21:42:02 +00001306 aborted_on_string = str(datetime.datetime.fromtimestamp(
1307 aborted_on_value))
1308 self._write_status_comment('Job aborted by %s on %s' %
1309 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001310
1311
jadmanski0afbb632008-06-06 21:10:57 +00001312 def abort(self):
1313 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001314 self._log_abort()
showard97aed502008-11-04 02:01:24 +00001315 self._finish_task(False)
jadmanskic2ac77f2008-05-16 21:44:04 +00001316
1317
showard21baa452008-10-21 00:08:39 +00001318 def _reboot_hosts(self):
1319 reboot_after = self.job.reboot_after
1320 do_reboot = False
showard0fc38302008-10-23 00:44:07 +00001321 if reboot_after == models.RebootAfter.ALWAYS:
showard21baa452008-10-21 00:08:39 +00001322 do_reboot = True
showard0fc38302008-10-23 00:44:07 +00001323 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
showard21baa452008-10-21 00:08:39 +00001324 num_tests_failed = self.monitor.num_tests_failed()
1325 do_reboot = (self.success and num_tests_failed == 0)
1326
showard8ebca792008-11-04 21:54:22 +00001327 for queue_entry in self.queue_entries:
1328 if do_reboot:
showard45ae8192008-11-05 19:32:53 +00001329 # don't pass the queue entry to the CleanupTask. if the cleanup
showardfa8629c2008-11-04 16:51:23 +00001330 # fails, the job doesn't care -- it's over.
showard45ae8192008-11-05 19:32:53 +00001331 cleanup_task = CleanupTask(host=queue_entry.get_host())
1332 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
showard8ebca792008-11-04 21:54:22 +00001333 else:
1334 queue_entry.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001335
1336
jadmanski0afbb632008-06-06 21:10:57 +00001337 def epilog(self):
1338 super(QueueTask, self).epilog()
showard97aed502008-11-04 02:01:24 +00001339 self._finish_task(self.success)
showard21baa452008-10-21 00:08:39 +00001340 self._reboot_hosts()
mblighbb421852008-03-11 22:36:16 +00001341
showard97aed502008-11-04 02:01:24 +00001342 print "queue_task finished with succes=%s" % self.success
mbligh36768f02008-02-22 18:28:33 +00001343
1344
mblighbb421852008-03-11 22:36:16 +00001345class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001346 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001347 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
jadmanski0afbb632008-06-06 21:10:57 +00001348 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001349
1350
jadmanski0afbb632008-06-06 21:10:57 +00001351 def run(self):
1352 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001353
1354
jadmanski0afbb632008-06-06 21:10:57 +00001355 def prolog(self):
1356 # recovering an existing process - don't do prolog
1357 pass
mblighbb421852008-03-11 22:36:16 +00001358
1359
showard8fe93b52008-11-18 17:53:22 +00001360class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001361 def __init__(self, host=None, queue_entry=None):
1362 assert bool(host) ^ bool(queue_entry)
1363 if queue_entry:
1364 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001365 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001366 self.host = host
showard170873e2009-01-07 00:22:26 +00001367
1368 self.create_temp_resultsdir('.cleanup')
1369 self.cmd = [_autoserv_path, '-p', '--cleanup', '-m', host.hostname,
1370 '-r', _drone_manager.absolute_path(self.temp_results_dir)]
showarde788ea62008-11-17 21:02:47 +00001371 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001372 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1373 failure_tasks=[repair_task])
1374
1375 self._set_ids(host=host, queue_entries=[queue_entry])
1376 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001377
mblighd5c95802008-03-05 00:33:46 +00001378
jadmanski0afbb632008-06-06 21:10:57 +00001379 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001380 super(CleanupTask, self).prolog()
showard45ae8192008-11-05 19:32:53 +00001381 print "starting cleanup task for host: %s" % self.host.hostname
1382 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001383
mblighd5c95802008-03-05 00:33:46 +00001384
showard21baa452008-10-21 00:08:39 +00001385 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001386 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001387 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001388 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001389 self.host.update_field('dirty', 0)
1390
1391
mblighd5c95802008-03-05 00:33:46 +00001392class AbortTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001393 def __init__(self, queue_entry, agents_to_abort):
jadmanski0afbb632008-06-06 21:10:57 +00001394 super(AbortTask, self).__init__('')
showard170873e2009-01-07 00:22:26 +00001395 self.queue_entry = queue_entry
1396 # don't use _set_ids, since we don't want to set the host_ids
1397 self.queue_entry_ids = [queue_entry.id]
1398 self.agents_to_abort = agents_to_abort
mbligh36768f02008-02-22 18:28:33 +00001399
1400
jadmanski0afbb632008-06-06 21:10:57 +00001401 def prolog(self):
1402 print "starting abort on host %s, job %s" % (
1403 self.queue_entry.host_id, self.queue_entry.job_id)
mbligh36768f02008-02-22 18:28:33 +00001404
mblighd64e5702008-04-04 21:39:28 +00001405
jadmanski0afbb632008-06-06 21:10:57 +00001406 def epilog(self):
1407 super(AbortTask, self).epilog()
1408 self.queue_entry.set_status('Aborted')
1409 self.success = True
1410
1411
1412 def run(self):
1413 for agent in self.agents_to_abort:
1414 if (agent.active_task):
1415 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +00001416
1417
showard97aed502008-11-04 02:01:24 +00001418class FinalReparseTask(AgentTask):
showard97aed502008-11-04 02:01:24 +00001419 _num_running_parses = 0
1420
1421 def __init__(self, queue_entries):
1422 self._queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001423 # don't use _set_ids, since we don't want to set the host_ids
1424 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00001425 self._parse_started = False
1426
1427 assert len(queue_entries) > 0
1428 queue_entry = queue_entries[0]
showard97aed502008-11-04 02:01:24 +00001429
showard170873e2009-01-07 00:22:26 +00001430 self._execution_tag = queue_entry.execution_tag()
1431 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1432 self._autoserv_monitor = PidfileRunMonitor()
1433 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1434 self._final_status = self._determine_final_status()
1435
showard97aed502008-11-04 02:01:24 +00001436 if _testing_mode:
1437 self.cmd = 'true'
showard170873e2009-01-07 00:22:26 +00001438 else:
1439 super(FinalReparseTask, self).__init__(
1440 cmd=self._generate_parse_command(),
1441 working_directory=self._execution_tag)
showard97aed502008-11-04 02:01:24 +00001442
showard170873e2009-01-07 00:22:26 +00001443 self.log_file = os.path.join(self._execution_tag, '.parse.log')
showard97aed502008-11-04 02:01:24 +00001444
1445
1446 @classmethod
1447 def _increment_running_parses(cls):
1448 cls._num_running_parses += 1
1449
1450
1451 @classmethod
1452 def _decrement_running_parses(cls):
1453 cls._num_running_parses -= 1
1454
1455
1456 @classmethod
1457 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00001458 return (cls._num_running_parses <
1459 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00001460
1461
showard170873e2009-01-07 00:22:26 +00001462 def _determine_final_status(self):
1463 # we'll use a PidfileRunMonitor to read the autoserv exit status
1464 if self._autoserv_monitor.exit_code() == 0:
1465 return models.HostQueueEntry.Status.COMPLETED
1466 return models.HostQueueEntry.Status.FAILED
1467
1468
showard97aed502008-11-04 02:01:24 +00001469 def prolog(self):
1470 super(FinalReparseTask, self).prolog()
1471 for queue_entry in self._queue_entries:
1472 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
1473
1474
1475 def epilog(self):
1476 super(FinalReparseTask, self).epilog()
showard97aed502008-11-04 02:01:24 +00001477 for queue_entry in self._queue_entries:
showard170873e2009-01-07 00:22:26 +00001478 queue_entry.set_status(self._final_status)
showard97aed502008-11-04 02:01:24 +00001479
1480
showard2bab8f42008-11-12 18:15:22 +00001481 def _generate_parse_command(self):
showard170873e2009-01-07 00:22:26 +00001482 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
1483 self._results_dir]
showard97aed502008-11-04 02:01:24 +00001484
1485
1486 def poll(self):
1487 # override poll to keep trying to start until the parse count goes down
1488 # and we can, at which point we revert to default behavior
1489 if self._parse_started:
1490 super(FinalReparseTask, self).poll()
1491 else:
1492 self._try_starting_parse()
1493
1494
1495 def run(self):
1496 # override run() to not actually run unless we can
1497 self._try_starting_parse()
1498
1499
1500 def _try_starting_parse(self):
1501 if not self._can_run_new_parse():
1502 return
showard170873e2009-01-07 00:22:26 +00001503
showard678df4f2009-02-04 21:36:39 +00001504 # make sure we actually have results to parse
1505 if not self._autoserv_monitor.has_process():
1506 email_manager.manager.enqueue_notify_email(
1507 'No results to parse',
1508 'No results to parse at %s' % self._autoserv_monitor.pidfile_id)
1509 self.finished(False)
1510 return
1511
showard97aed502008-11-04 02:01:24 +00001512 # actually run the parse command
showard170873e2009-01-07 00:22:26 +00001513 self.monitor = PidfileRunMonitor()
1514 self.monitor.run(self.cmd, self._working_directory,
1515 log_file=self.log_file,
1516 pidfile_name='.parser_execute',
1517 paired_with_pidfile=self._autoserv_monitor.pidfile_id)
1518
showard97aed502008-11-04 02:01:24 +00001519 self._increment_running_parses()
1520 self._parse_started = True
1521
1522
1523 def finished(self, success):
1524 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00001525 if self._parse_started:
1526 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00001527
1528
showardc9ae1782009-01-30 01:42:37 +00001529class SetEntryPendingTask(AgentTask):
1530 def __init__(self, queue_entry):
1531 super(SetEntryPendingTask, self).__init__(cmd='')
1532 self._queue_entry = queue_entry
1533 self._set_ids(queue_entries=[queue_entry])
1534
1535
1536 def run(self):
1537 agent = self._queue_entry.on_pending()
1538 if agent:
1539 self.agent.dispatcher.add_agent(agent)
1540 self.finished(True)
1541
1542
mbligh36768f02008-02-22 18:28:33 +00001543class DBObject(object):
jadmanski0afbb632008-06-06 21:10:57 +00001544 def __init__(self, id=None, row=None, new_record=False):
1545 assert (bool(id) != bool(row))
mbligh36768f02008-02-22 18:28:33 +00001546
jadmanski0afbb632008-06-06 21:10:57 +00001547 self.__table = self._get_table()
mbligh36768f02008-02-22 18:28:33 +00001548
jadmanski0afbb632008-06-06 21:10:57 +00001549 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00001550
jadmanski0afbb632008-06-06 21:10:57 +00001551 if row is None:
1552 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
1553 rows = _db.execute(sql, (id,))
1554 if len(rows) == 0:
1555 raise "row not found (table=%s, id=%s)" % \
1556 (self.__table, id)
1557 row = rows[0]
mbligh36768f02008-02-22 18:28:33 +00001558
showard2bab8f42008-11-12 18:15:22 +00001559 self._update_fields_from_row(row)
1560
1561
1562 def _update_fields_from_row(self, row):
jadmanski0afbb632008-06-06 21:10:57 +00001563 assert len(row) == self.num_cols(), (
1564 "table = %s, row = %s/%d, fields = %s/%d" % (
showard2bab8f42008-11-12 18:15:22 +00001565 self.__table, row, len(row), self._fields(), self.num_cols()))
mbligh36768f02008-02-22 18:28:33 +00001566
showard2bab8f42008-11-12 18:15:22 +00001567 self._valid_fields = set()
1568 for field, value in zip(self._fields(), row):
1569 setattr(self, field, value)
1570 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00001571
showard2bab8f42008-11-12 18:15:22 +00001572 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00001573
mblighe2586682008-02-29 22:45:46 +00001574
jadmanski0afbb632008-06-06 21:10:57 +00001575 @classmethod
1576 def _get_table(cls):
1577 raise NotImplementedError('Subclasses must override this')
mblighe2586682008-02-29 22:45:46 +00001578
1579
jadmanski0afbb632008-06-06 21:10:57 +00001580 @classmethod
1581 def _fields(cls):
1582 raise NotImplementedError('Subclasses must override this')
showard04c82c52008-05-29 19:38:12 +00001583
1584
jadmanski0afbb632008-06-06 21:10:57 +00001585 @classmethod
1586 def num_cols(cls):
1587 return len(cls._fields())
showard04c82c52008-05-29 19:38:12 +00001588
1589
jadmanski0afbb632008-06-06 21:10:57 +00001590 def count(self, where, table = None):
1591 if not table:
1592 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00001593
jadmanski0afbb632008-06-06 21:10:57 +00001594 rows = _db.execute("""
1595 SELECT count(*) FROM %s
1596 WHERE %s
1597 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00001598
jadmanski0afbb632008-06-06 21:10:57 +00001599 assert len(rows) == 1
1600
1601 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001602
1603
mblighf8c624d2008-07-03 16:58:45 +00001604 def update_field(self, field, value, condition=''):
showard2bab8f42008-11-12 18:15:22 +00001605 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00001606
showard2bab8f42008-11-12 18:15:22 +00001607 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00001608 return
mbligh36768f02008-02-22 18:28:33 +00001609
mblighf8c624d2008-07-03 16:58:45 +00001610 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
1611 if condition:
1612 query += ' AND (%s)' % condition
jadmanski0afbb632008-06-06 21:10:57 +00001613 _db.execute(query, (value, self.id))
1614
showard2bab8f42008-11-12 18:15:22 +00001615 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00001616
1617
jadmanski0afbb632008-06-06 21:10:57 +00001618 def save(self):
1619 if self.__new_record:
1620 keys = self._fields()[1:] # avoid id
1621 columns = ','.join([str(key) for key in keys])
1622 values = ['"%s"' % self.__dict__[key] for key in keys]
1623 values = ','.join(values)
1624 query = """INSERT INTO %s (%s) VALUES (%s)""" % \
1625 (self.__table, columns, values)
1626 _db.execute(query)
mbligh36768f02008-02-22 18:28:33 +00001627
1628
jadmanski0afbb632008-06-06 21:10:57 +00001629 def delete(self):
1630 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
1631 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00001632
1633
showard63a34772008-08-18 19:32:50 +00001634 @staticmethod
1635 def _prefix_with(string, prefix):
1636 if string:
1637 string = prefix + string
1638 return string
1639
1640
jadmanski0afbb632008-06-06 21:10:57 +00001641 @classmethod
showard989f25d2008-10-01 11:38:11 +00001642 def fetch(cls, where='', params=(), joins='', order_by=''):
showard63a34772008-08-18 19:32:50 +00001643 order_by = cls._prefix_with(order_by, 'ORDER BY ')
1644 where = cls._prefix_with(where, 'WHERE ')
1645 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
1646 '%(where)s %(order_by)s' % {'table' : cls._get_table(),
1647 'joins' : joins,
1648 'where' : where,
1649 'order_by' : order_by})
1650 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00001651 for row in rows:
1652 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00001653
mbligh36768f02008-02-22 18:28:33 +00001654
1655class IneligibleHostQueue(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001656 def __init__(self, id=None, row=None, new_record=None):
1657 super(IneligibleHostQueue, self).__init__(id=id, row=row,
1658 new_record=new_record)
mblighe2586682008-02-29 22:45:46 +00001659
1660
jadmanski0afbb632008-06-06 21:10:57 +00001661 @classmethod
1662 def _get_table(cls):
1663 return 'ineligible_host_queues'
mbligh36768f02008-02-22 18:28:33 +00001664
1665
jadmanski0afbb632008-06-06 21:10:57 +00001666 @classmethod
1667 def _fields(cls):
1668 return ['id', 'job_id', 'host_id']
showard04c82c52008-05-29 19:38:12 +00001669
1670
showard989f25d2008-10-01 11:38:11 +00001671class Label(DBObject):
1672 @classmethod
1673 def _get_table(cls):
1674 return 'labels'
1675
1676
1677 @classmethod
1678 def _fields(cls):
1679 return ['id', 'name', 'kernel_config', 'platform', 'invalid',
1680 'only_if_needed']
1681
1682
mbligh36768f02008-02-22 18:28:33 +00001683class Host(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001684 def __init__(self, id=None, row=None):
1685 super(Host, self).__init__(id=id, row=row)
mblighe2586682008-02-29 22:45:46 +00001686
1687
jadmanski0afbb632008-06-06 21:10:57 +00001688 @classmethod
1689 def _get_table(cls):
1690 return 'hosts'
mbligh36768f02008-02-22 18:28:33 +00001691
1692
jadmanski0afbb632008-06-06 21:10:57 +00001693 @classmethod
1694 def _fields(cls):
1695 return ['id', 'hostname', 'locked', 'synch_id','status',
showard21baa452008-10-21 00:08:39 +00001696 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty']
showard04c82c52008-05-29 19:38:12 +00001697
1698
jadmanski0afbb632008-06-06 21:10:57 +00001699 def current_task(self):
1700 rows = _db.execute("""
1701 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
1702 """, (self.id,))
1703
1704 if len(rows) == 0:
1705 return None
1706 else:
1707 assert len(rows) == 1
1708 results = rows[0];
mblighf8c624d2008-07-03 16:58:45 +00001709# print "current = %s" % results
jadmanski0afbb632008-06-06 21:10:57 +00001710 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00001711
1712
jadmanski0afbb632008-06-06 21:10:57 +00001713 def yield_work(self):
1714 print "%s yielding work" % self.hostname
1715 if self.current_task():
1716 self.current_task().requeue()
1717
1718 def set_status(self,status):
1719 print '%s -> %s' % (self.hostname, status)
1720 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00001721
1722
showard170873e2009-01-07 00:22:26 +00001723 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00001724 """
showard170873e2009-01-07 00:22:26 +00001725 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00001726 """
1727 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00001728 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00001729 FROM labels
1730 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00001731 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00001732 ORDER BY labels.name
1733 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00001734 platform = None
1735 all_labels = []
1736 for label_name, is_platform in rows:
1737 if is_platform:
1738 platform = label_name
1739 all_labels.append(label_name)
1740 return platform, all_labels
1741
1742
1743 def reverify_tasks(self):
1744 cleanup_task = CleanupTask(host=self)
1745 verify_task = VerifyTask(host=self)
1746 # just to make sure this host does not get taken away
1747 self.set_status('Cleaning')
1748 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00001749
1750
mbligh36768f02008-02-22 18:28:33 +00001751class HostQueueEntry(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001752 def __init__(self, id=None, row=None):
1753 assert id or row
1754 super(HostQueueEntry, self).__init__(id=id, row=row)
1755 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00001756
jadmanski0afbb632008-06-06 21:10:57 +00001757 if self.host_id:
1758 self.host = Host(self.host_id)
1759 else:
1760 self.host = None
mbligh36768f02008-02-22 18:28:33 +00001761
showard170873e2009-01-07 00:22:26 +00001762 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00001763 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00001764
1765
jadmanski0afbb632008-06-06 21:10:57 +00001766 @classmethod
1767 def _get_table(cls):
1768 return 'host_queue_entries'
mblighe2586682008-02-29 22:45:46 +00001769
1770
jadmanski0afbb632008-06-06 21:10:57 +00001771 @classmethod
1772 def _fields(cls):
showard2bab8f42008-11-12 18:15:22 +00001773 return ['id', 'job_id', 'host_id', 'priority', 'status', 'meta_host',
1774 'active', 'complete', 'deleted', 'execution_subdir']
showard04c82c52008-05-29 19:38:12 +00001775
1776
showardc85c21b2008-11-24 22:17:37 +00001777 def _view_job_url(self):
1778 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
1779
1780
jadmanski0afbb632008-06-06 21:10:57 +00001781 def set_host(self, host):
1782 if host:
1783 self.queue_log_record('Assigning host ' + host.hostname)
1784 self.update_field('host_id', host.id)
1785 self.update_field('active', True)
1786 self.block_host(host.id)
1787 else:
1788 self.queue_log_record('Releasing host')
1789 self.unblock_host(self.host.id)
1790 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00001791
jadmanski0afbb632008-06-06 21:10:57 +00001792 self.host = host
mbligh36768f02008-02-22 18:28:33 +00001793
1794
jadmanski0afbb632008-06-06 21:10:57 +00001795 def get_host(self):
1796 return self.host
mbligh36768f02008-02-22 18:28:33 +00001797
1798
jadmanski0afbb632008-06-06 21:10:57 +00001799 def queue_log_record(self, log_line):
1800 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00001801 _drone_manager.write_lines_to_file(self.queue_log_path,
1802 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00001803
1804
jadmanski0afbb632008-06-06 21:10:57 +00001805 def block_host(self, host_id):
1806 print "creating block %s/%s" % (self.job.id, host_id)
1807 row = [0, self.job.id, host_id]
1808 block = IneligibleHostQueue(row=row, new_record=True)
1809 block.save()
mblighe2586682008-02-29 22:45:46 +00001810
1811
jadmanski0afbb632008-06-06 21:10:57 +00001812 def unblock_host(self, host_id):
1813 print "removing block %s/%s" % (self.job.id, host_id)
1814 blocks = IneligibleHostQueue.fetch(
1815 'job_id=%d and host_id=%d' % (self.job.id, host_id))
1816 for block in blocks:
1817 block.delete()
mblighe2586682008-02-29 22:45:46 +00001818
1819
showard2bab8f42008-11-12 18:15:22 +00001820 def set_execution_subdir(self, subdir=None):
1821 if subdir is None:
1822 assert self.get_host()
1823 subdir = self.get_host().hostname
1824 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00001825
1826
showard6355f6b2008-12-05 18:52:13 +00001827 def _get_hostname(self):
1828 if self.host:
1829 return self.host.hostname
1830 return 'no host'
1831
1832
showard170873e2009-01-07 00:22:26 +00001833 def __str__(self):
1834 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
1835
1836
jadmanski0afbb632008-06-06 21:10:57 +00001837 def set_status(self, status):
mblighf8c624d2008-07-03 16:58:45 +00001838 abort_statuses = ['Abort', 'Aborting', 'Aborted']
1839 if status not in abort_statuses:
1840 condition = ' AND '.join(['status <> "%s"' % x
1841 for x in abort_statuses])
1842 else:
1843 condition = ''
1844 self.update_field('status', status, condition=condition)
1845
showard170873e2009-01-07 00:22:26 +00001846 print "%s -> %s" % (self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00001847
showardc85c21b2008-11-24 22:17:37 +00001848 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00001849 self.update_field('complete', False)
1850 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00001851
jadmanski0afbb632008-06-06 21:10:57 +00001852 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showarde58e3f82008-11-20 19:04:59 +00001853 'Aborting']:
jadmanski0afbb632008-06-06 21:10:57 +00001854 self.update_field('complete', False)
1855 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00001856
showardc85c21b2008-11-24 22:17:37 +00001857 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00001858 self.update_field('complete', True)
1859 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00001860
1861 should_email_status = (status.lower() in _notify_email_statuses or
1862 'all' in _notify_email_statuses)
1863 if should_email_status:
1864 self._email_on_status(status)
1865
1866 self._email_on_job_complete()
1867
1868
1869 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00001870 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00001871
1872 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
1873 self.job.id, self.job.name, hostname, status)
1874 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
1875 self.job.id, self.job.name, hostname, status,
1876 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00001877 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00001878
1879
1880 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00001881 if not self.job.is_finished():
1882 return
showard542e8402008-09-19 20:16:18 +00001883
showardc85c21b2008-11-24 22:17:37 +00001884 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00001885 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00001886 for queue_entry in hosts_queue:
1887 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00001888 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00001889 queue_entry.status))
1890
1891 summary_text = "\n".join(summary_text)
1892 status_counts = models.Job.objects.get_status_counts(
1893 [self.job.id])[self.job.id]
1894 status = ', '.join('%d %s' % (count, status) for status, count
1895 in status_counts.iteritems())
1896
1897 subject = 'Autotest: Job ID: %s "%s" %s' % (
1898 self.job.id, self.job.name, status)
1899 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
1900 self.job.id, self.job.name, status, self._view_job_url(),
1901 summary_text)
showard170873e2009-01-07 00:22:26 +00001902 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00001903
1904
jadmanski0afbb632008-06-06 21:10:57 +00001905 def run(self,assigned_host=None):
1906 if self.meta_host:
1907 assert assigned_host
1908 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00001909 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00001910
jadmanski0afbb632008-06-06 21:10:57 +00001911 print "%s/%s scheduled on %s, status=%s" % (self.job.name,
1912 self.meta_host, self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00001913
jadmanski0afbb632008-06-06 21:10:57 +00001914 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00001915
jadmanski0afbb632008-06-06 21:10:57 +00001916 def requeue(self):
1917 self.set_status('Queued')
showardde634ee2009-01-30 01:44:24 +00001918 # verify/cleanup failure sets the execution subdir, so reset it here
1919 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00001920 if self.meta_host:
1921 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00001922
1923
jadmanski0afbb632008-06-06 21:10:57 +00001924 def handle_host_failure(self):
1925 """\
1926 Called when this queue entry's host has failed verification and
1927 repair.
1928 """
1929 assert not self.meta_host
1930 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00001931 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00001932
1933
jadmanskif7fa2cc2008-10-01 14:13:23 +00001934 @property
1935 def aborted_by(self):
1936 self._load_abort_info()
1937 return self._aborted_by
1938
1939
1940 @property
1941 def aborted_on(self):
1942 self._load_abort_info()
1943 return self._aborted_on
1944
1945
1946 def _load_abort_info(self):
1947 """ Fetch info about who aborted the job. """
1948 if hasattr(self, "_aborted_by"):
1949 return
1950 rows = _db.execute("""
1951 SELECT users.login, aborted_host_queue_entries.aborted_on
1952 FROM aborted_host_queue_entries
1953 INNER JOIN users
1954 ON users.id = aborted_host_queue_entries.aborted_by_id
1955 WHERE aborted_host_queue_entries.queue_entry_id = %s
1956 """, (self.id,))
1957 if rows:
1958 self._aborted_by, self._aborted_on = rows[0]
1959 else:
1960 self._aborted_by = self._aborted_on = None
1961
1962
showardb2e2c322008-10-14 17:33:55 +00001963 def on_pending(self):
1964 """
1965 Called when an entry in a synchronous job has passed verify. If the
1966 job is ready to run, returns an agent to run the job. Returns None
1967 otherwise.
1968 """
1969 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00001970 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00001971 if self.job.is_ready():
1972 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00001973 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00001974 return None
1975
1976
showard170873e2009-01-07 00:22:26 +00001977 def abort(self, dispatcher, agents_to_abort=[]):
showard1be97432008-10-17 15:30:45 +00001978 host = self.get_host()
showard9d9ffd52008-11-09 23:14:35 +00001979 if self.active and host:
showard170873e2009-01-07 00:22:26 +00001980 dispatcher.add_agent(Agent(tasks=host.reverify_tasks()))
showard1be97432008-10-17 15:30:45 +00001981
showard170873e2009-01-07 00:22:26 +00001982 abort_task = AbortTask(self, agents_to_abort)
showard1be97432008-10-17 15:30:45 +00001983 self.set_status('Aborting')
showard170873e2009-01-07 00:22:26 +00001984 dispatcher.add_agent(Agent(tasks=[abort_task], num_processes=0))
1985
1986 def execution_tag(self):
1987 assert self.execution_subdir
1988 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00001989
1990
mbligh36768f02008-02-22 18:28:33 +00001991class Job(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001992 def __init__(self, id=None, row=None):
1993 assert id or row
1994 super(Job, self).__init__(id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00001995
mblighe2586682008-02-29 22:45:46 +00001996
jadmanski0afbb632008-06-06 21:10:57 +00001997 @classmethod
1998 def _get_table(cls):
1999 return 'jobs'
mbligh36768f02008-02-22 18:28:33 +00002000
2001
jadmanski0afbb632008-06-06 21:10:57 +00002002 @classmethod
2003 def _fields(cls):
2004 return ['id', 'owner', 'name', 'priority', 'control_file',
showard2bab8f42008-11-12 18:15:22 +00002005 'control_type', 'created_on', 'synch_count', 'timeout',
showard21baa452008-10-21 00:08:39 +00002006 'run_verify', 'email_list', 'reboot_before', 'reboot_after']
showard04c82c52008-05-29 19:38:12 +00002007
2008
jadmanski0afbb632008-06-06 21:10:57 +00002009 def is_server_job(self):
2010 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002011
2012
showard170873e2009-01-07 00:22:26 +00002013 def tag(self):
2014 return "%s-%s" % (self.id, self.owner)
2015
2016
jadmanski0afbb632008-06-06 21:10:57 +00002017 def get_host_queue_entries(self):
2018 rows = _db.execute("""
2019 SELECT * FROM host_queue_entries
2020 WHERE job_id= %s
2021 """, (self.id,))
2022 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002023
jadmanski0afbb632008-06-06 21:10:57 +00002024 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002025
jadmanski0afbb632008-06-06 21:10:57 +00002026 return entries
mbligh36768f02008-02-22 18:28:33 +00002027
2028
jadmanski0afbb632008-06-06 21:10:57 +00002029 def set_status(self, status, update_queues=False):
2030 self.update_field('status',status)
2031
2032 if update_queues:
2033 for queue_entry in self.get_host_queue_entries():
2034 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002035
2036
jadmanski0afbb632008-06-06 21:10:57 +00002037 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00002038 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
2039 status='Pending')
2040 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00002041
2042
jadmanski0afbb632008-06-06 21:10:57 +00002043 def num_machines(self, clause = None):
2044 sql = "job_id=%s" % self.id
2045 if clause:
2046 sql += " AND (%s)" % clause
2047 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002048
2049
jadmanski0afbb632008-06-06 21:10:57 +00002050 def num_queued(self):
2051 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002052
2053
jadmanski0afbb632008-06-06 21:10:57 +00002054 def num_active(self):
2055 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002056
2057
jadmanski0afbb632008-06-06 21:10:57 +00002058 def num_complete(self):
2059 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002060
2061
jadmanski0afbb632008-06-06 21:10:57 +00002062 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002063 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002064
mbligh36768f02008-02-22 18:28:33 +00002065
showard6bb7c292009-01-30 01:44:51 +00002066 def _not_yet_run_entries(self, include_verifying=True):
2067 statuses = [models.HostQueueEntry.Status.QUEUED,
2068 models.HostQueueEntry.Status.PENDING]
2069 if include_verifying:
2070 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2071 return models.HostQueueEntry.objects.filter(job=self.id,
2072 status__in=statuses)
2073
2074
2075 def _stop_all_entries(self):
2076 entries_to_stop = self._not_yet_run_entries(
2077 include_verifying=False)
2078 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002079 assert not child_entry.complete, (
2080 '%s status=%s, active=%s, complete=%s' %
2081 (child_entry.id, child_entry.status, child_entry.active,
2082 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002083 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2084 child_entry.host.status = models.Host.Status.READY
2085 child_entry.host.save()
2086 child_entry.status = models.HostQueueEntry.Status.STOPPED
2087 child_entry.save()
2088
showard2bab8f42008-11-12 18:15:22 +00002089 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002090 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002091 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002092 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002093
2094
jadmanski0afbb632008-06-06 21:10:57 +00002095 def write_to_machines_file(self, queue_entry):
2096 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002097 file_path = os.path.join(self.tag(), '.machines')
2098 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002099
2100
showard2bab8f42008-11-12 18:15:22 +00002101 def _next_group_name(self):
2102 query = models.HostQueueEntry.objects.filter(
2103 job=self.id).values('execution_subdir').distinct()
2104 subdirs = (entry['execution_subdir'] for entry in query)
2105 groups = (re.match(r'group(\d+)', subdir) for subdir in subdirs)
2106 ids = [int(match.group(1)) for match in groups if match]
2107 if ids:
2108 next_id = max(ids) + 1
2109 else:
2110 next_id = 0
2111 return "group%d" % next_id
2112
2113
showard170873e2009-01-07 00:22:26 +00002114 def _write_control_file(self, execution_tag):
2115 control_path = _drone_manager.attach_file_to_execution(
2116 execution_tag, self.control_file)
2117 return control_path
mbligh36768f02008-02-22 18:28:33 +00002118
showardb2e2c322008-10-14 17:33:55 +00002119
showard2bab8f42008-11-12 18:15:22 +00002120 def get_group_entries(self, queue_entry_from_group):
2121 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002122 return list(HostQueueEntry.fetch(
2123 where='job_id=%s AND execution_subdir=%s',
2124 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002125
2126
showardb2e2c322008-10-14 17:33:55 +00002127 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002128 assert queue_entries
2129 execution_tag = queue_entries[0].execution_tag()
2130 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002131 hostnames = ','.join([entry.get_host().hostname
2132 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002133
showard170873e2009-01-07 00:22:26 +00002134 params = [_autoserv_path, '-P', execution_tag, '-p', '-n',
2135 '-r', _drone_manager.absolute_path(execution_tag),
2136 '-u', self.owner, '-l', self.name, '-m', hostnames,
2137 _drone_manager.absolute_path(control_path)]
mbligh36768f02008-02-22 18:28:33 +00002138
jadmanski0afbb632008-06-06 21:10:57 +00002139 if not self.is_server_job():
2140 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002141
showardb2e2c322008-10-14 17:33:55 +00002142 return params
mblighe2586682008-02-29 22:45:46 +00002143
mbligh36768f02008-02-22 18:28:33 +00002144
showardc9ae1782009-01-30 01:42:37 +00002145 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002146 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002147 return True
showard0fc38302008-10-23 00:44:07 +00002148 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002149 return queue_entry.get_host().dirty
2150 return False
showard21baa452008-10-21 00:08:39 +00002151
showardc9ae1782009-01-30 01:42:37 +00002152
2153 def _should_run_verify(self, queue_entry):
2154 do_not_verify = (queue_entry.host.protection ==
2155 host_protections.Protection.DO_NOT_VERIFY)
2156 if do_not_verify:
2157 return False
2158 return self.run_verify
2159
2160
2161 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002162 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002163 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002164 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002165 if self._should_run_verify(queue_entry):
2166 tasks.append(VerifyTask(queue_entry=queue_entry))
2167 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002168 return tasks
2169
2170
showard2bab8f42008-11-12 18:15:22 +00002171 def _assign_new_group(self, queue_entries):
2172 if len(queue_entries) == 1:
2173 group_name = queue_entries[0].get_host().hostname
2174 else:
2175 group_name = self._next_group_name()
2176 print 'Running synchronous job %d hosts %s as %s' % (
2177 self.id, [entry.host.hostname for entry in queue_entries],
2178 group_name)
2179
2180 for queue_entry in queue_entries:
2181 queue_entry.set_execution_subdir(group_name)
2182
2183
2184 def _choose_group_to_run(self, include_queue_entry):
2185 chosen_entries = [include_queue_entry]
2186
2187 num_entries_needed = self.synch_count - 1
2188 if num_entries_needed > 0:
2189 pending_entries = HostQueueEntry.fetch(
2190 where='job_id = %s AND status = "Pending" AND id != %s',
2191 params=(self.id, include_queue_entry.id))
2192 chosen_entries += list(pending_entries)[:num_entries_needed]
2193
2194 self._assign_new_group(chosen_entries)
2195 return chosen_entries
2196
2197
2198 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002199 if not self.is_ready():
showardc9ae1782009-01-30 01:42:37 +00002200 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2201 return Agent(self._get_pre_job_tasks(queue_entry))
mbligh36768f02008-02-22 18:28:33 +00002202
showard2bab8f42008-11-12 18:15:22 +00002203 queue_entries = self._choose_group_to_run(queue_entry)
2204 return self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00002205
2206
2207 def _finish_run(self, queue_entries, initial_tasks=[]):
showardb2ccdda2008-10-28 20:39:05 +00002208 for queue_entry in queue_entries:
2209 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002210 params = self._get_autoserv_params(queue_entries)
2211 queue_task = QueueTask(job=self, queue_entries=queue_entries,
2212 cmd=params)
2213 tasks = initial_tasks + [queue_task]
2214 entry_ids = [entry.id for entry in queue_entries]
2215
showard170873e2009-01-07 00:22:26 +00002216 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00002217
2218
mbligh36768f02008-02-22 18:28:33 +00002219if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002220 main()