mbligh | 7c8ea99 | 2009-06-22 19:03:08 +0000 | [diff] [blame] | 1 | #!/usr/bin/python |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 2 | |
| 3 | import pickle, subprocess, os, shutil, socket, sys, time, signal, getpass |
showard | d791dcb | 2009-09-16 17:17:36 +0000 | [diff] [blame] | 4 | import datetime, traceback, tempfile, itertools, logging |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 5 | import common |
| 6 | from autotest_lib.client.common_lib import utils, global_config, error |
| 7 | from autotest_lib.server import hosts, subcommand |
showard | d1ee1dd | 2009-01-07 21:33:08 +0000 | [diff] [blame] | 8 | from autotest_lib.scheduler import email_manager, scheduler_config |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 9 | |
showard | d791dcb | 2009-09-16 17:17:36 +0000 | [diff] [blame] | 10 | # An environment variable we add to the environment to enable us to |
| 11 | # distinguish processes we started from those that were started by |
| 12 | # something else during recovery. Name credit goes to showard. ;) |
| 13 | DARK_MARK_ENVIRONMENT_VAR = 'AUTOTEST_SCHEDULER_DARK_MARK' |
| 14 | |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 15 | _TEMPORARY_DIRECTORY = 'drone_tmp' |
| 16 | _TRANSFER_FAILED_FILE = '.transfer_failed' |
| 17 | |
showard | d791dcb | 2009-09-16 17:17:36 +0000 | [diff] [blame] | 18 | |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 19 | class _MethodCall(object): |
| 20 | def __init__(self, method, args, kwargs): |
| 21 | self._method = method |
| 22 | self._args = args |
| 23 | self._kwargs = kwargs |
| 24 | |
| 25 | |
| 26 | def execute_on(self, drone_utility): |
| 27 | method = getattr(drone_utility, self._method) |
| 28 | return method(*self._args, **self._kwargs) |
| 29 | |
| 30 | |
| 31 | def __str__(self): |
| 32 | args = ', '.join(repr(arg) for arg in self._args) |
| 33 | kwargs = ', '.join('%s=%r' % (key, value) for key, value in |
| 34 | self._kwargs.iteritems()) |
| 35 | full_args = ', '.join(item for item in (args, kwargs) if item) |
| 36 | return '%s(%s)' % (self._method, full_args) |
| 37 | |
| 38 | |
| 39 | def call(method, *args, **kwargs): |
| 40 | return _MethodCall(method, args, kwargs) |
| 41 | |
| 42 | |
| 43 | class DroneUtility(object): |
| 44 | """ |
| 45 | This class executes actual OS calls on the drone machine. |
| 46 | |
| 47 | All paths going into and out of this class are absolute. |
| 48 | """ |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 49 | _WARNING_DURATION = 60 |
| 50 | |
| 51 | def __init__(self): |
showard | d791dcb | 2009-09-16 17:17:36 +0000 | [diff] [blame] | 52 | # Tattoo ourselves so that all of our spawn bears our mark. |
| 53 | os.putenv(DARK_MARK_ENVIRONMENT_VAR, str(os.getpid())) |
| 54 | |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 55 | self.warnings = [] |
| 56 | self._subcommands = [] |
| 57 | |
| 58 | |
| 59 | def initialize(self, results_dir): |
| 60 | temporary_directory = os.path.join(results_dir, _TEMPORARY_DIRECTORY) |
| 61 | if os.path.exists(temporary_directory): |
| 62 | shutil.rmtree(temporary_directory) |
| 63 | self._ensure_directory_exists(temporary_directory) |
jamesren | c92d90a | 2010-05-12 20:01:43 +0000 | [diff] [blame] | 64 | build_extern_cmd = os.path.join(results_dir, |
| 65 | '../utils/build_externals.py') |
| 66 | utils.run(build_extern_cmd) |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 67 | |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 68 | |
| 69 | def _warn(self, warning): |
| 70 | self.warnings.append(warning) |
| 71 | |
| 72 | |
showard | d791dcb | 2009-09-16 17:17:36 +0000 | [diff] [blame] | 73 | @staticmethod |
| 74 | def _check_pid_for_dark_mark(pid, open=open): |
| 75 | try: |
| 76 | env_file = open('/proc/%s/environ' % pid, 'rb') |
| 77 | except EnvironmentError: |
| 78 | return False |
| 79 | try: |
| 80 | env_data = env_file.read() |
| 81 | finally: |
| 82 | env_file.close() |
| 83 | return DARK_MARK_ENVIRONMENT_VAR in env_data |
| 84 | |
| 85 | |
| 86 | _PS_ARGS = ('pid', 'pgid', 'ppid', 'comm', 'args') |
| 87 | |
| 88 | |
| 89 | @classmethod |
| 90 | def _get_process_info(cls): |
| 91 | """ |
| 92 | @returns A generator of dicts with cls._PS_ARGS as keys and |
| 93 | string values each representing a running process. |
| 94 | """ |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 95 | ps_proc = subprocess.Popen( |
showard | d791dcb | 2009-09-16 17:17:36 +0000 | [diff] [blame] | 96 | ['/bin/ps', 'x', '-o', ','.join(cls._PS_ARGS)], |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 97 | stdout=subprocess.PIPE) |
| 98 | ps_output = ps_proc.communicate()[0] |
| 99 | |
| 100 | # split each line into the columns output by ps |
| 101 | split_lines = [line.split(None, 4) for line in ps_output.splitlines()] |
showard | d791dcb | 2009-09-16 17:17:36 +0000 | [diff] [blame] | 102 | return (dict(itertools.izip(cls._PS_ARGS, line_components)) |
| 103 | for line_components in split_lines) |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 104 | |
showard | d791dcb | 2009-09-16 17:17:36 +0000 | [diff] [blame] | 105 | |
Eric Li | e0493a4 | 2010-11-15 13:05:43 -0800 | [diff] [blame] | 106 | def _refresh_processes(self, command_name, open=open, |
| 107 | site_check_parse=None): |
showard | d791dcb | 2009-09-16 17:17:36 +0000 | [diff] [blame] | 108 | # The open argument is used for test injection. |
| 109 | check_mark = global_config.global_config.get_config_value( |
| 110 | 'SCHEDULER', 'check_processes_for_dark_mark', bool, False) |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 111 | processes = [] |
showard | d791dcb | 2009-09-16 17:17:36 +0000 | [diff] [blame] | 112 | for info in self._get_process_info(): |
Eric Li | e0493a4 | 2010-11-15 13:05:43 -0800 | [diff] [blame] | 113 | is_parse = (site_check_parse and site_check_parse(info)) |
| 114 | if info['comm'] == command_name or is_parse: |
showard | d791dcb | 2009-09-16 17:17:36 +0000 | [diff] [blame] | 115 | if (check_mark and not |
| 116 | self._check_pid_for_dark_mark(info['pid'], open=open)): |
| 117 | self._warn('%(comm)s process pid %(pid)s has no ' |
| 118 | 'dark mark; ignoring.' % info) |
| 119 | continue |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 120 | processes.append(info) |
| 121 | |
| 122 | return processes |
| 123 | |
| 124 | |
| 125 | def _read_pidfiles(self, pidfile_paths): |
| 126 | pidfiles = {} |
| 127 | for pidfile_path in pidfile_paths: |
| 128 | if not os.path.exists(pidfile_path): |
| 129 | continue |
| 130 | try: |
| 131 | file_object = open(pidfile_path, 'r') |
| 132 | pidfiles[pidfile_path] = file_object.read() |
| 133 | file_object.close() |
| 134 | except IOError: |
| 135 | continue |
| 136 | return pidfiles |
| 137 | |
| 138 | |
| 139 | def refresh(self, pidfile_paths): |
showard | d3dc199 | 2009-04-22 21:01:40 +0000 | [diff] [blame] | 140 | """ |
| 141 | pidfile_paths should be a list of paths to check for pidfiles. |
| 142 | |
| 143 | Returns a dict containing: |
| 144 | * pidfiles: dict mapping pidfile paths to file contents, for pidfiles |
| 145 | that exist. |
| 146 | * autoserv_processes: list of dicts corresponding to running autoserv |
| 147 | processes. each dict contain pid, pgid, ppid, comm, and args (see |
| 148 | "man ps" for details). |
| 149 | * parse_processes: likewise, for parse processes. |
| 150 | * pidfiles_second_read: same info as pidfiles, but gathered after the |
| 151 | processes are scanned. |
| 152 | """ |
Eric Li | e0493a4 | 2010-11-15 13:05:43 -0800 | [diff] [blame] | 153 | site_check_parse = utils.import_site_function( |
| 154 | __file__, 'autotest_lib.scheduler.site_drone_utility', |
| 155 | 'check_parse', lambda x: False) |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 156 | results = { |
| 157 | 'pidfiles' : self._read_pidfiles(pidfile_paths), |
showard | 0205a3e | 2009-01-16 03:03:50 +0000 | [diff] [blame] | 158 | 'autoserv_processes' : self._refresh_processes('autoserv'), |
Eric Li | e0493a4 | 2010-11-15 13:05:43 -0800 | [diff] [blame] | 159 | 'parse_processes' : self._refresh_processes( |
| 160 | 'parse', site_check_parse=site_check_parse), |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 161 | 'pidfiles_second_read' : self._read_pidfiles(pidfile_paths), |
| 162 | } |
| 163 | return results |
| 164 | |
| 165 | |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 166 | def kill_process(self, process): |
showard | 786da9a | 2009-10-12 20:31:20 +0000 | [diff] [blame] | 167 | signal_queue = (signal.SIGCONT, signal.SIGTERM, signal.SIGKILL) |
| 168 | utils.nuke_pid(process.pid, signal_queue=signal_queue) |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 169 | |
| 170 | |
showard | 78d4d97 | 2009-01-16 03:04:16 +0000 | [diff] [blame] | 171 | def _convert_old_host_log(self, log_path): |
| 172 | """ |
| 173 | For backwards compatibility only. This can safely be removed in the |
| 174 | future. |
| 175 | |
| 176 | The scheduler used to create files at results/hosts/<hostname>, and |
| 177 | append all host logs to that file. Now, it creates directories at |
| 178 | results/hosts/<hostname>, and places individual timestamped log files |
| 179 | into that directory. |
| 180 | |
| 181 | This can be a problem the first time the scheduler runs after upgrading. |
| 182 | To work around that, we'll look for a file at the path where the |
| 183 | directory should be, and if we find one, we'll automatically convert it |
| 184 | to a directory containing the old logfile. |
| 185 | """ |
| 186 | # move the file out of the way |
| 187 | temp_dir = tempfile.mkdtemp(suffix='.convert_host_log') |
| 188 | base_name = os.path.basename(log_path) |
| 189 | temp_path = os.path.join(temp_dir, base_name) |
| 190 | os.rename(log_path, temp_path) |
| 191 | |
| 192 | os.mkdir(log_path) |
| 193 | |
| 194 | # and move it into the new directory |
| 195 | os.rename(temp_path, os.path.join(log_path, 'old_log')) |
| 196 | os.rmdir(temp_dir) |
| 197 | |
| 198 | |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 199 | def _ensure_directory_exists(self, path): |
showard | 78d4d97 | 2009-01-16 03:04:16 +0000 | [diff] [blame] | 200 | if os.path.isdir(path): |
| 201 | return |
| 202 | |
| 203 | if os.path.exists(path): |
| 204 | # path exists already, but as a file, not a directory |
| 205 | if '/hosts/' in path: |
| 206 | self._convert_old_host_log(path) |
| 207 | return |
| 208 | else: |
| 209 | raise IOError('Path %s exists as a file, not a directory') |
| 210 | |
| 211 | os.makedirs(path) |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 212 | |
| 213 | |
| 214 | def execute_command(self, command, working_directory, log_file, |
| 215 | pidfile_name): |
| 216 | out_file = None |
| 217 | if log_file: |
| 218 | self._ensure_directory_exists(os.path.dirname(log_file)) |
| 219 | try: |
| 220 | out_file = open(log_file, 'a') |
| 221 | separator = ('*' * 80) + '\n' |
| 222 | out_file.write('\n' + separator) |
| 223 | out_file.write("%s> %s\n" % (time.strftime("%X %x"), command)) |
| 224 | out_file.write(separator) |
| 225 | except (OSError, IOError): |
| 226 | email_manager.manager.log_stacktrace( |
| 227 | 'Error opening log file %s' % log_file) |
| 228 | |
| 229 | if not out_file: |
| 230 | out_file = open('/dev/null', 'w') |
| 231 | |
| 232 | in_devnull = open('/dev/null', 'r') |
| 233 | |
| 234 | self._ensure_directory_exists(working_directory) |
| 235 | pidfile_path = os.path.join(working_directory, pidfile_name) |
| 236 | if os.path.exists(pidfile_path): |
| 237 | self._warn('Pidfile %s already exists' % pidfile_path) |
| 238 | os.remove(pidfile_path) |
| 239 | |
| 240 | subprocess.Popen(command, stdout=out_file, stderr=subprocess.STDOUT, |
| 241 | stdin=in_devnull) |
| 242 | out_file.close() |
| 243 | in_devnull.close() |
| 244 | |
| 245 | |
| 246 | def write_to_file(self, file_path, contents): |
| 247 | self._ensure_directory_exists(os.path.dirname(file_path)) |
| 248 | try: |
| 249 | file_object = open(file_path, 'a') |
| 250 | file_object.write(contents) |
| 251 | file_object.close() |
| 252 | except IOError, exc: |
| 253 | self._warn('Error write to file %s: %s' % (file_path, exc)) |
| 254 | |
| 255 | |
showard | de634ee | 2009-01-30 01:44:24 +0000 | [diff] [blame] | 256 | def copy_file_or_directory(self, source_path, destination_path): |
| 257 | """ |
| 258 | This interface is designed to match server.hosts.abstract_ssh.get_file |
| 259 | (and send_file). That is, if the source_path ends with a slash, the |
| 260 | contents of the directory are copied; otherwise, the directory iself is |
| 261 | copied. |
| 262 | """ |
showard | 1b0ffc3 | 2009-11-13 20:45:23 +0000 | [diff] [blame] | 263 | if self._same_file(source_path, destination_path): |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 264 | return |
| 265 | self._ensure_directory_exists(os.path.dirname(destination_path)) |
showard | de634ee | 2009-01-30 01:44:24 +0000 | [diff] [blame] | 266 | if source_path.endswith('/'): |
| 267 | # copying a directory's contents to another directory |
| 268 | assert os.path.isdir(source_path) |
| 269 | assert os.path.isdir(destination_path) |
| 270 | for filename in os.listdir(source_path): |
| 271 | self.copy_file_or_directory( |
| 272 | os.path.join(source_path, filename), |
| 273 | os.path.join(destination_path, filename)) |
| 274 | elif os.path.isdir(source_path): |
| 275 | shutil.copytree(source_path, destination_path, symlinks=True) |
| 276 | elif os.path.islink(source_path): |
| 277 | # copied from shutil.copytree() |
| 278 | link_to = os.readlink(source_path) |
| 279 | os.symlink(link_to, destination_path) |
| 280 | else: |
| 281 | shutil.copy(source_path, destination_path) |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 282 | |
| 283 | |
showard | 1b0ffc3 | 2009-11-13 20:45:23 +0000 | [diff] [blame] | 284 | def _same_file(self, source_path, destination_path): |
| 285 | """Checks if the source and destination are the same |
| 286 | |
| 287 | Returns True if the destination is the same as the source, False |
| 288 | otherwise. Also returns False if the destination does not exist. |
| 289 | """ |
| 290 | if not os.path.exists(destination_path): |
| 291 | return False |
| 292 | return os.path.samefile(source_path, destination_path) |
| 293 | |
| 294 | |
showard | c408c5e | 2009-01-08 23:30:53 +0000 | [diff] [blame] | 295 | def wait_for_all_async_commands(self): |
| 296 | for subproc in self._subcommands: |
| 297 | subproc.fork_waitfor() |
| 298 | self._subcommands = [] |
| 299 | |
| 300 | |
| 301 | def _poll_async_commands(self): |
| 302 | still_running = [] |
| 303 | for subproc in self._subcommands: |
| 304 | if subproc.poll() is None: |
| 305 | still_running.append(subproc) |
| 306 | self._subcommands = still_running |
| 307 | |
| 308 | |
| 309 | def _wait_for_some_async_commands(self): |
| 310 | self._poll_async_commands() |
| 311 | max_processes = scheduler_config.config.max_transfer_processes |
| 312 | while len(self._subcommands) >= max_processes: |
| 313 | time.sleep(1) |
| 314 | self._poll_async_commands() |
| 315 | |
| 316 | |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 317 | def run_async_command(self, function, args): |
| 318 | subproc = subcommand.subcommand(function, args) |
| 319 | self._subcommands.append(subproc) |
| 320 | subproc.fork_start() |
| 321 | |
| 322 | |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 323 | def _sync_get_file_from(self, hostname, source_path, destination_path): |
| 324 | self._ensure_directory_exists(os.path.dirname(destination_path)) |
| 325 | host = create_host(hostname) |
| 326 | host.get_file(source_path, destination_path, delete_dest=True) |
| 327 | |
| 328 | |
| 329 | def get_file_from(self, hostname, source_path, destination_path): |
| 330 | self.run_async_command(self._sync_get_file_from, |
| 331 | (hostname, source_path, destination_path)) |
| 332 | |
| 333 | |
mbligh | 4608b00 | 2010-01-05 18:22:35 +0000 | [diff] [blame] | 334 | def sync_send_file_to(self, hostname, source_path, destination_path, |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 335 | can_fail): |
| 336 | host = create_host(hostname) |
| 337 | try: |
| 338 | host.run('mkdir -p ' + os.path.dirname(destination_path)) |
| 339 | host.send_file(source_path, destination_path, delete_dest=True) |
| 340 | except error.AutoservError: |
| 341 | if not can_fail: |
| 342 | raise |
| 343 | |
| 344 | if os.path.isdir(source_path): |
| 345 | failed_file = os.path.join(source_path, _TRANSFER_FAILED_FILE) |
| 346 | file_object = open(failed_file, 'w') |
| 347 | try: |
| 348 | file_object.write('%s:%s\n%s\n%s' % |
| 349 | (hostname, destination_path, |
| 350 | datetime.datetime.now(), |
| 351 | traceback.format_exc())) |
| 352 | finally: |
| 353 | file_object.close() |
| 354 | else: |
| 355 | copy_to = destination_path + _TRANSFER_FAILED_FILE |
| 356 | self._ensure_directory_exists(os.path.dirname(copy_to)) |
showard | de634ee | 2009-01-30 01:44:24 +0000 | [diff] [blame] | 357 | self.copy_file_or_directory(source_path, copy_to) |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 358 | |
| 359 | |
| 360 | def send_file_to(self, hostname, source_path, destination_path, |
| 361 | can_fail=False): |
mbligh | 4608b00 | 2010-01-05 18:22:35 +0000 | [diff] [blame] | 362 | self.run_async_command(self.sync_send_file_to, |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 363 | (hostname, source_path, destination_path, |
| 364 | can_fail)) |
| 365 | |
| 366 | |
| 367 | def _report_long_execution(self, calls, duration): |
| 368 | call_count = {} |
| 369 | for call in calls: |
| 370 | call_count.setdefault(call._method, 0) |
| 371 | call_count[call._method] += 1 |
| 372 | call_summary = '\n'.join('%d %s' % (count, method) |
| 373 | for method, count in call_count.iteritems()) |
| 374 | self._warn('Execution took %f sec\n%s' % (duration, call_summary)) |
| 375 | |
| 376 | |
| 377 | def execute_calls(self, calls): |
| 378 | results = [] |
| 379 | start_time = time.time() |
showard | c408c5e | 2009-01-08 23:30:53 +0000 | [diff] [blame] | 380 | max_processes = scheduler_config.config.max_transfer_processes |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 381 | for method_call in calls: |
| 382 | results.append(method_call.execute_on(self)) |
showard | d1ee1dd | 2009-01-07 21:33:08 +0000 | [diff] [blame] | 383 | if len(self._subcommands) >= max_processes: |
showard | c408c5e | 2009-01-08 23:30:53 +0000 | [diff] [blame] | 384 | self._wait_for_some_async_commands() |
| 385 | self.wait_for_all_async_commands() |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 386 | |
| 387 | duration = time.time() - start_time |
| 388 | if duration > self._WARNING_DURATION: |
| 389 | self._report_long_execution(calls, duration) |
| 390 | |
| 391 | warnings = self.warnings |
| 392 | self.warnings = [] |
| 393 | return dict(results=results, warnings=warnings) |
| 394 | |
| 395 | |
| 396 | def create_host(hostname): |
| 397 | username = global_config.global_config.get_config_value( |
| 398 | 'SCHEDULER', hostname + '_username', default=getpass.getuser()) |
| 399 | return hosts.SSHHost(hostname, user=username) |
| 400 | |
| 401 | |
| 402 | def parse_input(): |
| 403 | input_chunks = [] |
| 404 | chunk_of_input = sys.stdin.read() |
| 405 | while chunk_of_input: |
| 406 | input_chunks.append(chunk_of_input) |
| 407 | chunk_of_input = sys.stdin.read() |
| 408 | pickled_input = ''.join(input_chunks) |
| 409 | |
| 410 | try: |
| 411 | return pickle.loads(pickled_input) |
| 412 | except Exception, exc: |
| 413 | separator = '*' * 50 |
| 414 | raise ValueError('Unpickling input failed\n' |
| 415 | 'Input: %r\n' |
| 416 | 'Exception from pickle:\n' |
| 417 | '%s\n%s\n%s' % |
| 418 | (pickled_input, separator, traceback.format_exc(), |
| 419 | separator)) |
| 420 | |
| 421 | |
| 422 | def return_data(data): |
| 423 | print pickle.dumps(data) |
| 424 | |
| 425 | |
| 426 | def main(): |
| 427 | calls = parse_input() |
| 428 | drone_utility = DroneUtility() |
| 429 | return_value = drone_utility.execute_calls(calls) |
| 430 | return_data(return_value) |
| 431 | |
| 432 | |
| 433 | if __name__ == '__main__': |
| 434 | main() |