| showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 1 | #!/usr/bin/python2.4 | 
|  | 2 |  | 
|  | 3 | import pickle, subprocess, os, shutil, socket, sys, time, signal, getpass | 
|  | 4 | import datetime, traceback | 
|  | 5 | import common | 
|  | 6 | from autotest_lib.client.common_lib import utils, global_config, error | 
|  | 7 | from autotest_lib.server import hosts, subcommand | 
| showard | d1ee1dd | 2009-01-07 21:33:08 +0000 | [diff] [blame^] | 8 | from autotest_lib.scheduler import email_manager, scheduler_config | 
| showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 9 |  | 
|  | 10 | _TEMPORARY_DIRECTORY = 'drone_tmp' | 
|  | 11 | _TRANSFER_FAILED_FILE = '.transfer_failed' | 
|  | 12 |  | 
|  | 13 | class _MethodCall(object): | 
|  | 14 | def __init__(self, method, args, kwargs): | 
|  | 15 | self._method = method | 
|  | 16 | self._args = args | 
|  | 17 | self._kwargs = kwargs | 
|  | 18 |  | 
|  | 19 |  | 
|  | 20 | def execute_on(self, drone_utility): | 
|  | 21 | method = getattr(drone_utility, self._method) | 
|  | 22 | return method(*self._args, **self._kwargs) | 
|  | 23 |  | 
|  | 24 |  | 
|  | 25 | def __str__(self): | 
|  | 26 | args = ', '.join(repr(arg) for arg in self._args) | 
|  | 27 | kwargs = ', '.join('%s=%r' % (key, value) for key, value in | 
|  | 28 | self._kwargs.iteritems()) | 
|  | 29 | full_args = ', '.join(item for item in (args, kwargs) if item) | 
|  | 30 | return '%s(%s)' % (self._method, full_args) | 
|  | 31 |  | 
|  | 32 |  | 
|  | 33 | def call(method, *args, **kwargs): | 
|  | 34 | return _MethodCall(method, args, kwargs) | 
|  | 35 |  | 
|  | 36 |  | 
|  | 37 | class DroneUtility(object): | 
|  | 38 | """ | 
|  | 39 | This class executes actual OS calls on the drone machine. | 
|  | 40 |  | 
|  | 41 | All paths going into and out of this class are absolute. | 
|  | 42 | """ | 
|  | 43 | _PS_ARGS = ['pid', 'pgid', 'ppid', 'comm', 'args'] | 
| showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 44 | _WARNING_DURATION = 60 | 
|  | 45 |  | 
|  | 46 | def __init__(self): | 
|  | 47 | self.warnings = [] | 
|  | 48 | self._subcommands = [] | 
|  | 49 |  | 
|  | 50 |  | 
|  | 51 | def initialize(self, results_dir): | 
|  | 52 | temporary_directory = os.path.join(results_dir, _TEMPORARY_DIRECTORY) | 
|  | 53 | if os.path.exists(temporary_directory): | 
|  | 54 | shutil.rmtree(temporary_directory) | 
|  | 55 | self._ensure_directory_exists(temporary_directory) | 
|  | 56 |  | 
|  | 57 | # make sure there are no old parsers running | 
|  | 58 | os.system('killall parse') | 
|  | 59 |  | 
|  | 60 |  | 
|  | 61 | def _warn(self, warning): | 
|  | 62 | self.warnings.append(warning) | 
|  | 63 |  | 
|  | 64 |  | 
|  | 65 | def _refresh_autoserv_processes(self): | 
|  | 66 | ps_proc = subprocess.Popen( | 
|  | 67 | ['/bin/ps', 'x', '-o', ','.join(self._PS_ARGS)], | 
|  | 68 | stdout=subprocess.PIPE) | 
|  | 69 | ps_output = ps_proc.communicate()[0] | 
|  | 70 |  | 
|  | 71 | # split each line into the columns output by ps | 
|  | 72 | split_lines = [line.split(None, 4) for line in ps_output.splitlines()] | 
|  | 73 | process_infos = [dict(zip(self._PS_ARGS, line_components)) | 
|  | 74 | for line_components in split_lines] | 
|  | 75 |  | 
|  | 76 | processes = [] | 
|  | 77 | for info in process_infos: | 
|  | 78 | if info['comm'] == 'autoserv': | 
|  | 79 | processes.append(info) | 
|  | 80 |  | 
|  | 81 | return processes | 
|  | 82 |  | 
|  | 83 |  | 
|  | 84 | def _read_pidfiles(self, pidfile_paths): | 
|  | 85 | pidfiles = {} | 
|  | 86 | for pidfile_path in pidfile_paths: | 
|  | 87 | if not os.path.exists(pidfile_path): | 
|  | 88 | continue | 
|  | 89 | try: | 
|  | 90 | file_object = open(pidfile_path, 'r') | 
|  | 91 | pidfiles[pidfile_path] = file_object.read() | 
|  | 92 | file_object.close() | 
|  | 93 | except IOError: | 
|  | 94 | continue | 
|  | 95 | return pidfiles | 
|  | 96 |  | 
|  | 97 |  | 
|  | 98 | def refresh(self, pidfile_paths): | 
|  | 99 | results = { | 
|  | 100 | 'pidfiles' : self._read_pidfiles(pidfile_paths), | 
|  | 101 | 'processes' : self._refresh_autoserv_processes(), | 
|  | 102 | 'pidfiles_second_read' : self._read_pidfiles(pidfile_paths), | 
|  | 103 | } | 
|  | 104 | return results | 
|  | 105 |  | 
|  | 106 |  | 
|  | 107 | def _is_process_running(self, process): | 
|  | 108 | # TODO: enhance this to check the process args | 
|  | 109 | proc_path = os.path.join('/proc', str(process.pid)) | 
|  | 110 | return os.path.exists(proc_path) | 
|  | 111 |  | 
|  | 112 |  | 
|  | 113 | def kill_process(self, process): | 
|  | 114 | if self._is_process_running(process): | 
|  | 115 | os.kill(process.pid, signal.SIGCONT) | 
|  | 116 | os.kill(process.pid, signal.SIGTERM) | 
|  | 117 |  | 
|  | 118 |  | 
|  | 119 | def _ensure_directory_exists(self, path): | 
|  | 120 | if not os.path.exists(path): | 
|  | 121 | os.makedirs(path) | 
|  | 122 |  | 
|  | 123 |  | 
|  | 124 | def execute_command(self, command, working_directory, log_file, | 
|  | 125 | pidfile_name): | 
|  | 126 | out_file = None | 
|  | 127 | if log_file: | 
|  | 128 | self._ensure_directory_exists(os.path.dirname(log_file)) | 
|  | 129 | try: | 
|  | 130 | out_file = open(log_file, 'a') | 
|  | 131 | separator = ('*' * 80) + '\n' | 
|  | 132 | out_file.write('\n' + separator) | 
|  | 133 | out_file.write("%s> %s\n" % (time.strftime("%X %x"), command)) | 
|  | 134 | out_file.write(separator) | 
|  | 135 | except (OSError, IOError): | 
|  | 136 | email_manager.manager.log_stacktrace( | 
|  | 137 | 'Error opening log file %s' % log_file) | 
|  | 138 |  | 
|  | 139 | if not out_file: | 
|  | 140 | out_file = open('/dev/null', 'w') | 
|  | 141 |  | 
|  | 142 | in_devnull = open('/dev/null', 'r') | 
|  | 143 |  | 
|  | 144 | self._ensure_directory_exists(working_directory) | 
|  | 145 | pidfile_path = os.path.join(working_directory, pidfile_name) | 
|  | 146 | if os.path.exists(pidfile_path): | 
|  | 147 | self._warn('Pidfile %s already exists' % pidfile_path) | 
|  | 148 | os.remove(pidfile_path) | 
|  | 149 |  | 
|  | 150 | subprocess.Popen(command, stdout=out_file, stderr=subprocess.STDOUT, | 
|  | 151 | stdin=in_devnull) | 
|  | 152 | out_file.close() | 
|  | 153 | in_devnull.close() | 
|  | 154 |  | 
|  | 155 |  | 
|  | 156 | def write_to_file(self, file_path, contents): | 
|  | 157 | self._ensure_directory_exists(os.path.dirname(file_path)) | 
|  | 158 | try: | 
|  | 159 | file_object = open(file_path, 'a') | 
|  | 160 | file_object.write(contents) | 
|  | 161 | file_object.close() | 
|  | 162 | except IOError, exc: | 
|  | 163 | self._warn('Error write to file %s: %s' % (file_path, exc)) | 
|  | 164 |  | 
|  | 165 |  | 
|  | 166 | def copy_file(self, source_path, destination_path): | 
|  | 167 | if source_path == destination_path: | 
|  | 168 | return | 
|  | 169 | self._ensure_directory_exists(os.path.dirname(destination_path)) | 
|  | 170 | shutil.copy(source_path, destination_path) | 
|  | 171 |  | 
|  | 172 |  | 
|  | 173 | def run_async_command(self, function, args): | 
|  | 174 | subproc = subcommand.subcommand(function, args) | 
|  | 175 | self._subcommands.append(subproc) | 
|  | 176 | subproc.fork_start() | 
|  | 177 |  | 
|  | 178 |  | 
|  | 179 | def wait_for_async_commands(self): | 
|  | 180 | for subproc in self._subcommands: | 
|  | 181 | subproc.fork_waitfor() | 
|  | 182 | self._subcommands = [] | 
|  | 183 |  | 
|  | 184 |  | 
|  | 185 | def _sync_get_file_from(self, hostname, source_path, destination_path): | 
|  | 186 | self._ensure_directory_exists(os.path.dirname(destination_path)) | 
|  | 187 | host = create_host(hostname) | 
|  | 188 | host.get_file(source_path, destination_path, delete_dest=True) | 
|  | 189 |  | 
|  | 190 |  | 
|  | 191 | def get_file_from(self, hostname, source_path, destination_path): | 
|  | 192 | self.run_async_command(self._sync_get_file_from, | 
|  | 193 | (hostname, source_path, destination_path)) | 
|  | 194 |  | 
|  | 195 |  | 
|  | 196 | def _sync_send_file_to(self, hostname, source_path, destination_path, | 
|  | 197 | can_fail): | 
|  | 198 | host = create_host(hostname) | 
|  | 199 | try: | 
|  | 200 | host.run('mkdir -p ' + os.path.dirname(destination_path)) | 
|  | 201 | host.send_file(source_path, destination_path, delete_dest=True) | 
|  | 202 | except error.AutoservError: | 
|  | 203 | if not can_fail: | 
|  | 204 | raise | 
|  | 205 |  | 
|  | 206 | if os.path.isdir(source_path): | 
|  | 207 | failed_file = os.path.join(source_path, _TRANSFER_FAILED_FILE) | 
|  | 208 | file_object = open(failed_file, 'w') | 
|  | 209 | try: | 
|  | 210 | file_object.write('%s:%s\n%s\n%s' % | 
|  | 211 | (hostname, destination_path, | 
|  | 212 | datetime.datetime.now(), | 
|  | 213 | traceback.format_exc())) | 
|  | 214 | finally: | 
|  | 215 | file_object.close() | 
|  | 216 | else: | 
|  | 217 | copy_to = destination_path + _TRANSFER_FAILED_FILE | 
|  | 218 | self._ensure_directory_exists(os.path.dirname(copy_to)) | 
|  | 219 | self.copy_file(source_path, copy_to) | 
|  | 220 |  | 
|  | 221 |  | 
|  | 222 | def send_file_to(self, hostname, source_path, destination_path, | 
|  | 223 | can_fail=False): | 
|  | 224 | self.run_async_command(self._sync_send_file_to, | 
|  | 225 | (hostname, source_path, destination_path, | 
|  | 226 | can_fail)) | 
|  | 227 |  | 
|  | 228 |  | 
|  | 229 | def _report_long_execution(self, calls, duration): | 
|  | 230 | call_count = {} | 
|  | 231 | for call in calls: | 
|  | 232 | call_count.setdefault(call._method, 0) | 
|  | 233 | call_count[call._method] += 1 | 
|  | 234 | call_summary = '\n'.join('%d %s' % (count, method) | 
|  | 235 | for method, count in call_count.iteritems()) | 
|  | 236 | self._warn('Execution took %f sec\n%s' % (duration, call_summary)) | 
|  | 237 |  | 
|  | 238 |  | 
|  | 239 | def execute_calls(self, calls): | 
|  | 240 | results = [] | 
|  | 241 | start_time = time.time() | 
|  | 242 | for method_call in calls: | 
|  | 243 | results.append(method_call.execute_on(self)) | 
| showard | d1ee1dd | 2009-01-07 21:33:08 +0000 | [diff] [blame^] | 244 | max_processes = scheduler_config.config.max_transfer_processes | 
|  | 245 | if len(self._subcommands) >= max_processes: | 
| showard | 170873e | 2009-01-07 00:22:26 +0000 | [diff] [blame] | 246 | self.wait_for_async_commands() | 
|  | 247 | self.wait_for_async_commands() | 
|  | 248 |  | 
|  | 249 | duration = time.time() - start_time | 
|  | 250 | if duration > self._WARNING_DURATION: | 
|  | 251 | self._report_long_execution(calls, duration) | 
|  | 252 |  | 
|  | 253 | warnings = self.warnings | 
|  | 254 | self.warnings = [] | 
|  | 255 | return dict(results=results, warnings=warnings) | 
|  | 256 |  | 
|  | 257 |  | 
|  | 258 | def create_host(hostname): | 
|  | 259 | username = global_config.global_config.get_config_value( | 
|  | 260 | 'SCHEDULER', hostname + '_username', default=getpass.getuser()) | 
|  | 261 | return hosts.SSHHost(hostname, user=username) | 
|  | 262 |  | 
|  | 263 |  | 
|  | 264 | def parse_input(): | 
|  | 265 | input_chunks = [] | 
|  | 266 | chunk_of_input = sys.stdin.read() | 
|  | 267 | while chunk_of_input: | 
|  | 268 | input_chunks.append(chunk_of_input) | 
|  | 269 | chunk_of_input = sys.stdin.read() | 
|  | 270 | pickled_input = ''.join(input_chunks) | 
|  | 271 |  | 
|  | 272 | try: | 
|  | 273 | return pickle.loads(pickled_input) | 
|  | 274 | except Exception, exc: | 
|  | 275 | separator = '*' * 50 | 
|  | 276 | raise ValueError('Unpickling input failed\n' | 
|  | 277 | 'Input: %r\n' | 
|  | 278 | 'Exception from pickle:\n' | 
|  | 279 | '%s\n%s\n%s' % | 
|  | 280 | (pickled_input, separator, traceback.format_exc(), | 
|  | 281 | separator)) | 
|  | 282 |  | 
|  | 283 |  | 
|  | 284 | def return_data(data): | 
|  | 285 | print pickle.dumps(data) | 
|  | 286 |  | 
|  | 287 |  | 
|  | 288 | def main(): | 
|  | 289 | calls = parse_input() | 
|  | 290 | drone_utility = DroneUtility() | 
|  | 291 | return_value = drone_utility.execute_calls(calls) | 
|  | 292 | return_data(return_value) | 
|  | 293 |  | 
|  | 294 |  | 
|  | 295 | if __name__ == '__main__': | 
|  | 296 | main() |