blob: 5ccac0a58c4160873d96955f0d11a353fdbe99e0 [file] [log] [blame]
Mike Frysingerd03e6b52019-08-03 12:49:01 -04001#!/usr/bin/python2
Prashanth B340fd1e2014-06-22 12:44:10 -07002
3"""Utility module that executes management commands on the drone.
4
51. This is the module responsible for orchestrating processes on a drone.
62. It receives instructions via stdin and replies via stdout.
73. Each invocation is responsible for the initiation of a set of batched calls.
84. The batched calls may be synchronous or asynchronous.
95. The caller is responsible for monitoring asynchronous calls through pidfiles.
10"""
11
Allen Liaae34bc2017-02-06 15:12:03 -080012#pylint: disable-msg=missing-docstring
showard170873e2009-01-07 00:22:26 +000013
Prashanth Bcf731e32014-08-10 18:03:57 -070014import argparse
Prathmesh Prabhucfabbde2017-06-05 18:50:32 -070015import collections
Allen Liaae34bc2017-02-06 15:12:03 -080016import datetime
17import getpass
18import itertools
19import logging
Prathmesh Prabhucfabbde2017-06-05 18:50:32 -070020import multiprocessing
Allen Liaae34bc2017-02-06 15:12:03 -080021import os
22import pickle
23import shutil
24import signal
25import subprocess
26import sys
Allen Liaae34bc2017-02-06 15:12:03 -080027import time
28import traceback
29
showard170873e2009-01-07 00:22:26 +000030import common
Allen Liaae34bc2017-02-06 15:12:03 -080031
32from autotest_lib.client.common_lib import error
33from autotest_lib.client.common_lib import global_config
Simran Basi93a2cd62013-10-23 13:29:28 -070034from autotest_lib.client.common_lib import logging_manager
Allen Liaae34bc2017-02-06 15:12:03 -080035from autotest_lib.client.common_lib import utils
Alex Miller1b409ec2013-10-24 11:39:29 -070036from autotest_lib.client.common_lib.cros import retry
Simran Basi93a2cd62013-10-23 13:29:28 -070037from autotest_lib.scheduler import drone_logging_config
Allen Liaae34bc2017-02-06 15:12:03 -080038from autotest_lib.scheduler import scheduler_config
Allen Liaae34bc2017-02-06 15:12:03 -080039from autotest_lib.server import subcommand
showard170873e2009-01-07 00:22:26 +000040
Dan Shic458f662015-04-29 12:12:38 -070041
showardd791dcb2009-09-16 17:17:36 +000042# An environment variable we add to the environment to enable us to
43# distinguish processes we started from those that were started by
44# something else during recovery. Name credit goes to showard. ;)
45DARK_MARK_ENVIRONMENT_VAR = 'AUTOTEST_SCHEDULER_DARK_MARK'
46
showard170873e2009-01-07 00:22:26 +000047_TEMPORARY_DIRECTORY = 'drone_tmp'
48_TRANSFER_FAILED_FILE = '.transfer_failed'
49
Dan Shic458f662015-04-29 12:12:38 -070050# script and log file for cleaning up orphaned lxc containers.
51LXC_CLEANUP_SCRIPT = os.path.join(common.autotest_dir, 'site_utils',
52 'lxc_cleanup.py')
53LXC_CLEANUP_LOG_FILE = os.path.join(common.autotest_dir, 'logs',
54 'lxc_cleanup.log')
55
56
showard170873e2009-01-07 00:22:26 +000057class _MethodCall(object):
58 def __init__(self, method, args, kwargs):
59 self._method = method
60 self._args = args
61 self._kwargs = kwargs
62
63
64 def execute_on(self, drone_utility):
65 method = getattr(drone_utility, self._method)
66 return method(*self._args, **self._kwargs)
67
68
69 def __str__(self):
70 args = ', '.join(repr(arg) for arg in self._args)
71 kwargs = ', '.join('%s=%r' % (key, value) for key, value in
72 self._kwargs.iteritems())
73 full_args = ', '.join(item for item in (args, kwargs) if item)
74 return '%s(%s)' % (self._method, full_args)
75
76
77def call(method, *args, **kwargs):
78 return _MethodCall(method, args, kwargs)
79
80
Prathmesh Prabhud4db8ff2017-06-02 17:28:49 -070081class DroneUtility(object):
showard170873e2009-01-07 00:22:26 +000082 """
83 This class executes actual OS calls on the drone machine.
84
85 All paths going into and out of this class are absolute.
86 """
Scott Zawalskic93bc5f2012-07-14 16:58:58 -040087 _WARNING_DURATION = 400
showard170873e2009-01-07 00:22:26 +000088
89 def __init__(self):
showardd791dcb2009-09-16 17:17:36 +000090 # Tattoo ourselves so that all of our spawn bears our mark.
91 os.putenv(DARK_MARK_ENVIRONMENT_VAR, str(os.getpid()))
92
showard170873e2009-01-07 00:22:26 +000093 self.warnings = []
94 self._subcommands = []
95
96
97 def initialize(self, results_dir):
98 temporary_directory = os.path.join(results_dir, _TEMPORARY_DIRECTORY)
99 if os.path.exists(temporary_directory):
Dan Shid3d96d62014-08-21 12:29:43 -0700100 # TODO crbug.com/391111: before we have a better solution to
101 # periodically cleanup tmp files, we have to use rm -rf to delete
102 # the whole folder. shutil.rmtree has performance issue when a
103 # folder has large amount of files, e.g., drone_tmp.
104 os.system('rm -rf %s' % temporary_directory)
showard170873e2009-01-07 00:22:26 +0000105 self._ensure_directory_exists(temporary_directory)
Simran Basia1aaaf72014-03-04 15:07:45 -0800106 # TODO (sbasi) crbug.com/345011 - Remove this configuration variable
107 # and clean up build_externals so it NO-OP's.
108 build_externals = global_config.global_config.get_config_value(
109 scheduler_config.CONFIG_SECTION, 'drone_build_externals',
110 default=True, type=bool)
111 if build_externals:
112 build_extern_cmd = os.path.join(common.autotest_dir,
113 'utils', 'build_externals.py')
114 utils.run(build_extern_cmd)
showard170873e2009-01-07 00:22:26 +0000115
showard170873e2009-01-07 00:22:26 +0000116
117 def _warn(self, warning):
118 self.warnings.append(warning)
119
120
Prathmesh Prabhua7d9e342017-06-05 11:39:22 -0700121 def refresh(self, pidfile_paths):
122 """Refreshes our view of the processes referred to by pdfile_paths.
Prashanth B340fd1e2014-06-22 12:44:10 -0700123
Prathmesh Prabhua7d9e342017-06-05 11:39:22 -0700124 See drone_utility.ProcessRefresher.__call__ for details.
Prashanth B340fd1e2014-06-22 12:44:10 -0700125 """
showardd791dcb2009-09-16 17:17:36 +0000126 check_mark = global_config.global_config.get_config_value(
127 'SCHEDULER', 'check_processes_for_dark_mark', bool, False)
Prathmesh Prabhucfabbde2017-06-05 18:50:32 -0700128 use_pool = global_config.global_config.get_config_value(
129 'SCHEDULER', 'drone_utility_refresh_use_pool', bool, False)
130 result, warnings = ProcessRefresher(check_mark, use_pool)(pidfile_paths)
Prathmesh Prabhua7d9e342017-06-05 11:39:22 -0700131 self.warnings += warnings
132 return result
showard170873e2009-01-07 00:22:26 +0000133
134
Dan Shi3f1b8a52015-04-21 11:11:06 -0700135 def get_signal_queue_to_kill(self, process):
136 """Get the signal queue needed to kill a process.
137
138 autoserv process has a handle on SIGTERM, in which it can do some
139 cleanup work. However, abort a process with SIGTERM then SIGKILL has
140 its overhead, detailed in following CL:
141 https://chromium-review.googlesource.com/230323
142 This method checks the process's argument and determine if SIGTERM is
143 required, and returns signal queue accordingly.
144
145 @param process: A drone_manager.Process object to be killed.
146
147 @return: The signal queue needed to kill a process.
148
149 """
150 signal_queue_with_sigterm = (signal.SIGTERM, signal.SIGKILL)
151 try:
152 ps_output = subprocess.check_output(
153 ['/bin/ps', '-p', str(process.pid), '-o', 'args'])
154 # For test running with server-side packaging, SIGTERM needs to be
155 # sent for autoserv process to destroy container used by the test.
156 if '--require-ssp' in ps_output:
157 logging.debug('PID %d requires SIGTERM to abort to cleanup '
158 'container.', process.pid)
159 return signal_queue_with_sigterm
160 except subprocess.CalledProcessError:
161 # Ignore errors, return the signal queue with SIGTERM to be safe.
162 return signal_queue_with_sigterm
163 # Default to kill the process with SIGKILL directly.
164 return (signal.SIGKILL,)
165
166
Prashanth Bcf731e32014-08-10 18:03:57 -0700167 def kill_processes(self, process_list):
168 """Send signals escalating in severity to the processes in process_list.
169
Dan Shi3f1b8a52015-04-21 11:11:06 -0700170 @param process_list: A list of drone_manager.Process objects
171 representing the processes to kill.
Prashanth Bcf731e32014-08-10 18:03:57 -0700172 """
Simran Basi31cf2bd2012-08-14 16:51:54 -0700173 try:
Prashanth Bcf731e32014-08-10 18:03:57 -0700174 logging.info('List of process to be killed: %s', process_list)
Dan Shi3f1b8a52015-04-21 11:11:06 -0700175 processes_to_kill = {}
176 for p in process_list:
177 signal_queue = self.get_signal_queue_to_kill(p)
178 processes_to_kill[signal_queue] = (
179 processes_to_kill.get(signal_queue, []) + [p])
180 sig_counts = {}
181 for signal_queue, processes in processes_to_kill.iteritems():
182 sig_counts.update(utils.nuke_pids(
183 [-process.pid for process in processes],
184 signal_queue=signal_queue))
Prashanth Bcf731e32014-08-10 18:03:57 -0700185 except error.AutoservRunError as e:
186 self._warn('Error occured when killing processes. Error: %s' % e)
showard170873e2009-01-07 00:22:26 +0000187
188
189 def _ensure_directory_exists(self, path):
Allen Li41c7d3f2017-05-18 14:07:51 -0700190 if not os.path.exists(path):
191 os.makedirs(path)
192 return
showard78d4d972009-01-16 03:04:16 +0000193 if os.path.isdir(path):
194 return
Allen Li41c7d3f2017-05-18 14:07:51 -0700195 assert os.path.isfile(path)
196 if '/hosts/' in path:
197 return
198 raise IOError('Path %s exists as a file, not a directory')
showard170873e2009-01-07 00:22:26 +0000199
200
201 def execute_command(self, command, working_directory, log_file,
202 pidfile_name):
203 out_file = None
204 if log_file:
205 self._ensure_directory_exists(os.path.dirname(log_file))
206 try:
207 out_file = open(log_file, 'a')
208 separator = ('*' * 80) + '\n'
209 out_file.write('\n' + separator)
210 out_file.write("%s> %s\n" % (time.strftime("%X %x"), command))
211 out_file.write(separator)
212 except (OSError, IOError):
Prathmesh Prabhu22848342017-07-05 11:50:48 -0700213 pass
showard170873e2009-01-07 00:22:26 +0000214
215 if not out_file:
216 out_file = open('/dev/null', 'w')
217
218 in_devnull = open('/dev/null', 'r')
219
220 self._ensure_directory_exists(working_directory)
221 pidfile_path = os.path.join(working_directory, pidfile_name)
222 if os.path.exists(pidfile_path):
223 self._warn('Pidfile %s already exists' % pidfile_path)
224 os.remove(pidfile_path)
225
226 subprocess.Popen(command, stdout=out_file, stderr=subprocess.STDOUT,
227 stdin=in_devnull)
228 out_file.close()
229 in_devnull.close()
230
231
Dan Shi3f1b8a52015-04-21 11:11:06 -0700232 def write_to_file(self, file_path, contents, is_retry=False):
233 """Write the specified contents to the end of the given file.
234
235 @param file_path: Path to the file.
236 @param contents: Content to be written to the file.
237 @param is_retry: True if this is a retry after file permission be
238 corrected.
239 """
showard170873e2009-01-07 00:22:26 +0000240 self._ensure_directory_exists(os.path.dirname(file_path))
241 try:
242 file_object = open(file_path, 'a')
243 file_object.write(contents)
244 file_object.close()
Dan Shi3f1b8a52015-04-21 11:11:06 -0700245 except IOError as e:
246 # TODO(dshi): crbug.com/459344 Remove following retry when test
247 # container can be unprivileged container.
248 # If write failed with error 'Permission denied', one possible cause
249 # is that the file was created in a container and thus owned by
250 # root. If so, fix the file permission, and try again.
251 if e.errno == 13 and not is_retry:
252 logging.error('Error write to file %s: %s. Will be retried.',
253 file_path, e)
254 utils.run('sudo chown %s "%s"' % (os.getuid(), file_path))
255 utils.run('sudo chgrp %s "%s"' % (os.getgid(), file_path))
256 self.write_to_file(file_path, contents, is_retry=True)
257 else:
258 self._warn('Error write to file %s: %s' % (file_path, e))
showard170873e2009-01-07 00:22:26 +0000259
260
showardde634ee2009-01-30 01:44:24 +0000261 def copy_file_or_directory(self, source_path, destination_path):
262 """
263 This interface is designed to match server.hosts.abstract_ssh.get_file
264 (and send_file). That is, if the source_path ends with a slash, the
265 contents of the directory are copied; otherwise, the directory iself is
266 copied.
267 """
showard1b0ffc32009-11-13 20:45:23 +0000268 if self._same_file(source_path, destination_path):
showard170873e2009-01-07 00:22:26 +0000269 return
270 self._ensure_directory_exists(os.path.dirname(destination_path))
showardde634ee2009-01-30 01:44:24 +0000271 if source_path.endswith('/'):
272 # copying a directory's contents to another directory
273 assert os.path.isdir(source_path)
274 assert os.path.isdir(destination_path)
275 for filename in os.listdir(source_path):
276 self.copy_file_or_directory(
277 os.path.join(source_path, filename),
278 os.path.join(destination_path, filename))
279 elif os.path.isdir(source_path):
Dan Shi603628d2016-04-11 11:47:07 -0700280 try:
281 shutil.copytree(source_path, destination_path, symlinks=True)
282 except shutil.Error:
283 # Ignore copy directory error due to missing files. The cause
284 # of this behavior is that, gs_offloader zips up folders with
285 # too many files. There is a race condition that repair job
286 # tries to copy provision job results to the test job result
287 # folder, meanwhile gs_offloader is uploading the provision job
288 # result and zipping up folders which contains too many files.
289 pass
showardde634ee2009-01-30 01:44:24 +0000290 elif os.path.islink(source_path):
291 # copied from shutil.copytree()
292 link_to = os.readlink(source_path)
293 os.symlink(link_to, destination_path)
294 else:
xixuan5e51bd92017-04-24 22:06:33 -0700295 try:
296 shutil.copy(source_path, destination_path)
297 except IOError:
298 # Ignore copy error following the same above reason.
299 pass
showard170873e2009-01-07 00:22:26 +0000300
301
showard1b0ffc32009-11-13 20:45:23 +0000302 def _same_file(self, source_path, destination_path):
303 """Checks if the source and destination are the same
304
305 Returns True if the destination is the same as the source, False
306 otherwise. Also returns False if the destination does not exist.
307 """
308 if not os.path.exists(destination_path):
309 return False
310 return os.path.samefile(source_path, destination_path)
311
312
Dan Shic458f662015-04-29 12:12:38 -0700313 def cleanup_orphaned_containers(self):
314 """Run lxc_cleanup script to clean up orphaned container.
315 """
316 # The script needs to run with sudo as the containers are privileged.
317 # TODO(dshi): crbug.com/459344 Call lxc_cleanup.main when test
318 # container can be unprivileged container.
319 command = ['sudo', LXC_CLEANUP_SCRIPT, '-x', '-v', '-l',
320 LXC_CLEANUP_LOG_FILE]
321 logging.info('Running %s', command)
322 # stdout and stderr needs to be direct to /dev/null, otherwise existing
323 # of drone_utils process will kill lxc_cleanup script.
324 subprocess.Popen(
325 command, shell=False, stdin=None, stdout=open('/dev/null', 'w'),
326 stderr=open('/dev/null', 'a'), preexec_fn=os.setpgrp)
327
328
showardc408c5e2009-01-08 23:30:53 +0000329 def wait_for_all_async_commands(self):
330 for subproc in self._subcommands:
331 subproc.fork_waitfor()
332 self._subcommands = []
333
334
335 def _poll_async_commands(self):
336 still_running = []
337 for subproc in self._subcommands:
338 if subproc.poll() is None:
339 still_running.append(subproc)
340 self._subcommands = still_running
341
342
343 def _wait_for_some_async_commands(self):
344 self._poll_async_commands()
345 max_processes = scheduler_config.config.max_transfer_processes
346 while len(self._subcommands) >= max_processes:
347 time.sleep(1)
348 self._poll_async_commands()
349
350
showard170873e2009-01-07 00:22:26 +0000351 def run_async_command(self, function, args):
352 subproc = subcommand.subcommand(function, args)
353 self._subcommands.append(subproc)
354 subproc.fork_start()
355
356
showard170873e2009-01-07 00:22:26 +0000357 def _sync_get_file_from(self, hostname, source_path, destination_path):
Simran Basi882f15b2013-10-29 14:59:34 -0700358 logging.debug('_sync_get_file_from hostname: %s, source_path: %s,'
359 'destination_path: %s', hostname, source_path,
360 destination_path)
showard170873e2009-01-07 00:22:26 +0000361 self._ensure_directory_exists(os.path.dirname(destination_path))
362 host = create_host(hostname)
363 host.get_file(source_path, destination_path, delete_dest=True)
364
365
366 def get_file_from(self, hostname, source_path, destination_path):
367 self.run_async_command(self._sync_get_file_from,
368 (hostname, source_path, destination_path))
369
370
Allen Li723fa1b2017-05-22 16:00:52 -0700371 def _sync_send_file_to(self, hostname, source_path, destination_path,
showard170873e2009-01-07 00:22:26 +0000372 can_fail):
Allen Li723fa1b2017-05-22 16:00:52 -0700373 logging.debug('_sync_send_file_to. hostname: %s, source_path: %s, '
Simran Basi882f15b2013-10-29 14:59:34 -0700374 'destination_path: %s, can_fail:%s', hostname,
375 source_path, destination_path, can_fail)
showard170873e2009-01-07 00:22:26 +0000376 host = create_host(hostname)
377 try:
378 host.run('mkdir -p ' + os.path.dirname(destination_path))
379 host.send_file(source_path, destination_path, delete_dest=True)
380 except error.AutoservError:
381 if not can_fail:
382 raise
383
384 if os.path.isdir(source_path):
385 failed_file = os.path.join(source_path, _TRANSFER_FAILED_FILE)
386 file_object = open(failed_file, 'w')
387 try:
388 file_object.write('%s:%s\n%s\n%s' %
389 (hostname, destination_path,
390 datetime.datetime.now(),
391 traceback.format_exc()))
392 finally:
393 file_object.close()
394 else:
395 copy_to = destination_path + _TRANSFER_FAILED_FILE
396 self._ensure_directory_exists(os.path.dirname(copy_to))
showardde634ee2009-01-30 01:44:24 +0000397 self.copy_file_or_directory(source_path, copy_to)
showard170873e2009-01-07 00:22:26 +0000398
399
400 def send_file_to(self, hostname, source_path, destination_path,
401 can_fail=False):
Allen Li723fa1b2017-05-22 16:00:52 -0700402 self.run_async_command(self._sync_send_file_to,
showard170873e2009-01-07 00:22:26 +0000403 (hostname, source_path, destination_path,
404 can_fail))
405
406
407 def _report_long_execution(self, calls, duration):
408 call_count = {}
409 for call in calls:
410 call_count.setdefault(call._method, 0)
411 call_count[call._method] += 1
412 call_summary = '\n'.join('%d %s' % (count, method)
413 for method, count in call_count.iteritems())
414 self._warn('Execution took %f sec\n%s' % (duration, call_summary))
415
416
417 def execute_calls(self, calls):
418 results = []
419 start_time = time.time()
showardc408c5e2009-01-08 23:30:53 +0000420 max_processes = scheduler_config.config.max_transfer_processes
showard170873e2009-01-07 00:22:26 +0000421 for method_call in calls:
422 results.append(method_call.execute_on(self))
showardd1ee1dd2009-01-07 21:33:08 +0000423 if len(self._subcommands) >= max_processes:
showardc408c5e2009-01-08 23:30:53 +0000424 self._wait_for_some_async_commands()
425 self.wait_for_all_async_commands()
showard170873e2009-01-07 00:22:26 +0000426
427 duration = time.time() - start_time
428 if duration > self._WARNING_DURATION:
429 self._report_long_execution(calls, duration)
430
431 warnings = self.warnings
432 self.warnings = []
433 return dict(results=results, warnings=warnings)
434
435
Prathmesh Prabhucfabbde2017-06-05 18:50:32 -0700436_MAX_REFRESH_POOL_SIZE = 50
437
Prathmesh Prabhua7d9e342017-06-05 11:39:22 -0700438class ProcessRefresher(object):
439 """Object to refresh process information from give pidfiles.
440
441 Usage: ProcessRefresh(True)(pidfile_list)
442 """
443
Prathmesh Prabhucfabbde2017-06-05 18:50:32 -0700444 def __init__(self, check_mark, use_pool=False):
Prathmesh Prabhua7d9e342017-06-05 11:39:22 -0700445 """
446 @param check_mark: If True, only consider processes that were
447 explicitly marked by a former drone_utility call as autotest
448 related processes.
Prathmesh Prabhucfabbde2017-06-05 18:50:32 -0700449 @param use_pool: If True, use a multiprocessing.Pool to parallelize
450 costly operations.
Prathmesh Prabhua7d9e342017-06-05 11:39:22 -0700451 """
452 self._check_mark = check_mark
Prathmesh Prabhucfabbde2017-06-05 18:50:32 -0700453 self._use_pool = use_pool
454 self._pool = None
Prathmesh Prabhua7d9e342017-06-05 11:39:22 -0700455
456
457 def __call__(self, pidfile_paths):
458 """
459 @param pidfile_paths: A list of paths to check for pidfiles.
460
461 @returns (result, warnings)
462 where result is a dict with the following keys:
463 - pidfiles: dict mapping pidfile paths to file contents, for
464 pidfiles that exist.
465 - all_processes: list of dicts corresponding to all running
466 processes. Each dict contain pid, pgid, ppid, comm, and args (see
467 "man ps" for details).
468 - autoserv_processes: likewise, restricted to autoserv processes.
469 - parse_processes: likewise, restricted to parse processes.
470 - pidfiles_second_read: same info as pidfiles, but gathered after
471 the processes are scanned.
472 and warnings is a list of warnings genearted during process refresh.
473 """
Prathmesh Prabhucfabbde2017-06-05 18:50:32 -0700474
475 if self._use_pool:
476 pool_size = max(
477 min(len(pidfile_paths), _MAX_REFRESH_POOL_SIZE),
478 1)
479 self._pool = multiprocessing.Pool(pool_size)
480 else:
481 pool_size = 0
482 logging.info('Refreshing %d pidfiles with %d helper processes',
483 len(pidfile_paths), pool_size)
484
Prathmesh Prabhua7d9e342017-06-05 11:39:22 -0700485 warnings = []
486 # It is necessary to explicitly force this to be a list because results
487 # are pickled by DroneUtility.
488 proc_infos = list(_get_process_info())
489
490 autoserv_processes, extra_warnings = self._filter_proc_infos(
491 proc_infos, 'autoserv')
492 warnings += extra_warnings
Prathmesh Prabhua71d9e82017-06-05 19:14:28 -0700493 parse_processes, extra_warnings = self._filter_proc_infos(proc_infos,
494 'parse')
Prathmesh Prabhua7d9e342017-06-05 11:39:22 -0700495 warnings += extra_warnings
496 site_parse_processes, extra_warnings = self._filter_proc_infos(
497 proc_infos, 'site_parse')
498 warnings += extra_warnings
499
500 result = {
Prathmesh Prabhucfabbde2017-06-05 18:50:32 -0700501 'pidfiles': self._read_pidfiles(pidfile_paths),
Prathmesh Prabhua7d9e342017-06-05 11:39:22 -0700502 'all_processes': proc_infos,
503 'autoserv_processes': autoserv_processes,
504 'parse_processes': (parse_processes + site_parse_processes),
Prathmesh Prabhucfabbde2017-06-05 18:50:32 -0700505 'pidfiles_second_read': self._read_pidfiles(pidfile_paths),
Prathmesh Prabhua7d9e342017-06-05 11:39:22 -0700506 }
507 return result, warnings
508
509
Prathmesh Prabhucfabbde2017-06-05 18:50:32 -0700510 def _read_pidfiles(self, pidfile_paths):
511 """Uses a process pool to read requested pidfile_paths."""
512 if self._use_pool:
513 contents = self._pool.map(_read_pidfile, pidfile_paths)
514 contents = [c for c in contents if c is not None]
515 return {k: v for k, v in contents}
516 else:
517 pidfiles = {}
518 for path in pidfile_paths:
519 content = _read_pidfile(path)
520 if content is None:
521 continue
522 pidfiles[content.path] = content.content
523 return pidfiles
524
525
Prathmesh Prabhua7d9e342017-06-05 11:39:22 -0700526 def _filter_proc_infos(self, proc_infos, command_name):
527 """Filters process info for the given command_name.
528
529 Examines ps output as returned by get_process_info and return
530 the process dicts for processes matching the given command name.
531
532 @proc_infos: ps output as returned by _get_process_info.
533 @param command_name: The name of the command, eg 'autoserv'.
534
535 @return: (proc_infos, warnings) where proc_infos is a list of ProcInfo
536 as returned by _get_process_info and warnings is a list of
537 warnings generated while filtering.
538 """
539 proc_infos = [info for info in proc_infos
540 if info['comm'] == command_name]
541 if not self._check_mark:
542 return proc_infos, []
543
Prathmesh Prabhua71d9e82017-06-05 19:14:28 -0700544 if self._use_pool:
545 dark_marks = self._pool.map(
546 _process_has_dark_mark,
547 [info['pid'] for info in proc_infos]
548 )
549 else:
550 dark_marks = [_process_has_dark_mark(info['pid'])
551 for info in proc_infos]
552
Prathmesh Prabhua7d9e342017-06-05 11:39:22 -0700553 marked_proc_infos = []
Prathmesh Prabhua71d9e82017-06-05 19:14:28 -0700554 warnings = []
555 for marked, info in itertools.izip(dark_marks, proc_infos):
556 if marked:
Prathmesh Prabhua7d9e342017-06-05 11:39:22 -0700557 marked_proc_infos.append(info)
558 else:
Prathmesh Prabhua71d9e82017-06-05 19:14:28 -0700559 warnings.append(
560 '%(comm)s process pid %(pid)s has no dark mark; '
561 'ignoring.' % info)
Prathmesh Prabhua7d9e342017-06-05 11:39:22 -0700562 return marked_proc_infos, warnings
563
564
showard170873e2009-01-07 00:22:26 +0000565def create_host(hostname):
Prathmesh Prabhu240b4672017-07-05 11:57:27 -0700566 # TODO(crbug.com/739466) Delay import to avoid a ~0.7 second penalty
567 # drone_utility calls that don't actually interact with DUTs.
568 from autotest_lib.server import hosts
showard170873e2009-01-07 00:22:26 +0000569 username = global_config.global_config.get_config_value(
570 'SCHEDULER', hostname + '_username', default=getpass.getuser())
571 return hosts.SSHHost(hostname, user=username)
572
573
574def parse_input():
575 input_chunks = []
576 chunk_of_input = sys.stdin.read()
577 while chunk_of_input:
578 input_chunks.append(chunk_of_input)
579 chunk_of_input = sys.stdin.read()
580 pickled_input = ''.join(input_chunks)
581
582 try:
583 return pickle.loads(pickled_input)
Dan Shic458f662015-04-29 12:12:38 -0700584 except Exception:
showard170873e2009-01-07 00:22:26 +0000585 separator = '*' * 50
586 raise ValueError('Unpickling input failed\n'
587 'Input: %r\n'
588 'Exception from pickle:\n'
589 '%s\n%s\n%s' %
590 (pickled_input, separator, traceback.format_exc(),
591 separator))
592
593
Prashanth Bcf731e32014-08-10 18:03:57 -0700594def _parse_args(args):
595 parser = argparse.ArgumentParser(description='Local drone process manager.')
596 parser.add_argument('--call_time',
597 help='Time this process was invoked from the master',
598 default=None, type=float)
599 return parser.parse_args(args)
600
601
showard170873e2009-01-07 00:22:26 +0000602def return_data(data):
603 print pickle.dumps(data)
604
Prathmesh Prabhua7d9e342017-06-05 11:39:22 -0700605def _process_has_dark_mark(pid):
606 """Checks if a process was launched earlier by drone_utility.
607
608 @param pid: The pid of the process to check.
609 """
Prathmesh Prabhu01e96da2017-06-02 18:36:25 -0700610 try:
Prathmesh Prabhua7d9e342017-06-05 11:39:22 -0700611 with open('/proc/%s/environ' % pid, 'rb') as env_file:
612 env_data = env_file.read()
Prathmesh Prabhu01e96da2017-06-02 18:36:25 -0700613 except EnvironmentError:
614 return False
Prathmesh Prabhu01e96da2017-06-02 18:36:25 -0700615 return DARK_MARK_ENVIRONMENT_VAR in env_data
616
617
618_PS_ARGS = ('pid', 'pgid', 'ppid', 'comm', 'args')
619def _get_process_info():
620 """Parse ps output for all process information.
621
Prathmesh Prabhua7d9e342017-06-05 11:39:22 -0700622 @returns A generator of dicts. Each dict has the following keys:
623 - comm: command_name,
624 - pgid: process group id,
625 - ppid: parent process id,
626 - pid: process id,
627 - args: args the command was invoked with,
Prathmesh Prabhu01e96da2017-06-02 18:36:25 -0700628 """
629 @retry.retry(subprocess.CalledProcessError,
630 timeout_min=0.5, delay_sec=0.25)
631 def run_ps():
632 return subprocess.check_output(
Prathmesh Prabhu9ccda9d2017-06-05 11:22:50 -0700633 ['/bin/ps', '--no-header', 'x', '-o', ','.join(_PS_ARGS)])
Prathmesh Prabhu01e96da2017-06-02 18:36:25 -0700634
635 ps_output = run_ps()
636 # split each line into the columns output by ps
637 split_lines = [line.split(None, 4) for line in ps_output.splitlines()]
638 return (dict(itertools.izip(_PS_ARGS, line_components))
639 for line_components in split_lines)
640
641
Prathmesh Prabhucfabbde2017-06-05 18:50:32 -0700642_PidfileContent = collections.namedtuple('_PidfileContent', ['path', 'content'])
643def _read_pidfile(pidfile_path):
644 """Reads the content of the given pidfile if it exists
645
646 @param: pidfile_path: Path of the file to read.
647 @returns: _PidfileContent tuple on success, None otherwise.
648 """
649 if not os.path.exists(pidfile_path):
650 return None
651 try:
652 with open(pidfile_path, 'r') as file_object:
653 return _PidfileContent(pidfile_path, file_object.read())
654 except IOError:
655 return None
Prathmesh Prabhu01e96da2017-06-02 18:36:25 -0700656
657
showard170873e2009-01-07 00:22:26 +0000658def main():
Simran Basi93a2cd62013-10-23 13:29:28 -0700659 logging_manager.configure_logging(
660 drone_logging_config.DroneLoggingConfig())
Shuqian Zhaod15d6102016-11-22 15:51:12 -0800661 calls = parse_input()
Prashanth Bcf731e32014-08-10 18:03:57 -0700662 args = _parse_args(sys.argv[1:])
Prashanth Bcf731e32014-08-10 18:03:57 -0700663
showard170873e2009-01-07 00:22:26 +0000664 drone_utility = DroneUtility()
665 return_value = drone_utility.execute_calls(calls)
Shuqian Zhaod15d6102016-11-22 15:51:12 -0800666 return_data(return_value)
showard170873e2009-01-07 00:22:26 +0000667
668
669if __name__ == '__main__':
670 main()