showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 1 | #!/usr/bin/python2.4 |
| 2 | |
| 3 | import pickle, subprocess, os, shutil, socket, sys, time, signal, getpass |
| 4 | import datetime, traceback |
| 5 | import common |
| 6 | from autotest_lib.client.common_lib import utils, global_config, error |
| 7 | from autotest_lib.server import hosts, subcommand |
showard | d1ee1dd | 2009-01-07 21:33:08 +0000 | [diff] [blame] | 8 | from autotest_lib.scheduler import email_manager, scheduler_config |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 9 | |
| 10 | _TEMPORARY_DIRECTORY = 'drone_tmp' |
| 11 | _TRANSFER_FAILED_FILE = '.transfer_failed' |
| 12 | |
| 13 | class _MethodCall(object): |
| 14 | def __init__(self, method, args, kwargs): |
| 15 | self._method = method |
| 16 | self._args = args |
| 17 | self._kwargs = kwargs |
| 18 | |
| 19 | |
| 20 | def execute_on(self, drone_utility): |
| 21 | method = getattr(drone_utility, self._method) |
| 22 | return method(*self._args, **self._kwargs) |
| 23 | |
| 24 | |
| 25 | def __str__(self): |
| 26 | args = ', '.join(repr(arg) for arg in self._args) |
| 27 | kwargs = ', '.join('%s=%r' % (key, value) for key, value in |
| 28 | self._kwargs.iteritems()) |
| 29 | full_args = ', '.join(item for item in (args, kwargs) if item) |
| 30 | return '%s(%s)' % (self._method, full_args) |
| 31 | |
| 32 | |
| 33 | def call(method, *args, **kwargs): |
| 34 | return _MethodCall(method, args, kwargs) |
| 35 | |
| 36 | |
| 37 | class DroneUtility(object): |
| 38 | """ |
| 39 | This class executes actual OS calls on the drone machine. |
| 40 | |
| 41 | All paths going into and out of this class are absolute. |
| 42 | """ |
| 43 | _PS_ARGS = ['pid', 'pgid', 'ppid', 'comm', 'args'] |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 44 | _WARNING_DURATION = 60 |
| 45 | |
| 46 | def __init__(self): |
| 47 | self.warnings = [] |
| 48 | self._subcommands = [] |
| 49 | |
| 50 | |
| 51 | def initialize(self, results_dir): |
| 52 | temporary_directory = os.path.join(results_dir, _TEMPORARY_DIRECTORY) |
| 53 | if os.path.exists(temporary_directory): |
| 54 | shutil.rmtree(temporary_directory) |
| 55 | self._ensure_directory_exists(temporary_directory) |
| 56 | |
| 57 | # make sure there are no old parsers running |
| 58 | os.system('killall parse') |
| 59 | |
| 60 | |
| 61 | def _warn(self, warning): |
| 62 | self.warnings.append(warning) |
| 63 | |
| 64 | |
showard | 0205a3e | 2009-01-16 03:03:50 +0000 | [diff] [blame^] | 65 | def _refresh_processes(self, command_name): |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 66 | ps_proc = subprocess.Popen( |
| 67 | ['/bin/ps', 'x', '-o', ','.join(self._PS_ARGS)], |
| 68 | stdout=subprocess.PIPE) |
| 69 | ps_output = ps_proc.communicate()[0] |
| 70 | |
| 71 | # split each line into the columns output by ps |
| 72 | split_lines = [line.split(None, 4) for line in ps_output.splitlines()] |
| 73 | process_infos = [dict(zip(self._PS_ARGS, line_components)) |
| 74 | for line_components in split_lines] |
| 75 | |
| 76 | processes = [] |
| 77 | for info in process_infos: |
showard | 0205a3e | 2009-01-16 03:03:50 +0000 | [diff] [blame^] | 78 | if info['comm'] == command_name: |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 79 | processes.append(info) |
| 80 | |
| 81 | return processes |
| 82 | |
| 83 | |
| 84 | def _read_pidfiles(self, pidfile_paths): |
| 85 | pidfiles = {} |
| 86 | for pidfile_path in pidfile_paths: |
| 87 | if not os.path.exists(pidfile_path): |
| 88 | continue |
| 89 | try: |
| 90 | file_object = open(pidfile_path, 'r') |
| 91 | pidfiles[pidfile_path] = file_object.read() |
| 92 | file_object.close() |
| 93 | except IOError: |
| 94 | continue |
| 95 | return pidfiles |
| 96 | |
| 97 | |
| 98 | def refresh(self, pidfile_paths): |
| 99 | results = { |
| 100 | 'pidfiles' : self._read_pidfiles(pidfile_paths), |
showard | 0205a3e | 2009-01-16 03:03:50 +0000 | [diff] [blame^] | 101 | 'autoserv_processes' : self._refresh_processes('autoserv'), |
| 102 | 'parse_processes' : self._refresh_processes('parse'), |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 103 | 'pidfiles_second_read' : self._read_pidfiles(pidfile_paths), |
| 104 | } |
| 105 | return results |
| 106 | |
| 107 | |
| 108 | def _is_process_running(self, process): |
| 109 | # TODO: enhance this to check the process args |
| 110 | proc_path = os.path.join('/proc', str(process.pid)) |
| 111 | return os.path.exists(proc_path) |
| 112 | |
| 113 | |
| 114 | def kill_process(self, process): |
| 115 | if self._is_process_running(process): |
| 116 | os.kill(process.pid, signal.SIGCONT) |
| 117 | os.kill(process.pid, signal.SIGTERM) |
| 118 | |
| 119 | |
| 120 | def _ensure_directory_exists(self, path): |
| 121 | if not os.path.exists(path): |
| 122 | os.makedirs(path) |
| 123 | |
| 124 | |
| 125 | def execute_command(self, command, working_directory, log_file, |
| 126 | pidfile_name): |
| 127 | out_file = None |
| 128 | if log_file: |
| 129 | self._ensure_directory_exists(os.path.dirname(log_file)) |
| 130 | try: |
| 131 | out_file = open(log_file, 'a') |
| 132 | separator = ('*' * 80) + '\n' |
| 133 | out_file.write('\n' + separator) |
| 134 | out_file.write("%s> %s\n" % (time.strftime("%X %x"), command)) |
| 135 | out_file.write(separator) |
| 136 | except (OSError, IOError): |
| 137 | email_manager.manager.log_stacktrace( |
| 138 | 'Error opening log file %s' % log_file) |
| 139 | |
| 140 | if not out_file: |
| 141 | out_file = open('/dev/null', 'w') |
| 142 | |
| 143 | in_devnull = open('/dev/null', 'r') |
| 144 | |
| 145 | self._ensure_directory_exists(working_directory) |
| 146 | pidfile_path = os.path.join(working_directory, pidfile_name) |
| 147 | if os.path.exists(pidfile_path): |
| 148 | self._warn('Pidfile %s already exists' % pidfile_path) |
| 149 | os.remove(pidfile_path) |
| 150 | |
| 151 | subprocess.Popen(command, stdout=out_file, stderr=subprocess.STDOUT, |
| 152 | stdin=in_devnull) |
| 153 | out_file.close() |
| 154 | in_devnull.close() |
| 155 | |
| 156 | |
| 157 | def write_to_file(self, file_path, contents): |
| 158 | self._ensure_directory_exists(os.path.dirname(file_path)) |
| 159 | try: |
| 160 | file_object = open(file_path, 'a') |
| 161 | file_object.write(contents) |
| 162 | file_object.close() |
| 163 | except IOError, exc: |
| 164 | self._warn('Error write to file %s: %s' % (file_path, exc)) |
| 165 | |
| 166 | |
| 167 | def copy_file(self, source_path, destination_path): |
| 168 | if source_path == destination_path: |
| 169 | return |
| 170 | self._ensure_directory_exists(os.path.dirname(destination_path)) |
| 171 | shutil.copy(source_path, destination_path) |
| 172 | |
| 173 | |
showard | c408c5e | 2009-01-08 23:30:53 +0000 | [diff] [blame] | 174 | def wait_for_all_async_commands(self): |
| 175 | for subproc in self._subcommands: |
| 176 | subproc.fork_waitfor() |
| 177 | self._subcommands = [] |
| 178 | |
| 179 | |
| 180 | def _poll_async_commands(self): |
| 181 | still_running = [] |
| 182 | for subproc in self._subcommands: |
| 183 | if subproc.poll() is None: |
| 184 | still_running.append(subproc) |
| 185 | self._subcommands = still_running |
| 186 | |
| 187 | |
| 188 | def _wait_for_some_async_commands(self): |
| 189 | self._poll_async_commands() |
| 190 | max_processes = scheduler_config.config.max_transfer_processes |
| 191 | while len(self._subcommands) >= max_processes: |
| 192 | time.sleep(1) |
| 193 | self._poll_async_commands() |
| 194 | |
| 195 | |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 196 | def run_async_command(self, function, args): |
| 197 | subproc = subcommand.subcommand(function, args) |
| 198 | self._subcommands.append(subproc) |
| 199 | subproc.fork_start() |
| 200 | |
| 201 | |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 202 | def _sync_get_file_from(self, hostname, source_path, destination_path): |
| 203 | self._ensure_directory_exists(os.path.dirname(destination_path)) |
| 204 | host = create_host(hostname) |
| 205 | host.get_file(source_path, destination_path, delete_dest=True) |
| 206 | |
| 207 | |
| 208 | def get_file_from(self, hostname, source_path, destination_path): |
| 209 | self.run_async_command(self._sync_get_file_from, |
| 210 | (hostname, source_path, destination_path)) |
| 211 | |
| 212 | |
| 213 | def _sync_send_file_to(self, hostname, source_path, destination_path, |
| 214 | can_fail): |
| 215 | host = create_host(hostname) |
| 216 | try: |
| 217 | host.run('mkdir -p ' + os.path.dirname(destination_path)) |
| 218 | host.send_file(source_path, destination_path, delete_dest=True) |
| 219 | except error.AutoservError: |
| 220 | if not can_fail: |
| 221 | raise |
| 222 | |
| 223 | if os.path.isdir(source_path): |
| 224 | failed_file = os.path.join(source_path, _TRANSFER_FAILED_FILE) |
| 225 | file_object = open(failed_file, 'w') |
| 226 | try: |
| 227 | file_object.write('%s:%s\n%s\n%s' % |
| 228 | (hostname, destination_path, |
| 229 | datetime.datetime.now(), |
| 230 | traceback.format_exc())) |
| 231 | finally: |
| 232 | file_object.close() |
| 233 | else: |
| 234 | copy_to = destination_path + _TRANSFER_FAILED_FILE |
| 235 | self._ensure_directory_exists(os.path.dirname(copy_to)) |
| 236 | self.copy_file(source_path, copy_to) |
| 237 | |
| 238 | |
| 239 | def send_file_to(self, hostname, source_path, destination_path, |
| 240 | can_fail=False): |
| 241 | self.run_async_command(self._sync_send_file_to, |
| 242 | (hostname, source_path, destination_path, |
| 243 | can_fail)) |
| 244 | |
| 245 | |
| 246 | def _report_long_execution(self, calls, duration): |
| 247 | call_count = {} |
| 248 | for call in calls: |
| 249 | call_count.setdefault(call._method, 0) |
| 250 | call_count[call._method] += 1 |
| 251 | call_summary = '\n'.join('%d %s' % (count, method) |
| 252 | for method, count in call_count.iteritems()) |
| 253 | self._warn('Execution took %f sec\n%s' % (duration, call_summary)) |
| 254 | |
| 255 | |
| 256 | def execute_calls(self, calls): |
| 257 | results = [] |
| 258 | start_time = time.time() |
showard | c408c5e | 2009-01-08 23:30:53 +0000 | [diff] [blame] | 259 | max_processes = scheduler_config.config.max_transfer_processes |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 260 | for method_call in calls: |
| 261 | results.append(method_call.execute_on(self)) |
showard | d1ee1dd | 2009-01-07 21:33:08 +0000 | [diff] [blame] | 262 | if len(self._subcommands) >= max_processes: |
showard | c408c5e | 2009-01-08 23:30:53 +0000 | [diff] [blame] | 263 | self._wait_for_some_async_commands() |
| 264 | self.wait_for_all_async_commands() |
showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 265 | |
| 266 | duration = time.time() - start_time |
| 267 | if duration > self._WARNING_DURATION: |
| 268 | self._report_long_execution(calls, duration) |
| 269 | |
| 270 | warnings = self.warnings |
| 271 | self.warnings = [] |
| 272 | return dict(results=results, warnings=warnings) |
| 273 | |
| 274 | |
| 275 | def create_host(hostname): |
| 276 | username = global_config.global_config.get_config_value( |
| 277 | 'SCHEDULER', hostname + '_username', default=getpass.getuser()) |
| 278 | return hosts.SSHHost(hostname, user=username) |
| 279 | |
| 280 | |
| 281 | def parse_input(): |
| 282 | input_chunks = [] |
| 283 | chunk_of_input = sys.stdin.read() |
| 284 | while chunk_of_input: |
| 285 | input_chunks.append(chunk_of_input) |
| 286 | chunk_of_input = sys.stdin.read() |
| 287 | pickled_input = ''.join(input_chunks) |
| 288 | |
| 289 | try: |
| 290 | return pickle.loads(pickled_input) |
| 291 | except Exception, exc: |
| 292 | separator = '*' * 50 |
| 293 | raise ValueError('Unpickling input failed\n' |
| 294 | 'Input: %r\n' |
| 295 | 'Exception from pickle:\n' |
| 296 | '%s\n%s\n%s' % |
| 297 | (pickled_input, separator, traceback.format_exc(), |
| 298 | separator)) |
| 299 | |
| 300 | |
| 301 | def return_data(data): |
| 302 | print pickle.dumps(data) |
| 303 | |
| 304 | |
| 305 | def main(): |
| 306 | calls = parse_input() |
| 307 | drone_utility = DroneUtility() |
| 308 | return_value = drone_utility.execute_calls(calls) |
| 309 | return_data(return_value) |
| 310 | |
| 311 | |
| 312 | if __name__ == '__main__': |
| 313 | main() |