blob: f4624dccfdc494d11301e94c4b3fecfa9b1ca5e6 [file] [log] [blame]
mbligh7c8ea992009-06-22 19:03:08 +00001#!/usr/bin/python
showard170873e2009-01-07 00:22:26 +00002
3import pickle, subprocess, os, shutil, socket, sys, time, signal, getpass
showardd791dcb2009-09-16 17:17:36 +00004import datetime, traceback, tempfile, itertools, logging
showard170873e2009-01-07 00:22:26 +00005import common
6from autotest_lib.client.common_lib import utils, global_config, error
7from autotest_lib.server import hosts, subcommand
showardd1ee1dd2009-01-07 21:33:08 +00008from autotest_lib.scheduler import email_manager, scheduler_config
showard170873e2009-01-07 00:22:26 +00009
showardd791dcb2009-09-16 17:17:36 +000010# An environment variable we add to the environment to enable us to
11# distinguish processes we started from those that were started by
12# something else during recovery. Name credit goes to showard. ;)
13DARK_MARK_ENVIRONMENT_VAR = 'AUTOTEST_SCHEDULER_DARK_MARK'
14
showard170873e2009-01-07 00:22:26 +000015_TEMPORARY_DIRECTORY = 'drone_tmp'
16_TRANSFER_FAILED_FILE = '.transfer_failed'
17
showardd791dcb2009-09-16 17:17:36 +000018
showard170873e2009-01-07 00:22:26 +000019class _MethodCall(object):
20 def __init__(self, method, args, kwargs):
21 self._method = method
22 self._args = args
23 self._kwargs = kwargs
24
25
26 def execute_on(self, drone_utility):
27 method = getattr(drone_utility, self._method)
28 return method(*self._args, **self._kwargs)
29
30
31 def __str__(self):
32 args = ', '.join(repr(arg) for arg in self._args)
33 kwargs = ', '.join('%s=%r' % (key, value) for key, value in
34 self._kwargs.iteritems())
35 full_args = ', '.join(item for item in (args, kwargs) if item)
36 return '%s(%s)' % (self._method, full_args)
37
38
39def call(method, *args, **kwargs):
40 return _MethodCall(method, args, kwargs)
41
42
Simran Basiaf9b8e72012-10-12 15:02:36 -070043class BaseDroneUtility(object):
showard170873e2009-01-07 00:22:26 +000044 """
45 This class executes actual OS calls on the drone machine.
46
47 All paths going into and out of this class are absolute.
48 """
Scott Zawalskic93bc5f2012-07-14 16:58:58 -040049 _WARNING_DURATION = 400
showard170873e2009-01-07 00:22:26 +000050
51 def __init__(self):
showardd791dcb2009-09-16 17:17:36 +000052 # Tattoo ourselves so that all of our spawn bears our mark.
53 os.putenv(DARK_MARK_ENVIRONMENT_VAR, str(os.getpid()))
54
showard170873e2009-01-07 00:22:26 +000055 self.warnings = []
56 self._subcommands = []
57
58
59 def initialize(self, results_dir):
60 temporary_directory = os.path.join(results_dir, _TEMPORARY_DIRECTORY)
61 if os.path.exists(temporary_directory):
62 shutil.rmtree(temporary_directory)
63 self._ensure_directory_exists(temporary_directory)
Chris Sosaa33ee482012-09-25 17:58:13 -070064 build_extern_cmd = os.path.join(common.autotest_dir,
65 'utils/build_externals.py')
jamesrenc92d90a2010-05-12 20:01:43 +000066 utils.run(build_extern_cmd)
showard170873e2009-01-07 00:22:26 +000067
showard170873e2009-01-07 00:22:26 +000068
69 def _warn(self, warning):
70 self.warnings.append(warning)
71
72
showardd791dcb2009-09-16 17:17:36 +000073 @staticmethod
74 def _check_pid_for_dark_mark(pid, open=open):
75 try:
76 env_file = open('/proc/%s/environ' % pid, 'rb')
77 except EnvironmentError:
78 return False
79 try:
80 env_data = env_file.read()
81 finally:
82 env_file.close()
83 return DARK_MARK_ENVIRONMENT_VAR in env_data
84
85
86 _PS_ARGS = ('pid', 'pgid', 'ppid', 'comm', 'args')
87
88
89 @classmethod
90 def _get_process_info(cls):
91 """
92 @returns A generator of dicts with cls._PS_ARGS as keys and
93 string values each representing a running process.
94 """
showard170873e2009-01-07 00:22:26 +000095 ps_proc = subprocess.Popen(
showardd791dcb2009-09-16 17:17:36 +000096 ['/bin/ps', 'x', '-o', ','.join(cls._PS_ARGS)],
showard170873e2009-01-07 00:22:26 +000097 stdout=subprocess.PIPE)
98 ps_output = ps_proc.communicate()[0]
99
100 # split each line into the columns output by ps
101 split_lines = [line.split(None, 4) for line in ps_output.splitlines()]
showardd791dcb2009-09-16 17:17:36 +0000102 return (dict(itertools.izip(cls._PS_ARGS, line_components))
103 for line_components in split_lines)
showard170873e2009-01-07 00:22:26 +0000104
showardd791dcb2009-09-16 17:17:36 +0000105
Eric Lie0493a42010-11-15 13:05:43 -0800106 def _refresh_processes(self, command_name, open=open,
107 site_check_parse=None):
showardd791dcb2009-09-16 17:17:36 +0000108 # The open argument is used for test injection.
109 check_mark = global_config.global_config.get_config_value(
110 'SCHEDULER', 'check_processes_for_dark_mark', bool, False)
showard170873e2009-01-07 00:22:26 +0000111 processes = []
showardd791dcb2009-09-16 17:17:36 +0000112 for info in self._get_process_info():
Eric Lie0493a42010-11-15 13:05:43 -0800113 is_parse = (site_check_parse and site_check_parse(info))
114 if info['comm'] == command_name or is_parse:
showardd791dcb2009-09-16 17:17:36 +0000115 if (check_mark and not
116 self._check_pid_for_dark_mark(info['pid'], open=open)):
117 self._warn('%(comm)s process pid %(pid)s has no '
118 'dark mark; ignoring.' % info)
119 continue
showard170873e2009-01-07 00:22:26 +0000120 processes.append(info)
121
122 return processes
123
124
125 def _read_pidfiles(self, pidfile_paths):
126 pidfiles = {}
127 for pidfile_path in pidfile_paths:
128 if not os.path.exists(pidfile_path):
129 continue
130 try:
131 file_object = open(pidfile_path, 'r')
132 pidfiles[pidfile_path] = file_object.read()
133 file_object.close()
134 except IOError:
135 continue
136 return pidfiles
137
138
139 def refresh(self, pidfile_paths):
showardd3dc1992009-04-22 21:01:40 +0000140 """
141 pidfile_paths should be a list of paths to check for pidfiles.
142
143 Returns a dict containing:
144 * pidfiles: dict mapping pidfile paths to file contents, for pidfiles
145 that exist.
146 * autoserv_processes: list of dicts corresponding to running autoserv
147 processes. each dict contain pid, pgid, ppid, comm, and args (see
148 "man ps" for details).
149 * parse_processes: likewise, for parse processes.
150 * pidfiles_second_read: same info as pidfiles, but gathered after the
151 processes are scanned.
152 """
Eric Lie0493a42010-11-15 13:05:43 -0800153 site_check_parse = utils.import_site_function(
154 __file__, 'autotest_lib.scheduler.site_drone_utility',
155 'check_parse', lambda x: False)
showard170873e2009-01-07 00:22:26 +0000156 results = {
157 'pidfiles' : self._read_pidfiles(pidfile_paths),
showard0205a3e2009-01-16 03:03:50 +0000158 'autoserv_processes' : self._refresh_processes('autoserv'),
Eric Lie0493a42010-11-15 13:05:43 -0800159 'parse_processes' : self._refresh_processes(
160 'parse', site_check_parse=site_check_parse),
showard170873e2009-01-07 00:22:26 +0000161 'pidfiles_second_read' : self._read_pidfiles(pidfile_paths),
162 }
163 return results
164
165
showard170873e2009-01-07 00:22:26 +0000166 def kill_process(self, process):
showard786da9a2009-10-12 20:31:20 +0000167 signal_queue = (signal.SIGCONT, signal.SIGTERM, signal.SIGKILL)
Simran Basi31cf2bd2012-08-14 16:51:54 -0700168 try:
169 utils.nuke_pid(process.pid, signal_queue=signal_queue)
170 except error.AutoservPidAlreadyDeadError:
171 self._warn('Tried to kill a pid:%d that did not exist.' %
172 process.pid)
173
showard170873e2009-01-07 00:22:26 +0000174
175
showard78d4d972009-01-16 03:04:16 +0000176 def _convert_old_host_log(self, log_path):
177 """
178 For backwards compatibility only. This can safely be removed in the
179 future.
180
181 The scheduler used to create files at results/hosts/<hostname>, and
182 append all host logs to that file. Now, it creates directories at
183 results/hosts/<hostname>, and places individual timestamped log files
184 into that directory.
185
186 This can be a problem the first time the scheduler runs after upgrading.
187 To work around that, we'll look for a file at the path where the
188 directory should be, and if we find one, we'll automatically convert it
189 to a directory containing the old logfile.
190 """
191 # move the file out of the way
192 temp_dir = tempfile.mkdtemp(suffix='.convert_host_log')
193 base_name = os.path.basename(log_path)
194 temp_path = os.path.join(temp_dir, base_name)
195 os.rename(log_path, temp_path)
196
197 os.mkdir(log_path)
198
199 # and move it into the new directory
200 os.rename(temp_path, os.path.join(log_path, 'old_log'))
201 os.rmdir(temp_dir)
202
203
showard170873e2009-01-07 00:22:26 +0000204 def _ensure_directory_exists(self, path):
showard78d4d972009-01-16 03:04:16 +0000205 if os.path.isdir(path):
206 return
207
208 if os.path.exists(path):
209 # path exists already, but as a file, not a directory
210 if '/hosts/' in path:
211 self._convert_old_host_log(path)
212 return
213 else:
214 raise IOError('Path %s exists as a file, not a directory')
215
216 os.makedirs(path)
showard170873e2009-01-07 00:22:26 +0000217
218
219 def execute_command(self, command, working_directory, log_file,
220 pidfile_name):
221 out_file = None
222 if log_file:
223 self._ensure_directory_exists(os.path.dirname(log_file))
224 try:
225 out_file = open(log_file, 'a')
226 separator = ('*' * 80) + '\n'
227 out_file.write('\n' + separator)
228 out_file.write("%s> %s\n" % (time.strftime("%X %x"), command))
229 out_file.write(separator)
230 except (OSError, IOError):
231 email_manager.manager.log_stacktrace(
232 'Error opening log file %s' % log_file)
233
234 if not out_file:
235 out_file = open('/dev/null', 'w')
236
237 in_devnull = open('/dev/null', 'r')
238
239 self._ensure_directory_exists(working_directory)
240 pidfile_path = os.path.join(working_directory, pidfile_name)
241 if os.path.exists(pidfile_path):
242 self._warn('Pidfile %s already exists' % pidfile_path)
243 os.remove(pidfile_path)
244
245 subprocess.Popen(command, stdout=out_file, stderr=subprocess.STDOUT,
246 stdin=in_devnull)
247 out_file.close()
248 in_devnull.close()
249
250
251 def write_to_file(self, file_path, contents):
252 self._ensure_directory_exists(os.path.dirname(file_path))
253 try:
254 file_object = open(file_path, 'a')
255 file_object.write(contents)
256 file_object.close()
257 except IOError, exc:
258 self._warn('Error write to file %s: %s' % (file_path, exc))
259
260
showardde634ee2009-01-30 01:44:24 +0000261 def copy_file_or_directory(self, source_path, destination_path):
262 """
263 This interface is designed to match server.hosts.abstract_ssh.get_file
264 (and send_file). That is, if the source_path ends with a slash, the
265 contents of the directory are copied; otherwise, the directory iself is
266 copied.
267 """
showard1b0ffc32009-11-13 20:45:23 +0000268 if self._same_file(source_path, destination_path):
showard170873e2009-01-07 00:22:26 +0000269 return
270 self._ensure_directory_exists(os.path.dirname(destination_path))
showardde634ee2009-01-30 01:44:24 +0000271 if source_path.endswith('/'):
272 # copying a directory's contents to another directory
273 assert os.path.isdir(source_path)
274 assert os.path.isdir(destination_path)
275 for filename in os.listdir(source_path):
276 self.copy_file_or_directory(
277 os.path.join(source_path, filename),
278 os.path.join(destination_path, filename))
279 elif os.path.isdir(source_path):
280 shutil.copytree(source_path, destination_path, symlinks=True)
281 elif os.path.islink(source_path):
282 # copied from shutil.copytree()
283 link_to = os.readlink(source_path)
284 os.symlink(link_to, destination_path)
285 else:
286 shutil.copy(source_path, destination_path)
showard170873e2009-01-07 00:22:26 +0000287
288
showard1b0ffc32009-11-13 20:45:23 +0000289 def _same_file(self, source_path, destination_path):
290 """Checks if the source and destination are the same
291
292 Returns True if the destination is the same as the source, False
293 otherwise. Also returns False if the destination does not exist.
294 """
295 if not os.path.exists(destination_path):
296 return False
297 return os.path.samefile(source_path, destination_path)
298
299
showardc408c5e2009-01-08 23:30:53 +0000300 def wait_for_all_async_commands(self):
301 for subproc in self._subcommands:
302 subproc.fork_waitfor()
303 self._subcommands = []
304
305
306 def _poll_async_commands(self):
307 still_running = []
308 for subproc in self._subcommands:
309 if subproc.poll() is None:
310 still_running.append(subproc)
311 self._subcommands = still_running
312
313
314 def _wait_for_some_async_commands(self):
315 self._poll_async_commands()
316 max_processes = scheduler_config.config.max_transfer_processes
317 while len(self._subcommands) >= max_processes:
318 time.sleep(1)
319 self._poll_async_commands()
320
321
showard170873e2009-01-07 00:22:26 +0000322 def run_async_command(self, function, args):
323 subproc = subcommand.subcommand(function, args)
324 self._subcommands.append(subproc)
325 subproc.fork_start()
326
327
showard170873e2009-01-07 00:22:26 +0000328 def _sync_get_file_from(self, hostname, source_path, destination_path):
329 self._ensure_directory_exists(os.path.dirname(destination_path))
330 host = create_host(hostname)
331 host.get_file(source_path, destination_path, delete_dest=True)
332
333
334 def get_file_from(self, hostname, source_path, destination_path):
335 self.run_async_command(self._sync_get_file_from,
336 (hostname, source_path, destination_path))
337
338
mbligh4608b002010-01-05 18:22:35 +0000339 def sync_send_file_to(self, hostname, source_path, destination_path,
showard170873e2009-01-07 00:22:26 +0000340 can_fail):
341 host = create_host(hostname)
342 try:
343 host.run('mkdir -p ' + os.path.dirname(destination_path))
344 host.send_file(source_path, destination_path, delete_dest=True)
345 except error.AutoservError:
346 if not can_fail:
347 raise
348
349 if os.path.isdir(source_path):
350 failed_file = os.path.join(source_path, _TRANSFER_FAILED_FILE)
351 file_object = open(failed_file, 'w')
352 try:
353 file_object.write('%s:%s\n%s\n%s' %
354 (hostname, destination_path,
355 datetime.datetime.now(),
356 traceback.format_exc()))
357 finally:
358 file_object.close()
359 else:
360 copy_to = destination_path + _TRANSFER_FAILED_FILE
361 self._ensure_directory_exists(os.path.dirname(copy_to))
showardde634ee2009-01-30 01:44:24 +0000362 self.copy_file_or_directory(source_path, copy_to)
showard170873e2009-01-07 00:22:26 +0000363
364
365 def send_file_to(self, hostname, source_path, destination_path,
366 can_fail=False):
mbligh4608b002010-01-05 18:22:35 +0000367 self.run_async_command(self.sync_send_file_to,
showard170873e2009-01-07 00:22:26 +0000368 (hostname, source_path, destination_path,
369 can_fail))
370
371
372 def _report_long_execution(self, calls, duration):
373 call_count = {}
374 for call in calls:
375 call_count.setdefault(call._method, 0)
376 call_count[call._method] += 1
377 call_summary = '\n'.join('%d %s' % (count, method)
378 for method, count in call_count.iteritems())
379 self._warn('Execution took %f sec\n%s' % (duration, call_summary))
380
381
382 def execute_calls(self, calls):
383 results = []
384 start_time = time.time()
showardc408c5e2009-01-08 23:30:53 +0000385 max_processes = scheduler_config.config.max_transfer_processes
showard170873e2009-01-07 00:22:26 +0000386 for method_call in calls:
387 results.append(method_call.execute_on(self))
showardd1ee1dd2009-01-07 21:33:08 +0000388 if len(self._subcommands) >= max_processes:
showardc408c5e2009-01-08 23:30:53 +0000389 self._wait_for_some_async_commands()
390 self.wait_for_all_async_commands()
showard170873e2009-01-07 00:22:26 +0000391
392 duration = time.time() - start_time
393 if duration > self._WARNING_DURATION:
394 self._report_long_execution(calls, duration)
395
396 warnings = self.warnings
397 self.warnings = []
398 return dict(results=results, warnings=warnings)
399
400
401def create_host(hostname):
402 username = global_config.global_config.get_config_value(
403 'SCHEDULER', hostname + '_username', default=getpass.getuser())
404 return hosts.SSHHost(hostname, user=username)
405
406
407def parse_input():
408 input_chunks = []
409 chunk_of_input = sys.stdin.read()
410 while chunk_of_input:
411 input_chunks.append(chunk_of_input)
412 chunk_of_input = sys.stdin.read()
413 pickled_input = ''.join(input_chunks)
414
415 try:
416 return pickle.loads(pickled_input)
417 except Exception, exc:
418 separator = '*' * 50
419 raise ValueError('Unpickling input failed\n'
420 'Input: %r\n'
421 'Exception from pickle:\n'
422 '%s\n%s\n%s' %
423 (pickled_input, separator, traceback.format_exc(),
424 separator))
425
426
Simran Basiaf9b8e72012-10-12 15:02:36 -0700427SiteDroneUtility = utils.import_site_class(
428 __file__, 'autotest_lib.scheduler.site_drone_utility',
429 'SiteDroneUtility', BaseDroneUtility)
430
431
432class DroneUtility(SiteDroneUtility):
433 pass
434
435
showard170873e2009-01-07 00:22:26 +0000436def return_data(data):
437 print pickle.dumps(data)
438
439
440def main():
441 calls = parse_input()
442 drone_utility = DroneUtility()
443 return_value = drone_utility.execute_calls(calls)
444 return_data(return_value)
445
446
447if __name__ == '__main__':
448 main()