blob: c84a033de08f2f09d667c3527fc735dde6cf58c0 [file] [log] [blame]
mbligh7c8ea992009-06-22 19:03:08 +00001#!/usr/bin/python
showard170873e2009-01-07 00:22:26 +00002
3import pickle, subprocess, os, shutil, socket, sys, time, signal, getpass
showardd791dcb2009-09-16 17:17:36 +00004import datetime, traceback, tempfile, itertools, logging
showard170873e2009-01-07 00:22:26 +00005import common
6from autotest_lib.client.common_lib import utils, global_config, error
7from autotest_lib.server import hosts, subcommand
showardd1ee1dd2009-01-07 21:33:08 +00008from autotest_lib.scheduler import email_manager, scheduler_config
showard170873e2009-01-07 00:22:26 +00009
showardd791dcb2009-09-16 17:17:36 +000010# An environment variable we add to the environment to enable us to
11# distinguish processes we started from those that were started by
12# something else during recovery. Name credit goes to showard. ;)
13DARK_MARK_ENVIRONMENT_VAR = 'AUTOTEST_SCHEDULER_DARK_MARK'
14
showard170873e2009-01-07 00:22:26 +000015_TEMPORARY_DIRECTORY = 'drone_tmp'
16_TRANSFER_FAILED_FILE = '.transfer_failed'
17
showardd791dcb2009-09-16 17:17:36 +000018
showard170873e2009-01-07 00:22:26 +000019class _MethodCall(object):
20 def __init__(self, method, args, kwargs):
21 self._method = method
22 self._args = args
23 self._kwargs = kwargs
24
25
26 def execute_on(self, drone_utility):
27 method = getattr(drone_utility, self._method)
28 return method(*self._args, **self._kwargs)
29
30
31 def __str__(self):
32 args = ', '.join(repr(arg) for arg in self._args)
33 kwargs = ', '.join('%s=%r' % (key, value) for key, value in
34 self._kwargs.iteritems())
35 full_args = ', '.join(item for item in (args, kwargs) if item)
36 return '%s(%s)' % (self._method, full_args)
37
38
39def call(method, *args, **kwargs):
40 return _MethodCall(method, args, kwargs)
41
42
43class DroneUtility(object):
44 """
45 This class executes actual OS calls on the drone machine.
46
47 All paths going into and out of this class are absolute.
48 """
showard170873e2009-01-07 00:22:26 +000049 _WARNING_DURATION = 60
50
51 def __init__(self):
showardd791dcb2009-09-16 17:17:36 +000052 # Tattoo ourselves so that all of our spawn bears our mark.
53 os.putenv(DARK_MARK_ENVIRONMENT_VAR, str(os.getpid()))
54
showard170873e2009-01-07 00:22:26 +000055 self.warnings = []
56 self._subcommands = []
57
58
59 def initialize(self, results_dir):
60 temporary_directory = os.path.join(results_dir, _TEMPORARY_DIRECTORY)
61 if os.path.exists(temporary_directory):
62 shutil.rmtree(temporary_directory)
63 self._ensure_directory_exists(temporary_directory)
jamesrenc92d90a2010-05-12 20:01:43 +000064 build_extern_cmd = os.path.join(results_dir,
65 '../utils/build_externals.py')
66 utils.run(build_extern_cmd)
showard170873e2009-01-07 00:22:26 +000067
showard170873e2009-01-07 00:22:26 +000068
69 def _warn(self, warning):
70 self.warnings.append(warning)
71
72
showardd791dcb2009-09-16 17:17:36 +000073 @staticmethod
74 def _check_pid_for_dark_mark(pid, open=open):
75 try:
76 env_file = open('/proc/%s/environ' % pid, 'rb')
77 except EnvironmentError:
78 return False
79 try:
80 env_data = env_file.read()
81 finally:
82 env_file.close()
83 return DARK_MARK_ENVIRONMENT_VAR in env_data
84
85
86 _PS_ARGS = ('pid', 'pgid', 'ppid', 'comm', 'args')
87
88
89 @classmethod
90 def _get_process_info(cls):
91 """
92 @returns A generator of dicts with cls._PS_ARGS as keys and
93 string values each representing a running process.
94 """
showard170873e2009-01-07 00:22:26 +000095 ps_proc = subprocess.Popen(
showardd791dcb2009-09-16 17:17:36 +000096 ['/bin/ps', 'x', '-o', ','.join(cls._PS_ARGS)],
showard170873e2009-01-07 00:22:26 +000097 stdout=subprocess.PIPE)
98 ps_output = ps_proc.communicate()[0]
99
100 # split each line into the columns output by ps
101 split_lines = [line.split(None, 4) for line in ps_output.splitlines()]
showardd791dcb2009-09-16 17:17:36 +0000102 return (dict(itertools.izip(cls._PS_ARGS, line_components))
103 for line_components in split_lines)
showard170873e2009-01-07 00:22:26 +0000104
showardd791dcb2009-09-16 17:17:36 +0000105
Eric Lie0493a42010-11-15 13:05:43 -0800106 def _refresh_processes(self, command_name, open=open,
107 site_check_parse=None):
showardd791dcb2009-09-16 17:17:36 +0000108 # The open argument is used for test injection.
109 check_mark = global_config.global_config.get_config_value(
110 'SCHEDULER', 'check_processes_for_dark_mark', bool, False)
showard170873e2009-01-07 00:22:26 +0000111 processes = []
showardd791dcb2009-09-16 17:17:36 +0000112 for info in self._get_process_info():
Eric Lie0493a42010-11-15 13:05:43 -0800113 is_parse = (site_check_parse and site_check_parse(info))
114 if info['comm'] == command_name or is_parse:
showardd791dcb2009-09-16 17:17:36 +0000115 if (check_mark and not
116 self._check_pid_for_dark_mark(info['pid'], open=open)):
117 self._warn('%(comm)s process pid %(pid)s has no '
118 'dark mark; ignoring.' % info)
119 continue
showard170873e2009-01-07 00:22:26 +0000120 processes.append(info)
121
122 return processes
123
124
125 def _read_pidfiles(self, pidfile_paths):
126 pidfiles = {}
127 for pidfile_path in pidfile_paths:
128 if not os.path.exists(pidfile_path):
129 continue
130 try:
131 file_object = open(pidfile_path, 'r')
132 pidfiles[pidfile_path] = file_object.read()
133 file_object.close()
134 except IOError:
135 continue
136 return pidfiles
137
138
139 def refresh(self, pidfile_paths):
showardd3dc1992009-04-22 21:01:40 +0000140 """
141 pidfile_paths should be a list of paths to check for pidfiles.
142
143 Returns a dict containing:
144 * pidfiles: dict mapping pidfile paths to file contents, for pidfiles
145 that exist.
146 * autoserv_processes: list of dicts corresponding to running autoserv
147 processes. each dict contain pid, pgid, ppid, comm, and args (see
148 "man ps" for details).
149 * parse_processes: likewise, for parse processes.
150 * pidfiles_second_read: same info as pidfiles, but gathered after the
151 processes are scanned.
152 """
Eric Lie0493a42010-11-15 13:05:43 -0800153 site_check_parse = utils.import_site_function(
154 __file__, 'autotest_lib.scheduler.site_drone_utility',
155 'check_parse', lambda x: False)
showard170873e2009-01-07 00:22:26 +0000156 results = {
157 'pidfiles' : self._read_pidfiles(pidfile_paths),
showard0205a3e2009-01-16 03:03:50 +0000158 'autoserv_processes' : self._refresh_processes('autoserv'),
Eric Lie0493a42010-11-15 13:05:43 -0800159 'parse_processes' : self._refresh_processes(
160 'parse', site_check_parse=site_check_parse),
showard170873e2009-01-07 00:22:26 +0000161 'pidfiles_second_read' : self._read_pidfiles(pidfile_paths),
162 }
163 return results
164
165
showard170873e2009-01-07 00:22:26 +0000166 def kill_process(self, process):
showard786da9a2009-10-12 20:31:20 +0000167 signal_queue = (signal.SIGCONT, signal.SIGTERM, signal.SIGKILL)
168 utils.nuke_pid(process.pid, signal_queue=signal_queue)
showard170873e2009-01-07 00:22:26 +0000169
170
showard78d4d972009-01-16 03:04:16 +0000171 def _convert_old_host_log(self, log_path):
172 """
173 For backwards compatibility only. This can safely be removed in the
174 future.
175
176 The scheduler used to create files at results/hosts/<hostname>, and
177 append all host logs to that file. Now, it creates directories at
178 results/hosts/<hostname>, and places individual timestamped log files
179 into that directory.
180
181 This can be a problem the first time the scheduler runs after upgrading.
182 To work around that, we'll look for a file at the path where the
183 directory should be, and if we find one, we'll automatically convert it
184 to a directory containing the old logfile.
185 """
186 # move the file out of the way
187 temp_dir = tempfile.mkdtemp(suffix='.convert_host_log')
188 base_name = os.path.basename(log_path)
189 temp_path = os.path.join(temp_dir, base_name)
190 os.rename(log_path, temp_path)
191
192 os.mkdir(log_path)
193
194 # and move it into the new directory
195 os.rename(temp_path, os.path.join(log_path, 'old_log'))
196 os.rmdir(temp_dir)
197
198
showard170873e2009-01-07 00:22:26 +0000199 def _ensure_directory_exists(self, path):
showard78d4d972009-01-16 03:04:16 +0000200 if os.path.isdir(path):
201 return
202
203 if os.path.exists(path):
204 # path exists already, but as a file, not a directory
205 if '/hosts/' in path:
206 self._convert_old_host_log(path)
207 return
208 else:
209 raise IOError('Path %s exists as a file, not a directory')
210
211 os.makedirs(path)
showard170873e2009-01-07 00:22:26 +0000212
213
214 def execute_command(self, command, working_directory, log_file,
215 pidfile_name):
216 out_file = None
217 if log_file:
218 self._ensure_directory_exists(os.path.dirname(log_file))
219 try:
220 out_file = open(log_file, 'a')
221 separator = ('*' * 80) + '\n'
222 out_file.write('\n' + separator)
223 out_file.write("%s> %s\n" % (time.strftime("%X %x"), command))
224 out_file.write(separator)
225 except (OSError, IOError):
226 email_manager.manager.log_stacktrace(
227 'Error opening log file %s' % log_file)
228
229 if not out_file:
230 out_file = open('/dev/null', 'w')
231
232 in_devnull = open('/dev/null', 'r')
233
234 self._ensure_directory_exists(working_directory)
235 pidfile_path = os.path.join(working_directory, pidfile_name)
236 if os.path.exists(pidfile_path):
237 self._warn('Pidfile %s already exists' % pidfile_path)
238 os.remove(pidfile_path)
239
240 subprocess.Popen(command, stdout=out_file, stderr=subprocess.STDOUT,
241 stdin=in_devnull)
242 out_file.close()
243 in_devnull.close()
244
245
246 def write_to_file(self, file_path, contents):
247 self._ensure_directory_exists(os.path.dirname(file_path))
248 try:
249 file_object = open(file_path, 'a')
250 file_object.write(contents)
251 file_object.close()
252 except IOError, exc:
253 self._warn('Error write to file %s: %s' % (file_path, exc))
254
255
showardde634ee2009-01-30 01:44:24 +0000256 def copy_file_or_directory(self, source_path, destination_path):
257 """
258 This interface is designed to match server.hosts.abstract_ssh.get_file
259 (and send_file). That is, if the source_path ends with a slash, the
260 contents of the directory are copied; otherwise, the directory iself is
261 copied.
262 """
showard1b0ffc32009-11-13 20:45:23 +0000263 if self._same_file(source_path, destination_path):
showard170873e2009-01-07 00:22:26 +0000264 return
265 self._ensure_directory_exists(os.path.dirname(destination_path))
showardde634ee2009-01-30 01:44:24 +0000266 if source_path.endswith('/'):
267 # copying a directory's contents to another directory
268 assert os.path.isdir(source_path)
269 assert os.path.isdir(destination_path)
270 for filename in os.listdir(source_path):
271 self.copy_file_or_directory(
272 os.path.join(source_path, filename),
273 os.path.join(destination_path, filename))
274 elif os.path.isdir(source_path):
275 shutil.copytree(source_path, destination_path, symlinks=True)
276 elif os.path.islink(source_path):
277 # copied from shutil.copytree()
278 link_to = os.readlink(source_path)
279 os.symlink(link_to, destination_path)
280 else:
281 shutil.copy(source_path, destination_path)
showard170873e2009-01-07 00:22:26 +0000282
283
showard1b0ffc32009-11-13 20:45:23 +0000284 def _same_file(self, source_path, destination_path):
285 """Checks if the source and destination are the same
286
287 Returns True if the destination is the same as the source, False
288 otherwise. Also returns False if the destination does not exist.
289 """
290 if not os.path.exists(destination_path):
291 return False
292 return os.path.samefile(source_path, destination_path)
293
294
showardc408c5e2009-01-08 23:30:53 +0000295 def wait_for_all_async_commands(self):
296 for subproc in self._subcommands:
297 subproc.fork_waitfor()
298 self._subcommands = []
299
300
301 def _poll_async_commands(self):
302 still_running = []
303 for subproc in self._subcommands:
304 if subproc.poll() is None:
305 still_running.append(subproc)
306 self._subcommands = still_running
307
308
309 def _wait_for_some_async_commands(self):
310 self._poll_async_commands()
311 max_processes = scheduler_config.config.max_transfer_processes
312 while len(self._subcommands) >= max_processes:
313 time.sleep(1)
314 self._poll_async_commands()
315
316
showard170873e2009-01-07 00:22:26 +0000317 def run_async_command(self, function, args):
318 subproc = subcommand.subcommand(function, args)
319 self._subcommands.append(subproc)
320 subproc.fork_start()
321
322
showard170873e2009-01-07 00:22:26 +0000323 def _sync_get_file_from(self, hostname, source_path, destination_path):
324 self._ensure_directory_exists(os.path.dirname(destination_path))
325 host = create_host(hostname)
326 host.get_file(source_path, destination_path, delete_dest=True)
327
328
329 def get_file_from(self, hostname, source_path, destination_path):
330 self.run_async_command(self._sync_get_file_from,
331 (hostname, source_path, destination_path))
332
333
mbligh4608b002010-01-05 18:22:35 +0000334 def sync_send_file_to(self, hostname, source_path, destination_path,
showard170873e2009-01-07 00:22:26 +0000335 can_fail):
336 host = create_host(hostname)
337 try:
338 host.run('mkdir -p ' + os.path.dirname(destination_path))
339 host.send_file(source_path, destination_path, delete_dest=True)
340 except error.AutoservError:
341 if not can_fail:
342 raise
343
344 if os.path.isdir(source_path):
345 failed_file = os.path.join(source_path, _TRANSFER_FAILED_FILE)
346 file_object = open(failed_file, 'w')
347 try:
348 file_object.write('%s:%s\n%s\n%s' %
349 (hostname, destination_path,
350 datetime.datetime.now(),
351 traceback.format_exc()))
352 finally:
353 file_object.close()
354 else:
355 copy_to = destination_path + _TRANSFER_FAILED_FILE
356 self._ensure_directory_exists(os.path.dirname(copy_to))
showardde634ee2009-01-30 01:44:24 +0000357 self.copy_file_or_directory(source_path, copy_to)
showard170873e2009-01-07 00:22:26 +0000358
359
360 def send_file_to(self, hostname, source_path, destination_path,
361 can_fail=False):
mbligh4608b002010-01-05 18:22:35 +0000362 self.run_async_command(self.sync_send_file_to,
showard170873e2009-01-07 00:22:26 +0000363 (hostname, source_path, destination_path,
364 can_fail))
365
366
367 def _report_long_execution(self, calls, duration):
368 call_count = {}
369 for call in calls:
370 call_count.setdefault(call._method, 0)
371 call_count[call._method] += 1
372 call_summary = '\n'.join('%d %s' % (count, method)
373 for method, count in call_count.iteritems())
374 self._warn('Execution took %f sec\n%s' % (duration, call_summary))
375
376
377 def execute_calls(self, calls):
378 results = []
379 start_time = time.time()
showardc408c5e2009-01-08 23:30:53 +0000380 max_processes = scheduler_config.config.max_transfer_processes
showard170873e2009-01-07 00:22:26 +0000381 for method_call in calls:
382 results.append(method_call.execute_on(self))
showardd1ee1dd2009-01-07 21:33:08 +0000383 if len(self._subcommands) >= max_processes:
showardc408c5e2009-01-08 23:30:53 +0000384 self._wait_for_some_async_commands()
385 self.wait_for_all_async_commands()
showard170873e2009-01-07 00:22:26 +0000386
387 duration = time.time() - start_time
388 if duration > self._WARNING_DURATION:
389 self._report_long_execution(calls, duration)
390
391 warnings = self.warnings
392 self.warnings = []
393 return dict(results=results, warnings=warnings)
394
395
396def create_host(hostname):
397 username = global_config.global_config.get_config_value(
398 'SCHEDULER', hostname + '_username', default=getpass.getuser())
399 return hosts.SSHHost(hostname, user=username)
400
401
402def parse_input():
403 input_chunks = []
404 chunk_of_input = sys.stdin.read()
405 while chunk_of_input:
406 input_chunks.append(chunk_of_input)
407 chunk_of_input = sys.stdin.read()
408 pickled_input = ''.join(input_chunks)
409
410 try:
411 return pickle.loads(pickled_input)
412 except Exception, exc:
413 separator = '*' * 50
414 raise ValueError('Unpickling input failed\n'
415 'Input: %r\n'
416 'Exception from pickle:\n'
417 '%s\n%s\n%s' %
418 (pickled_input, separator, traceback.format_exc(),
419 separator))
420
421
422def return_data(data):
423 print pickle.dumps(data)
424
425
426def main():
427 calls = parse_input()
428 drone_utility = DroneUtility()
429 return_value = drone_utility.execute_calls(calls)
430 return_data(return_value)
431
432
433if __name__ == '__main__':
434 main()