blob: ab5e505d4365c1778888be5f2cee6bed951e9128 [file] [log] [blame]
mbligh7c8ea992009-06-22 19:03:08 +00001#!/usr/bin/python
showard170873e2009-01-07 00:22:26 +00002
3import pickle, subprocess, os, shutil, socket, sys, time, signal, getpass
showard78d4d972009-01-16 03:04:16 +00004import datetime, traceback, tempfile
showard170873e2009-01-07 00:22:26 +00005import common
6from autotest_lib.client.common_lib import utils, global_config, error
7from autotest_lib.server import hosts, subcommand
showardd1ee1dd2009-01-07 21:33:08 +00008from autotest_lib.scheduler import email_manager, scheduler_config
showard170873e2009-01-07 00:22:26 +00009
10_TEMPORARY_DIRECTORY = 'drone_tmp'
11_TRANSFER_FAILED_FILE = '.transfer_failed'
12
13class _MethodCall(object):
14 def __init__(self, method, args, kwargs):
15 self._method = method
16 self._args = args
17 self._kwargs = kwargs
18
19
20 def execute_on(self, drone_utility):
21 method = getattr(drone_utility, self._method)
22 return method(*self._args, **self._kwargs)
23
24
25 def __str__(self):
26 args = ', '.join(repr(arg) for arg in self._args)
27 kwargs = ', '.join('%s=%r' % (key, value) for key, value in
28 self._kwargs.iteritems())
29 full_args = ', '.join(item for item in (args, kwargs) if item)
30 return '%s(%s)' % (self._method, full_args)
31
32
33def call(method, *args, **kwargs):
34 return _MethodCall(method, args, kwargs)
35
36
37class DroneUtility(object):
38 """
39 This class executes actual OS calls on the drone machine.
40
41 All paths going into and out of this class are absolute.
42 """
43 _PS_ARGS = ['pid', 'pgid', 'ppid', 'comm', 'args']
showard170873e2009-01-07 00:22:26 +000044 _WARNING_DURATION = 60
45
46 def __init__(self):
47 self.warnings = []
48 self._subcommands = []
49
50
51 def initialize(self, results_dir):
52 temporary_directory = os.path.join(results_dir, _TEMPORARY_DIRECTORY)
53 if os.path.exists(temporary_directory):
54 shutil.rmtree(temporary_directory)
55 self._ensure_directory_exists(temporary_directory)
56
showard170873e2009-01-07 00:22:26 +000057
58 def _warn(self, warning):
59 self.warnings.append(warning)
60
61
showard0205a3e2009-01-16 03:03:50 +000062 def _refresh_processes(self, command_name):
showard170873e2009-01-07 00:22:26 +000063 ps_proc = subprocess.Popen(
64 ['/bin/ps', 'x', '-o', ','.join(self._PS_ARGS)],
65 stdout=subprocess.PIPE)
66 ps_output = ps_proc.communicate()[0]
67
68 # split each line into the columns output by ps
69 split_lines = [line.split(None, 4) for line in ps_output.splitlines()]
70 process_infos = [dict(zip(self._PS_ARGS, line_components))
71 for line_components in split_lines]
72
73 processes = []
74 for info in process_infos:
showard0205a3e2009-01-16 03:03:50 +000075 if info['comm'] == command_name:
showard170873e2009-01-07 00:22:26 +000076 processes.append(info)
77
78 return processes
79
80
81 def _read_pidfiles(self, pidfile_paths):
82 pidfiles = {}
83 for pidfile_path in pidfile_paths:
84 if not os.path.exists(pidfile_path):
85 continue
86 try:
87 file_object = open(pidfile_path, 'r')
88 pidfiles[pidfile_path] = file_object.read()
89 file_object.close()
90 except IOError:
91 continue
92 return pidfiles
93
94
95 def refresh(self, pidfile_paths):
showardd3dc1992009-04-22 21:01:40 +000096 """
97 pidfile_paths should be a list of paths to check for pidfiles.
98
99 Returns a dict containing:
100 * pidfiles: dict mapping pidfile paths to file contents, for pidfiles
101 that exist.
102 * autoserv_processes: list of dicts corresponding to running autoserv
103 processes. each dict contain pid, pgid, ppid, comm, and args (see
104 "man ps" for details).
105 * parse_processes: likewise, for parse processes.
106 * pidfiles_second_read: same info as pidfiles, but gathered after the
107 processes are scanned.
108 """
showard170873e2009-01-07 00:22:26 +0000109 results = {
110 'pidfiles' : self._read_pidfiles(pidfile_paths),
showard0205a3e2009-01-16 03:03:50 +0000111 'autoserv_processes' : self._refresh_processes('autoserv'),
112 'parse_processes' : self._refresh_processes('parse'),
showard170873e2009-01-07 00:22:26 +0000113 'pidfiles_second_read' : self._read_pidfiles(pidfile_paths),
114 }
115 return results
116
117
118 def _is_process_running(self, process):
119 # TODO: enhance this to check the process args
120 proc_path = os.path.join('/proc', str(process.pid))
121 return os.path.exists(proc_path)
122
123
124 def kill_process(self, process):
125 if self._is_process_running(process):
126 os.kill(process.pid, signal.SIGCONT)
127 os.kill(process.pid, signal.SIGTERM)
128
129
showard78d4d972009-01-16 03:04:16 +0000130 def _convert_old_host_log(self, log_path):
131 """
132 For backwards compatibility only. This can safely be removed in the
133 future.
134
135 The scheduler used to create files at results/hosts/<hostname>, and
136 append all host logs to that file. Now, it creates directories at
137 results/hosts/<hostname>, and places individual timestamped log files
138 into that directory.
139
140 This can be a problem the first time the scheduler runs after upgrading.
141 To work around that, we'll look for a file at the path where the
142 directory should be, and if we find one, we'll automatically convert it
143 to a directory containing the old logfile.
144 """
145 # move the file out of the way
146 temp_dir = tempfile.mkdtemp(suffix='.convert_host_log')
147 base_name = os.path.basename(log_path)
148 temp_path = os.path.join(temp_dir, base_name)
149 os.rename(log_path, temp_path)
150
151 os.mkdir(log_path)
152
153 # and move it into the new directory
154 os.rename(temp_path, os.path.join(log_path, 'old_log'))
155 os.rmdir(temp_dir)
156
157
showard170873e2009-01-07 00:22:26 +0000158 def _ensure_directory_exists(self, path):
showard78d4d972009-01-16 03:04:16 +0000159 if os.path.isdir(path):
160 return
161
162 if os.path.exists(path):
163 # path exists already, but as a file, not a directory
164 if '/hosts/' in path:
165 self._convert_old_host_log(path)
166 return
167 else:
168 raise IOError('Path %s exists as a file, not a directory')
169
170 os.makedirs(path)
showard170873e2009-01-07 00:22:26 +0000171
172
173 def execute_command(self, command, working_directory, log_file,
174 pidfile_name):
175 out_file = None
176 if log_file:
177 self._ensure_directory_exists(os.path.dirname(log_file))
178 try:
179 out_file = open(log_file, 'a')
180 separator = ('*' * 80) + '\n'
181 out_file.write('\n' + separator)
182 out_file.write("%s> %s\n" % (time.strftime("%X %x"), command))
183 out_file.write(separator)
184 except (OSError, IOError):
185 email_manager.manager.log_stacktrace(
186 'Error opening log file %s' % log_file)
187
188 if not out_file:
189 out_file = open('/dev/null', 'w')
190
191 in_devnull = open('/dev/null', 'r')
192
193 self._ensure_directory_exists(working_directory)
194 pidfile_path = os.path.join(working_directory, pidfile_name)
195 if os.path.exists(pidfile_path):
196 self._warn('Pidfile %s already exists' % pidfile_path)
197 os.remove(pidfile_path)
198
199 subprocess.Popen(command, stdout=out_file, stderr=subprocess.STDOUT,
200 stdin=in_devnull)
201 out_file.close()
202 in_devnull.close()
203
204
205 def write_to_file(self, file_path, contents):
206 self._ensure_directory_exists(os.path.dirname(file_path))
207 try:
208 file_object = open(file_path, 'a')
209 file_object.write(contents)
210 file_object.close()
211 except IOError, exc:
212 self._warn('Error write to file %s: %s' % (file_path, exc))
213
214
showardde634ee2009-01-30 01:44:24 +0000215 def copy_file_or_directory(self, source_path, destination_path):
216 """
217 This interface is designed to match server.hosts.abstract_ssh.get_file
218 (and send_file). That is, if the source_path ends with a slash, the
219 contents of the directory are copied; otherwise, the directory iself is
220 copied.
221 """
222 if source_path.rstrip('/') == destination_path.rstrip('/'):
showard170873e2009-01-07 00:22:26 +0000223 return
224 self._ensure_directory_exists(os.path.dirname(destination_path))
showardde634ee2009-01-30 01:44:24 +0000225 if source_path.endswith('/'):
226 # copying a directory's contents to another directory
227 assert os.path.isdir(source_path)
228 assert os.path.isdir(destination_path)
229 for filename in os.listdir(source_path):
230 self.copy_file_or_directory(
231 os.path.join(source_path, filename),
232 os.path.join(destination_path, filename))
233 elif os.path.isdir(source_path):
234 shutil.copytree(source_path, destination_path, symlinks=True)
235 elif os.path.islink(source_path):
236 # copied from shutil.copytree()
237 link_to = os.readlink(source_path)
238 os.symlink(link_to, destination_path)
239 else:
240 shutil.copy(source_path, destination_path)
showard170873e2009-01-07 00:22:26 +0000241
242
showardc408c5e2009-01-08 23:30:53 +0000243 def wait_for_all_async_commands(self):
244 for subproc in self._subcommands:
245 subproc.fork_waitfor()
246 self._subcommands = []
247
248
249 def _poll_async_commands(self):
250 still_running = []
251 for subproc in self._subcommands:
252 if subproc.poll() is None:
253 still_running.append(subproc)
254 self._subcommands = still_running
255
256
257 def _wait_for_some_async_commands(self):
258 self._poll_async_commands()
259 max_processes = scheduler_config.config.max_transfer_processes
260 while len(self._subcommands) >= max_processes:
261 time.sleep(1)
262 self._poll_async_commands()
263
264
showard170873e2009-01-07 00:22:26 +0000265 def run_async_command(self, function, args):
266 subproc = subcommand.subcommand(function, args)
267 self._subcommands.append(subproc)
268 subproc.fork_start()
269
270
showard170873e2009-01-07 00:22:26 +0000271 def _sync_get_file_from(self, hostname, source_path, destination_path):
272 self._ensure_directory_exists(os.path.dirname(destination_path))
273 host = create_host(hostname)
274 host.get_file(source_path, destination_path, delete_dest=True)
275
276
277 def get_file_from(self, hostname, source_path, destination_path):
278 self.run_async_command(self._sync_get_file_from,
279 (hostname, source_path, destination_path))
280
281
282 def _sync_send_file_to(self, hostname, source_path, destination_path,
283 can_fail):
284 host = create_host(hostname)
285 try:
286 host.run('mkdir -p ' + os.path.dirname(destination_path))
287 host.send_file(source_path, destination_path, delete_dest=True)
288 except error.AutoservError:
289 if not can_fail:
290 raise
291
292 if os.path.isdir(source_path):
293 failed_file = os.path.join(source_path, _TRANSFER_FAILED_FILE)
294 file_object = open(failed_file, 'w')
295 try:
296 file_object.write('%s:%s\n%s\n%s' %
297 (hostname, destination_path,
298 datetime.datetime.now(),
299 traceback.format_exc()))
300 finally:
301 file_object.close()
302 else:
303 copy_to = destination_path + _TRANSFER_FAILED_FILE
304 self._ensure_directory_exists(os.path.dirname(copy_to))
showardde634ee2009-01-30 01:44:24 +0000305 self.copy_file_or_directory(source_path, copy_to)
showard170873e2009-01-07 00:22:26 +0000306
307
308 def send_file_to(self, hostname, source_path, destination_path,
309 can_fail=False):
310 self.run_async_command(self._sync_send_file_to,
311 (hostname, source_path, destination_path,
312 can_fail))
313
314
315 def _report_long_execution(self, calls, duration):
316 call_count = {}
317 for call in calls:
318 call_count.setdefault(call._method, 0)
319 call_count[call._method] += 1
320 call_summary = '\n'.join('%d %s' % (count, method)
321 for method, count in call_count.iteritems())
322 self._warn('Execution took %f sec\n%s' % (duration, call_summary))
323
324
325 def execute_calls(self, calls):
326 results = []
327 start_time = time.time()
showardc408c5e2009-01-08 23:30:53 +0000328 max_processes = scheduler_config.config.max_transfer_processes
showard170873e2009-01-07 00:22:26 +0000329 for method_call in calls:
330 results.append(method_call.execute_on(self))
showardd1ee1dd2009-01-07 21:33:08 +0000331 if len(self._subcommands) >= max_processes:
showardc408c5e2009-01-08 23:30:53 +0000332 self._wait_for_some_async_commands()
333 self.wait_for_all_async_commands()
showard170873e2009-01-07 00:22:26 +0000334
335 duration = time.time() - start_time
336 if duration > self._WARNING_DURATION:
337 self._report_long_execution(calls, duration)
338
339 warnings = self.warnings
340 self.warnings = []
341 return dict(results=results, warnings=warnings)
342
343
344def create_host(hostname):
345 username = global_config.global_config.get_config_value(
346 'SCHEDULER', hostname + '_username', default=getpass.getuser())
347 return hosts.SSHHost(hostname, user=username)
348
349
350def parse_input():
351 input_chunks = []
352 chunk_of_input = sys.stdin.read()
353 while chunk_of_input:
354 input_chunks.append(chunk_of_input)
355 chunk_of_input = sys.stdin.read()
356 pickled_input = ''.join(input_chunks)
357
358 try:
359 return pickle.loads(pickled_input)
360 except Exception, exc:
361 separator = '*' * 50
362 raise ValueError('Unpickling input failed\n'
363 'Input: %r\n'
364 'Exception from pickle:\n'
365 '%s\n%s\n%s' %
366 (pickled_input, separator, traceback.format_exc(),
367 separator))
368
369
370def return_data(data):
371 print pickle.dumps(data)
372
373
374def main():
375 calls = parse_input()
376 drone_utility = DroneUtility()
377 return_value = drone_utility.execute_calls(calls)
378 return_data(return_value)
379
380
381if __name__ == '__main__':
382 main()