blob: 826a62c580c871d8615f9b5a3734425c02f15a98 [file] [log] [blame]
showard170873e2009-01-07 00:22:26 +00001import os, re, shutil, signal, subprocess, errno, time, heapq, traceback
2import common
3from autotest_lib.client.common_lib import error
4from autotest_lib.scheduler import email_manager, drone_utility, drones
5
6_AUTOSERV_PID_FILE = '.autoserv_execute'
7
8
9class DroneManagerError(Exception):
10 pass
11
12
13class CustomEquals(object):
14 def _id(self):
15 raise NotImplementedError
16
17
18 def __eq__(self, other):
19 if not isinstance(other, type(self)):
20 return NotImplemented
21 return self._id() == other._id()
22
23
24 def __ne__(self, other):
25 return not self == other
26
27
28 def __hash__(self):
29 return hash(self._id())
30
31
32class Process(CustomEquals):
33 def __init__(self, hostname, pid, ppid=None):
34 self.hostname = hostname
35 self.pid = pid
36 self.ppid = ppid
37
38 def _id(self):
39 return (self.hostname, self.pid)
40
41
42 def __str__(self):
43 return '%s/%s' % (self.hostname, self.pid)
44
45
46 def __repr__(self):
47 return super(Process, self).__repr__() + '<%s>' % self
48
49
50class PidfileId(CustomEquals):
51 def __init__(self, path):
52 self.path = path
53
54
55 def _id(self):
56 return self.path
57
58
59 def __str__(self):
60 return str(self.path)
61
62
63class PidfileContents(object):
64 process = None
65 exit_status = None
66 num_tests_failed = None
67
68 def is_invalid(self):
69 return False
70
71
72class InvalidPidfile(object):
73 def __init__(self, error):
74 self.error = error
75
76
77 def is_invalid(self):
78 return True
79
80
81 def __str__(self):
82 return self.error
83
84
85class DroneManager(object):
86 """
87 This class acts as an interface from the scheduler to drones, whether it be
88 only a single "drone" for localhost or multiple remote drones.
89
90 All paths going into and out of this class are relative to the full results
91 directory, except for those returns by absolute_path().
92 """
93 _MAX_PIDFILE_AGE = 1000
94 _NULL_HOSTNAME = 'null host'
95
96 def __init__(self):
97 self._results_dir = None
98 self._processes = {}
99 self._process_set = set()
100 self._pidfiles = {}
101 self._pidfiles_second_read = {}
102 self._pidfile_age = {}
103 self._temporary_path_counter = 0
104 self._drones = {}
105 self._results_drone = None
106 self._attached_files = {}
107 self._drone_queue = []
108 self._null_drone = drones.NullDrone()
109
110
111 def initialize(self, base_results_dir, drone_hostnames,
112 results_repository_hostname):
113 self._results_dir = base_results_dir
114 drones.set_temporary_directory(os.path.join(
115 base_results_dir, drone_utility._TEMPORARY_DIRECTORY))
116
117 for hostname in drone_hostnames:
118 try:
119 drone = self._add_drone(hostname)
120 drone.call('initialize', base_results_dir)
121 except error.AutoservError:
122 warning = 'Drone %s failed to initialize:\n%s' % (
123 hostname, traceback.format_exc())
124 print warning
125 email_manager.manager.enqueue_notify_email(
126 'Drone failed to initialize', warning)
127 self._remove_drone(hostname)
128
129 if not self._drones:
130 # all drones failed to initialize
131 raise DroneManagerError('No valid drones found')
132
133 print 'Using results repository on', results_repository_hostname
134 self._results_drone = drones.get_drone(results_repository_hostname)
135 # don't initialize() the results drone - we don't want to clear out any
136 # directories and we don't need ot kill any processes
137
138
139 def reinitialize_drones(self):
140 self._call_all_drones('initialize', self._results_dir)
141
142
143 def shutdown(self):
144 for drone in self._drones.itervalues():
145 drone.shutdown()
146
147
148 def _add_drone(self, hostname):
149 print 'Adding drone', hostname
150 drone = drones.get_drone(hostname)
151 self._drones[drone.hostname] = drone
152 return drone
153
154
155 def _remove_drone(self, hostname):
156 self._drones.pop(hostname, None)
157
158
159 def _get_drone_for_process(self, process):
160 if process.hostname == self._NULL_HOSTNAME:
161 return self._null_drone
162 return self._drones[process.hostname]
163
164
165 def _get_drone_for_pidfile_id(self, pidfile_id):
166 pidfile_contents = self.get_pidfile_contents(pidfile_id)
167 assert pidfile_contents.process is not None
168 return self._get_drone_for_process(pidfile_contents.process)
169
170
171 def _drop_old_pidfiles(self):
172 for pidfile_id, age in self._pidfile_age.items():
173 if age > self._MAX_PIDFILE_AGE:
174 del self._pidfile_age[pidfile_id]
175 else:
176 self._pidfile_age[pidfile_id] += 1
177
178
179 def _reset(self):
180 self._processes = {}
181 self._process_set = set()
182 self._pidfiles = {}
183 self._pidfiles_second_read = {}
184 self._drone_queue = []
185
186
187 def _call_all_drones(self, method, *args, **kwargs):
188 all_results = {}
189 for drone in self._drones.itervalues():
190 all_results[drone] = drone.call(method, *args, **kwargs)
191 return all_results
192
193
194 def _parse_pidfile(self, drone, raw_contents):
195 contents = PidfileContents()
196 if not raw_contents:
197 return contents
198 lines = raw_contents.splitlines()
199 if len(lines) > 3:
200 return InvalidPidfile('Corrupt pid file (%d lines):\n%s' %
201 (len(lines), lines))
202 try:
203 pid = int(lines[0])
204 contents.process = Process(drone.hostname, pid)
205 # if len(lines) == 2, assume we caught Autoserv between writing
206 # exit_status and num_failed_tests, so just ignore it and wait for
207 # the next cycle
208 if len(lines) == 3:
209 contents.exit_status = int(lines[1])
210 contents.num_tests_failed = int(lines[2])
211 except ValueError, exc:
212 return InvalidPidfile('Corrupt pid file: ' + str(exc.args))
213
214 return contents
215
216
217 def _process_pidfiles(self, drone, pidfiles, store_in_dict):
218 for pidfile_path, contents in pidfiles.iteritems():
219 pidfile_id = PidfileId(pidfile_path)
220 contents = self._parse_pidfile(drone, contents)
221 store_in_dict[pidfile_id] = contents
222
223
224 def refresh(self):
225 """
226 Called at the beginning of a scheduler cycle to refresh all process
227 information.
228 """
229 self._reset()
230 pidfile_paths = [pidfile_id.path for pidfile_id in self._pidfile_age]
231 all_results = self._call_all_drones('refresh', pidfile_paths)
232
233 for drone, results_list in all_results.iteritems():
234 results = results_list[0]
235 process_count = len(results['processes'])
236 heapq.heappush(self._drone_queue, (process_count, drone))
237 for process_info in results['processes']:
238 # only root autoserv processes have pgid == pid
239 if process_info['pgid'] != process_info['pid']:
240 continue
241 process = Process(drone.hostname, int(process_info['pid']),
242 int(process_info['ppid']))
243 execution_tag = self._execution_tag_for_process(drone,
244 process_info)
245 self._processes[execution_tag] = process
246 self._process_set.add(process)
247
248 self._process_pidfiles(drone, results['pidfiles'], self._pidfiles)
249 self._process_pidfiles(drone, results['pidfiles_second_read'],
250 self._pidfiles_second_read)
251
252
253 def _execution_tag_for_process(self, drone, process_info):
254 execution_tag = self._extract_execution_tag(process_info['args'])
255 if not execution_tag:
256 # this process has no execution tag - just make up something unique
257 return '%s.%s' % (drone, process_info['pid'])
258 return execution_tag
259
260
261 def _extract_execution_tag(self, command):
262 match = re.match(r'.* -P (\S+) ', command)
263 if not match:
264 return None
265 return match.group(1)
266
267
268 def execute_actions(self):
269 """
270 Called at the end of a scheduler cycle to execute all queued actions
271 on drones.
272 """
273 for drone in self._drones.values():
274 drone.execute_queued_calls()
275
276 try:
277 self._results_drone.execute_queued_calls()
278 except error.AutoservError:
279 warning = ('Results repository failed to execute calls:\n' +
280 traceback.format_exc())
281 print warning
282 email_manager.manager.enqueue_notify_email(
283 'Results repository error', warning)
284 self._results_drone.clear_call_queue()
285
286
287 def get_orphaned_autoserv_processes(self):
288 """
289 Returns a dict mapping execution tags to AutoservProcess objects for
290 orphaned processes only.
291 """
292 return dict((execution_tag, process)
293 for execution_tag, process in self._processes.iteritems()
294 if process.ppid == 1)
295
296
297 def get_process_for(self, execution_tag):
298 """
299 Return the process object for the given execution tag.
300 """
301 return self._processes.get(execution_tag, None)
302
303
304 def get_dummy_process(self):
305 """
306 Return a null process object.
307 """
308 return Process(self._NULL_HOSTNAME, 0)
309
310
311 def kill_process(self, process):
312 """
313 Kill the given process.
314 """
315 print 'killing', process
316 drone = self._get_drone_for_process(process)
317 drone.queue_call('kill_process', process)
318
319
320 def _ensure_directory_exists(self, path):
321 if not os.path.exists(path):
322 os.makedirs(path)
323
324
325 def _extract_num_processes(self, command):
326 try:
327 machine_list_index = command.index('-m') + 1
328 except ValueError:
329 return 1
330 assert machine_list_index < len(command)
331 machine_list = command[machine_list_index].split(',')
332 return len(machine_list)
333
334
335 def _choose_drone_for_execution(self, num_processes):
336 processes, drone_to_use = heapq.heappop(self._drone_queue)
337 heapq.heappush(self._drone_queue,
338 (processes + num_processes, drone_to_use))
339 return drone_to_use
340
341
342 def execute_command(self, command, working_directory, log_file=None,
343 pidfile_name=None, paired_with_pidfile=None):
344 """
345 Execute the given command, taken as an argv list.
346
347 * working_directory: directory in which the pidfile will be written
348 * log_file (optional): specifies a path (in the results repository) to
349 hold command output.
350 * pidfile_name (optional): gives the name of the pidfile this process
351 will write
352 * paired_with_pidfile (optional): a PidfileId for an already-executed
353 process; the new process will execute on the same drone as the
354 previous process.
355 """
356 working_directory = self.absolute_path(working_directory)
357 if not log_file:
358 log_file = self.get_temporary_path('execute')
359 log_file = self.absolute_path(log_file)
360 if not pidfile_name:
361 pidfile_name = _AUTOSERV_PID_FILE
362
363 if paired_with_pidfile:
364 drone = self._get_drone_for_pidfile_id(paired_with_pidfile)
365 else:
366 num_processes = self._extract_num_processes(command)
367 drone = self._choose_drone_for_execution(num_processes)
368 print "command = %s" % command
369 print 'log file = %s:%s' % (drone.hostname, log_file)
370 self._write_attached_files(command, drone)
371 drone.queue_call('execute_command', command, working_directory,
372 log_file, pidfile_name)
373
374 pidfile_path = self.absolute_path(os.path.join(working_directory,
375 pidfile_name))
376 pidfile_id = PidfileId(pidfile_path)
377 self.register_pidfile(pidfile_id)
378 return pidfile_id
379
380
381 def get_pidfile_id_from(self, execution_tag):
382 path = os.path.join(self.absolute_path(execution_tag),
383 _AUTOSERV_PID_FILE)
384 return PidfileId(path)
385
386
387 def register_pidfile(self, pidfile_id):
388 """
389 Indicate that the DroneManager should look for the given pidfile when
390 refreshing.
391 """
392 self._pidfile_age[pidfile_id] = 0
393
394
395 def get_pidfile_contents(self, pidfile_id, use_second_read=False):
396 """
397 Retrieve a PidfileContents object for the given pidfile_id. If
398 use_second_read is True, use results that were read after the processes
399 were checked, instead of before.
400 """
401 self.register_pidfile(pidfile_id)
402 if use_second_read:
403 pidfile_map = self._pidfiles_second_read
404 else:
405 pidfile_map = self._pidfiles
406 return pidfile_map.get(pidfile_id, PidfileContents())
407
408
409 def is_process_running(self, process):
410 """
411 Check if the given process is in the running process list.
412 """
413 return process in self._process_set
414
415
416 def get_temporary_path(self, base_name):
417 """
418 Get a new temporary path guaranteed to be unique across all drones
419 for this scheduler execution.
420 """
421 self._temporary_path_counter += 1
422 return os.path.join(drone_utility._TEMPORARY_DIRECTORY,
423 '%s.%s' % (base_name, self._temporary_path_counter))
424
425
426 def absolute_path(self, path):
427 return os.path.join(self._results_dir, path)
428
429
430 def copy_to_results_repository(self, process, source_path,
431 destination_path=None):
432 """
433 Copy results from the given process at source_path to destination_path
434 in the results repository.
435 """
436 if destination_path is None:
437 destination_path = source_path
438 full_source = self.absolute_path(source_path)
439 full_destination = self.absolute_path(destination_path)
440 source_drone = self._get_drone_for_process(process)
441 source_drone.send_file_to(self._results_drone, full_source,
442 full_destination, can_fail=True)
443
444
445 def _write_attached_files(self, command, drone):
446 execution_tag = self._extract_execution_tag(' '.join(command))
447 attached_files = self._attached_files.pop(execution_tag, [])
448 for file_path, contents in attached_files:
449 drone.queue_call('write_to_file', self.absolute_path(file_path),
450 contents)
451
452
453 def attach_file_to_execution(self, execution_tag, file_contents,
454 file_path=None):
455 """
456 When the process for execution_tag is executed, the given file contents
457 will be placed in a file on the drone. Returns the path at which the
458 file will be placed.
459 """
460 if not file_path:
461 file_path = self.get_temporary_path('attach')
462 self._attached_files.setdefault(execution_tag, []).append(
463 (file_path, file_contents))
464 return file_path
465
466
467 def write_lines_to_file(self, file_path, lines, paired_with_pidfile=None):
468 """
469 Write the given lines (as a list of strings) to a file. If
470 paired_with_pidfile is given, the file will be written on the drone
471 running the given PidfileId. Otherwise, the file will be written to the
472 results repository.
473 """
474 full_path = os.path.join(self._results_dir, file_path)
475 file_contents = '\n'.join(lines) + '\n'
476 if paired_with_pidfile:
477 drone = self._get_drone_for_pidfile_id(paired_with_pidfile)
478 else:
479 drone = self._results_drone
480 drone.queue_call('write_to_file', full_path, file_contents)