blob: fe3b53365e88607b33c7c90432649d24d2b8c7bc [file] [log] [blame]
import os, re, shutil, signal, subprocess, errno, time, heapq, traceback
import common
from autotest_lib.client.common_lib import error, global_config
from autotest_lib.scheduler import email_manager, drone_utility, drones
from autotest_lib.scheduler import scheduler_config
_AUTOSERV_PID_FILE = '.autoserv_execute'
class DroneManagerError(Exception):
pass
class CustomEquals(object):
def _id(self):
raise NotImplementedError
def __eq__(self, other):
if not isinstance(other, type(self)):
return NotImplemented
return self._id() == other._id()
def __ne__(self, other):
return not self == other
def __hash__(self):
return hash(self._id())
class Process(CustomEquals):
def __init__(self, hostname, pid, ppid=None):
self.hostname = hostname
self.pid = pid
self.ppid = ppid
def _id(self):
return (self.hostname, self.pid)
def __str__(self):
return '%s/%s' % (self.hostname, self.pid)
def __repr__(self):
return super(Process, self).__repr__() + '<%s>' % self
class PidfileId(CustomEquals):
def __init__(self, path):
self.path = path
def _id(self):
return self.path
def __str__(self):
return str(self.path)
class PidfileContents(object):
process = None
exit_status = None
num_tests_failed = None
def is_invalid(self):
return False
class InvalidPidfile(object):
def __init__(self, error):
self.error = error
def is_invalid(self):
return True
def __str__(self):
return self.error
class DroneManager(object):
"""
This class acts as an interface from the scheduler to drones, whether it be
only a single "drone" for localhost or multiple remote drones.
All paths going into and out of this class are relative to the full results
directory, except for those returns by absolute_path().
"""
_MAX_PIDFILE_AGE = 1000
def __init__(self):
self._results_dir = None
self._processes = {}
self._process_set = set()
self._pidfiles = {}
self._pidfiles_second_read = {}
self._pidfile_age = {}
self._temporary_path_counter = 0
self._drones = {}
self._results_drone = None
self._attached_files = {}
self._drone_queue = []
def initialize(self, base_results_dir, drone_hostnames,
results_repository_hostname):
self._results_dir = base_results_dir
drones.set_temporary_directory(os.path.join(
base_results_dir, drone_utility._TEMPORARY_DIRECTORY))
for hostname in drone_hostnames:
try:
drone = self._add_drone(hostname)
drone.call('initialize', base_results_dir)
except error.AutoservError:
warning = 'Drone %s failed to initialize:\n%s' % (
hostname, traceback.format_exc())
print warning
email_manager.manager.enqueue_notify_email(
'Drone failed to initialize', warning)
self._remove_drone(hostname)
if not self._drones:
# all drones failed to initialize
raise DroneManagerError('No valid drones found')
self.refresh_drone_configs()
print 'Using results repository on', results_repository_hostname
self._results_drone = drones.get_drone(results_repository_hostname)
# don't initialize() the results drone - we don't want to clear out any
# directories and we don't need ot kill any processes
def reinitialize_drones(self):
self._call_all_drones('initialize', self._results_dir)
def shutdown(self):
for drone in self.get_drones():
drone.shutdown()
def _add_drone(self, hostname):
print 'Adding drone', hostname
drone = drones.get_drone(hostname)
self._drones[drone.hostname] = drone
return drone
def _remove_drone(self, hostname):
self._drones.pop(hostname, None)
def refresh_drone_configs(self):
"""
Reread global config options for all drones.
"""
config = global_config.global_config
section = scheduler_config.CONFIG_SECTION
config.parse_config_file()
for hostname, drone in self._drones.iteritems():
disabled = config.get_config_value(
section, '%s_disabled' % hostname, default='')
drone.enabled = not bool(disabled)
drone.max_processes = config.get_config_value(
section, '%s_max_processes' % hostname, type=int,
default=scheduler_config.config.max_processes_per_drone)
def get_drones(self):
return self._drones.itervalues()
def _get_drone_for_process(self, process):
return self._drones[process.hostname]
def _get_drone_for_pidfile_id(self, pidfile_id):
pidfile_contents = self.get_pidfile_contents(pidfile_id)
assert pidfile_contents.process is not None
return self._get_drone_for_process(pidfile_contents.process)
def _drop_old_pidfiles(self):
for pidfile_id, age in self._pidfile_age.items():
if age > self._MAX_PIDFILE_AGE:
del self._pidfile_age[pidfile_id]
else:
self._pidfile_age[pidfile_id] += 1
def _reset(self):
self._processes = {}
self._process_set = set()
self._pidfiles = {}
self._pidfiles_second_read = {}
self._drone_queue = []
def _call_all_drones(self, method, *args, **kwargs):
all_results = {}
for drone in self.get_drones():
all_results[drone] = drone.call(method, *args, **kwargs)
return all_results
def _parse_pidfile(self, drone, raw_contents):
contents = PidfileContents()
if not raw_contents:
return contents
lines = raw_contents.splitlines()
if len(lines) > 3:
return InvalidPidfile('Corrupt pid file (%d lines):\n%s' %
(len(lines), lines))
try:
pid = int(lines[0])
contents.process = Process(drone.hostname, pid)
# if len(lines) == 2, assume we caught Autoserv between writing
# exit_status and num_failed_tests, so just ignore it and wait for
# the next cycle
if len(lines) == 3:
contents.exit_status = int(lines[1])
contents.num_tests_failed = int(lines[2])
except ValueError, exc:
return InvalidPidfile('Corrupt pid file: ' + str(exc.args))
return contents
def _process_pidfiles(self, drone, pidfiles, store_in_dict):
for pidfile_path, contents in pidfiles.iteritems():
pidfile_id = PidfileId(pidfile_path)
contents = self._parse_pidfile(drone, contents)
store_in_dict[pidfile_id] = contents
def _add_process(self, drone, process_info):
process = Process(drone.hostname, int(process_info['pid']),
int(process_info['ppid']))
self._process_set.add(process)
return process
def _add_autoserv_process(self, drone, process_info):
assert process_info['comm'] == 'autoserv'
# only root autoserv processes have pgid == pid
if process_info['pgid'] != process_info['pid']:
return
process = self._add_process(drone, process_info)
execution_tag = self._execution_tag_for_process(drone, process_info)
self._processes[execution_tag] = process
def _enqueue_drone(self, drone):
heapq.heappush(self._drone_queue, (drone.used_capacity(), drone))
def refresh(self):
"""
Called at the beginning of a scheduler cycle to refresh all process
information.
"""
self._reset()
pidfile_paths = [pidfile_id.path for pidfile_id in self._pidfile_age]
all_results = self._call_all_drones('refresh', pidfile_paths)
for drone, results_list in all_results.iteritems():
results = results_list[0]
drone.active_processes = len(results['autoserv_processes'])
if drone.enabled:
self._enqueue_drone(drone)
for process_info in results['autoserv_processes']:
self._add_autoserv_process(drone, process_info)
for process_info in results['parse_processes']:
self._add_process(drone, process_info)
self._process_pidfiles(drone, results['pidfiles'], self._pidfiles)
self._process_pidfiles(drone, results['pidfiles_second_read'],
self._pidfiles_second_read)
def _execution_tag_for_process(self, drone, process_info):
execution_tag = self._extract_execution_tag(process_info['args'])
if not execution_tag:
# this process has no execution tag - just make up something unique
return '%s.%s' % (drone, process_info['pid'])
return execution_tag
def _extract_execution_tag(self, command):
match = re.match(r'.* -P (\S+) ', command)
if not match:
return None
return match.group(1)
def execute_actions(self):
"""
Called at the end of a scheduler cycle to execute all queued actions
on drones.
"""
for drone in self._drones.values():
drone.execute_queued_calls()
try:
self._results_drone.execute_queued_calls()
except error.AutoservError:
warning = ('Results repository failed to execute calls:\n' +
traceback.format_exc())
print warning
email_manager.manager.enqueue_notify_email(
'Results repository error', warning)
self._results_drone.clear_call_queue()
def get_orphaned_autoserv_processes(self):
"""
Returns a dict mapping execution tags to AutoservProcess objects for
orphaned processes only.
"""
return dict((execution_tag, process)
for execution_tag, process in self._processes.iteritems()
if process.ppid == 1)
def get_process_for(self, execution_tag):
"""
Return the process object for the given execution tag.
"""
return self._processes.get(execution_tag, None)
def kill_process(self, process):
"""
Kill the given process.
"""
print 'killing', process
drone = self._get_drone_for_process(process)
drone.queue_call('kill_process', process)
def _ensure_directory_exists(self, path):
if not os.path.exists(path):
os.makedirs(path)
def _extract_num_processes(self, command):
try:
machine_list_index = command.index('-m') + 1
except ValueError:
return 1
assert machine_list_index < len(command)
machine_list = command[machine_list_index].split(',')
return len(machine_list)
def total_running_processes(self):
return sum(drone.active_processes for drone in self.get_drones())
def max_runnable_processes(self):
"""
Return the maximum number of processes that can be run (in a single
execution) given the current load on drones.
"""
if not self._drone_queue:
# all drones disabled
return 0
return max(drone.max_processes - drone.active_processes
for _, drone in self._drone_queue)
def _choose_drone_for_execution(self, num_processes):
# cycle through drones is order of increasing used capacity until
# we find one that can handle these processes
checked_drones = []
drone_to_use = None
while self._drone_queue:
used_capacity, drone = heapq.heappop(self._drone_queue)
checked_drones.append(drone)
if drone.active_processes + num_processes <= drone.max_processes:
drone_to_use = drone
break
if drone_to_use:
drone_to_use.active_processes += num_processes
else:
drone_summary = ','.join('%s %s/%s' % (drone.hostname,
drone.active_processes,
drone.max_processes)
for drone in checked_drones)
raise ValueError('No drone has capacity to handle %d processes (%s)'
% (num_processes, drone_summary))
# refill _drone_queue
for drone in checked_drones:
self._enqueue_drone(drone)
return drone_to_use
def execute_command(self, command, working_directory, log_file=None,
pidfile_name=None, paired_with_pidfile=None):
"""
Execute the given command, taken as an argv list.
* working_directory: directory in which the pidfile will be written
* log_file (optional): specifies a path (in the results repository) to
hold command output.
* pidfile_name (optional): gives the name of the pidfile this process
will write
* paired_with_pidfile (optional): a PidfileId for an already-executed
process; the new process will execute on the same drone as the
previous process.
"""
working_directory = self.absolute_path(working_directory)
if not log_file:
log_file = self.get_temporary_path('execute')
log_file = self.absolute_path(log_file)
if not pidfile_name:
pidfile_name = _AUTOSERV_PID_FILE
if paired_with_pidfile:
drone = self._get_drone_for_pidfile_id(paired_with_pidfile)
else:
num_processes = self._extract_num_processes(command)
drone = self._choose_drone_for_execution(num_processes)
print "command = %s" % command
print 'log file = %s:%s' % (drone.hostname, log_file)
self._write_attached_files(command, drone)
drone.queue_call('execute_command', command, working_directory,
log_file, pidfile_name)
pidfile_path = self.absolute_path(os.path.join(working_directory,
pidfile_name))
pidfile_id = PidfileId(pidfile_path)
self.register_pidfile(pidfile_id)
return pidfile_id
def get_pidfile_id_from(self, execution_tag):
path = os.path.join(self.absolute_path(execution_tag),
_AUTOSERV_PID_FILE)
return PidfileId(path)
def register_pidfile(self, pidfile_id):
"""
Indicate that the DroneManager should look for the given pidfile when
refreshing.
"""
self._pidfile_age[pidfile_id] = 0
def get_pidfile_contents(self, pidfile_id, use_second_read=False):
"""
Retrieve a PidfileContents object for the given pidfile_id. If
use_second_read is True, use results that were read after the processes
were checked, instead of before.
"""
self.register_pidfile(pidfile_id)
if use_second_read:
pidfile_map = self._pidfiles_second_read
else:
pidfile_map = self._pidfiles
return pidfile_map.get(pidfile_id, PidfileContents())
def is_process_running(self, process):
"""
Check if the given process is in the running process list.
"""
return process in self._process_set
def get_temporary_path(self, base_name):
"""
Get a new temporary path guaranteed to be unique across all drones
for this scheduler execution.
"""
self._temporary_path_counter += 1
return os.path.join(drone_utility._TEMPORARY_DIRECTORY,
'%s.%s' % (base_name, self._temporary_path_counter))
def absolute_path(self, path):
return os.path.join(self._results_dir, path)
def _copy_results_helper(self, process, source_path, destination_path,
to_results_repository=False):
full_source = self.absolute_path(source_path)
full_destination = self.absolute_path(destination_path)
source_drone = self._get_drone_for_process(process)
if to_results_repository:
source_drone.send_file_to(self._results_drone, full_source,
full_destination, can_fail=True)
else:
source_drone.queue_call('copy_file_or_directory', full_source,
full_destination)
def copy_to_results_repository(self, process, source_path,
destination_path=None):
"""
Copy results from the given process at source_path to destination_path
in the results repository.
"""
if destination_path is None:
destination_path = source_path
self._copy_results_helper(process, source_path, destination_path,
to_results_repository=True)
def copy_results_on_drone(self, process, source_path, destination_path):
"""
Copy a results directory from one place to another on the drone.
"""
self._copy_results_helper(process, source_path, destination_path)
def _write_attached_files(self, command, drone):
execution_tag = self._extract_execution_tag(' '.join(command))
attached_files = self._attached_files.pop(execution_tag, {})
for file_path, contents in attached_files.iteritems():
drone.queue_call('write_to_file', self.absolute_path(file_path),
contents)
def attach_file_to_execution(self, execution_tag, file_contents,
file_path=None):
"""
When the process for execution_tag is executed, the given file contents
will be placed in a file on the drone. Returns the path at which the
file will be placed.
"""
if not file_path:
file_path = self.get_temporary_path('attach')
files_for_execution = self._attached_files.setdefault(execution_tag, {})
assert file_path not in files_for_execution
files_for_execution[file_path] = file_contents
return file_path
def write_lines_to_file(self, file_path, lines, paired_with_process=None):
"""
Write the given lines (as a list of strings) to a file. If
paired_with_process is given, the file will be written on the drone
running the given Process. Otherwise, the file will be written to the
results repository.
"""
full_path = os.path.join(self._results_dir, file_path)
file_contents = '\n'.join(lines) + '\n'
if paired_with_process:
drone = self._get_drone_for_process(paired_with_process)
else:
drone = self._results_drone
drone.queue_call('write_to_file', full_path, file_contents)