blob: b2dace4debf4b05031bf48865f26e67961b6734a [file] [log] [blame]
showard170873e2009-01-07 00:22:26 +00001import os, re, shutil, signal, subprocess, errno, time, heapq, traceback
showardb18134f2009-03-20 20:52:18 +00002import common, logging
showardc5afc462009-01-13 00:09:39 +00003from autotest_lib.client.common_lib import error, global_config
showard170873e2009-01-07 00:22:26 +00004from autotest_lib.scheduler import email_manager, drone_utility, drones
showard324bf812009-01-20 23:23:38 +00005from autotest_lib.scheduler import scheduler_config
showard170873e2009-01-07 00:22:26 +00006
showard170873e2009-01-07 00:22:26 +00007
showardc75fded2009-10-14 16:20:02 +00008# results on drones will be placed under the drone_installation_directory in a
9# directory with this name
10_DRONE_RESULTS_DIR_SUFFIX = 'results'
11
showarded2afea2009-07-07 20:54:07 +000012WORKING_DIRECTORY = object() # see execute_command()
13
showard8d3dbca2009-09-25 20:29:38 +000014
jamesrenc44ae992010-02-19 00:12:54 +000015AUTOSERV_PID_FILE = '.autoserv_execute'
16CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
17PARSER_PID_FILE = '.parser_execute'
18ARCHIVER_PID_FILE = '.archiver_execute'
19
20ALL_PIDFILE_NAMES = (AUTOSERV_PID_FILE, CRASHINFO_PID_FILE, PARSER_PID_FILE,
21 ARCHIVER_PID_FILE)
22
23
showard170873e2009-01-07 00:22:26 +000024class DroneManagerError(Exception):
25 pass
26
27
28class CustomEquals(object):
29 def _id(self):
30 raise NotImplementedError
31
32
33 def __eq__(self, other):
34 if not isinstance(other, type(self)):
35 return NotImplemented
36 return self._id() == other._id()
37
38
39 def __ne__(self, other):
40 return not self == other
41
42
43 def __hash__(self):
44 return hash(self._id())
45
46
47class Process(CustomEquals):
48 def __init__(self, hostname, pid, ppid=None):
49 self.hostname = hostname
50 self.pid = pid
51 self.ppid = ppid
52
53 def _id(self):
54 return (self.hostname, self.pid)
55
56
57 def __str__(self):
58 return '%s/%s' % (self.hostname, self.pid)
59
60
61 def __repr__(self):
62 return super(Process, self).__repr__() + '<%s>' % self
63
64
65class PidfileId(CustomEquals):
66 def __init__(self, path):
67 self.path = path
68
69
70 def _id(self):
71 return self.path
72
73
74 def __str__(self):
75 return str(self.path)
76
77
showardd1195652009-12-08 22:21:02 +000078class _PidfileInfo(object):
79 age = 0
80 num_processes = None
81
82
showard170873e2009-01-07 00:22:26 +000083class PidfileContents(object):
84 process = None
85 exit_status = None
86 num_tests_failed = None
87
88 def is_invalid(self):
89 return False
90
91
showardd1195652009-12-08 22:21:02 +000092 def is_running(self):
93 return self.process and not self.exit_status
94
95
showard170873e2009-01-07 00:22:26 +000096class InvalidPidfile(object):
97 def __init__(self, error):
98 self.error = error
99
100
101 def is_invalid(self):
102 return True
103
104
showardd1195652009-12-08 22:21:02 +0000105 def is_running(self):
106 return False
107
108
showard170873e2009-01-07 00:22:26 +0000109 def __str__(self):
110 return self.error
111
112
showard418785b2009-11-23 20:19:59 +0000113class _DroneHeapWrapper(object):
114 """Wrapper to compare drones based on used_capacity().
115
116 These objects can be used to keep a heap of drones by capacity.
117 """
118 def __init__(self, drone):
119 self.drone = drone
120
121
122 def __cmp__(self, other):
123 assert isinstance(other, _DroneHeapWrapper)
124 return cmp(self.drone.used_capacity(), other.drone.used_capacity())
125
126
showard170873e2009-01-07 00:22:26 +0000127class DroneManager(object):
128 """
129 This class acts as an interface from the scheduler to drones, whether it be
130 only a single "drone" for localhost or multiple remote drones.
131
132 All paths going into and out of this class are relative to the full results
133 directory, except for those returns by absolute_path().
134 """
showard170873e2009-01-07 00:22:26 +0000135 def __init__(self):
showardd1195652009-12-08 22:21:02 +0000136 # absolute path of base results dir
showard170873e2009-01-07 00:22:26 +0000137 self._results_dir = None
showardd1195652009-12-08 22:21:02 +0000138 # holds Process objects
showard170873e2009-01-07 00:22:26 +0000139 self._process_set = set()
showardd1195652009-12-08 22:21:02 +0000140 # maps PidfileId to PidfileContents
showard170873e2009-01-07 00:22:26 +0000141 self._pidfiles = {}
showardd1195652009-12-08 22:21:02 +0000142 # same as _pidfiles
showard170873e2009-01-07 00:22:26 +0000143 self._pidfiles_second_read = {}
showardd1195652009-12-08 22:21:02 +0000144 # maps PidfileId to _PidfileInfo
145 self._registered_pidfile_info = {}
146 # used to generate unique temporary paths
showard170873e2009-01-07 00:22:26 +0000147 self._temporary_path_counter = 0
showardd1195652009-12-08 22:21:02 +0000148 # maps hostname to Drone object
showard170873e2009-01-07 00:22:26 +0000149 self._drones = {}
150 self._results_drone = None
showardd1195652009-12-08 22:21:02 +0000151 # maps results dir to dict mapping file path to contents
showard170873e2009-01-07 00:22:26 +0000152 self._attached_files = {}
showard418785b2009-11-23 20:19:59 +0000153 # heapq of _DroneHeapWrappers
showard170873e2009-01-07 00:22:26 +0000154 self._drone_queue = []
showard170873e2009-01-07 00:22:26 +0000155
156
157 def initialize(self, base_results_dir, drone_hostnames,
158 results_repository_hostname):
159 self._results_dir = base_results_dir
showard170873e2009-01-07 00:22:26 +0000160
161 for hostname in drone_hostnames:
showard4460ee82009-07-07 20:54:29 +0000162 drone = self._add_drone(hostname)
showard2aafd902009-10-14 16:20:14 +0000163 drone.call('initialize', self.absolute_path(''))
showard170873e2009-01-07 00:22:26 +0000164
165 if not self._drones:
166 # all drones failed to initialize
167 raise DroneManagerError('No valid drones found')
168
showard324bf812009-01-20 23:23:38 +0000169 self.refresh_drone_configs()
showardc5afc462009-01-13 00:09:39 +0000170
showard4460ee82009-07-07 20:54:29 +0000171 logging.info('Using results repository on %s',
showardb18134f2009-03-20 20:52:18 +0000172 results_repository_hostname)
showard170873e2009-01-07 00:22:26 +0000173 self._results_drone = drones.get_drone(results_repository_hostname)
showardac5b0002009-10-19 18:34:00 +0000174 results_installation_dir = global_config.global_config.get_config_value(
175 scheduler_config.CONFIG_SECTION,
176 'results_host_installation_directory', default=None)
177 if results_installation_dir:
178 self._results_drone.set_autotest_install_dir(
179 results_installation_dir)
showard170873e2009-01-07 00:22:26 +0000180 # don't initialize() the results drone - we don't want to clear out any
showardd1195652009-12-08 22:21:02 +0000181 # directories and we don't need to kill any processes
showard170873e2009-01-07 00:22:26 +0000182
183
184 def reinitialize_drones(self):
185 self._call_all_drones('initialize', self._results_dir)
186
187
188 def shutdown(self):
showard324bf812009-01-20 23:23:38 +0000189 for drone in self.get_drones():
showard170873e2009-01-07 00:22:26 +0000190 drone.shutdown()
191
192
showard8d3dbca2009-09-25 20:29:38 +0000193 def _get_max_pidfile_refreshes(self):
194 """
195 Normally refresh() is called on every monitor_db.Dispatcher.tick().
196
197 @returns: The number of refresh() calls before we forget a pidfile.
198 """
199 pidfile_timeout = global_config.global_config.get_config_value(
200 scheduler_config.CONFIG_SECTION, 'max_pidfile_refreshes',
201 type=int, default=2000)
202 return pidfile_timeout
203
204
showard170873e2009-01-07 00:22:26 +0000205 def _add_drone(self, hostname):
showardb18134f2009-03-20 20:52:18 +0000206 logging.info('Adding drone %s' % hostname)
showard170873e2009-01-07 00:22:26 +0000207 drone = drones.get_drone(hostname)
208 self._drones[drone.hostname] = drone
209 return drone
210
211
212 def _remove_drone(self, hostname):
213 self._drones.pop(hostname, None)
214
215
showard324bf812009-01-20 23:23:38 +0000216 def refresh_drone_configs(self):
showardc5afc462009-01-13 00:09:39 +0000217 """
showard324bf812009-01-20 23:23:38 +0000218 Reread global config options for all drones.
showardc5afc462009-01-13 00:09:39 +0000219 """
showard324bf812009-01-20 23:23:38 +0000220 config = global_config.global_config
221 section = scheduler_config.CONFIG_SECTION
222 config.parse_config_file()
showardc5afc462009-01-13 00:09:39 +0000223 for hostname, drone in self._drones.iteritems():
showard324bf812009-01-20 23:23:38 +0000224 disabled = config.get_config_value(
showard9bb960b2009-11-19 01:02:11 +0000225 section, '%s_disabled' % hostname, default='')
showardc5afc462009-01-13 00:09:39 +0000226 drone.enabled = not bool(disabled)
227
showard324bf812009-01-20 23:23:38 +0000228 drone.max_processes = config.get_config_value(
showard9bb960b2009-11-19 01:02:11 +0000229 section, '%s_max_processes' % hostname, type=int,
230 default=scheduler_config.config.max_processes_per_drone)
231
232 allowed_users = config.get_config_value(
233 section, '%s_users' % hostname, default=None)
234 if allowed_users is not None:
showard1b7142d2010-01-15 00:21:37 +0000235 allowed_users = set(allowed_users.split())
236 drone.allowed_users = allowed_users
showardc5afc462009-01-13 00:09:39 +0000237
showard418785b2009-11-23 20:19:59 +0000238 self._reorder_drone_queue() # max_processes may have changed
239
showardc5afc462009-01-13 00:09:39 +0000240
showard324bf812009-01-20 23:23:38 +0000241 def get_drones(self):
242 return self._drones.itervalues()
showardc5afc462009-01-13 00:09:39 +0000243
244
showard170873e2009-01-07 00:22:26 +0000245 def _get_drone_for_process(self, process):
showard170873e2009-01-07 00:22:26 +0000246 return self._drones[process.hostname]
247
248
249 def _get_drone_for_pidfile_id(self, pidfile_id):
250 pidfile_contents = self.get_pidfile_contents(pidfile_id)
251 assert pidfile_contents.process is not None
252 return self._get_drone_for_process(pidfile_contents.process)
253
254
255 def _drop_old_pidfiles(self):
showardd3496242009-12-10 21:41:43 +0000256 # use items() since the dict is modified in unregister_pidfile()
257 for pidfile_id, info in self._registered_pidfile_info.items():
showardd1195652009-12-08 22:21:02 +0000258 if info.age > self._get_max_pidfile_refreshes():
showardf85a0b72009-10-07 20:48:45 +0000259 logging.warning('dropping leaked pidfile %s', pidfile_id)
260 self.unregister_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000261 else:
showardd1195652009-12-08 22:21:02 +0000262 info.age += 1
showard170873e2009-01-07 00:22:26 +0000263
264
265 def _reset(self):
showard170873e2009-01-07 00:22:26 +0000266 self._process_set = set()
267 self._pidfiles = {}
268 self._pidfiles_second_read = {}
269 self._drone_queue = []
270
271
272 def _call_all_drones(self, method, *args, **kwargs):
273 all_results = {}
showard324bf812009-01-20 23:23:38 +0000274 for drone in self.get_drones():
showard170873e2009-01-07 00:22:26 +0000275 all_results[drone] = drone.call(method, *args, **kwargs)
276 return all_results
277
278
279 def _parse_pidfile(self, drone, raw_contents):
280 contents = PidfileContents()
281 if not raw_contents:
282 return contents
283 lines = raw_contents.splitlines()
284 if len(lines) > 3:
285 return InvalidPidfile('Corrupt pid file (%d lines):\n%s' %
286 (len(lines), lines))
287 try:
288 pid = int(lines[0])
289 contents.process = Process(drone.hostname, pid)
290 # if len(lines) == 2, assume we caught Autoserv between writing
291 # exit_status and num_failed_tests, so just ignore it and wait for
292 # the next cycle
293 if len(lines) == 3:
294 contents.exit_status = int(lines[1])
295 contents.num_tests_failed = int(lines[2])
296 except ValueError, exc:
297 return InvalidPidfile('Corrupt pid file: ' + str(exc.args))
298
299 return contents
300
301
302 def _process_pidfiles(self, drone, pidfiles, store_in_dict):
303 for pidfile_path, contents in pidfiles.iteritems():
304 pidfile_id = PidfileId(pidfile_path)
305 contents = self._parse_pidfile(drone, contents)
306 store_in_dict[pidfile_id] = contents
307
308
showard0205a3e2009-01-16 03:03:50 +0000309 def _add_process(self, drone, process_info):
310 process = Process(drone.hostname, int(process_info['pid']),
311 int(process_info['ppid']))
312 self._process_set.add(process)
showard0205a3e2009-01-16 03:03:50 +0000313
314
315 def _add_autoserv_process(self, drone, process_info):
316 assert process_info['comm'] == 'autoserv'
317 # only root autoserv processes have pgid == pid
318 if process_info['pgid'] != process_info['pid']:
319 return
showardd1195652009-12-08 22:21:02 +0000320 self._add_process(drone, process_info)
showard0205a3e2009-01-16 03:03:50 +0000321
322
showard324bf812009-01-20 23:23:38 +0000323 def _enqueue_drone(self, drone):
showard418785b2009-11-23 20:19:59 +0000324 heapq.heappush(self._drone_queue, _DroneHeapWrapper(drone))
325
326
327 def _reorder_drone_queue(self):
328 heapq.heapify(self._drone_queue)
showard324bf812009-01-20 23:23:38 +0000329
330
showardd1195652009-12-08 22:21:02 +0000331 def _compute_active_processes(self, drone):
332 drone.active_processes = 0
333 for pidfile_id, contents in self._pidfiles.iteritems():
334 is_running = contents.exit_status is None
335 on_this_drone = (contents.process
336 and contents.process.hostname == drone.hostname)
337 if is_running and on_this_drone:
338 info = self._registered_pidfile_info[pidfile_id]
339 if info.num_processes is not None:
340 drone.active_processes += info.num_processes
341
342
showard170873e2009-01-07 00:22:26 +0000343 def refresh(self):
344 """
345 Called at the beginning of a scheduler cycle to refresh all process
346 information.
347 """
348 self._reset()
showardbf9695d2009-07-06 20:22:24 +0000349 self._drop_old_pidfiles()
showardd1195652009-12-08 22:21:02 +0000350 pidfile_paths = [pidfile_id.path
351 for pidfile_id in self._registered_pidfile_info]
showard170873e2009-01-07 00:22:26 +0000352 all_results = self._call_all_drones('refresh', pidfile_paths)
353
354 for drone, results_list in all_results.iteritems():
355 results = results_list[0]
showard0205a3e2009-01-16 03:03:50 +0000356
357 for process_info in results['autoserv_processes']:
358 self._add_autoserv_process(drone, process_info)
359 for process_info in results['parse_processes']:
360 self._add_process(drone, process_info)
showard170873e2009-01-07 00:22:26 +0000361
362 self._process_pidfiles(drone, results['pidfiles'], self._pidfiles)
363 self._process_pidfiles(drone, results['pidfiles_second_read'],
364 self._pidfiles_second_read)
365
showardd1195652009-12-08 22:21:02 +0000366 self._compute_active_processes(drone)
367 if drone.enabled:
368 self._enqueue_drone(drone)
showard170873e2009-01-07 00:22:26 +0000369
370
371 def execute_actions(self):
372 """
373 Called at the end of a scheduler cycle to execute all queued actions
374 on drones.
375 """
376 for drone in self._drones.values():
377 drone.execute_queued_calls()
378
379 try:
mbligh1ef218d2009-08-03 16:57:56 +0000380 self._results_drone.execute_queued_calls()
showard170873e2009-01-07 00:22:26 +0000381 except error.AutoservError:
382 warning = ('Results repository failed to execute calls:\n' +
383 traceback.format_exc())
showard170873e2009-01-07 00:22:26 +0000384 email_manager.manager.enqueue_notify_email(
385 'Results repository error', warning)
386 self._results_drone.clear_call_queue()
387
388
389 def get_orphaned_autoserv_processes(self):
390 """
showardd3dc1992009-04-22 21:01:40 +0000391 Returns a set of Process objects for orphaned processes only.
showard170873e2009-01-07 00:22:26 +0000392 """
showardd3dc1992009-04-22 21:01:40 +0000393 return set(process for process in self._process_set
394 if process.ppid == 1)
showard170873e2009-01-07 00:22:26 +0000395
396
showard170873e2009-01-07 00:22:26 +0000397 def kill_process(self, process):
398 """
399 Kill the given process.
400 """
showardd3dc1992009-04-22 21:01:40 +0000401 logging.info('killing %s', process)
showard170873e2009-01-07 00:22:26 +0000402 drone = self._get_drone_for_process(process)
403 drone.queue_call('kill_process', process)
404
405
406 def _ensure_directory_exists(self, path):
407 if not os.path.exists(path):
408 os.makedirs(path)
409
410
showard324bf812009-01-20 23:23:38 +0000411 def total_running_processes(self):
412 return sum(drone.active_processes for drone in self.get_drones())
413
414
showard9bb960b2009-11-19 01:02:11 +0000415 def max_runnable_processes(self, username):
showard324bf812009-01-20 23:23:38 +0000416 """
417 Return the maximum number of processes that can be run (in a single
418 execution) given the current load on drones.
showard9bb960b2009-11-19 01:02:11 +0000419 @param username: login of user to run a process. may be None.
showard324bf812009-01-20 23:23:38 +0000420 """
showard1b7142d2010-01-15 00:21:37 +0000421 usable_drone_wrappers = [wrapper for wrapper in self._drone_queue
422 if wrapper.drone.usable_by(username)]
423 if not usable_drone_wrappers:
424 # all drones disabled or inaccessible
showardde700d32009-02-25 00:12:42 +0000425 return 0
jamesren37b50452010-03-25 20:38:56 +0000426 runnable_processes = [
427 wrapper.drone.max_processes - wrapper.drone.active_processes
428 for wrapper in usable_drone_wrappers]
429 return max([0] + runnable_processes)
showard324bf812009-01-20 23:23:38 +0000430
431
showarde39ebe92009-06-18 23:14:48 +0000432 def _least_loaded_drone(self, drones):
433 drone_to_use = drones[0]
434 for drone in drones[1:]:
435 if drone.used_capacity() < drone_to_use.used_capacity():
436 drone_to_use = drone
437 return drone_to_use
438
439
showard9bb960b2009-11-19 01:02:11 +0000440 def _choose_drone_for_execution(self, num_processes, username):
showard324bf812009-01-20 23:23:38 +0000441 # cycle through drones is order of increasing used capacity until
442 # we find one that can handle these processes
443 checked_drones = []
jamesren37b50452010-03-25 20:38:56 +0000444 usable_drones = []
showard324bf812009-01-20 23:23:38 +0000445 drone_to_use = None
446 while self._drone_queue:
showard418785b2009-11-23 20:19:59 +0000447 drone = heapq.heappop(self._drone_queue).drone
showard324bf812009-01-20 23:23:38 +0000448 checked_drones.append(drone)
showard9bb960b2009-11-19 01:02:11 +0000449 if not drone.usable_by(username):
450 continue
jamesren37b50452010-03-25 20:38:56 +0000451 usable_drones.append(drone)
showard324bf812009-01-20 23:23:38 +0000452 if drone.active_processes + num_processes <= drone.max_processes:
453 drone_to_use = drone
454 break
455
showarde39ebe92009-06-18 23:14:48 +0000456 if not drone_to_use:
showard324bf812009-01-20 23:23:38 +0000457 drone_summary = ','.join('%s %s/%s' % (drone.hostname,
458 drone.active_processes,
459 drone.max_processes)
jamesren37b50452010-03-25 20:38:56 +0000460 for drone in usable_drones)
461 logging.error('No drone has capacity to handle %d processes (%s) '
462 'for user %s', num_processes, drone_summary, username)
463 drone_to_use = self._least_loaded_drone(usable_drones)
showarde39ebe92009-06-18 23:14:48 +0000464
showard324bf812009-01-20 23:23:38 +0000465 # refill _drone_queue
466 for drone in checked_drones:
467 self._enqueue_drone(drone)
468
showard170873e2009-01-07 00:22:26 +0000469 return drone_to_use
470
471
showarded2afea2009-07-07 20:54:07 +0000472 def _substitute_working_directory_into_command(self, command,
473 working_directory):
474 for i, item in enumerate(command):
475 if item is WORKING_DIRECTORY:
476 command[i] = working_directory
477
478
showardd3dc1992009-04-22 21:01:40 +0000479 def execute_command(self, command, working_directory, pidfile_name,
showard418785b2009-11-23 20:19:59 +0000480 num_processes, log_file=None, paired_with_pidfile=None,
showard9bb960b2009-11-19 01:02:11 +0000481 username=None):
showard170873e2009-01-07 00:22:26 +0000482 """
483 Execute the given command, taken as an argv list.
484
showarded2afea2009-07-07 20:54:07 +0000485 @param command: command to execute as a list. if any item is
486 WORKING_DIRECTORY, the absolute path to the working directory
487 will be substituted for it.
488 @param working_directory: directory in which the pidfile will be written
489 @param pidfile_name: name of the pidfile this process will write
showardd1195652009-12-08 22:21:02 +0000490 @param num_processes: number of processes to account for from this
491 execution
showarded2afea2009-07-07 20:54:07 +0000492 @param log_file (optional): path (in the results repository) to hold
493 command output.
494 @param paired_with_pidfile (optional): a PidfileId for an
495 already-executed process; the new process will execute on the
496 same drone as the previous process.
showard9bb960b2009-11-19 01:02:11 +0000497 @param username (optional): login of the user responsible for this
498 process.
showard170873e2009-01-07 00:22:26 +0000499 """
showarddb502762009-09-09 15:31:20 +0000500 abs_working_directory = self.absolute_path(working_directory)
showard170873e2009-01-07 00:22:26 +0000501 if not log_file:
502 log_file = self.get_temporary_path('execute')
503 log_file = self.absolute_path(log_file)
showard170873e2009-01-07 00:22:26 +0000504
showarded2afea2009-07-07 20:54:07 +0000505 self._substitute_working_directory_into_command(command,
showarddb502762009-09-09 15:31:20 +0000506 abs_working_directory)
showarded2afea2009-07-07 20:54:07 +0000507
showard170873e2009-01-07 00:22:26 +0000508 if paired_with_pidfile:
509 drone = self._get_drone_for_pidfile_id(paired_with_pidfile)
510 else:
showard9bb960b2009-11-19 01:02:11 +0000511 drone = self._choose_drone_for_execution(num_processes, username)
showardb18134f2009-03-20 20:52:18 +0000512 logging.info("command = %s" % command)
513 logging.info('log file = %s:%s' % (drone.hostname, log_file))
showarddb502762009-09-09 15:31:20 +0000514 self._write_attached_files(working_directory, drone)
515 drone.queue_call('execute_command', command, abs_working_directory,
showard170873e2009-01-07 00:22:26 +0000516 log_file, pidfile_name)
showard418785b2009-11-23 20:19:59 +0000517 drone.active_processes += num_processes
518 self._reorder_drone_queue()
showard170873e2009-01-07 00:22:26 +0000519
showard42d44982009-10-12 20:34:03 +0000520 pidfile_path = os.path.join(abs_working_directory, pidfile_name)
showard170873e2009-01-07 00:22:26 +0000521 pidfile_id = PidfileId(pidfile_path)
522 self.register_pidfile(pidfile_id)
showardd1195652009-12-08 22:21:02 +0000523 self._registered_pidfile_info[pidfile_id].num_processes = num_processes
showard170873e2009-01-07 00:22:26 +0000524 return pidfile_id
525
526
showardd3dc1992009-04-22 21:01:40 +0000527 def get_pidfile_id_from(self, execution_tag, pidfile_name):
528 path = os.path.join(self.absolute_path(execution_tag), pidfile_name)
showard170873e2009-01-07 00:22:26 +0000529 return PidfileId(path)
530
531
532 def register_pidfile(self, pidfile_id):
533 """
534 Indicate that the DroneManager should look for the given pidfile when
535 refreshing.
536 """
showardd1195652009-12-08 22:21:02 +0000537 if pidfile_id not in self._registered_pidfile_info:
showard37399782009-08-20 23:32:20 +0000538 logging.info('monitoring pidfile %s', pidfile_id)
showardd1195652009-12-08 22:21:02 +0000539 self._registered_pidfile_info[pidfile_id] = _PidfileInfo()
showardc6fb6042010-01-25 21:48:20 +0000540 self._reset_pidfile_age(pidfile_id)
541
542
543 def _reset_pidfile_age(self, pidfile_id):
544 if pidfile_id in self._registered_pidfile_info:
545 self._registered_pidfile_info[pidfile_id].age = 0
showard170873e2009-01-07 00:22:26 +0000546
547
showardf85a0b72009-10-07 20:48:45 +0000548 def unregister_pidfile(self, pidfile_id):
showardd1195652009-12-08 22:21:02 +0000549 if pidfile_id in self._registered_pidfile_info:
showardf85a0b72009-10-07 20:48:45 +0000550 logging.info('forgetting pidfile %s', pidfile_id)
showardd1195652009-12-08 22:21:02 +0000551 del self._registered_pidfile_info[pidfile_id]
552
553
554 def declare_process_count(self, pidfile_id, num_processes):
555 self._registered_pidfile_info[pidfile_id].num_processes = num_processes
showardf85a0b72009-10-07 20:48:45 +0000556
557
showard170873e2009-01-07 00:22:26 +0000558 def get_pidfile_contents(self, pidfile_id, use_second_read=False):
559 """
560 Retrieve a PidfileContents object for the given pidfile_id. If
561 use_second_read is True, use results that were read after the processes
562 were checked, instead of before.
563 """
showardc6fb6042010-01-25 21:48:20 +0000564 self._reset_pidfile_age(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000565 if use_second_read:
566 pidfile_map = self._pidfiles_second_read
567 else:
568 pidfile_map = self._pidfiles
569 return pidfile_map.get(pidfile_id, PidfileContents())
570
571
572 def is_process_running(self, process):
573 """
574 Check if the given process is in the running process list.
575 """
576 return process in self._process_set
577
578
579 def get_temporary_path(self, base_name):
580 """
581 Get a new temporary path guaranteed to be unique across all drones
582 for this scheduler execution.
583 """
584 self._temporary_path_counter += 1
585 return os.path.join(drone_utility._TEMPORARY_DIRECTORY,
586 '%s.%s' % (base_name, self._temporary_path_counter))
587
588
showard42d44982009-10-12 20:34:03 +0000589 def absolute_path(self, path, on_results_repository=False):
590 if on_results_repository:
591 base_dir = self._results_dir
592 else:
showardc75fded2009-10-14 16:20:02 +0000593 base_dir = os.path.join(drones.AUTOTEST_INSTALL_DIR,
594 _DRONE_RESULTS_DIR_SUFFIX)
showard42d44982009-10-12 20:34:03 +0000595 return os.path.join(base_dir, path)
showard170873e2009-01-07 00:22:26 +0000596
597
showard678df4f2009-02-04 21:36:39 +0000598 def _copy_results_helper(self, process, source_path, destination_path,
599 to_results_repository=False):
600 full_source = self.absolute_path(source_path)
showard42d44982009-10-12 20:34:03 +0000601 full_destination = self.absolute_path(
602 destination_path, on_results_repository=to_results_repository)
showard678df4f2009-02-04 21:36:39 +0000603 source_drone = self._get_drone_for_process(process)
604 if to_results_repository:
605 source_drone.send_file_to(self._results_drone, full_source,
606 full_destination, can_fail=True)
607 else:
608 source_drone.queue_call('copy_file_or_directory', full_source,
609 full_destination)
610
611
showard170873e2009-01-07 00:22:26 +0000612 def copy_to_results_repository(self, process, source_path,
613 destination_path=None):
614 """
615 Copy results from the given process at source_path to destination_path
616 in the results repository.
617 """
618 if destination_path is None:
619 destination_path = source_path
showard678df4f2009-02-04 21:36:39 +0000620 self._copy_results_helper(process, source_path, destination_path,
621 to_results_repository=True)
622
623
624 def copy_results_on_drone(self, process, source_path, destination_path):
625 """
626 Copy a results directory from one place to another on the drone.
627 """
628 self._copy_results_helper(process, source_path, destination_path)
showard170873e2009-01-07 00:22:26 +0000629
630
showarddb502762009-09-09 15:31:20 +0000631 def _write_attached_files(self, results_dir, drone):
632 attached_files = self._attached_files.pop(results_dir, {})
showard73ec0442009-02-07 02:05:20 +0000633 for file_path, contents in attached_files.iteritems():
showard170873e2009-01-07 00:22:26 +0000634 drone.queue_call('write_to_file', self.absolute_path(file_path),
635 contents)
636
637
showarddb502762009-09-09 15:31:20 +0000638 def attach_file_to_execution(self, results_dir, file_contents,
showard170873e2009-01-07 00:22:26 +0000639 file_path=None):
640 """
showarddb502762009-09-09 15:31:20 +0000641 When the process for the results directory is executed, the given file
642 contents will be placed in a file on the drone. Returns the path at
643 which the file will be placed.
showard170873e2009-01-07 00:22:26 +0000644 """
645 if not file_path:
646 file_path = self.get_temporary_path('attach')
showarddb502762009-09-09 15:31:20 +0000647 files_for_execution = self._attached_files.setdefault(results_dir, {})
showard73ec0442009-02-07 02:05:20 +0000648 assert file_path not in files_for_execution
649 files_for_execution[file_path] = file_contents
showard170873e2009-01-07 00:22:26 +0000650 return file_path
651
652
showard35162b02009-03-03 02:17:30 +0000653 def write_lines_to_file(self, file_path, lines, paired_with_process=None):
showard170873e2009-01-07 00:22:26 +0000654 """
655 Write the given lines (as a list of strings) to a file. If
showard35162b02009-03-03 02:17:30 +0000656 paired_with_process is given, the file will be written on the drone
657 running the given Process. Otherwise, the file will be written to the
showard170873e2009-01-07 00:22:26 +0000658 results repository.
659 """
showard170873e2009-01-07 00:22:26 +0000660 file_contents = '\n'.join(lines) + '\n'
showard35162b02009-03-03 02:17:30 +0000661 if paired_with_process:
662 drone = self._get_drone_for_process(paired_with_process)
showard42d44982009-10-12 20:34:03 +0000663 on_results_repository = False
showard170873e2009-01-07 00:22:26 +0000664 else:
665 drone = self._results_drone
showard42d44982009-10-12 20:34:03 +0000666 on_results_repository = True
667 full_path = self.absolute_path(
668 file_path, on_results_repository=on_results_repository)
showard170873e2009-01-07 00:22:26 +0000669 drone.queue_call('write_to_file', full_path, file_contents)
jamesrenc44ae992010-02-19 00:12:54 +0000670
671
672_the_instance = None
673
674def instance():
675 if _the_instance is None:
676 _set_instance(DroneManager())
677 return _the_instance
678
679
680def _set_instance(instance): # usable for testing
681 global _the_instance
682 _the_instance = instance