blob: 55b0a6b82e884611c9b1658a2a8e6e10c18c8f9e [file] [log] [blame]
mbligh7c8ea992009-06-22 19:03:08 +00001#!/usr/bin/python
showard170873e2009-01-07 00:22:26 +00002
3import pickle, subprocess, os, shutil, socket, sys, time, signal, getpass
showardd791dcb2009-09-16 17:17:36 +00004import datetime, traceback, tempfile, itertools, logging
showard170873e2009-01-07 00:22:26 +00005import common
6from autotest_lib.client.common_lib import utils, global_config, error
7from autotest_lib.server import hosts, subcommand
showardd1ee1dd2009-01-07 21:33:08 +00008from autotest_lib.scheduler import email_manager, scheduler_config
showard170873e2009-01-07 00:22:26 +00009
showardd791dcb2009-09-16 17:17:36 +000010# An environment variable we add to the environment to enable us to
11# distinguish processes we started from those that were started by
12# something else during recovery. Name credit goes to showard. ;)
13DARK_MARK_ENVIRONMENT_VAR = 'AUTOTEST_SCHEDULER_DARK_MARK'
14
showard170873e2009-01-07 00:22:26 +000015_TEMPORARY_DIRECTORY = 'drone_tmp'
16_TRANSFER_FAILED_FILE = '.transfer_failed'
17
showardd791dcb2009-09-16 17:17:36 +000018
showard170873e2009-01-07 00:22:26 +000019class _MethodCall(object):
20 def __init__(self, method, args, kwargs):
21 self._method = method
22 self._args = args
23 self._kwargs = kwargs
24
25
26 def execute_on(self, drone_utility):
27 method = getattr(drone_utility, self._method)
28 return method(*self._args, **self._kwargs)
29
30
31 def __str__(self):
32 args = ', '.join(repr(arg) for arg in self._args)
33 kwargs = ', '.join('%s=%r' % (key, value) for key, value in
34 self._kwargs.iteritems())
35 full_args = ', '.join(item for item in (args, kwargs) if item)
36 return '%s(%s)' % (self._method, full_args)
37
38
39def call(method, *args, **kwargs):
40 return _MethodCall(method, args, kwargs)
41
42
43class DroneUtility(object):
44 """
45 This class executes actual OS calls on the drone machine.
46
47 All paths going into and out of this class are absolute.
48 """
showard170873e2009-01-07 00:22:26 +000049 _WARNING_DURATION = 60
50
51 def __init__(self):
showardd791dcb2009-09-16 17:17:36 +000052 # Tattoo ourselves so that all of our spawn bears our mark.
53 os.putenv(DARK_MARK_ENVIRONMENT_VAR, str(os.getpid()))
54
showard170873e2009-01-07 00:22:26 +000055 self.warnings = []
56 self._subcommands = []
57
58
59 def initialize(self, results_dir):
60 temporary_directory = os.path.join(results_dir, _TEMPORARY_DIRECTORY)
61 if os.path.exists(temporary_directory):
62 shutil.rmtree(temporary_directory)
63 self._ensure_directory_exists(temporary_directory)
jamesrenc92d90a2010-05-12 20:01:43 +000064 build_extern_cmd = os.path.join(results_dir,
65 '../utils/build_externals.py')
66 utils.run(build_extern_cmd)
showard170873e2009-01-07 00:22:26 +000067
showard170873e2009-01-07 00:22:26 +000068
69 def _warn(self, warning):
70 self.warnings.append(warning)
71
72
showardd791dcb2009-09-16 17:17:36 +000073 @staticmethod
74 def _check_pid_for_dark_mark(pid, open=open):
75 try:
76 env_file = open('/proc/%s/environ' % pid, 'rb')
77 except EnvironmentError:
78 return False
79 try:
80 env_data = env_file.read()
81 finally:
82 env_file.close()
83 return DARK_MARK_ENVIRONMENT_VAR in env_data
84
85
86 _PS_ARGS = ('pid', 'pgid', 'ppid', 'comm', 'args')
87
88
89 @classmethod
90 def _get_process_info(cls):
91 """
92 @returns A generator of dicts with cls._PS_ARGS as keys and
93 string values each representing a running process.
94 """
showard170873e2009-01-07 00:22:26 +000095 ps_proc = subprocess.Popen(
showardd791dcb2009-09-16 17:17:36 +000096 ['/bin/ps', 'x', '-o', ','.join(cls._PS_ARGS)],
showard170873e2009-01-07 00:22:26 +000097 stdout=subprocess.PIPE)
98 ps_output = ps_proc.communicate()[0]
99
100 # split each line into the columns output by ps
101 split_lines = [line.split(None, 4) for line in ps_output.splitlines()]
showardd791dcb2009-09-16 17:17:36 +0000102 return (dict(itertools.izip(cls._PS_ARGS, line_components))
103 for line_components in split_lines)
showard170873e2009-01-07 00:22:26 +0000104
showardd791dcb2009-09-16 17:17:36 +0000105
106 def _refresh_processes(self, command_name, open=open):
107 # The open argument is used for test injection.
108 check_mark = global_config.global_config.get_config_value(
109 'SCHEDULER', 'check_processes_for_dark_mark', bool, False)
showard170873e2009-01-07 00:22:26 +0000110 processes = []
showardd791dcb2009-09-16 17:17:36 +0000111 for info in self._get_process_info():
showard0205a3e2009-01-16 03:03:50 +0000112 if info['comm'] == command_name:
showardd791dcb2009-09-16 17:17:36 +0000113 if (check_mark and not
114 self._check_pid_for_dark_mark(info['pid'], open=open)):
115 self._warn('%(comm)s process pid %(pid)s has no '
116 'dark mark; ignoring.' % info)
117 continue
showard170873e2009-01-07 00:22:26 +0000118 processes.append(info)
119
120 return processes
121
122
123 def _read_pidfiles(self, pidfile_paths):
124 pidfiles = {}
125 for pidfile_path in pidfile_paths:
126 if not os.path.exists(pidfile_path):
127 continue
128 try:
129 file_object = open(pidfile_path, 'r')
130 pidfiles[pidfile_path] = file_object.read()
131 file_object.close()
132 except IOError:
133 continue
134 return pidfiles
135
136
137 def refresh(self, pidfile_paths):
showardd3dc1992009-04-22 21:01:40 +0000138 """
139 pidfile_paths should be a list of paths to check for pidfiles.
140
141 Returns a dict containing:
142 * pidfiles: dict mapping pidfile paths to file contents, for pidfiles
143 that exist.
144 * autoserv_processes: list of dicts corresponding to running autoserv
145 processes. each dict contain pid, pgid, ppid, comm, and args (see
146 "man ps" for details).
147 * parse_processes: likewise, for parse processes.
148 * pidfiles_second_read: same info as pidfiles, but gathered after the
149 processes are scanned.
150 """
showard170873e2009-01-07 00:22:26 +0000151 results = {
152 'pidfiles' : self._read_pidfiles(pidfile_paths),
showard0205a3e2009-01-16 03:03:50 +0000153 'autoserv_processes' : self._refresh_processes('autoserv'),
154 'parse_processes' : self._refresh_processes('parse'),
showard170873e2009-01-07 00:22:26 +0000155 'pidfiles_second_read' : self._read_pidfiles(pidfile_paths),
156 }
157 return results
158
159
showard170873e2009-01-07 00:22:26 +0000160 def kill_process(self, process):
showard786da9a2009-10-12 20:31:20 +0000161 signal_queue = (signal.SIGCONT, signal.SIGTERM, signal.SIGKILL)
162 utils.nuke_pid(process.pid, signal_queue=signal_queue)
showard170873e2009-01-07 00:22:26 +0000163
164
showard78d4d972009-01-16 03:04:16 +0000165 def _convert_old_host_log(self, log_path):
166 """
167 For backwards compatibility only. This can safely be removed in the
168 future.
169
170 The scheduler used to create files at results/hosts/<hostname>, and
171 append all host logs to that file. Now, it creates directories at
172 results/hosts/<hostname>, and places individual timestamped log files
173 into that directory.
174
175 This can be a problem the first time the scheduler runs after upgrading.
176 To work around that, we'll look for a file at the path where the
177 directory should be, and if we find one, we'll automatically convert it
178 to a directory containing the old logfile.
179 """
180 # move the file out of the way
181 temp_dir = tempfile.mkdtemp(suffix='.convert_host_log')
182 base_name = os.path.basename(log_path)
183 temp_path = os.path.join(temp_dir, base_name)
184 os.rename(log_path, temp_path)
185
186 os.mkdir(log_path)
187
188 # and move it into the new directory
189 os.rename(temp_path, os.path.join(log_path, 'old_log'))
190 os.rmdir(temp_dir)
191
192
showard170873e2009-01-07 00:22:26 +0000193 def _ensure_directory_exists(self, path):
showard78d4d972009-01-16 03:04:16 +0000194 if os.path.isdir(path):
195 return
196
197 if os.path.exists(path):
198 # path exists already, but as a file, not a directory
199 if '/hosts/' in path:
200 self._convert_old_host_log(path)
201 return
202 else:
203 raise IOError('Path %s exists as a file, not a directory')
204
205 os.makedirs(path)
showard170873e2009-01-07 00:22:26 +0000206
207
208 def execute_command(self, command, working_directory, log_file,
209 pidfile_name):
210 out_file = None
211 if log_file:
212 self._ensure_directory_exists(os.path.dirname(log_file))
213 try:
214 out_file = open(log_file, 'a')
215 separator = ('*' * 80) + '\n'
216 out_file.write('\n' + separator)
217 out_file.write("%s> %s\n" % (time.strftime("%X %x"), command))
218 out_file.write(separator)
219 except (OSError, IOError):
220 email_manager.manager.log_stacktrace(
221 'Error opening log file %s' % log_file)
222
223 if not out_file:
224 out_file = open('/dev/null', 'w')
225
226 in_devnull = open('/dev/null', 'r')
227
228 self._ensure_directory_exists(working_directory)
229 pidfile_path = os.path.join(working_directory, pidfile_name)
230 if os.path.exists(pidfile_path):
231 self._warn('Pidfile %s already exists' % pidfile_path)
232 os.remove(pidfile_path)
233
234 subprocess.Popen(command, stdout=out_file, stderr=subprocess.STDOUT,
235 stdin=in_devnull)
236 out_file.close()
237 in_devnull.close()
238
239
240 def write_to_file(self, file_path, contents):
241 self._ensure_directory_exists(os.path.dirname(file_path))
242 try:
243 file_object = open(file_path, 'a')
244 file_object.write(contents)
245 file_object.close()
246 except IOError, exc:
247 self._warn('Error write to file %s: %s' % (file_path, exc))
248
249
showardde634ee2009-01-30 01:44:24 +0000250 def copy_file_or_directory(self, source_path, destination_path):
251 """
252 This interface is designed to match server.hosts.abstract_ssh.get_file
253 (and send_file). That is, if the source_path ends with a slash, the
254 contents of the directory are copied; otherwise, the directory iself is
255 copied.
256 """
showard1b0ffc32009-11-13 20:45:23 +0000257 if self._same_file(source_path, destination_path):
showard170873e2009-01-07 00:22:26 +0000258 return
259 self._ensure_directory_exists(os.path.dirname(destination_path))
showardde634ee2009-01-30 01:44:24 +0000260 if source_path.endswith('/'):
261 # copying a directory's contents to another directory
262 assert os.path.isdir(source_path)
263 assert os.path.isdir(destination_path)
264 for filename in os.listdir(source_path):
265 self.copy_file_or_directory(
266 os.path.join(source_path, filename),
267 os.path.join(destination_path, filename))
268 elif os.path.isdir(source_path):
269 shutil.copytree(source_path, destination_path, symlinks=True)
270 elif os.path.islink(source_path):
271 # copied from shutil.copytree()
272 link_to = os.readlink(source_path)
273 os.symlink(link_to, destination_path)
274 else:
275 shutil.copy(source_path, destination_path)
showard170873e2009-01-07 00:22:26 +0000276
277
showard1b0ffc32009-11-13 20:45:23 +0000278 def _same_file(self, source_path, destination_path):
279 """Checks if the source and destination are the same
280
281 Returns True if the destination is the same as the source, False
282 otherwise. Also returns False if the destination does not exist.
283 """
284 if not os.path.exists(destination_path):
285 return False
286 return os.path.samefile(source_path, destination_path)
287
288
showardc408c5e2009-01-08 23:30:53 +0000289 def wait_for_all_async_commands(self):
290 for subproc in self._subcommands:
291 subproc.fork_waitfor()
292 self._subcommands = []
293
294
295 def _poll_async_commands(self):
296 still_running = []
297 for subproc in self._subcommands:
298 if subproc.poll() is None:
299 still_running.append(subproc)
300 self._subcommands = still_running
301
302
303 def _wait_for_some_async_commands(self):
304 self._poll_async_commands()
305 max_processes = scheduler_config.config.max_transfer_processes
306 while len(self._subcommands) >= max_processes:
307 time.sleep(1)
308 self._poll_async_commands()
309
310
showard170873e2009-01-07 00:22:26 +0000311 def run_async_command(self, function, args):
312 subproc = subcommand.subcommand(function, args)
313 self._subcommands.append(subproc)
314 subproc.fork_start()
315
316
showard170873e2009-01-07 00:22:26 +0000317 def _sync_get_file_from(self, hostname, source_path, destination_path):
318 self._ensure_directory_exists(os.path.dirname(destination_path))
319 host = create_host(hostname)
320 host.get_file(source_path, destination_path, delete_dest=True)
321
322
323 def get_file_from(self, hostname, source_path, destination_path):
324 self.run_async_command(self._sync_get_file_from,
325 (hostname, source_path, destination_path))
326
327
mbligh4608b002010-01-05 18:22:35 +0000328 def sync_send_file_to(self, hostname, source_path, destination_path,
showard170873e2009-01-07 00:22:26 +0000329 can_fail):
330 host = create_host(hostname)
331 try:
332 host.run('mkdir -p ' + os.path.dirname(destination_path))
333 host.send_file(source_path, destination_path, delete_dest=True)
334 except error.AutoservError:
335 if not can_fail:
336 raise
337
338 if os.path.isdir(source_path):
339 failed_file = os.path.join(source_path, _TRANSFER_FAILED_FILE)
340 file_object = open(failed_file, 'w')
341 try:
342 file_object.write('%s:%s\n%s\n%s' %
343 (hostname, destination_path,
344 datetime.datetime.now(),
345 traceback.format_exc()))
346 finally:
347 file_object.close()
348 else:
349 copy_to = destination_path + _TRANSFER_FAILED_FILE
350 self._ensure_directory_exists(os.path.dirname(copy_to))
showardde634ee2009-01-30 01:44:24 +0000351 self.copy_file_or_directory(source_path, copy_to)
showard170873e2009-01-07 00:22:26 +0000352
353
354 def send_file_to(self, hostname, source_path, destination_path,
355 can_fail=False):
mbligh4608b002010-01-05 18:22:35 +0000356 self.run_async_command(self.sync_send_file_to,
showard170873e2009-01-07 00:22:26 +0000357 (hostname, source_path, destination_path,
358 can_fail))
359
360
361 def _report_long_execution(self, calls, duration):
362 call_count = {}
363 for call in calls:
364 call_count.setdefault(call._method, 0)
365 call_count[call._method] += 1
366 call_summary = '\n'.join('%d %s' % (count, method)
367 for method, count in call_count.iteritems())
368 self._warn('Execution took %f sec\n%s' % (duration, call_summary))
369
370
371 def execute_calls(self, calls):
372 results = []
373 start_time = time.time()
showardc408c5e2009-01-08 23:30:53 +0000374 max_processes = scheduler_config.config.max_transfer_processes
showard170873e2009-01-07 00:22:26 +0000375 for method_call in calls:
376 results.append(method_call.execute_on(self))
showardd1ee1dd2009-01-07 21:33:08 +0000377 if len(self._subcommands) >= max_processes:
showardc408c5e2009-01-08 23:30:53 +0000378 self._wait_for_some_async_commands()
379 self.wait_for_all_async_commands()
showard170873e2009-01-07 00:22:26 +0000380
381 duration = time.time() - start_time
382 if duration > self._WARNING_DURATION:
383 self._report_long_execution(calls, duration)
384
385 warnings = self.warnings
386 self.warnings = []
387 return dict(results=results, warnings=warnings)
388
389
390def create_host(hostname):
391 username = global_config.global_config.get_config_value(
392 'SCHEDULER', hostname + '_username', default=getpass.getuser())
393 return hosts.SSHHost(hostname, user=username)
394
395
396def parse_input():
397 input_chunks = []
398 chunk_of_input = sys.stdin.read()
399 while chunk_of_input:
400 input_chunks.append(chunk_of_input)
401 chunk_of_input = sys.stdin.read()
402 pickled_input = ''.join(input_chunks)
403
404 try:
405 return pickle.loads(pickled_input)
406 except Exception, exc:
407 separator = '*' * 50
408 raise ValueError('Unpickling input failed\n'
409 'Input: %r\n'
410 'Exception from pickle:\n'
411 '%s\n%s\n%s' %
412 (pickled_input, separator, traceback.format_exc(),
413 separator))
414
415
416def return_data(data):
417 print pickle.dumps(data)
418
419
420def main():
421 calls = parse_input()
422 drone_utility = DroneUtility()
423 return_value = drone_utility.execute_calls(calls)
424 return_data(return_value)
425
426
427if __name__ == '__main__':
428 main()