blob: 75724f382b83c0849b01ff2ed5194134ab4c0ff1 [file] [log] [blame]
showard170873e2009-01-07 00:22:26 +00001import os, re, shutil, signal, subprocess, errno, time, heapq, traceback
showardb18134f2009-03-20 20:52:18 +00002import common, logging
showardc5afc462009-01-13 00:09:39 +00003from autotest_lib.client.common_lib import error, global_config
showard170873e2009-01-07 00:22:26 +00004from autotest_lib.scheduler import email_manager, drone_utility, drones
showard324bf812009-01-20 23:23:38 +00005from autotest_lib.scheduler import scheduler_config
showard170873e2009-01-07 00:22:26 +00006
showard170873e2009-01-07 00:22:26 +00007
showardc75fded2009-10-14 16:20:02 +00008# results on drones will be placed under the drone_installation_directory in a
9# directory with this name
10_DRONE_RESULTS_DIR_SUFFIX = 'results'
11
showarded2afea2009-07-07 20:54:07 +000012WORKING_DIRECTORY = object() # see execute_command()
13
showard8d3dbca2009-09-25 20:29:38 +000014
jamesrenc44ae992010-02-19 00:12:54 +000015AUTOSERV_PID_FILE = '.autoserv_execute'
16CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
17PARSER_PID_FILE = '.parser_execute'
18ARCHIVER_PID_FILE = '.archiver_execute'
19
20ALL_PIDFILE_NAMES = (AUTOSERV_PID_FILE, CRASHINFO_PID_FILE, PARSER_PID_FILE,
21 ARCHIVER_PID_FILE)
22
23
showard170873e2009-01-07 00:22:26 +000024class DroneManagerError(Exception):
25 pass
26
27
28class CustomEquals(object):
29 def _id(self):
30 raise NotImplementedError
31
32
33 def __eq__(self, other):
34 if not isinstance(other, type(self)):
35 return NotImplemented
36 return self._id() == other._id()
37
38
39 def __ne__(self, other):
40 return not self == other
41
42
43 def __hash__(self):
44 return hash(self._id())
45
46
47class Process(CustomEquals):
48 def __init__(self, hostname, pid, ppid=None):
49 self.hostname = hostname
50 self.pid = pid
51 self.ppid = ppid
52
53 def _id(self):
54 return (self.hostname, self.pid)
55
56
57 def __str__(self):
58 return '%s/%s' % (self.hostname, self.pid)
59
60
61 def __repr__(self):
62 return super(Process, self).__repr__() + '<%s>' % self
63
64
65class PidfileId(CustomEquals):
66 def __init__(self, path):
67 self.path = path
68
69
70 def _id(self):
71 return self.path
72
73
74 def __str__(self):
75 return str(self.path)
76
77
showardd1195652009-12-08 22:21:02 +000078class _PidfileInfo(object):
79 age = 0
80 num_processes = None
81
82
showard170873e2009-01-07 00:22:26 +000083class PidfileContents(object):
84 process = None
85 exit_status = None
86 num_tests_failed = None
87
88 def is_invalid(self):
89 return False
90
91
showardd1195652009-12-08 22:21:02 +000092 def is_running(self):
93 return self.process and not self.exit_status
94
95
showard170873e2009-01-07 00:22:26 +000096class InvalidPidfile(object):
97 def __init__(self, error):
98 self.error = error
99
100
101 def is_invalid(self):
102 return True
103
104
showardd1195652009-12-08 22:21:02 +0000105 def is_running(self):
106 return False
107
108
showard170873e2009-01-07 00:22:26 +0000109 def __str__(self):
110 return self.error
111
112
showard418785b2009-11-23 20:19:59 +0000113class _DroneHeapWrapper(object):
114 """Wrapper to compare drones based on used_capacity().
115
116 These objects can be used to keep a heap of drones by capacity.
117 """
118 def __init__(self, drone):
119 self.drone = drone
120
121
122 def __cmp__(self, other):
123 assert isinstance(other, _DroneHeapWrapper)
124 return cmp(self.drone.used_capacity(), other.drone.used_capacity())
125
126
showard170873e2009-01-07 00:22:26 +0000127class DroneManager(object):
128 """
129 This class acts as an interface from the scheduler to drones, whether it be
130 only a single "drone" for localhost or multiple remote drones.
131
132 All paths going into and out of this class are relative to the full results
133 directory, except for those returns by absolute_path().
134 """
showard170873e2009-01-07 00:22:26 +0000135 def __init__(self):
showardd1195652009-12-08 22:21:02 +0000136 # absolute path of base results dir
showard170873e2009-01-07 00:22:26 +0000137 self._results_dir = None
showardd1195652009-12-08 22:21:02 +0000138 # holds Process objects
showard170873e2009-01-07 00:22:26 +0000139 self._process_set = set()
showardd1195652009-12-08 22:21:02 +0000140 # maps PidfileId to PidfileContents
showard170873e2009-01-07 00:22:26 +0000141 self._pidfiles = {}
showardd1195652009-12-08 22:21:02 +0000142 # same as _pidfiles
showard170873e2009-01-07 00:22:26 +0000143 self._pidfiles_second_read = {}
showardd1195652009-12-08 22:21:02 +0000144 # maps PidfileId to _PidfileInfo
145 self._registered_pidfile_info = {}
146 # used to generate unique temporary paths
showard170873e2009-01-07 00:22:26 +0000147 self._temporary_path_counter = 0
showardd1195652009-12-08 22:21:02 +0000148 # maps hostname to Drone object
showard170873e2009-01-07 00:22:26 +0000149 self._drones = {}
150 self._results_drone = None
showardd1195652009-12-08 22:21:02 +0000151 # maps results dir to dict mapping file path to contents
showard170873e2009-01-07 00:22:26 +0000152 self._attached_files = {}
showard418785b2009-11-23 20:19:59 +0000153 # heapq of _DroneHeapWrappers
showard170873e2009-01-07 00:22:26 +0000154 self._drone_queue = []
showard170873e2009-01-07 00:22:26 +0000155
156
157 def initialize(self, base_results_dir, drone_hostnames,
158 results_repository_hostname):
159 self._results_dir = base_results_dir
showard170873e2009-01-07 00:22:26 +0000160
161 for hostname in drone_hostnames:
showard4460ee82009-07-07 20:54:29 +0000162 drone = self._add_drone(hostname)
showard2aafd902009-10-14 16:20:14 +0000163 drone.call('initialize', self.absolute_path(''))
showard170873e2009-01-07 00:22:26 +0000164
165 if not self._drones:
166 # all drones failed to initialize
167 raise DroneManagerError('No valid drones found')
168
showard324bf812009-01-20 23:23:38 +0000169 self.refresh_drone_configs()
showardc5afc462009-01-13 00:09:39 +0000170
showard4460ee82009-07-07 20:54:29 +0000171 logging.info('Using results repository on %s',
showardb18134f2009-03-20 20:52:18 +0000172 results_repository_hostname)
showard170873e2009-01-07 00:22:26 +0000173 self._results_drone = drones.get_drone(results_repository_hostname)
showardac5b0002009-10-19 18:34:00 +0000174 results_installation_dir = global_config.global_config.get_config_value(
175 scheduler_config.CONFIG_SECTION,
176 'results_host_installation_directory', default=None)
177 if results_installation_dir:
178 self._results_drone.set_autotest_install_dir(
179 results_installation_dir)
showard170873e2009-01-07 00:22:26 +0000180 # don't initialize() the results drone - we don't want to clear out any
showardd1195652009-12-08 22:21:02 +0000181 # directories and we don't need to kill any processes
showard170873e2009-01-07 00:22:26 +0000182
183
184 def reinitialize_drones(self):
185 self._call_all_drones('initialize', self._results_dir)
186
187
188 def shutdown(self):
showard324bf812009-01-20 23:23:38 +0000189 for drone in self.get_drones():
showard170873e2009-01-07 00:22:26 +0000190 drone.shutdown()
191
192
showard8d3dbca2009-09-25 20:29:38 +0000193 def _get_max_pidfile_refreshes(self):
194 """
195 Normally refresh() is called on every monitor_db.Dispatcher.tick().
196
197 @returns: The number of refresh() calls before we forget a pidfile.
198 """
199 pidfile_timeout = global_config.global_config.get_config_value(
200 scheduler_config.CONFIG_SECTION, 'max_pidfile_refreshes',
201 type=int, default=2000)
202 return pidfile_timeout
203
204
showard170873e2009-01-07 00:22:26 +0000205 def _add_drone(self, hostname):
showardb18134f2009-03-20 20:52:18 +0000206 logging.info('Adding drone %s' % hostname)
showard170873e2009-01-07 00:22:26 +0000207 drone = drones.get_drone(hostname)
208 self._drones[drone.hostname] = drone
209 return drone
210
211
212 def _remove_drone(self, hostname):
213 self._drones.pop(hostname, None)
214
215
showard324bf812009-01-20 23:23:38 +0000216 def refresh_drone_configs(self):
showardc5afc462009-01-13 00:09:39 +0000217 """
showard324bf812009-01-20 23:23:38 +0000218 Reread global config options for all drones.
showardc5afc462009-01-13 00:09:39 +0000219 """
showard324bf812009-01-20 23:23:38 +0000220 config = global_config.global_config
221 section = scheduler_config.CONFIG_SECTION
222 config.parse_config_file()
showardc5afc462009-01-13 00:09:39 +0000223 for hostname, drone in self._drones.iteritems():
showard324bf812009-01-20 23:23:38 +0000224 disabled = config.get_config_value(
showard9bb960b2009-11-19 01:02:11 +0000225 section, '%s_disabled' % hostname, default='')
showardc5afc462009-01-13 00:09:39 +0000226 drone.enabled = not bool(disabled)
227
showard324bf812009-01-20 23:23:38 +0000228 drone.max_processes = config.get_config_value(
showard9bb960b2009-11-19 01:02:11 +0000229 section, '%s_max_processes' % hostname, type=int,
230 default=scheduler_config.config.max_processes_per_drone)
231
232 allowed_users = config.get_config_value(
233 section, '%s_users' % hostname, default=None)
234 if allowed_users is not None:
showard1b7142d2010-01-15 00:21:37 +0000235 allowed_users = set(allowed_users.split())
236 drone.allowed_users = allowed_users
showardc5afc462009-01-13 00:09:39 +0000237
showard418785b2009-11-23 20:19:59 +0000238 self._reorder_drone_queue() # max_processes may have changed
239
showardc5afc462009-01-13 00:09:39 +0000240
showard324bf812009-01-20 23:23:38 +0000241 def get_drones(self):
242 return self._drones.itervalues()
showardc5afc462009-01-13 00:09:39 +0000243
244
showard170873e2009-01-07 00:22:26 +0000245 def _get_drone_for_process(self, process):
showard170873e2009-01-07 00:22:26 +0000246 return self._drones[process.hostname]
247
248
249 def _get_drone_for_pidfile_id(self, pidfile_id):
250 pidfile_contents = self.get_pidfile_contents(pidfile_id)
251 assert pidfile_contents.process is not None
252 return self._get_drone_for_process(pidfile_contents.process)
253
254
255 def _drop_old_pidfiles(self):
showardd3496242009-12-10 21:41:43 +0000256 # use items() since the dict is modified in unregister_pidfile()
257 for pidfile_id, info in self._registered_pidfile_info.items():
showardd1195652009-12-08 22:21:02 +0000258 if info.age > self._get_max_pidfile_refreshes():
showardf85a0b72009-10-07 20:48:45 +0000259 logging.warning('dropping leaked pidfile %s', pidfile_id)
260 self.unregister_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000261 else:
showardd1195652009-12-08 22:21:02 +0000262 info.age += 1
showard170873e2009-01-07 00:22:26 +0000263
264
265 def _reset(self):
showard170873e2009-01-07 00:22:26 +0000266 self._process_set = set()
267 self._pidfiles = {}
268 self._pidfiles_second_read = {}
269 self._drone_queue = []
270
271
272 def _call_all_drones(self, method, *args, **kwargs):
273 all_results = {}
showard324bf812009-01-20 23:23:38 +0000274 for drone in self.get_drones():
showard170873e2009-01-07 00:22:26 +0000275 all_results[drone] = drone.call(method, *args, **kwargs)
276 return all_results
277
278
279 def _parse_pidfile(self, drone, raw_contents):
280 contents = PidfileContents()
281 if not raw_contents:
282 return contents
283 lines = raw_contents.splitlines()
284 if len(lines) > 3:
285 return InvalidPidfile('Corrupt pid file (%d lines):\n%s' %
286 (len(lines), lines))
287 try:
288 pid = int(lines[0])
289 contents.process = Process(drone.hostname, pid)
290 # if len(lines) == 2, assume we caught Autoserv between writing
291 # exit_status and num_failed_tests, so just ignore it and wait for
292 # the next cycle
293 if len(lines) == 3:
294 contents.exit_status = int(lines[1])
295 contents.num_tests_failed = int(lines[2])
296 except ValueError, exc:
297 return InvalidPidfile('Corrupt pid file: ' + str(exc.args))
298
299 return contents
300
301
302 def _process_pidfiles(self, drone, pidfiles, store_in_dict):
303 for pidfile_path, contents in pidfiles.iteritems():
304 pidfile_id = PidfileId(pidfile_path)
305 contents = self._parse_pidfile(drone, contents)
306 store_in_dict[pidfile_id] = contents
307
308
showard0205a3e2009-01-16 03:03:50 +0000309 def _add_process(self, drone, process_info):
310 process = Process(drone.hostname, int(process_info['pid']),
311 int(process_info['ppid']))
312 self._process_set.add(process)
showard0205a3e2009-01-16 03:03:50 +0000313
314
315 def _add_autoserv_process(self, drone, process_info):
316 assert process_info['comm'] == 'autoserv'
317 # only root autoserv processes have pgid == pid
318 if process_info['pgid'] != process_info['pid']:
319 return
showardd1195652009-12-08 22:21:02 +0000320 self._add_process(drone, process_info)
showard0205a3e2009-01-16 03:03:50 +0000321
322
showard324bf812009-01-20 23:23:38 +0000323 def _enqueue_drone(self, drone):
showard418785b2009-11-23 20:19:59 +0000324 heapq.heappush(self._drone_queue, _DroneHeapWrapper(drone))
325
326
327 def _reorder_drone_queue(self):
328 heapq.heapify(self._drone_queue)
showard324bf812009-01-20 23:23:38 +0000329
330
showardd1195652009-12-08 22:21:02 +0000331 def _compute_active_processes(self, drone):
332 drone.active_processes = 0
333 for pidfile_id, contents in self._pidfiles.iteritems():
334 is_running = contents.exit_status is None
335 on_this_drone = (contents.process
336 and contents.process.hostname == drone.hostname)
337 if is_running and on_this_drone:
338 info = self._registered_pidfile_info[pidfile_id]
339 if info.num_processes is not None:
340 drone.active_processes += info.num_processes
341
342
showard170873e2009-01-07 00:22:26 +0000343 def refresh(self):
344 """
345 Called at the beginning of a scheduler cycle to refresh all process
346 information.
347 """
348 self._reset()
showardbf9695d2009-07-06 20:22:24 +0000349 self._drop_old_pidfiles()
showardd1195652009-12-08 22:21:02 +0000350 pidfile_paths = [pidfile_id.path
351 for pidfile_id in self._registered_pidfile_info]
showard170873e2009-01-07 00:22:26 +0000352 all_results = self._call_all_drones('refresh', pidfile_paths)
353
354 for drone, results_list in all_results.iteritems():
355 results = results_list[0]
showard0205a3e2009-01-16 03:03:50 +0000356
357 for process_info in results['autoserv_processes']:
358 self._add_autoserv_process(drone, process_info)
359 for process_info in results['parse_processes']:
360 self._add_process(drone, process_info)
showard170873e2009-01-07 00:22:26 +0000361
362 self._process_pidfiles(drone, results['pidfiles'], self._pidfiles)
363 self._process_pidfiles(drone, results['pidfiles_second_read'],
364 self._pidfiles_second_read)
365
showardd1195652009-12-08 22:21:02 +0000366 self._compute_active_processes(drone)
367 if drone.enabled:
368 self._enqueue_drone(drone)
showard170873e2009-01-07 00:22:26 +0000369
370
371 def execute_actions(self):
372 """
373 Called at the end of a scheduler cycle to execute all queued actions
374 on drones.
375 """
376 for drone in self._drones.values():
377 drone.execute_queued_calls()
378
379 try:
mbligh1ef218d2009-08-03 16:57:56 +0000380 self._results_drone.execute_queued_calls()
showard170873e2009-01-07 00:22:26 +0000381 except error.AutoservError:
382 warning = ('Results repository failed to execute calls:\n' +
383 traceback.format_exc())
showard170873e2009-01-07 00:22:26 +0000384 email_manager.manager.enqueue_notify_email(
385 'Results repository error', warning)
386 self._results_drone.clear_call_queue()
387
388
389 def get_orphaned_autoserv_processes(self):
390 """
showardd3dc1992009-04-22 21:01:40 +0000391 Returns a set of Process objects for orphaned processes only.
showard170873e2009-01-07 00:22:26 +0000392 """
showardd3dc1992009-04-22 21:01:40 +0000393 return set(process for process in self._process_set
394 if process.ppid == 1)
showard170873e2009-01-07 00:22:26 +0000395
396
showard170873e2009-01-07 00:22:26 +0000397 def kill_process(self, process):
398 """
399 Kill the given process.
400 """
showardd3dc1992009-04-22 21:01:40 +0000401 logging.info('killing %s', process)
showard170873e2009-01-07 00:22:26 +0000402 drone = self._get_drone_for_process(process)
403 drone.queue_call('kill_process', process)
404
405
406 def _ensure_directory_exists(self, path):
407 if not os.path.exists(path):
408 os.makedirs(path)
409
410
showard324bf812009-01-20 23:23:38 +0000411 def total_running_processes(self):
412 return sum(drone.active_processes for drone in self.get_drones())
413
414
jamesren76fcf192010-04-21 20:39:50 +0000415 def max_runnable_processes(self, username, drone_hostnames_allowed):
showard324bf812009-01-20 23:23:38 +0000416 """
417 Return the maximum number of processes that can be run (in a single
418 execution) given the current load on drones.
showard9bb960b2009-11-19 01:02:11 +0000419 @param username: login of user to run a process. may be None.
jamesren76fcf192010-04-21 20:39:50 +0000420 @param drone_hostnames_allowed: list of drones that can be used. May be
421 None
showard324bf812009-01-20 23:23:38 +0000422 """
showard1b7142d2010-01-15 00:21:37 +0000423 usable_drone_wrappers = [wrapper for wrapper in self._drone_queue
jamesren76fcf192010-04-21 20:39:50 +0000424 if wrapper.drone.usable_by(username) and
425 (drone_hostnames_allowed is None or
426 wrapper.drone.hostname in
427 drone_hostnames_allowed)]
showard1b7142d2010-01-15 00:21:37 +0000428 if not usable_drone_wrappers:
429 # all drones disabled or inaccessible
showardde700d32009-02-25 00:12:42 +0000430 return 0
jamesren37b50452010-03-25 20:38:56 +0000431 runnable_processes = [
432 wrapper.drone.max_processes - wrapper.drone.active_processes
433 for wrapper in usable_drone_wrappers]
434 return max([0] + runnable_processes)
showard324bf812009-01-20 23:23:38 +0000435
436
showarde39ebe92009-06-18 23:14:48 +0000437 def _least_loaded_drone(self, drones):
438 drone_to_use = drones[0]
439 for drone in drones[1:]:
440 if drone.used_capacity() < drone_to_use.used_capacity():
441 drone_to_use = drone
442 return drone_to_use
443
444
jamesren76fcf192010-04-21 20:39:50 +0000445 def _choose_drone_for_execution(self, num_processes, username,
446 drone_hostnames_allowed):
showard324bf812009-01-20 23:23:38 +0000447 # cycle through drones is order of increasing used capacity until
448 # we find one that can handle these processes
449 checked_drones = []
jamesren37b50452010-03-25 20:38:56 +0000450 usable_drones = []
showard324bf812009-01-20 23:23:38 +0000451 drone_to_use = None
452 while self._drone_queue:
showard418785b2009-11-23 20:19:59 +0000453 drone = heapq.heappop(self._drone_queue).drone
showard324bf812009-01-20 23:23:38 +0000454 checked_drones.append(drone)
Eric Lie0493a42010-11-15 13:05:43 -0800455 logging.info('Checking drone %s', drone.hostname)
showard9bb960b2009-11-19 01:02:11 +0000456 if not drone.usable_by(username):
457 continue
jamesren76fcf192010-04-21 20:39:50 +0000458
459 drone_allowed = (drone_hostnames_allowed is None
460 or drone.hostname in drone_hostnames_allowed)
461 if not drone_allowed:
Eric Lie0493a42010-11-15 13:05:43 -0800462 logging.debug('Drone %s not allowed: ', drone.hostname)
jamesren76fcf192010-04-21 20:39:50 +0000463 continue
464
jamesren37b50452010-03-25 20:38:56 +0000465 usable_drones.append(drone)
jamesren76fcf192010-04-21 20:39:50 +0000466
showard324bf812009-01-20 23:23:38 +0000467 if drone.active_processes + num_processes <= drone.max_processes:
468 drone_to_use = drone
469 break
Eric Lie0493a42010-11-15 13:05:43 -0800470 logging.info('Drone %s has %d active + %s requested > %s max',
471 drone.hostname, drone.active_processes, num_processes,
472 drone.max_processes)
showard324bf812009-01-20 23:23:38 +0000473
jamesren76fcf192010-04-21 20:39:50 +0000474 if not drone_to_use and usable_drones:
showard324bf812009-01-20 23:23:38 +0000475 drone_summary = ','.join('%s %s/%s' % (drone.hostname,
476 drone.active_processes,
477 drone.max_processes)
jamesren37b50452010-03-25 20:38:56 +0000478 for drone in usable_drones)
479 logging.error('No drone has capacity to handle %d processes (%s) '
480 'for user %s', num_processes, drone_summary, username)
481 drone_to_use = self._least_loaded_drone(usable_drones)
showarde39ebe92009-06-18 23:14:48 +0000482
showard324bf812009-01-20 23:23:38 +0000483 # refill _drone_queue
484 for drone in checked_drones:
485 self._enqueue_drone(drone)
486
showard170873e2009-01-07 00:22:26 +0000487 return drone_to_use
488
489
showarded2afea2009-07-07 20:54:07 +0000490 def _substitute_working_directory_into_command(self, command,
491 working_directory):
492 for i, item in enumerate(command):
493 if item is WORKING_DIRECTORY:
494 command[i] = working_directory
495
496
showardd3dc1992009-04-22 21:01:40 +0000497 def execute_command(self, command, working_directory, pidfile_name,
showard418785b2009-11-23 20:19:59 +0000498 num_processes, log_file=None, paired_with_pidfile=None,
jamesren76fcf192010-04-21 20:39:50 +0000499 username=None, drone_hostnames_allowed=None):
showard170873e2009-01-07 00:22:26 +0000500 """
501 Execute the given command, taken as an argv list.
502
showarded2afea2009-07-07 20:54:07 +0000503 @param command: command to execute as a list. if any item is
504 WORKING_DIRECTORY, the absolute path to the working directory
505 will be substituted for it.
506 @param working_directory: directory in which the pidfile will be written
507 @param pidfile_name: name of the pidfile this process will write
showardd1195652009-12-08 22:21:02 +0000508 @param num_processes: number of processes to account for from this
509 execution
showarded2afea2009-07-07 20:54:07 +0000510 @param log_file (optional): path (in the results repository) to hold
511 command output.
512 @param paired_with_pidfile (optional): a PidfileId for an
513 already-executed process; the new process will execute on the
514 same drone as the previous process.
showard9bb960b2009-11-19 01:02:11 +0000515 @param username (optional): login of the user responsible for this
516 process.
jamesren76fcf192010-04-21 20:39:50 +0000517 @param drone_hostnames_allowed (optional): hostnames of the drones that
518 this command is allowed to
519 execute on
showard170873e2009-01-07 00:22:26 +0000520 """
showarddb502762009-09-09 15:31:20 +0000521 abs_working_directory = self.absolute_path(working_directory)
showard170873e2009-01-07 00:22:26 +0000522 if not log_file:
523 log_file = self.get_temporary_path('execute')
524 log_file = self.absolute_path(log_file)
showard170873e2009-01-07 00:22:26 +0000525
showarded2afea2009-07-07 20:54:07 +0000526 self._substitute_working_directory_into_command(command,
showarddb502762009-09-09 15:31:20 +0000527 abs_working_directory)
showarded2afea2009-07-07 20:54:07 +0000528
showard170873e2009-01-07 00:22:26 +0000529 if paired_with_pidfile:
530 drone = self._get_drone_for_pidfile_id(paired_with_pidfile)
531 else:
jamesren76fcf192010-04-21 20:39:50 +0000532 drone = self._choose_drone_for_execution(num_processes, username,
533 drone_hostnames_allowed)
534
535 if not drone:
536 raise DroneManagerError('command failed; no drones available: %s'
537 % command)
538
showardb18134f2009-03-20 20:52:18 +0000539 logging.info("command = %s" % command)
540 logging.info('log file = %s:%s' % (drone.hostname, log_file))
showarddb502762009-09-09 15:31:20 +0000541 self._write_attached_files(working_directory, drone)
542 drone.queue_call('execute_command', command, abs_working_directory,
showard170873e2009-01-07 00:22:26 +0000543 log_file, pidfile_name)
showard418785b2009-11-23 20:19:59 +0000544 drone.active_processes += num_processes
545 self._reorder_drone_queue()
showard170873e2009-01-07 00:22:26 +0000546
showard42d44982009-10-12 20:34:03 +0000547 pidfile_path = os.path.join(abs_working_directory, pidfile_name)
showard170873e2009-01-07 00:22:26 +0000548 pidfile_id = PidfileId(pidfile_path)
549 self.register_pidfile(pidfile_id)
showardd1195652009-12-08 22:21:02 +0000550 self._registered_pidfile_info[pidfile_id].num_processes = num_processes
showard170873e2009-01-07 00:22:26 +0000551 return pidfile_id
552
553
showardd3dc1992009-04-22 21:01:40 +0000554 def get_pidfile_id_from(self, execution_tag, pidfile_name):
555 path = os.path.join(self.absolute_path(execution_tag), pidfile_name)
showard170873e2009-01-07 00:22:26 +0000556 return PidfileId(path)
557
558
559 def register_pidfile(self, pidfile_id):
560 """
561 Indicate that the DroneManager should look for the given pidfile when
562 refreshing.
563 """
showardd1195652009-12-08 22:21:02 +0000564 if pidfile_id not in self._registered_pidfile_info:
showard37399782009-08-20 23:32:20 +0000565 logging.info('monitoring pidfile %s', pidfile_id)
showardd1195652009-12-08 22:21:02 +0000566 self._registered_pidfile_info[pidfile_id] = _PidfileInfo()
showardc6fb6042010-01-25 21:48:20 +0000567 self._reset_pidfile_age(pidfile_id)
568
569
570 def _reset_pidfile_age(self, pidfile_id):
571 if pidfile_id in self._registered_pidfile_info:
572 self._registered_pidfile_info[pidfile_id].age = 0
showard170873e2009-01-07 00:22:26 +0000573
574
showardf85a0b72009-10-07 20:48:45 +0000575 def unregister_pidfile(self, pidfile_id):
showardd1195652009-12-08 22:21:02 +0000576 if pidfile_id in self._registered_pidfile_info:
showardf85a0b72009-10-07 20:48:45 +0000577 logging.info('forgetting pidfile %s', pidfile_id)
showardd1195652009-12-08 22:21:02 +0000578 del self._registered_pidfile_info[pidfile_id]
579
580
581 def declare_process_count(self, pidfile_id, num_processes):
582 self._registered_pidfile_info[pidfile_id].num_processes = num_processes
showardf85a0b72009-10-07 20:48:45 +0000583
584
showard170873e2009-01-07 00:22:26 +0000585 def get_pidfile_contents(self, pidfile_id, use_second_read=False):
586 """
587 Retrieve a PidfileContents object for the given pidfile_id. If
588 use_second_read is True, use results that were read after the processes
589 were checked, instead of before.
590 """
showardc6fb6042010-01-25 21:48:20 +0000591 self._reset_pidfile_age(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000592 if use_second_read:
593 pidfile_map = self._pidfiles_second_read
594 else:
595 pidfile_map = self._pidfiles
596 return pidfile_map.get(pidfile_id, PidfileContents())
597
598
599 def is_process_running(self, process):
600 """
601 Check if the given process is in the running process list.
602 """
603 return process in self._process_set
604
605
606 def get_temporary_path(self, base_name):
607 """
608 Get a new temporary path guaranteed to be unique across all drones
609 for this scheduler execution.
610 """
611 self._temporary_path_counter += 1
612 return os.path.join(drone_utility._TEMPORARY_DIRECTORY,
613 '%s.%s' % (base_name, self._temporary_path_counter))
614
615
showard42d44982009-10-12 20:34:03 +0000616 def absolute_path(self, path, on_results_repository=False):
617 if on_results_repository:
618 base_dir = self._results_dir
619 else:
showardc75fded2009-10-14 16:20:02 +0000620 base_dir = os.path.join(drones.AUTOTEST_INSTALL_DIR,
621 _DRONE_RESULTS_DIR_SUFFIX)
showard42d44982009-10-12 20:34:03 +0000622 return os.path.join(base_dir, path)
showard170873e2009-01-07 00:22:26 +0000623
624
showard678df4f2009-02-04 21:36:39 +0000625 def _copy_results_helper(self, process, source_path, destination_path,
626 to_results_repository=False):
627 full_source = self.absolute_path(source_path)
showard42d44982009-10-12 20:34:03 +0000628 full_destination = self.absolute_path(
629 destination_path, on_results_repository=to_results_repository)
showard678df4f2009-02-04 21:36:39 +0000630 source_drone = self._get_drone_for_process(process)
631 if to_results_repository:
632 source_drone.send_file_to(self._results_drone, full_source,
633 full_destination, can_fail=True)
634 else:
635 source_drone.queue_call('copy_file_or_directory', full_source,
636 full_destination)
637
638
showard170873e2009-01-07 00:22:26 +0000639 def copy_to_results_repository(self, process, source_path,
640 destination_path=None):
641 """
642 Copy results from the given process at source_path to destination_path
643 in the results repository.
644 """
645 if destination_path is None:
646 destination_path = source_path
showard678df4f2009-02-04 21:36:39 +0000647 self._copy_results_helper(process, source_path, destination_path,
648 to_results_repository=True)
649
650
651 def copy_results_on_drone(self, process, source_path, destination_path):
652 """
653 Copy a results directory from one place to another on the drone.
654 """
655 self._copy_results_helper(process, source_path, destination_path)
showard170873e2009-01-07 00:22:26 +0000656
657
showarddb502762009-09-09 15:31:20 +0000658 def _write_attached_files(self, results_dir, drone):
659 attached_files = self._attached_files.pop(results_dir, {})
showard73ec0442009-02-07 02:05:20 +0000660 for file_path, contents in attached_files.iteritems():
showard170873e2009-01-07 00:22:26 +0000661 drone.queue_call('write_to_file', self.absolute_path(file_path),
662 contents)
663
664
showarddb502762009-09-09 15:31:20 +0000665 def attach_file_to_execution(self, results_dir, file_contents,
showard170873e2009-01-07 00:22:26 +0000666 file_path=None):
667 """
showarddb502762009-09-09 15:31:20 +0000668 When the process for the results directory is executed, the given file
669 contents will be placed in a file on the drone. Returns the path at
670 which the file will be placed.
showard170873e2009-01-07 00:22:26 +0000671 """
672 if not file_path:
673 file_path = self.get_temporary_path('attach')
showarddb502762009-09-09 15:31:20 +0000674 files_for_execution = self._attached_files.setdefault(results_dir, {})
showard73ec0442009-02-07 02:05:20 +0000675 assert file_path not in files_for_execution
676 files_for_execution[file_path] = file_contents
showard170873e2009-01-07 00:22:26 +0000677 return file_path
678
679
showard35162b02009-03-03 02:17:30 +0000680 def write_lines_to_file(self, file_path, lines, paired_with_process=None):
showard170873e2009-01-07 00:22:26 +0000681 """
682 Write the given lines (as a list of strings) to a file. If
showard35162b02009-03-03 02:17:30 +0000683 paired_with_process is given, the file will be written on the drone
684 running the given Process. Otherwise, the file will be written to the
showard170873e2009-01-07 00:22:26 +0000685 results repository.
686 """
showard170873e2009-01-07 00:22:26 +0000687 file_contents = '\n'.join(lines) + '\n'
showard35162b02009-03-03 02:17:30 +0000688 if paired_with_process:
689 drone = self._get_drone_for_process(paired_with_process)
showard42d44982009-10-12 20:34:03 +0000690 on_results_repository = False
showard170873e2009-01-07 00:22:26 +0000691 else:
692 drone = self._results_drone
showard42d44982009-10-12 20:34:03 +0000693 on_results_repository = True
694 full_path = self.absolute_path(
695 file_path, on_results_repository=on_results_repository)
showard170873e2009-01-07 00:22:26 +0000696 drone.queue_call('write_to_file', full_path, file_contents)
jamesrenc44ae992010-02-19 00:12:54 +0000697
698
699_the_instance = None
700
701def instance():
702 if _the_instance is None:
703 _set_instance(DroneManager())
704 return _the_instance
705
706
707def _set_instance(instance): # usable for testing
708 global _the_instance
709 _the_instance = instance