blob: f7c480dd620015f6a5c23b79ea1cf04bf90c0a16 [file] [log] [blame]
showard170873e2009-01-07 00:22:26 +00001#!/usr/bin/python2.4
2
3import pickle, subprocess, os, shutil, socket, sys, time, signal, getpass
4import datetime, traceback
5import common
6from autotest_lib.client.common_lib import utils, global_config, error
7from autotest_lib.server import hosts, subcommand
showardd1ee1dd2009-01-07 21:33:08 +00008from autotest_lib.scheduler import email_manager, scheduler_config
showard170873e2009-01-07 00:22:26 +00009
10_TEMPORARY_DIRECTORY = 'drone_tmp'
11_TRANSFER_FAILED_FILE = '.transfer_failed'
12
13class _MethodCall(object):
14 def __init__(self, method, args, kwargs):
15 self._method = method
16 self._args = args
17 self._kwargs = kwargs
18
19
20 def execute_on(self, drone_utility):
21 method = getattr(drone_utility, self._method)
22 return method(*self._args, **self._kwargs)
23
24
25 def __str__(self):
26 args = ', '.join(repr(arg) for arg in self._args)
27 kwargs = ', '.join('%s=%r' % (key, value) for key, value in
28 self._kwargs.iteritems())
29 full_args = ', '.join(item for item in (args, kwargs) if item)
30 return '%s(%s)' % (self._method, full_args)
31
32
33def call(method, *args, **kwargs):
34 return _MethodCall(method, args, kwargs)
35
36
37class DroneUtility(object):
38 """
39 This class executes actual OS calls on the drone machine.
40
41 All paths going into and out of this class are absolute.
42 """
43 _PS_ARGS = ['pid', 'pgid', 'ppid', 'comm', 'args']
showard170873e2009-01-07 00:22:26 +000044 _WARNING_DURATION = 60
45
46 def __init__(self):
47 self.warnings = []
48 self._subcommands = []
49
50
51 def initialize(self, results_dir):
52 temporary_directory = os.path.join(results_dir, _TEMPORARY_DIRECTORY)
53 if os.path.exists(temporary_directory):
54 shutil.rmtree(temporary_directory)
55 self._ensure_directory_exists(temporary_directory)
56
57 # make sure there are no old parsers running
58 os.system('killall parse')
59
60
61 def _warn(self, warning):
62 self.warnings.append(warning)
63
64
showard0205a3e2009-01-16 03:03:50 +000065 def _refresh_processes(self, command_name):
showard170873e2009-01-07 00:22:26 +000066 ps_proc = subprocess.Popen(
67 ['/bin/ps', 'x', '-o', ','.join(self._PS_ARGS)],
68 stdout=subprocess.PIPE)
69 ps_output = ps_proc.communicate()[0]
70
71 # split each line into the columns output by ps
72 split_lines = [line.split(None, 4) for line in ps_output.splitlines()]
73 process_infos = [dict(zip(self._PS_ARGS, line_components))
74 for line_components in split_lines]
75
76 processes = []
77 for info in process_infos:
showard0205a3e2009-01-16 03:03:50 +000078 if info['comm'] == command_name:
showard170873e2009-01-07 00:22:26 +000079 processes.append(info)
80
81 return processes
82
83
84 def _read_pidfiles(self, pidfile_paths):
85 pidfiles = {}
86 for pidfile_path in pidfile_paths:
87 if not os.path.exists(pidfile_path):
88 continue
89 try:
90 file_object = open(pidfile_path, 'r')
91 pidfiles[pidfile_path] = file_object.read()
92 file_object.close()
93 except IOError:
94 continue
95 return pidfiles
96
97
98 def refresh(self, pidfile_paths):
99 results = {
100 'pidfiles' : self._read_pidfiles(pidfile_paths),
showard0205a3e2009-01-16 03:03:50 +0000101 'autoserv_processes' : self._refresh_processes('autoserv'),
102 'parse_processes' : self._refresh_processes('parse'),
showard170873e2009-01-07 00:22:26 +0000103 'pidfiles_second_read' : self._read_pidfiles(pidfile_paths),
104 }
105 return results
106
107
108 def _is_process_running(self, process):
109 # TODO: enhance this to check the process args
110 proc_path = os.path.join('/proc', str(process.pid))
111 return os.path.exists(proc_path)
112
113
114 def kill_process(self, process):
115 if self._is_process_running(process):
116 os.kill(process.pid, signal.SIGCONT)
117 os.kill(process.pid, signal.SIGTERM)
118
119
120 def _ensure_directory_exists(self, path):
121 if not os.path.exists(path):
122 os.makedirs(path)
123
124
125 def execute_command(self, command, working_directory, log_file,
126 pidfile_name):
127 out_file = None
128 if log_file:
129 self._ensure_directory_exists(os.path.dirname(log_file))
130 try:
131 out_file = open(log_file, 'a')
132 separator = ('*' * 80) + '\n'
133 out_file.write('\n' + separator)
134 out_file.write("%s> %s\n" % (time.strftime("%X %x"), command))
135 out_file.write(separator)
136 except (OSError, IOError):
137 email_manager.manager.log_stacktrace(
138 'Error opening log file %s' % log_file)
139
140 if not out_file:
141 out_file = open('/dev/null', 'w')
142
143 in_devnull = open('/dev/null', 'r')
144
145 self._ensure_directory_exists(working_directory)
146 pidfile_path = os.path.join(working_directory, pidfile_name)
147 if os.path.exists(pidfile_path):
148 self._warn('Pidfile %s already exists' % pidfile_path)
149 os.remove(pidfile_path)
150
151 subprocess.Popen(command, stdout=out_file, stderr=subprocess.STDOUT,
152 stdin=in_devnull)
153 out_file.close()
154 in_devnull.close()
155
156
157 def write_to_file(self, file_path, contents):
158 self._ensure_directory_exists(os.path.dirname(file_path))
159 try:
160 file_object = open(file_path, 'a')
161 file_object.write(contents)
162 file_object.close()
163 except IOError, exc:
164 self._warn('Error write to file %s: %s' % (file_path, exc))
165
166
167 def copy_file(self, source_path, destination_path):
168 if source_path == destination_path:
169 return
170 self._ensure_directory_exists(os.path.dirname(destination_path))
171 shutil.copy(source_path, destination_path)
172
173
showardc408c5e2009-01-08 23:30:53 +0000174 def wait_for_all_async_commands(self):
175 for subproc in self._subcommands:
176 subproc.fork_waitfor()
177 self._subcommands = []
178
179
180 def _poll_async_commands(self):
181 still_running = []
182 for subproc in self._subcommands:
183 if subproc.poll() is None:
184 still_running.append(subproc)
185 self._subcommands = still_running
186
187
188 def _wait_for_some_async_commands(self):
189 self._poll_async_commands()
190 max_processes = scheduler_config.config.max_transfer_processes
191 while len(self._subcommands) >= max_processes:
192 time.sleep(1)
193 self._poll_async_commands()
194
195
showard170873e2009-01-07 00:22:26 +0000196 def run_async_command(self, function, args):
197 subproc = subcommand.subcommand(function, args)
198 self._subcommands.append(subproc)
199 subproc.fork_start()
200
201
showard170873e2009-01-07 00:22:26 +0000202 def _sync_get_file_from(self, hostname, source_path, destination_path):
203 self._ensure_directory_exists(os.path.dirname(destination_path))
204 host = create_host(hostname)
205 host.get_file(source_path, destination_path, delete_dest=True)
206
207
208 def get_file_from(self, hostname, source_path, destination_path):
209 self.run_async_command(self._sync_get_file_from,
210 (hostname, source_path, destination_path))
211
212
213 def _sync_send_file_to(self, hostname, source_path, destination_path,
214 can_fail):
215 host = create_host(hostname)
216 try:
217 host.run('mkdir -p ' + os.path.dirname(destination_path))
218 host.send_file(source_path, destination_path, delete_dest=True)
219 except error.AutoservError:
220 if not can_fail:
221 raise
222
223 if os.path.isdir(source_path):
224 failed_file = os.path.join(source_path, _TRANSFER_FAILED_FILE)
225 file_object = open(failed_file, 'w')
226 try:
227 file_object.write('%s:%s\n%s\n%s' %
228 (hostname, destination_path,
229 datetime.datetime.now(),
230 traceback.format_exc()))
231 finally:
232 file_object.close()
233 else:
234 copy_to = destination_path + _TRANSFER_FAILED_FILE
235 self._ensure_directory_exists(os.path.dirname(copy_to))
236 self.copy_file(source_path, copy_to)
237
238
239 def send_file_to(self, hostname, source_path, destination_path,
240 can_fail=False):
241 self.run_async_command(self._sync_send_file_to,
242 (hostname, source_path, destination_path,
243 can_fail))
244
245
246 def _report_long_execution(self, calls, duration):
247 call_count = {}
248 for call in calls:
249 call_count.setdefault(call._method, 0)
250 call_count[call._method] += 1
251 call_summary = '\n'.join('%d %s' % (count, method)
252 for method, count in call_count.iteritems())
253 self._warn('Execution took %f sec\n%s' % (duration, call_summary))
254
255
256 def execute_calls(self, calls):
257 results = []
258 start_time = time.time()
showardc408c5e2009-01-08 23:30:53 +0000259 max_processes = scheduler_config.config.max_transfer_processes
showard170873e2009-01-07 00:22:26 +0000260 for method_call in calls:
261 results.append(method_call.execute_on(self))
showardd1ee1dd2009-01-07 21:33:08 +0000262 if len(self._subcommands) >= max_processes:
showardc408c5e2009-01-08 23:30:53 +0000263 self._wait_for_some_async_commands()
264 self.wait_for_all_async_commands()
showard170873e2009-01-07 00:22:26 +0000265
266 duration = time.time() - start_time
267 if duration > self._WARNING_DURATION:
268 self._report_long_execution(calls, duration)
269
270 warnings = self.warnings
271 self.warnings = []
272 return dict(results=results, warnings=warnings)
273
274
275def create_host(hostname):
276 username = global_config.global_config.get_config_value(
277 'SCHEDULER', hostname + '_username', default=getpass.getuser())
278 return hosts.SSHHost(hostname, user=username)
279
280
281def parse_input():
282 input_chunks = []
283 chunk_of_input = sys.stdin.read()
284 while chunk_of_input:
285 input_chunks.append(chunk_of_input)
286 chunk_of_input = sys.stdin.read()
287 pickled_input = ''.join(input_chunks)
288
289 try:
290 return pickle.loads(pickled_input)
291 except Exception, exc:
292 separator = '*' * 50
293 raise ValueError('Unpickling input failed\n'
294 'Input: %r\n'
295 'Exception from pickle:\n'
296 '%s\n%s\n%s' %
297 (pickled_input, separator, traceback.format_exc(),
298 separator))
299
300
301def return_data(data):
302 print pickle.dumps(data)
303
304
305def main():
306 calls = parse_input()
307 drone_utility = DroneUtility()
308 return_value = drone_utility.execute_calls(calls)
309 return_data(return_value)
310
311
312if __name__ == '__main__':
313 main()