blob: 65551bea0442ba18ed83c8354f4db7c62b0545ff [file] [log] [blame]
showard170873e2009-01-07 00:22:26 +00001import os, re, shutil, signal, subprocess, errno, time, heapq, traceback
showardb18134f2009-03-20 20:52:18 +00002import common, logging
showardc5afc462009-01-13 00:09:39 +00003from autotest_lib.client.common_lib import error, global_config
showard170873e2009-01-07 00:22:26 +00004from autotest_lib.scheduler import email_manager, drone_utility, drones
showard324bf812009-01-20 23:23:38 +00005from autotest_lib.scheduler import scheduler_config
showard170873e2009-01-07 00:22:26 +00006
showard170873e2009-01-07 00:22:26 +00007
showardc75fded2009-10-14 16:20:02 +00008# results on drones will be placed under the drone_installation_directory in a
9# directory with this name
10_DRONE_RESULTS_DIR_SUFFIX = 'results'
11
showarded2afea2009-07-07 20:54:07 +000012WORKING_DIRECTORY = object() # see execute_command()
13
showard8d3dbca2009-09-25 20:29:38 +000014
jamesrenc44ae992010-02-19 00:12:54 +000015AUTOSERV_PID_FILE = '.autoserv_execute'
16CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
17PARSER_PID_FILE = '.parser_execute'
18ARCHIVER_PID_FILE = '.archiver_execute'
19
20ALL_PIDFILE_NAMES = (AUTOSERV_PID_FILE, CRASHINFO_PID_FILE, PARSER_PID_FILE,
21 ARCHIVER_PID_FILE)
22
23
showard170873e2009-01-07 00:22:26 +000024class DroneManagerError(Exception):
25 pass
26
27
28class CustomEquals(object):
29 def _id(self):
30 raise NotImplementedError
31
32
33 def __eq__(self, other):
34 if not isinstance(other, type(self)):
35 return NotImplemented
36 return self._id() == other._id()
37
38
39 def __ne__(self, other):
40 return not self == other
41
42
43 def __hash__(self):
44 return hash(self._id())
45
46
47class Process(CustomEquals):
48 def __init__(self, hostname, pid, ppid=None):
49 self.hostname = hostname
50 self.pid = pid
51 self.ppid = ppid
52
53 def _id(self):
54 return (self.hostname, self.pid)
55
56
57 def __str__(self):
58 return '%s/%s' % (self.hostname, self.pid)
59
60
61 def __repr__(self):
62 return super(Process, self).__repr__() + '<%s>' % self
63
64
65class PidfileId(CustomEquals):
66 def __init__(self, path):
67 self.path = path
68
69
70 def _id(self):
71 return self.path
72
73
74 def __str__(self):
75 return str(self.path)
76
77
showardd1195652009-12-08 22:21:02 +000078class _PidfileInfo(object):
79 age = 0
80 num_processes = None
81
82
showard170873e2009-01-07 00:22:26 +000083class PidfileContents(object):
84 process = None
85 exit_status = None
86 num_tests_failed = None
87
88 def is_invalid(self):
89 return False
90
91
showardd1195652009-12-08 22:21:02 +000092 def is_running(self):
93 return self.process and not self.exit_status
94
95
showard170873e2009-01-07 00:22:26 +000096class InvalidPidfile(object):
97 def __init__(self, error):
98 self.error = error
99
100
101 def is_invalid(self):
102 return True
103
104
showardd1195652009-12-08 22:21:02 +0000105 def is_running(self):
106 return False
107
108
showard170873e2009-01-07 00:22:26 +0000109 def __str__(self):
110 return self.error
111
112
showard418785b2009-11-23 20:19:59 +0000113class _DroneHeapWrapper(object):
114 """Wrapper to compare drones based on used_capacity().
115
116 These objects can be used to keep a heap of drones by capacity.
117 """
118 def __init__(self, drone):
119 self.drone = drone
120
121
122 def __cmp__(self, other):
123 assert isinstance(other, _DroneHeapWrapper)
124 return cmp(self.drone.used_capacity(), other.drone.used_capacity())
125
126
showard170873e2009-01-07 00:22:26 +0000127class DroneManager(object):
128 """
129 This class acts as an interface from the scheduler to drones, whether it be
130 only a single "drone" for localhost or multiple remote drones.
131
132 All paths going into and out of this class are relative to the full results
133 directory, except for those returns by absolute_path().
134 """
showard170873e2009-01-07 00:22:26 +0000135 def __init__(self):
showardd1195652009-12-08 22:21:02 +0000136 # absolute path of base results dir
showard170873e2009-01-07 00:22:26 +0000137 self._results_dir = None
showardd1195652009-12-08 22:21:02 +0000138 # holds Process objects
showard170873e2009-01-07 00:22:26 +0000139 self._process_set = set()
showardd1195652009-12-08 22:21:02 +0000140 # maps PidfileId to PidfileContents
showard170873e2009-01-07 00:22:26 +0000141 self._pidfiles = {}
showardd1195652009-12-08 22:21:02 +0000142 # same as _pidfiles
showard170873e2009-01-07 00:22:26 +0000143 self._pidfiles_second_read = {}
showardd1195652009-12-08 22:21:02 +0000144 # maps PidfileId to _PidfileInfo
145 self._registered_pidfile_info = {}
146 # used to generate unique temporary paths
showard170873e2009-01-07 00:22:26 +0000147 self._temporary_path_counter = 0
showardd1195652009-12-08 22:21:02 +0000148 # maps hostname to Drone object
showard170873e2009-01-07 00:22:26 +0000149 self._drones = {}
150 self._results_drone = None
showardd1195652009-12-08 22:21:02 +0000151 # maps results dir to dict mapping file path to contents
showard170873e2009-01-07 00:22:26 +0000152 self._attached_files = {}
showard418785b2009-11-23 20:19:59 +0000153 # heapq of _DroneHeapWrappers
showard170873e2009-01-07 00:22:26 +0000154 self._drone_queue = []
showard170873e2009-01-07 00:22:26 +0000155
156
157 def initialize(self, base_results_dir, drone_hostnames,
158 results_repository_hostname):
159 self._results_dir = base_results_dir
showard170873e2009-01-07 00:22:26 +0000160
161 for hostname in drone_hostnames:
showard4460ee82009-07-07 20:54:29 +0000162 drone = self._add_drone(hostname)
showard2aafd902009-10-14 16:20:14 +0000163 drone.call('initialize', self.absolute_path(''))
showard170873e2009-01-07 00:22:26 +0000164
165 if not self._drones:
166 # all drones failed to initialize
167 raise DroneManagerError('No valid drones found')
168
showard324bf812009-01-20 23:23:38 +0000169 self.refresh_drone_configs()
showardc5afc462009-01-13 00:09:39 +0000170
showard4460ee82009-07-07 20:54:29 +0000171 logging.info('Using results repository on %s',
showardb18134f2009-03-20 20:52:18 +0000172 results_repository_hostname)
showard170873e2009-01-07 00:22:26 +0000173 self._results_drone = drones.get_drone(results_repository_hostname)
showardac5b0002009-10-19 18:34:00 +0000174 results_installation_dir = global_config.global_config.get_config_value(
175 scheduler_config.CONFIG_SECTION,
176 'results_host_installation_directory', default=None)
177 if results_installation_dir:
178 self._results_drone.set_autotest_install_dir(
179 results_installation_dir)
showard170873e2009-01-07 00:22:26 +0000180 # don't initialize() the results drone - we don't want to clear out any
showardd1195652009-12-08 22:21:02 +0000181 # directories and we don't need to kill any processes
showard170873e2009-01-07 00:22:26 +0000182
183
184 def reinitialize_drones(self):
185 self._call_all_drones('initialize', self._results_dir)
186
187
188 def shutdown(self):
showard324bf812009-01-20 23:23:38 +0000189 for drone in self.get_drones():
showard170873e2009-01-07 00:22:26 +0000190 drone.shutdown()
191
192
showard8d3dbca2009-09-25 20:29:38 +0000193 def _get_max_pidfile_refreshes(self):
194 """
195 Normally refresh() is called on every monitor_db.Dispatcher.tick().
196
197 @returns: The number of refresh() calls before we forget a pidfile.
198 """
199 pidfile_timeout = global_config.global_config.get_config_value(
200 scheduler_config.CONFIG_SECTION, 'max_pidfile_refreshes',
201 type=int, default=2000)
202 return pidfile_timeout
203
204
showard170873e2009-01-07 00:22:26 +0000205 def _add_drone(self, hostname):
showardb18134f2009-03-20 20:52:18 +0000206 logging.info('Adding drone %s' % hostname)
showard170873e2009-01-07 00:22:26 +0000207 drone = drones.get_drone(hostname)
208 self._drones[drone.hostname] = drone
209 return drone
210
211
212 def _remove_drone(self, hostname):
213 self._drones.pop(hostname, None)
214
215
showard324bf812009-01-20 23:23:38 +0000216 def refresh_drone_configs(self):
showardc5afc462009-01-13 00:09:39 +0000217 """
showard324bf812009-01-20 23:23:38 +0000218 Reread global config options for all drones.
showardc5afc462009-01-13 00:09:39 +0000219 """
showard324bf812009-01-20 23:23:38 +0000220 config = global_config.global_config
221 section = scheduler_config.CONFIG_SECTION
222 config.parse_config_file()
showardc5afc462009-01-13 00:09:39 +0000223 for hostname, drone in self._drones.iteritems():
showard324bf812009-01-20 23:23:38 +0000224 disabled = config.get_config_value(
showard9bb960b2009-11-19 01:02:11 +0000225 section, '%s_disabled' % hostname, default='')
showardc5afc462009-01-13 00:09:39 +0000226 drone.enabled = not bool(disabled)
227
showard324bf812009-01-20 23:23:38 +0000228 drone.max_processes = config.get_config_value(
showard9bb960b2009-11-19 01:02:11 +0000229 section, '%s_max_processes' % hostname, type=int,
230 default=scheduler_config.config.max_processes_per_drone)
231
232 allowed_users = config.get_config_value(
233 section, '%s_users' % hostname, default=None)
234 if allowed_users is not None:
showard1b7142d2010-01-15 00:21:37 +0000235 allowed_users = set(allowed_users.split())
236 drone.allowed_users = allowed_users
showardc5afc462009-01-13 00:09:39 +0000237
showard418785b2009-11-23 20:19:59 +0000238 self._reorder_drone_queue() # max_processes may have changed
239
showardc5afc462009-01-13 00:09:39 +0000240
showard324bf812009-01-20 23:23:38 +0000241 def get_drones(self):
242 return self._drones.itervalues()
showardc5afc462009-01-13 00:09:39 +0000243
244
showard170873e2009-01-07 00:22:26 +0000245 def _get_drone_for_process(self, process):
showard170873e2009-01-07 00:22:26 +0000246 return self._drones[process.hostname]
247
248
249 def _get_drone_for_pidfile_id(self, pidfile_id):
250 pidfile_contents = self.get_pidfile_contents(pidfile_id)
251 assert pidfile_contents.process is not None
252 return self._get_drone_for_process(pidfile_contents.process)
253
254
255 def _drop_old_pidfiles(self):
showardd3496242009-12-10 21:41:43 +0000256 # use items() since the dict is modified in unregister_pidfile()
257 for pidfile_id, info in self._registered_pidfile_info.items():
showardd1195652009-12-08 22:21:02 +0000258 if info.age > self._get_max_pidfile_refreshes():
showardf85a0b72009-10-07 20:48:45 +0000259 logging.warning('dropping leaked pidfile %s', pidfile_id)
260 self.unregister_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000261 else:
showardd1195652009-12-08 22:21:02 +0000262 info.age += 1
showard170873e2009-01-07 00:22:26 +0000263
264
265 def _reset(self):
showard170873e2009-01-07 00:22:26 +0000266 self._process_set = set()
267 self._pidfiles = {}
268 self._pidfiles_second_read = {}
269 self._drone_queue = []
270
271
272 def _call_all_drones(self, method, *args, **kwargs):
273 all_results = {}
showard324bf812009-01-20 23:23:38 +0000274 for drone in self.get_drones():
showard170873e2009-01-07 00:22:26 +0000275 all_results[drone] = drone.call(method, *args, **kwargs)
276 return all_results
277
278
279 def _parse_pidfile(self, drone, raw_contents):
280 contents = PidfileContents()
281 if not raw_contents:
282 return contents
283 lines = raw_contents.splitlines()
284 if len(lines) > 3:
285 return InvalidPidfile('Corrupt pid file (%d lines):\n%s' %
286 (len(lines), lines))
287 try:
288 pid = int(lines[0])
289 contents.process = Process(drone.hostname, pid)
290 # if len(lines) == 2, assume we caught Autoserv between writing
291 # exit_status and num_failed_tests, so just ignore it and wait for
292 # the next cycle
293 if len(lines) == 3:
294 contents.exit_status = int(lines[1])
295 contents.num_tests_failed = int(lines[2])
296 except ValueError, exc:
297 return InvalidPidfile('Corrupt pid file: ' + str(exc.args))
298
299 return contents
300
301
302 def _process_pidfiles(self, drone, pidfiles, store_in_dict):
303 for pidfile_path, contents in pidfiles.iteritems():
304 pidfile_id = PidfileId(pidfile_path)
305 contents = self._parse_pidfile(drone, contents)
306 store_in_dict[pidfile_id] = contents
307
308
showard0205a3e2009-01-16 03:03:50 +0000309 def _add_process(self, drone, process_info):
310 process = Process(drone.hostname, int(process_info['pid']),
311 int(process_info['ppid']))
312 self._process_set.add(process)
showard0205a3e2009-01-16 03:03:50 +0000313
314
315 def _add_autoserv_process(self, drone, process_info):
316 assert process_info['comm'] == 'autoserv'
317 # only root autoserv processes have pgid == pid
318 if process_info['pgid'] != process_info['pid']:
319 return
showardd1195652009-12-08 22:21:02 +0000320 self._add_process(drone, process_info)
showard0205a3e2009-01-16 03:03:50 +0000321
322
showard324bf812009-01-20 23:23:38 +0000323 def _enqueue_drone(self, drone):
showard418785b2009-11-23 20:19:59 +0000324 heapq.heappush(self._drone_queue, _DroneHeapWrapper(drone))
325
326
327 def _reorder_drone_queue(self):
328 heapq.heapify(self._drone_queue)
showard324bf812009-01-20 23:23:38 +0000329
330
showardd1195652009-12-08 22:21:02 +0000331 def _compute_active_processes(self, drone):
332 drone.active_processes = 0
333 for pidfile_id, contents in self._pidfiles.iteritems():
334 is_running = contents.exit_status is None
335 on_this_drone = (contents.process
336 and contents.process.hostname == drone.hostname)
337 if is_running and on_this_drone:
338 info = self._registered_pidfile_info[pidfile_id]
339 if info.num_processes is not None:
340 drone.active_processes += info.num_processes
341
342
showard170873e2009-01-07 00:22:26 +0000343 def refresh(self):
344 """
345 Called at the beginning of a scheduler cycle to refresh all process
346 information.
347 """
348 self._reset()
showardbf9695d2009-07-06 20:22:24 +0000349 self._drop_old_pidfiles()
showardd1195652009-12-08 22:21:02 +0000350 pidfile_paths = [pidfile_id.path
351 for pidfile_id in self._registered_pidfile_info]
showard170873e2009-01-07 00:22:26 +0000352 all_results = self._call_all_drones('refresh', pidfile_paths)
353
354 for drone, results_list in all_results.iteritems():
355 results = results_list[0]
showard0205a3e2009-01-16 03:03:50 +0000356
357 for process_info in results['autoserv_processes']:
358 self._add_autoserv_process(drone, process_info)
359 for process_info in results['parse_processes']:
360 self._add_process(drone, process_info)
showard170873e2009-01-07 00:22:26 +0000361
362 self._process_pidfiles(drone, results['pidfiles'], self._pidfiles)
363 self._process_pidfiles(drone, results['pidfiles_second_read'],
364 self._pidfiles_second_read)
365
showardd1195652009-12-08 22:21:02 +0000366 self._compute_active_processes(drone)
367 if drone.enabled:
368 self._enqueue_drone(drone)
showard170873e2009-01-07 00:22:26 +0000369
370
371 def execute_actions(self):
372 """
373 Called at the end of a scheduler cycle to execute all queued actions
374 on drones.
375 """
376 for drone in self._drones.values():
377 drone.execute_queued_calls()
378
379 try:
mbligh1ef218d2009-08-03 16:57:56 +0000380 self._results_drone.execute_queued_calls()
showard170873e2009-01-07 00:22:26 +0000381 except error.AutoservError:
382 warning = ('Results repository failed to execute calls:\n' +
383 traceback.format_exc())
showard170873e2009-01-07 00:22:26 +0000384 email_manager.manager.enqueue_notify_email(
385 'Results repository error', warning)
386 self._results_drone.clear_call_queue()
387
388
389 def get_orphaned_autoserv_processes(self):
390 """
showardd3dc1992009-04-22 21:01:40 +0000391 Returns a set of Process objects for orphaned processes only.
showard170873e2009-01-07 00:22:26 +0000392 """
showardd3dc1992009-04-22 21:01:40 +0000393 return set(process for process in self._process_set
394 if process.ppid == 1)
showard170873e2009-01-07 00:22:26 +0000395
396
showard170873e2009-01-07 00:22:26 +0000397 def kill_process(self, process):
398 """
399 Kill the given process.
400 """
showardd3dc1992009-04-22 21:01:40 +0000401 logging.info('killing %s', process)
showard170873e2009-01-07 00:22:26 +0000402 drone = self._get_drone_for_process(process)
403 drone.queue_call('kill_process', process)
404
405
406 def _ensure_directory_exists(self, path):
407 if not os.path.exists(path):
408 os.makedirs(path)
409
410
showard324bf812009-01-20 23:23:38 +0000411 def total_running_processes(self):
412 return sum(drone.active_processes for drone in self.get_drones())
413
414
showard9bb960b2009-11-19 01:02:11 +0000415 def max_runnable_processes(self, username):
showard324bf812009-01-20 23:23:38 +0000416 """
417 Return the maximum number of processes that can be run (in a single
418 execution) given the current load on drones.
showard9bb960b2009-11-19 01:02:11 +0000419 @param username: login of user to run a process. may be None.
showard324bf812009-01-20 23:23:38 +0000420 """
showard1b7142d2010-01-15 00:21:37 +0000421 usable_drone_wrappers = [wrapper for wrapper in self._drone_queue
422 if wrapper.drone.usable_by(username)]
423 if not usable_drone_wrappers:
424 # all drones disabled or inaccessible
showardde700d32009-02-25 00:12:42 +0000425 return 0
showard418785b2009-11-23 20:19:59 +0000426 return max(wrapper.drone.max_processes - wrapper.drone.active_processes
showard1b7142d2010-01-15 00:21:37 +0000427 for wrapper in usable_drone_wrappers)
showard324bf812009-01-20 23:23:38 +0000428
429
showarde39ebe92009-06-18 23:14:48 +0000430 def _least_loaded_drone(self, drones):
431 drone_to_use = drones[0]
432 for drone in drones[1:]:
433 if drone.used_capacity() < drone_to_use.used_capacity():
434 drone_to_use = drone
435 return drone_to_use
436
437
showard9bb960b2009-11-19 01:02:11 +0000438 def _choose_drone_for_execution(self, num_processes, username):
showard324bf812009-01-20 23:23:38 +0000439 # cycle through drones is order of increasing used capacity until
440 # we find one that can handle these processes
441 checked_drones = []
442 drone_to_use = None
443 while self._drone_queue:
showard418785b2009-11-23 20:19:59 +0000444 drone = heapq.heappop(self._drone_queue).drone
showard324bf812009-01-20 23:23:38 +0000445 checked_drones.append(drone)
showard9bb960b2009-11-19 01:02:11 +0000446 if not drone.usable_by(username):
447 continue
showard324bf812009-01-20 23:23:38 +0000448 if drone.active_processes + num_processes <= drone.max_processes:
449 drone_to_use = drone
450 break
451
showarde39ebe92009-06-18 23:14:48 +0000452 if not drone_to_use:
showard324bf812009-01-20 23:23:38 +0000453 drone_summary = ','.join('%s %s/%s' % (drone.hostname,
454 drone.active_processes,
455 drone.max_processes)
showardde700d32009-02-25 00:12:42 +0000456 for drone in checked_drones)
showarde39ebe92009-06-18 23:14:48 +0000457 logging.error('No drone has capacity to handle %d processes (%s)',
458 num_processes, drone_summary)
459 drone_to_use = self._least_loaded_drone(checked_drones)
460
showard324bf812009-01-20 23:23:38 +0000461 # refill _drone_queue
462 for drone in checked_drones:
463 self._enqueue_drone(drone)
464
showard170873e2009-01-07 00:22:26 +0000465 return drone_to_use
466
467
showarded2afea2009-07-07 20:54:07 +0000468 def _substitute_working_directory_into_command(self, command,
469 working_directory):
470 for i, item in enumerate(command):
471 if item is WORKING_DIRECTORY:
472 command[i] = working_directory
473
474
showardd3dc1992009-04-22 21:01:40 +0000475 def execute_command(self, command, working_directory, pidfile_name,
showard418785b2009-11-23 20:19:59 +0000476 num_processes, log_file=None, paired_with_pidfile=None,
showard9bb960b2009-11-19 01:02:11 +0000477 username=None):
showard170873e2009-01-07 00:22:26 +0000478 """
479 Execute the given command, taken as an argv list.
480
showarded2afea2009-07-07 20:54:07 +0000481 @param command: command to execute as a list. if any item is
482 WORKING_DIRECTORY, the absolute path to the working directory
483 will be substituted for it.
484 @param working_directory: directory in which the pidfile will be written
485 @param pidfile_name: name of the pidfile this process will write
showardd1195652009-12-08 22:21:02 +0000486 @param num_processes: number of processes to account for from this
487 execution
showarded2afea2009-07-07 20:54:07 +0000488 @param log_file (optional): path (in the results repository) to hold
489 command output.
490 @param paired_with_pidfile (optional): a PidfileId for an
491 already-executed process; the new process will execute on the
492 same drone as the previous process.
showard9bb960b2009-11-19 01:02:11 +0000493 @param username (optional): login of the user responsible for this
494 process.
showard170873e2009-01-07 00:22:26 +0000495 """
showarddb502762009-09-09 15:31:20 +0000496 abs_working_directory = self.absolute_path(working_directory)
showard170873e2009-01-07 00:22:26 +0000497 if not log_file:
498 log_file = self.get_temporary_path('execute')
499 log_file = self.absolute_path(log_file)
showard170873e2009-01-07 00:22:26 +0000500
showarded2afea2009-07-07 20:54:07 +0000501 self._substitute_working_directory_into_command(command,
showarddb502762009-09-09 15:31:20 +0000502 abs_working_directory)
showarded2afea2009-07-07 20:54:07 +0000503
showard170873e2009-01-07 00:22:26 +0000504 if paired_with_pidfile:
505 drone = self._get_drone_for_pidfile_id(paired_with_pidfile)
506 else:
showard9bb960b2009-11-19 01:02:11 +0000507 drone = self._choose_drone_for_execution(num_processes, username)
showardb18134f2009-03-20 20:52:18 +0000508 logging.info("command = %s" % command)
509 logging.info('log file = %s:%s' % (drone.hostname, log_file))
showarddb502762009-09-09 15:31:20 +0000510 self._write_attached_files(working_directory, drone)
511 drone.queue_call('execute_command', command, abs_working_directory,
showard170873e2009-01-07 00:22:26 +0000512 log_file, pidfile_name)
showard418785b2009-11-23 20:19:59 +0000513 drone.active_processes += num_processes
514 self._reorder_drone_queue()
showard170873e2009-01-07 00:22:26 +0000515
showard42d44982009-10-12 20:34:03 +0000516 pidfile_path = os.path.join(abs_working_directory, pidfile_name)
showard170873e2009-01-07 00:22:26 +0000517 pidfile_id = PidfileId(pidfile_path)
518 self.register_pidfile(pidfile_id)
showardd1195652009-12-08 22:21:02 +0000519 self._registered_pidfile_info[pidfile_id].num_processes = num_processes
showard170873e2009-01-07 00:22:26 +0000520 return pidfile_id
521
522
showardd3dc1992009-04-22 21:01:40 +0000523 def get_pidfile_id_from(self, execution_tag, pidfile_name):
524 path = os.path.join(self.absolute_path(execution_tag), pidfile_name)
showard170873e2009-01-07 00:22:26 +0000525 return PidfileId(path)
526
527
528 def register_pidfile(self, pidfile_id):
529 """
530 Indicate that the DroneManager should look for the given pidfile when
531 refreshing.
532 """
showardd1195652009-12-08 22:21:02 +0000533 if pidfile_id not in self._registered_pidfile_info:
showard37399782009-08-20 23:32:20 +0000534 logging.info('monitoring pidfile %s', pidfile_id)
showardd1195652009-12-08 22:21:02 +0000535 self._registered_pidfile_info[pidfile_id] = _PidfileInfo()
showardc6fb6042010-01-25 21:48:20 +0000536 self._reset_pidfile_age(pidfile_id)
537
538
539 def _reset_pidfile_age(self, pidfile_id):
540 if pidfile_id in self._registered_pidfile_info:
541 self._registered_pidfile_info[pidfile_id].age = 0
showard170873e2009-01-07 00:22:26 +0000542
543
showardf85a0b72009-10-07 20:48:45 +0000544 def unregister_pidfile(self, pidfile_id):
showardd1195652009-12-08 22:21:02 +0000545 if pidfile_id in self._registered_pidfile_info:
showardf85a0b72009-10-07 20:48:45 +0000546 logging.info('forgetting pidfile %s', pidfile_id)
showardd1195652009-12-08 22:21:02 +0000547 del self._registered_pidfile_info[pidfile_id]
548
549
550 def declare_process_count(self, pidfile_id, num_processes):
551 self._registered_pidfile_info[pidfile_id].num_processes = num_processes
showardf85a0b72009-10-07 20:48:45 +0000552
553
showard170873e2009-01-07 00:22:26 +0000554 def get_pidfile_contents(self, pidfile_id, use_second_read=False):
555 """
556 Retrieve a PidfileContents object for the given pidfile_id. If
557 use_second_read is True, use results that were read after the processes
558 were checked, instead of before.
559 """
showardc6fb6042010-01-25 21:48:20 +0000560 self._reset_pidfile_age(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000561 if use_second_read:
562 pidfile_map = self._pidfiles_second_read
563 else:
564 pidfile_map = self._pidfiles
565 return pidfile_map.get(pidfile_id, PidfileContents())
566
567
568 def is_process_running(self, process):
569 """
570 Check if the given process is in the running process list.
571 """
572 return process in self._process_set
573
574
575 def get_temporary_path(self, base_name):
576 """
577 Get a new temporary path guaranteed to be unique across all drones
578 for this scheduler execution.
579 """
580 self._temporary_path_counter += 1
581 return os.path.join(drone_utility._TEMPORARY_DIRECTORY,
582 '%s.%s' % (base_name, self._temporary_path_counter))
583
584
showard42d44982009-10-12 20:34:03 +0000585 def absolute_path(self, path, on_results_repository=False):
586 if on_results_repository:
587 base_dir = self._results_dir
588 else:
showardc75fded2009-10-14 16:20:02 +0000589 base_dir = os.path.join(drones.AUTOTEST_INSTALL_DIR,
590 _DRONE_RESULTS_DIR_SUFFIX)
showard42d44982009-10-12 20:34:03 +0000591 return os.path.join(base_dir, path)
showard170873e2009-01-07 00:22:26 +0000592
593
showard678df4f2009-02-04 21:36:39 +0000594 def _copy_results_helper(self, process, source_path, destination_path,
595 to_results_repository=False):
596 full_source = self.absolute_path(source_path)
showard42d44982009-10-12 20:34:03 +0000597 full_destination = self.absolute_path(
598 destination_path, on_results_repository=to_results_repository)
showard678df4f2009-02-04 21:36:39 +0000599 source_drone = self._get_drone_for_process(process)
600 if to_results_repository:
601 source_drone.send_file_to(self._results_drone, full_source,
602 full_destination, can_fail=True)
603 else:
604 source_drone.queue_call('copy_file_or_directory', full_source,
605 full_destination)
606
607
showard170873e2009-01-07 00:22:26 +0000608 def copy_to_results_repository(self, process, source_path,
609 destination_path=None):
610 """
611 Copy results from the given process at source_path to destination_path
612 in the results repository.
613 """
614 if destination_path is None:
615 destination_path = source_path
showard678df4f2009-02-04 21:36:39 +0000616 self._copy_results_helper(process, source_path, destination_path,
617 to_results_repository=True)
618
619
620 def copy_results_on_drone(self, process, source_path, destination_path):
621 """
622 Copy a results directory from one place to another on the drone.
623 """
624 self._copy_results_helper(process, source_path, destination_path)
showard170873e2009-01-07 00:22:26 +0000625
626
showarddb502762009-09-09 15:31:20 +0000627 def _write_attached_files(self, results_dir, drone):
628 attached_files = self._attached_files.pop(results_dir, {})
showard73ec0442009-02-07 02:05:20 +0000629 for file_path, contents in attached_files.iteritems():
showard170873e2009-01-07 00:22:26 +0000630 drone.queue_call('write_to_file', self.absolute_path(file_path),
631 contents)
632
633
showarddb502762009-09-09 15:31:20 +0000634 def attach_file_to_execution(self, results_dir, file_contents,
showard170873e2009-01-07 00:22:26 +0000635 file_path=None):
636 """
showarddb502762009-09-09 15:31:20 +0000637 When the process for the results directory is executed, the given file
638 contents will be placed in a file on the drone. Returns the path at
639 which the file will be placed.
showard170873e2009-01-07 00:22:26 +0000640 """
641 if not file_path:
642 file_path = self.get_temporary_path('attach')
showarddb502762009-09-09 15:31:20 +0000643 files_for_execution = self._attached_files.setdefault(results_dir, {})
showard73ec0442009-02-07 02:05:20 +0000644 assert file_path not in files_for_execution
645 files_for_execution[file_path] = file_contents
showard170873e2009-01-07 00:22:26 +0000646 return file_path
647
648
showard35162b02009-03-03 02:17:30 +0000649 def write_lines_to_file(self, file_path, lines, paired_with_process=None):
showard170873e2009-01-07 00:22:26 +0000650 """
651 Write the given lines (as a list of strings) to a file. If
showard35162b02009-03-03 02:17:30 +0000652 paired_with_process is given, the file will be written on the drone
653 running the given Process. Otherwise, the file will be written to the
showard170873e2009-01-07 00:22:26 +0000654 results repository.
655 """
showard170873e2009-01-07 00:22:26 +0000656 file_contents = '\n'.join(lines) + '\n'
showard35162b02009-03-03 02:17:30 +0000657 if paired_with_process:
658 drone = self._get_drone_for_process(paired_with_process)
showard42d44982009-10-12 20:34:03 +0000659 on_results_repository = False
showard170873e2009-01-07 00:22:26 +0000660 else:
661 drone = self._results_drone
showard42d44982009-10-12 20:34:03 +0000662 on_results_repository = True
663 full_path = self.absolute_path(
664 file_path, on_results_repository=on_results_repository)
showard170873e2009-01-07 00:22:26 +0000665 drone.queue_call('write_to_file', full_path, file_contents)
jamesrenc44ae992010-02-19 00:12:54 +0000666
667
668_the_instance = None
669
670def instance():
671 if _the_instance is None:
672 _set_instance(DroneManager())
673 return _the_instance
674
675
676def _set_instance(instance): # usable for testing
677 global _the_instance
678 _the_instance = instance