blob: 2f6111782d1314ec44595007e2e42c7d0dd0e9e4 [file] [log] [blame]
showard170873e2009-01-07 00:22:26 +00001import os, re, shutil, signal, subprocess, errno, time, heapq, traceback
showardb18134f2009-03-20 20:52:18 +00002import common, logging
showardc5afc462009-01-13 00:09:39 +00003from autotest_lib.client.common_lib import error, global_config
showard170873e2009-01-07 00:22:26 +00004from autotest_lib.scheduler import email_manager, drone_utility, drones
showard324bf812009-01-20 23:23:38 +00005from autotest_lib.scheduler import scheduler_config
showard170873e2009-01-07 00:22:26 +00006
7_AUTOSERV_PID_FILE = '.autoserv_execute'
8
9
10class DroneManagerError(Exception):
11 pass
12
13
14class CustomEquals(object):
15 def _id(self):
16 raise NotImplementedError
17
18
19 def __eq__(self, other):
20 if not isinstance(other, type(self)):
21 return NotImplemented
22 return self._id() == other._id()
23
24
25 def __ne__(self, other):
26 return not self == other
27
28
29 def __hash__(self):
30 return hash(self._id())
31
32
33class Process(CustomEquals):
34 def __init__(self, hostname, pid, ppid=None):
35 self.hostname = hostname
36 self.pid = pid
37 self.ppid = ppid
38
39 def _id(self):
40 return (self.hostname, self.pid)
41
42
43 def __str__(self):
44 return '%s/%s' % (self.hostname, self.pid)
45
46
47 def __repr__(self):
48 return super(Process, self).__repr__() + '<%s>' % self
49
50
51class PidfileId(CustomEquals):
52 def __init__(self, path):
53 self.path = path
54
55
56 def _id(self):
57 return self.path
58
59
60 def __str__(self):
61 return str(self.path)
62
63
64class PidfileContents(object):
65 process = None
66 exit_status = None
67 num_tests_failed = None
68
69 def is_invalid(self):
70 return False
71
72
73class InvalidPidfile(object):
74 def __init__(self, error):
75 self.error = error
76
77
78 def is_invalid(self):
79 return True
80
81
82 def __str__(self):
83 return self.error
84
85
86class DroneManager(object):
87 """
88 This class acts as an interface from the scheduler to drones, whether it be
89 only a single "drone" for localhost or multiple remote drones.
90
91 All paths going into and out of this class are relative to the full results
92 directory, except for those returns by absolute_path().
93 """
94 _MAX_PIDFILE_AGE = 1000
showard170873e2009-01-07 00:22:26 +000095
96 def __init__(self):
97 self._results_dir = None
98 self._processes = {}
99 self._process_set = set()
100 self._pidfiles = {}
101 self._pidfiles_second_read = {}
102 self._pidfile_age = {}
103 self._temporary_path_counter = 0
104 self._drones = {}
105 self._results_drone = None
106 self._attached_files = {}
107 self._drone_queue = []
showard170873e2009-01-07 00:22:26 +0000108
109
110 def initialize(self, base_results_dir, drone_hostnames,
111 results_repository_hostname):
112 self._results_dir = base_results_dir
113 drones.set_temporary_directory(os.path.join(
114 base_results_dir, drone_utility._TEMPORARY_DIRECTORY))
115
116 for hostname in drone_hostnames:
117 try:
118 drone = self._add_drone(hostname)
119 drone.call('initialize', base_results_dir)
120 except error.AutoservError:
121 warning = 'Drone %s failed to initialize:\n%s' % (
122 hostname, traceback.format_exc())
showardb18134f2009-03-20 20:52:18 +0000123 logging.warn(warning)
showard170873e2009-01-07 00:22:26 +0000124 email_manager.manager.enqueue_notify_email(
125 'Drone failed to initialize', warning)
126 self._remove_drone(hostname)
127
128 if not self._drones:
129 # all drones failed to initialize
130 raise DroneManagerError('No valid drones found')
131
showard324bf812009-01-20 23:23:38 +0000132 self.refresh_drone_configs()
showardc5afc462009-01-13 00:09:39 +0000133
showardb18134f2009-03-20 20:52:18 +0000134 logging.info('Using results repository on %s',
135 results_repository_hostname)
showard170873e2009-01-07 00:22:26 +0000136 self._results_drone = drones.get_drone(results_repository_hostname)
137 # don't initialize() the results drone - we don't want to clear out any
138 # directories and we don't need ot kill any processes
139
140
141 def reinitialize_drones(self):
142 self._call_all_drones('initialize', self._results_dir)
143
144
145 def shutdown(self):
showard324bf812009-01-20 23:23:38 +0000146 for drone in self.get_drones():
showard170873e2009-01-07 00:22:26 +0000147 drone.shutdown()
148
149
150 def _add_drone(self, hostname):
showardb18134f2009-03-20 20:52:18 +0000151 logging.info('Adding drone %s' % hostname)
showard170873e2009-01-07 00:22:26 +0000152 drone = drones.get_drone(hostname)
153 self._drones[drone.hostname] = drone
154 return drone
155
156
157 def _remove_drone(self, hostname):
158 self._drones.pop(hostname, None)
159
160
showard324bf812009-01-20 23:23:38 +0000161 def refresh_drone_configs(self):
showardc5afc462009-01-13 00:09:39 +0000162 """
showard324bf812009-01-20 23:23:38 +0000163 Reread global config options for all drones.
showardc5afc462009-01-13 00:09:39 +0000164 """
showard324bf812009-01-20 23:23:38 +0000165 config = global_config.global_config
166 section = scheduler_config.CONFIG_SECTION
167 config.parse_config_file()
showardc5afc462009-01-13 00:09:39 +0000168 for hostname, drone in self._drones.iteritems():
showard324bf812009-01-20 23:23:38 +0000169 disabled = config.get_config_value(
170 section, '%s_disabled' % hostname, default='')
showardc5afc462009-01-13 00:09:39 +0000171 drone.enabled = not bool(disabled)
172
showard324bf812009-01-20 23:23:38 +0000173 drone.max_processes = config.get_config_value(
174 section, '%s_max_processes' % hostname, type=int,
175 default=scheduler_config.config.max_processes_per_drone)
showardc5afc462009-01-13 00:09:39 +0000176
177
showard324bf812009-01-20 23:23:38 +0000178 def get_drones(self):
179 return self._drones.itervalues()
showardc5afc462009-01-13 00:09:39 +0000180
181
showard170873e2009-01-07 00:22:26 +0000182 def _get_drone_for_process(self, process):
showard170873e2009-01-07 00:22:26 +0000183 return self._drones[process.hostname]
184
185
186 def _get_drone_for_pidfile_id(self, pidfile_id):
187 pidfile_contents = self.get_pidfile_contents(pidfile_id)
188 assert pidfile_contents.process is not None
189 return self._get_drone_for_process(pidfile_contents.process)
190
191
192 def _drop_old_pidfiles(self):
193 for pidfile_id, age in self._pidfile_age.items():
194 if age > self._MAX_PIDFILE_AGE:
195 del self._pidfile_age[pidfile_id]
196 else:
197 self._pidfile_age[pidfile_id] += 1
198
199
200 def _reset(self):
201 self._processes = {}
202 self._process_set = set()
203 self._pidfiles = {}
204 self._pidfiles_second_read = {}
205 self._drone_queue = []
206
207
208 def _call_all_drones(self, method, *args, **kwargs):
209 all_results = {}
showard324bf812009-01-20 23:23:38 +0000210 for drone in self.get_drones():
showard170873e2009-01-07 00:22:26 +0000211 all_results[drone] = drone.call(method, *args, **kwargs)
212 return all_results
213
214
215 def _parse_pidfile(self, drone, raw_contents):
216 contents = PidfileContents()
217 if not raw_contents:
218 return contents
219 lines = raw_contents.splitlines()
220 if len(lines) > 3:
221 return InvalidPidfile('Corrupt pid file (%d lines):\n%s' %
222 (len(lines), lines))
223 try:
224 pid = int(lines[0])
225 contents.process = Process(drone.hostname, pid)
226 # if len(lines) == 2, assume we caught Autoserv between writing
227 # exit_status and num_failed_tests, so just ignore it and wait for
228 # the next cycle
229 if len(lines) == 3:
230 contents.exit_status = int(lines[1])
231 contents.num_tests_failed = int(lines[2])
232 except ValueError, exc:
233 return InvalidPidfile('Corrupt pid file: ' + str(exc.args))
234
235 return contents
236
237
238 def _process_pidfiles(self, drone, pidfiles, store_in_dict):
239 for pidfile_path, contents in pidfiles.iteritems():
240 pidfile_id = PidfileId(pidfile_path)
241 contents = self._parse_pidfile(drone, contents)
242 store_in_dict[pidfile_id] = contents
243
244
showard0205a3e2009-01-16 03:03:50 +0000245 def _add_process(self, drone, process_info):
246 process = Process(drone.hostname, int(process_info['pid']),
247 int(process_info['ppid']))
248 self._process_set.add(process)
249 return process
250
251
252 def _add_autoserv_process(self, drone, process_info):
253 assert process_info['comm'] == 'autoserv'
254 # only root autoserv processes have pgid == pid
255 if process_info['pgid'] != process_info['pid']:
256 return
257 process = self._add_process(drone, process_info)
258 execution_tag = self._execution_tag_for_process(drone, process_info)
259 self._processes[execution_tag] = process
260
261
showard324bf812009-01-20 23:23:38 +0000262 def _enqueue_drone(self, drone):
263 heapq.heappush(self._drone_queue, (drone.used_capacity(), drone))
264
265
showard170873e2009-01-07 00:22:26 +0000266 def refresh(self):
267 """
268 Called at the beginning of a scheduler cycle to refresh all process
269 information.
270 """
271 self._reset()
272 pidfile_paths = [pidfile_id.path for pidfile_id in self._pidfile_age]
273 all_results = self._call_all_drones('refresh', pidfile_paths)
274
275 for drone, results_list in all_results.iteritems():
276 results = results_list[0]
showard324bf812009-01-20 23:23:38 +0000277 drone.active_processes = len(results['autoserv_processes'])
showardc5afc462009-01-13 00:09:39 +0000278 if drone.enabled:
showard324bf812009-01-20 23:23:38 +0000279 self._enqueue_drone(drone)
showard0205a3e2009-01-16 03:03:50 +0000280
281 for process_info in results['autoserv_processes']:
282 self._add_autoserv_process(drone, process_info)
283 for process_info in results['parse_processes']:
284 self._add_process(drone, process_info)
showard170873e2009-01-07 00:22:26 +0000285
286 self._process_pidfiles(drone, results['pidfiles'], self._pidfiles)
287 self._process_pidfiles(drone, results['pidfiles_second_read'],
288 self._pidfiles_second_read)
289
290
291 def _execution_tag_for_process(self, drone, process_info):
292 execution_tag = self._extract_execution_tag(process_info['args'])
293 if not execution_tag:
294 # this process has no execution tag - just make up something unique
295 return '%s.%s' % (drone, process_info['pid'])
296 return execution_tag
297
298
299 def _extract_execution_tag(self, command):
300 match = re.match(r'.* -P (\S+) ', command)
301 if not match:
302 return None
303 return match.group(1)
304
305
306 def execute_actions(self):
307 """
308 Called at the end of a scheduler cycle to execute all queued actions
309 on drones.
310 """
311 for drone in self._drones.values():
312 drone.execute_queued_calls()
313
314 try:
315 self._results_drone.execute_queued_calls()
316 except error.AutoservError:
317 warning = ('Results repository failed to execute calls:\n' +
318 traceback.format_exc())
showardb18134f2009-03-20 20:52:18 +0000319 logging.warn(warning)
showard170873e2009-01-07 00:22:26 +0000320 email_manager.manager.enqueue_notify_email(
321 'Results repository error', warning)
322 self._results_drone.clear_call_queue()
323
324
325 def get_orphaned_autoserv_processes(self):
326 """
327 Returns a dict mapping execution tags to AutoservProcess objects for
328 orphaned processes only.
329 """
330 return dict((execution_tag, process)
331 for execution_tag, process in self._processes.iteritems()
332 if process.ppid == 1)
333
334
335 def get_process_for(self, execution_tag):
336 """
337 Return the process object for the given execution tag.
338 """
339 return self._processes.get(execution_tag, None)
340
341
showard170873e2009-01-07 00:22:26 +0000342 def kill_process(self, process):
343 """
344 Kill the given process.
345 """
showardb18134f2009-03-20 20:52:18 +0000346 logging.info('killing %d' % process)
showard170873e2009-01-07 00:22:26 +0000347 drone = self._get_drone_for_process(process)
348 drone.queue_call('kill_process', process)
349
350
351 def _ensure_directory_exists(self, path):
352 if not os.path.exists(path):
353 os.makedirs(path)
354
355
356 def _extract_num_processes(self, command):
357 try:
358 machine_list_index = command.index('-m') + 1
359 except ValueError:
360 return 1
361 assert machine_list_index < len(command)
362 machine_list = command[machine_list_index].split(',')
363 return len(machine_list)
364
365
showard324bf812009-01-20 23:23:38 +0000366 def total_running_processes(self):
367 return sum(drone.active_processes for drone in self.get_drones())
368
369
370 def max_runnable_processes(self):
371 """
372 Return the maximum number of processes that can be run (in a single
373 execution) given the current load on drones.
374 """
showardde700d32009-02-25 00:12:42 +0000375 if not self._drone_queue:
376 # all drones disabled
377 return 0
showard324bf812009-01-20 23:23:38 +0000378 return max(drone.max_processes - drone.active_processes
showardde700d32009-02-25 00:12:42 +0000379 for _, drone in self._drone_queue)
showard324bf812009-01-20 23:23:38 +0000380
381
showard170873e2009-01-07 00:22:26 +0000382 def _choose_drone_for_execution(self, num_processes):
showard324bf812009-01-20 23:23:38 +0000383 # cycle through drones is order of increasing used capacity until
384 # we find one that can handle these processes
385 checked_drones = []
386 drone_to_use = None
387 while self._drone_queue:
388 used_capacity, drone = heapq.heappop(self._drone_queue)
389 checked_drones.append(drone)
390 if drone.active_processes + num_processes <= drone.max_processes:
391 drone_to_use = drone
392 break
393
394 if drone_to_use:
395 drone_to_use.active_processes += num_processes
396 else:
397 drone_summary = ','.join('%s %s/%s' % (drone.hostname,
398 drone.active_processes,
399 drone.max_processes)
showardde700d32009-02-25 00:12:42 +0000400 for drone in checked_drones)
showard324bf812009-01-20 23:23:38 +0000401 raise ValueError('No drone has capacity to handle %d processes (%s)'
402 % (num_processes, drone_summary))
403
404 # refill _drone_queue
405 for drone in checked_drones:
406 self._enqueue_drone(drone)
407
showard170873e2009-01-07 00:22:26 +0000408 return drone_to_use
409
410
411 def execute_command(self, command, working_directory, log_file=None,
412 pidfile_name=None, paired_with_pidfile=None):
413 """
414 Execute the given command, taken as an argv list.
415
416 * working_directory: directory in which the pidfile will be written
417 * log_file (optional): specifies a path (in the results repository) to
418 hold command output.
419 * pidfile_name (optional): gives the name of the pidfile this process
420 will write
421 * paired_with_pidfile (optional): a PidfileId for an already-executed
422 process; the new process will execute on the same drone as the
423 previous process.
424 """
425 working_directory = self.absolute_path(working_directory)
426 if not log_file:
427 log_file = self.get_temporary_path('execute')
428 log_file = self.absolute_path(log_file)
429 if not pidfile_name:
430 pidfile_name = _AUTOSERV_PID_FILE
431
432 if paired_with_pidfile:
433 drone = self._get_drone_for_pidfile_id(paired_with_pidfile)
434 else:
435 num_processes = self._extract_num_processes(command)
436 drone = self._choose_drone_for_execution(num_processes)
showardb18134f2009-03-20 20:52:18 +0000437 logging.info("command = %s" % command)
438 logging.info('log file = %s:%s' % (drone.hostname, log_file))
showard170873e2009-01-07 00:22:26 +0000439 self._write_attached_files(command, drone)
440 drone.queue_call('execute_command', command, working_directory,
441 log_file, pidfile_name)
442
443 pidfile_path = self.absolute_path(os.path.join(working_directory,
444 pidfile_name))
445 pidfile_id = PidfileId(pidfile_path)
446 self.register_pidfile(pidfile_id)
447 return pidfile_id
448
449
450 def get_pidfile_id_from(self, execution_tag):
451 path = os.path.join(self.absolute_path(execution_tag),
452 _AUTOSERV_PID_FILE)
453 return PidfileId(path)
454
455
456 def register_pidfile(self, pidfile_id):
457 """
458 Indicate that the DroneManager should look for the given pidfile when
459 refreshing.
460 """
461 self._pidfile_age[pidfile_id] = 0
462
463
464 def get_pidfile_contents(self, pidfile_id, use_second_read=False):
465 """
466 Retrieve a PidfileContents object for the given pidfile_id. If
467 use_second_read is True, use results that were read after the processes
468 were checked, instead of before.
469 """
470 self.register_pidfile(pidfile_id)
471 if use_second_read:
472 pidfile_map = self._pidfiles_second_read
473 else:
474 pidfile_map = self._pidfiles
475 return pidfile_map.get(pidfile_id, PidfileContents())
476
477
478 def is_process_running(self, process):
479 """
480 Check if the given process is in the running process list.
481 """
482 return process in self._process_set
483
484
485 def get_temporary_path(self, base_name):
486 """
487 Get a new temporary path guaranteed to be unique across all drones
488 for this scheduler execution.
489 """
490 self._temporary_path_counter += 1
491 return os.path.join(drone_utility._TEMPORARY_DIRECTORY,
492 '%s.%s' % (base_name, self._temporary_path_counter))
493
494
495 def absolute_path(self, path):
496 return os.path.join(self._results_dir, path)
497
498
showard678df4f2009-02-04 21:36:39 +0000499 def _copy_results_helper(self, process, source_path, destination_path,
500 to_results_repository=False):
501 full_source = self.absolute_path(source_path)
502 full_destination = self.absolute_path(destination_path)
503 source_drone = self._get_drone_for_process(process)
504 if to_results_repository:
505 source_drone.send_file_to(self._results_drone, full_source,
506 full_destination, can_fail=True)
507 else:
508 source_drone.queue_call('copy_file_or_directory', full_source,
509 full_destination)
510
511
showard170873e2009-01-07 00:22:26 +0000512 def copy_to_results_repository(self, process, source_path,
513 destination_path=None):
514 """
515 Copy results from the given process at source_path to destination_path
516 in the results repository.
517 """
518 if destination_path is None:
519 destination_path = source_path
showard678df4f2009-02-04 21:36:39 +0000520 self._copy_results_helper(process, source_path, destination_path,
521 to_results_repository=True)
522
523
524 def copy_results_on_drone(self, process, source_path, destination_path):
525 """
526 Copy a results directory from one place to another on the drone.
527 """
528 self._copy_results_helper(process, source_path, destination_path)
showard170873e2009-01-07 00:22:26 +0000529
530
531 def _write_attached_files(self, command, drone):
532 execution_tag = self._extract_execution_tag(' '.join(command))
showard73ec0442009-02-07 02:05:20 +0000533 attached_files = self._attached_files.pop(execution_tag, {})
534 for file_path, contents in attached_files.iteritems():
showard170873e2009-01-07 00:22:26 +0000535 drone.queue_call('write_to_file', self.absolute_path(file_path),
536 contents)
537
538
539 def attach_file_to_execution(self, execution_tag, file_contents,
540 file_path=None):
541 """
542 When the process for execution_tag is executed, the given file contents
543 will be placed in a file on the drone. Returns the path at which the
544 file will be placed.
545 """
546 if not file_path:
547 file_path = self.get_temporary_path('attach')
showard73ec0442009-02-07 02:05:20 +0000548 files_for_execution = self._attached_files.setdefault(execution_tag, {})
549 assert file_path not in files_for_execution
550 files_for_execution[file_path] = file_contents
showard170873e2009-01-07 00:22:26 +0000551 return file_path
552
553
showard35162b02009-03-03 02:17:30 +0000554 def write_lines_to_file(self, file_path, lines, paired_with_process=None):
showard170873e2009-01-07 00:22:26 +0000555 """
556 Write the given lines (as a list of strings) to a file. If
showard35162b02009-03-03 02:17:30 +0000557 paired_with_process is given, the file will be written on the drone
558 running the given Process. Otherwise, the file will be written to the
showard170873e2009-01-07 00:22:26 +0000559 results repository.
560 """
561 full_path = os.path.join(self._results_dir, file_path)
562 file_contents = '\n'.join(lines) + '\n'
showard35162b02009-03-03 02:17:30 +0000563 if paired_with_process:
564 drone = self._get_drone_for_process(paired_with_process)
showard170873e2009-01-07 00:22:26 +0000565 else:
566 drone = self._results_drone
567 drone.queue_call('write_to_file', full_path, file_contents)