Attached is a very large patch that adds support for running a
distributed Autotest service. Previously, the scheduler could only
execute autoservs locally and all results were written directly to the
local filesystem. This placed a limit on the number of machines that
could be concurrently tested by a single Autotest service instance due
to the strain of running many autoserv processes on a single machine.
With this change, the scheduler can spread autoserv processes among a
number of machines and gather all results to a single results
repository machine. This allows vastly improved scalability for a
single Autotest service instance. See
http://autotest.kernel.org/wiki/DistributedServerSetup for more
details.
Note that the single-server setup is still supported and the global
configuration defaults to this setup, so existing service instances
should continue to run.
Steve
Signed-off-by: Steve Howard <showard@google.com>
git-svn-id: http://test.kernel.org/svn/autotest/trunk@2596 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/apache/drone-conf b/apache/drone-conf
new file mode 100644
index 0000000..4702d18
--- /dev/null
+++ b/apache/drone-conf
@@ -0,0 +1,18 @@
+NameVirtualHost *
+<VirtualHost *>
+ ErrorLog /var/log/apache2/error.log
+ LogLevel warn
+
+ CustomLog /var/log/apache2/access.log combined
+ ServerSignature On
+
+ Alias /results "/usr/local/autotest/results/"
+ <Directory /usr/local/autotest/results/>
+ Options Indexes FollowSymLinks MultiViews
+ AllowOverride None
+ Order allow,deny
+ Allow from all
+ </Directory>
+
+
+</VirtualHost>
diff --git a/client/common_lib/test_utils/mock.py b/client/common_lib/test_utils/mock.py
index b734e83..0223d82 100644
--- a/client/common_lib/test_utils/mock.py
+++ b/client/common_lib/test_utils/mock.py
@@ -80,6 +80,8 @@
def __str__(self):
+ if isinstance(self.value, argument_comparator):
+ return str(self.value)
return repr(self.value)
@@ -118,6 +120,15 @@
return "is a %s" % self.cls
+class anything_comparator(argument_comparator):
+ def is_satisfied_by(self, parameter):
+ return True
+
+
+ def __str__(self):
+ return 'anything'
+
+
class base_mapping(object):
def __init__(self, symbol, return_obj, *args, **dargs):
self.return_obj = return_obj
@@ -421,14 +432,14 @@
if len(self.recording) != 0:
func_call = self.recording[0]
if func_call.symbol != symbol:
- msg = ("Unexpected call: %s. Expected %s"
+ msg = ("Unexpected call: %s\nExpected: %s"
% (_dump_function_call(symbol, args, dargs),
func_call))
self._append_error(msg)
return None
if not func_call.match(*args, **dargs):
- msg = ("%s called. Expected %s"
+ msg = ("Incorrect call: %s\nExpected: %s"
% (_dump_function_call(symbol, args, dargs),
func_call))
self._append_error(msg)
diff --git a/client/common_lib/utils.py b/client/common_lib/utils.py
index 77d7e81..cde942f 100644
--- a/client/common_lib/utils.py
+++ b/client/common_lib/utils.py
@@ -21,7 +21,8 @@
class BgJob(object):
- def __init__(self, command, stdout_tee=None, stderr_tee=None, verbose=True):
+ def __init__(self, command, stdout_tee=None, stderr_tee=None, verbose=True,
+ stdin=None):
self.command = command
self.stdout_tee = stdout_tee
self.stderr_tee = stderr_tee
@@ -32,7 +33,8 @@
self.sp = subprocess.Popen(command, stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
preexec_fn=self._reset_sigpipe, shell=True,
- executable="/bin/bash")
+ executable="/bin/bash",
+ stdin=stdin)
def output_prepare(self, stdout_file=None, stderr_file=None):
@@ -300,7 +302,7 @@
def run(command, timeout=None, ignore_status=False,
- stdout_tee=None, stderr_tee=None, verbose=True):
+ stdout_tee=None, stderr_tee=None, verbose=True, stdin=None):
"""
Run a command on the host.
@@ -316,6 +318,7 @@
will be written as it is generated (data will still
be stored in result.stdout)
stderr_tee: likewise for stderr
+ stdin: stdin to pass to the executed process
Returns:
a CmdResult object
@@ -324,8 +327,9 @@
CmdError: the exit code of the command
execution was not 0
"""
- bg_job = join_bg_jobs((BgJob(command, stdout_tee, stderr_tee, verbose),),
- timeout)[0]
+ bg_job = join_bg_jobs(
+ (BgJob(command, stdout_tee, stderr_tee, verbose, stdin=stdin),),
+ timeout)[0]
if not ignore_status and bg_job.result.exit_status:
raise error.CmdError(command, bg_job.result,
"Command returned non-zero exit status")
diff --git a/global_config.ini b/global_config.ini
index 7e70cba..0e023c9 100644
--- a/global_config.ini
+++ b/global_config.ini
@@ -32,9 +32,13 @@
max_running_jobs: 1000
max_jobs_started_per_cycle: 100
max_parse_processes: 5
+max_transfer_processes: 50
tick_pause_sec: 5
clean_interval_minutes: 60
synch_job_start_timeout_minutes: 150
+drones: localhost
+drone_installation_directory: /usr/local/autotest
+results_host: localhost
[HOSTS]
wait_up_processes:
diff --git a/scheduler/drone_manager.py b/scheduler/drone_manager.py
new file mode 100644
index 0000000..826a62c
--- /dev/null
+++ b/scheduler/drone_manager.py
@@ -0,0 +1,480 @@
+import os, re, shutil, signal, subprocess, errno, time, heapq, traceback
+import common
+from autotest_lib.client.common_lib import error
+from autotest_lib.scheduler import email_manager, drone_utility, drones
+
+_AUTOSERV_PID_FILE = '.autoserv_execute'
+
+
+class DroneManagerError(Exception):
+ pass
+
+
+class CustomEquals(object):
+ def _id(self):
+ raise NotImplementedError
+
+
+ def __eq__(self, other):
+ if not isinstance(other, type(self)):
+ return NotImplemented
+ return self._id() == other._id()
+
+
+ def __ne__(self, other):
+ return not self == other
+
+
+ def __hash__(self):
+ return hash(self._id())
+
+
+class Process(CustomEquals):
+ def __init__(self, hostname, pid, ppid=None):
+ self.hostname = hostname
+ self.pid = pid
+ self.ppid = ppid
+
+ def _id(self):
+ return (self.hostname, self.pid)
+
+
+ def __str__(self):
+ return '%s/%s' % (self.hostname, self.pid)
+
+
+ def __repr__(self):
+ return super(Process, self).__repr__() + '<%s>' % self
+
+
+class PidfileId(CustomEquals):
+ def __init__(self, path):
+ self.path = path
+
+
+ def _id(self):
+ return self.path
+
+
+ def __str__(self):
+ return str(self.path)
+
+
+class PidfileContents(object):
+ process = None
+ exit_status = None
+ num_tests_failed = None
+
+ def is_invalid(self):
+ return False
+
+
+class InvalidPidfile(object):
+ def __init__(self, error):
+ self.error = error
+
+
+ def is_invalid(self):
+ return True
+
+
+ def __str__(self):
+ return self.error
+
+
+class DroneManager(object):
+ """
+ This class acts as an interface from the scheduler to drones, whether it be
+ only a single "drone" for localhost or multiple remote drones.
+
+ All paths going into and out of this class are relative to the full results
+ directory, except for those returns by absolute_path().
+ """
+ _MAX_PIDFILE_AGE = 1000
+ _NULL_HOSTNAME = 'null host'
+
+ def __init__(self):
+ self._results_dir = None
+ self._processes = {}
+ self._process_set = set()
+ self._pidfiles = {}
+ self._pidfiles_second_read = {}
+ self._pidfile_age = {}
+ self._temporary_path_counter = 0
+ self._drones = {}
+ self._results_drone = None
+ self._attached_files = {}
+ self._drone_queue = []
+ self._null_drone = drones.NullDrone()
+
+
+ def initialize(self, base_results_dir, drone_hostnames,
+ results_repository_hostname):
+ self._results_dir = base_results_dir
+ drones.set_temporary_directory(os.path.join(
+ base_results_dir, drone_utility._TEMPORARY_DIRECTORY))
+
+ for hostname in drone_hostnames:
+ try:
+ drone = self._add_drone(hostname)
+ drone.call('initialize', base_results_dir)
+ except error.AutoservError:
+ warning = 'Drone %s failed to initialize:\n%s' % (
+ hostname, traceback.format_exc())
+ print warning
+ email_manager.manager.enqueue_notify_email(
+ 'Drone failed to initialize', warning)
+ self._remove_drone(hostname)
+
+ if not self._drones:
+ # all drones failed to initialize
+ raise DroneManagerError('No valid drones found')
+
+ print 'Using results repository on', results_repository_hostname
+ self._results_drone = drones.get_drone(results_repository_hostname)
+ # don't initialize() the results drone - we don't want to clear out any
+ # directories and we don't need ot kill any processes
+
+
+ def reinitialize_drones(self):
+ self._call_all_drones('initialize', self._results_dir)
+
+
+ def shutdown(self):
+ for drone in self._drones.itervalues():
+ drone.shutdown()
+
+
+ def _add_drone(self, hostname):
+ print 'Adding drone', hostname
+ drone = drones.get_drone(hostname)
+ self._drones[drone.hostname] = drone
+ return drone
+
+
+ def _remove_drone(self, hostname):
+ self._drones.pop(hostname, None)
+
+
+ def _get_drone_for_process(self, process):
+ if process.hostname == self._NULL_HOSTNAME:
+ return self._null_drone
+ return self._drones[process.hostname]
+
+
+ def _get_drone_for_pidfile_id(self, pidfile_id):
+ pidfile_contents = self.get_pidfile_contents(pidfile_id)
+ assert pidfile_contents.process is not None
+ return self._get_drone_for_process(pidfile_contents.process)
+
+
+ def _drop_old_pidfiles(self):
+ for pidfile_id, age in self._pidfile_age.items():
+ if age > self._MAX_PIDFILE_AGE:
+ del self._pidfile_age[pidfile_id]
+ else:
+ self._pidfile_age[pidfile_id] += 1
+
+
+ def _reset(self):
+ self._processes = {}
+ self._process_set = set()
+ self._pidfiles = {}
+ self._pidfiles_second_read = {}
+ self._drone_queue = []
+
+
+ def _call_all_drones(self, method, *args, **kwargs):
+ all_results = {}
+ for drone in self._drones.itervalues():
+ all_results[drone] = drone.call(method, *args, **kwargs)
+ return all_results
+
+
+ def _parse_pidfile(self, drone, raw_contents):
+ contents = PidfileContents()
+ if not raw_contents:
+ return contents
+ lines = raw_contents.splitlines()
+ if len(lines) > 3:
+ return InvalidPidfile('Corrupt pid file (%d lines):\n%s' %
+ (len(lines), lines))
+ try:
+ pid = int(lines[0])
+ contents.process = Process(drone.hostname, pid)
+ # if len(lines) == 2, assume we caught Autoserv between writing
+ # exit_status and num_failed_tests, so just ignore it and wait for
+ # the next cycle
+ if len(lines) == 3:
+ contents.exit_status = int(lines[1])
+ contents.num_tests_failed = int(lines[2])
+ except ValueError, exc:
+ return InvalidPidfile('Corrupt pid file: ' + str(exc.args))
+
+ return contents
+
+
+ def _process_pidfiles(self, drone, pidfiles, store_in_dict):
+ for pidfile_path, contents in pidfiles.iteritems():
+ pidfile_id = PidfileId(pidfile_path)
+ contents = self._parse_pidfile(drone, contents)
+ store_in_dict[pidfile_id] = contents
+
+
+ def refresh(self):
+ """
+ Called at the beginning of a scheduler cycle to refresh all process
+ information.
+ """
+ self._reset()
+ pidfile_paths = [pidfile_id.path for pidfile_id in self._pidfile_age]
+ all_results = self._call_all_drones('refresh', pidfile_paths)
+
+ for drone, results_list in all_results.iteritems():
+ results = results_list[0]
+ process_count = len(results['processes'])
+ heapq.heappush(self._drone_queue, (process_count, drone))
+ for process_info in results['processes']:
+ # only root autoserv processes have pgid == pid
+ if process_info['pgid'] != process_info['pid']:
+ continue
+ process = Process(drone.hostname, int(process_info['pid']),
+ int(process_info['ppid']))
+ execution_tag = self._execution_tag_for_process(drone,
+ process_info)
+ self._processes[execution_tag] = process
+ self._process_set.add(process)
+
+ self._process_pidfiles(drone, results['pidfiles'], self._pidfiles)
+ self._process_pidfiles(drone, results['pidfiles_second_read'],
+ self._pidfiles_second_read)
+
+
+ def _execution_tag_for_process(self, drone, process_info):
+ execution_tag = self._extract_execution_tag(process_info['args'])
+ if not execution_tag:
+ # this process has no execution tag - just make up something unique
+ return '%s.%s' % (drone, process_info['pid'])
+ return execution_tag
+
+
+ def _extract_execution_tag(self, command):
+ match = re.match(r'.* -P (\S+) ', command)
+ if not match:
+ return None
+ return match.group(1)
+
+
+ def execute_actions(self):
+ """
+ Called at the end of a scheduler cycle to execute all queued actions
+ on drones.
+ """
+ for drone in self._drones.values():
+ drone.execute_queued_calls()
+
+ try:
+ self._results_drone.execute_queued_calls()
+ except error.AutoservError:
+ warning = ('Results repository failed to execute calls:\n' +
+ traceback.format_exc())
+ print warning
+ email_manager.manager.enqueue_notify_email(
+ 'Results repository error', warning)
+ self._results_drone.clear_call_queue()
+
+
+ def get_orphaned_autoserv_processes(self):
+ """
+ Returns a dict mapping execution tags to AutoservProcess objects for
+ orphaned processes only.
+ """
+ return dict((execution_tag, process)
+ for execution_tag, process in self._processes.iteritems()
+ if process.ppid == 1)
+
+
+ def get_process_for(self, execution_tag):
+ """
+ Return the process object for the given execution tag.
+ """
+ return self._processes.get(execution_tag, None)
+
+
+ def get_dummy_process(self):
+ """
+ Return a null process object.
+ """
+ return Process(self._NULL_HOSTNAME, 0)
+
+
+ def kill_process(self, process):
+ """
+ Kill the given process.
+ """
+ print 'killing', process
+ drone = self._get_drone_for_process(process)
+ drone.queue_call('kill_process', process)
+
+
+ def _ensure_directory_exists(self, path):
+ if not os.path.exists(path):
+ os.makedirs(path)
+
+
+ def _extract_num_processes(self, command):
+ try:
+ machine_list_index = command.index('-m') + 1
+ except ValueError:
+ return 1
+ assert machine_list_index < len(command)
+ machine_list = command[machine_list_index].split(',')
+ return len(machine_list)
+
+
+ def _choose_drone_for_execution(self, num_processes):
+ processes, drone_to_use = heapq.heappop(self._drone_queue)
+ heapq.heappush(self._drone_queue,
+ (processes + num_processes, drone_to_use))
+ return drone_to_use
+
+
+ def execute_command(self, command, working_directory, log_file=None,
+ pidfile_name=None, paired_with_pidfile=None):
+ """
+ Execute the given command, taken as an argv list.
+
+ * working_directory: directory in which the pidfile will be written
+ * log_file (optional): specifies a path (in the results repository) to
+ hold command output.
+ * pidfile_name (optional): gives the name of the pidfile this process
+ will write
+ * paired_with_pidfile (optional): a PidfileId for an already-executed
+ process; the new process will execute on the same drone as the
+ previous process.
+ """
+ working_directory = self.absolute_path(working_directory)
+ if not log_file:
+ log_file = self.get_temporary_path('execute')
+ log_file = self.absolute_path(log_file)
+ if not pidfile_name:
+ pidfile_name = _AUTOSERV_PID_FILE
+
+ if paired_with_pidfile:
+ drone = self._get_drone_for_pidfile_id(paired_with_pidfile)
+ else:
+ num_processes = self._extract_num_processes(command)
+ drone = self._choose_drone_for_execution(num_processes)
+ print "command = %s" % command
+ print 'log file = %s:%s' % (drone.hostname, log_file)
+ self._write_attached_files(command, drone)
+ drone.queue_call('execute_command', command, working_directory,
+ log_file, pidfile_name)
+
+ pidfile_path = self.absolute_path(os.path.join(working_directory,
+ pidfile_name))
+ pidfile_id = PidfileId(pidfile_path)
+ self.register_pidfile(pidfile_id)
+ return pidfile_id
+
+
+ def get_pidfile_id_from(self, execution_tag):
+ path = os.path.join(self.absolute_path(execution_tag),
+ _AUTOSERV_PID_FILE)
+ return PidfileId(path)
+
+
+ def register_pidfile(self, pidfile_id):
+ """
+ Indicate that the DroneManager should look for the given pidfile when
+ refreshing.
+ """
+ self._pidfile_age[pidfile_id] = 0
+
+
+ def get_pidfile_contents(self, pidfile_id, use_second_read=False):
+ """
+ Retrieve a PidfileContents object for the given pidfile_id. If
+ use_second_read is True, use results that were read after the processes
+ were checked, instead of before.
+ """
+ self.register_pidfile(pidfile_id)
+ if use_second_read:
+ pidfile_map = self._pidfiles_second_read
+ else:
+ pidfile_map = self._pidfiles
+ return pidfile_map.get(pidfile_id, PidfileContents())
+
+
+ def is_process_running(self, process):
+ """
+ Check if the given process is in the running process list.
+ """
+ return process in self._process_set
+
+
+ def get_temporary_path(self, base_name):
+ """
+ Get a new temporary path guaranteed to be unique across all drones
+ for this scheduler execution.
+ """
+ self._temporary_path_counter += 1
+ return os.path.join(drone_utility._TEMPORARY_DIRECTORY,
+ '%s.%s' % (base_name, self._temporary_path_counter))
+
+
+ def absolute_path(self, path):
+ return os.path.join(self._results_dir, path)
+
+
+ def copy_to_results_repository(self, process, source_path,
+ destination_path=None):
+ """
+ Copy results from the given process at source_path to destination_path
+ in the results repository.
+ """
+ if destination_path is None:
+ destination_path = source_path
+ full_source = self.absolute_path(source_path)
+ full_destination = self.absolute_path(destination_path)
+ source_drone = self._get_drone_for_process(process)
+ source_drone.send_file_to(self._results_drone, full_source,
+ full_destination, can_fail=True)
+
+
+ def _write_attached_files(self, command, drone):
+ execution_tag = self._extract_execution_tag(' '.join(command))
+ attached_files = self._attached_files.pop(execution_tag, [])
+ for file_path, contents in attached_files:
+ drone.queue_call('write_to_file', self.absolute_path(file_path),
+ contents)
+
+
+ def attach_file_to_execution(self, execution_tag, file_contents,
+ file_path=None):
+ """
+ When the process for execution_tag is executed, the given file contents
+ will be placed in a file on the drone. Returns the path at which the
+ file will be placed.
+ """
+ if not file_path:
+ file_path = self.get_temporary_path('attach')
+ self._attached_files.setdefault(execution_tag, []).append(
+ (file_path, file_contents))
+ return file_path
+
+
+ def write_lines_to_file(self, file_path, lines, paired_with_pidfile=None):
+ """
+ Write the given lines (as a list of strings) to a file. If
+ paired_with_pidfile is given, the file will be written on the drone
+ running the given PidfileId. Otherwise, the file will be written to the
+ results repository.
+ """
+ full_path = os.path.join(self._results_dir, file_path)
+ file_contents = '\n'.join(lines) + '\n'
+ if paired_with_pidfile:
+ drone = self._get_drone_for_pidfile_id(paired_with_pidfile)
+ else:
+ drone = self._results_drone
+ drone.queue_call('write_to_file', full_path, file_contents)
diff --git a/scheduler/drone_utility.py b/scheduler/drone_utility.py
new file mode 100644
index 0000000..7e69ba0
--- /dev/null
+++ b/scheduler/drone_utility.py
@@ -0,0 +1,297 @@
+#!/usr/bin/python2.4
+
+import pickle, subprocess, os, shutil, socket, sys, time, signal, getpass
+import datetime, traceback
+import common
+from autotest_lib.client.common_lib import utils, global_config, error
+from autotest_lib.server import hosts, subcommand
+from autotest_lib.scheduler import email_manager
+
+_TEMPORARY_DIRECTORY = 'drone_tmp'
+_TRANSFER_FAILED_FILE = '.transfer_failed'
+
+class _MethodCall(object):
+ def __init__(self, method, args, kwargs):
+ self._method = method
+ self._args = args
+ self._kwargs = kwargs
+
+
+ def execute_on(self, drone_utility):
+ method = getattr(drone_utility, self._method)
+ return method(*self._args, **self._kwargs)
+
+
+ def __str__(self):
+ args = ', '.join(repr(arg) for arg in self._args)
+ kwargs = ', '.join('%s=%r' % (key, value) for key, value in
+ self._kwargs.iteritems())
+ full_args = ', '.join(item for item in (args, kwargs) if item)
+ return '%s(%s)' % (self._method, full_args)
+
+
+def call(method, *args, **kwargs):
+ return _MethodCall(method, args, kwargs)
+
+
+class DroneUtility(object):
+ """
+ This class executes actual OS calls on the drone machine.
+
+ All paths going into and out of this class are absolute.
+ """
+ _PS_ARGS = ['pid', 'pgid', 'ppid', 'comm', 'args']
+ _MAX_TRANSFER_PROCESSES = global_config.global_config.get_config_value(
+ 'SCHEDULER', 'max_transfer_processes', type=int)
+ _WARNING_DURATION = 60
+
+ def __init__(self):
+ self.warnings = []
+ self._subcommands = []
+
+
+ def initialize(self, results_dir):
+ temporary_directory = os.path.join(results_dir, _TEMPORARY_DIRECTORY)
+ if os.path.exists(temporary_directory):
+ shutil.rmtree(temporary_directory)
+ self._ensure_directory_exists(temporary_directory)
+
+ # make sure there are no old parsers running
+ os.system('killall parse')
+
+
+ def _warn(self, warning):
+ self.warnings.append(warning)
+
+
+ def _refresh_autoserv_processes(self):
+ ps_proc = subprocess.Popen(
+ ['/bin/ps', 'x', '-o', ','.join(self._PS_ARGS)],
+ stdout=subprocess.PIPE)
+ ps_output = ps_proc.communicate()[0]
+
+ # split each line into the columns output by ps
+ split_lines = [line.split(None, 4) for line in ps_output.splitlines()]
+ process_infos = [dict(zip(self._PS_ARGS, line_components))
+ for line_components in split_lines]
+
+ processes = []
+ for info in process_infos:
+ if info['comm'] == 'autoserv':
+ processes.append(info)
+
+ return processes
+
+
+ def _read_pidfiles(self, pidfile_paths):
+ pidfiles = {}
+ for pidfile_path in pidfile_paths:
+ if not os.path.exists(pidfile_path):
+ continue
+ try:
+ file_object = open(pidfile_path, 'r')
+ pidfiles[pidfile_path] = file_object.read()
+ file_object.close()
+ except IOError:
+ continue
+ return pidfiles
+
+
+ def refresh(self, pidfile_paths):
+ results = {
+ 'pidfiles' : self._read_pidfiles(pidfile_paths),
+ 'processes' : self._refresh_autoserv_processes(),
+ 'pidfiles_second_read' : self._read_pidfiles(pidfile_paths),
+ }
+ return results
+
+
+ def _is_process_running(self, process):
+ # TODO: enhance this to check the process args
+ proc_path = os.path.join('/proc', str(process.pid))
+ return os.path.exists(proc_path)
+
+
+ def kill_process(self, process):
+ if self._is_process_running(process):
+ os.kill(process.pid, signal.SIGCONT)
+ os.kill(process.pid, signal.SIGTERM)
+
+
+ def _ensure_directory_exists(self, path):
+ if not os.path.exists(path):
+ os.makedirs(path)
+
+
+ def execute_command(self, command, working_directory, log_file,
+ pidfile_name):
+ out_file = None
+ if log_file:
+ self._ensure_directory_exists(os.path.dirname(log_file))
+ try:
+ out_file = open(log_file, 'a')
+ separator = ('*' * 80) + '\n'
+ out_file.write('\n' + separator)
+ out_file.write("%s> %s\n" % (time.strftime("%X %x"), command))
+ out_file.write(separator)
+ except (OSError, IOError):
+ email_manager.manager.log_stacktrace(
+ 'Error opening log file %s' % log_file)
+
+ if not out_file:
+ out_file = open('/dev/null', 'w')
+
+ in_devnull = open('/dev/null', 'r')
+
+ self._ensure_directory_exists(working_directory)
+ pidfile_path = os.path.join(working_directory, pidfile_name)
+ if os.path.exists(pidfile_path):
+ self._warn('Pidfile %s already exists' % pidfile_path)
+ os.remove(pidfile_path)
+
+ subprocess.Popen(command, stdout=out_file, stderr=subprocess.STDOUT,
+ stdin=in_devnull)
+ out_file.close()
+ in_devnull.close()
+
+
+ def write_to_file(self, file_path, contents):
+ self._ensure_directory_exists(os.path.dirname(file_path))
+ try:
+ file_object = open(file_path, 'a')
+ file_object.write(contents)
+ file_object.close()
+ except IOError, exc:
+ self._warn('Error write to file %s: %s' % (file_path, exc))
+
+
+ def copy_file(self, source_path, destination_path):
+ if source_path == destination_path:
+ return
+ self._ensure_directory_exists(os.path.dirname(destination_path))
+ shutil.copy(source_path, destination_path)
+
+
+ def run_async_command(self, function, args):
+ subproc = subcommand.subcommand(function, args)
+ self._subcommands.append(subproc)
+ subproc.fork_start()
+
+
+ def wait_for_async_commands(self):
+ for subproc in self._subcommands:
+ subproc.fork_waitfor()
+ self._subcommands = []
+
+
+ def _sync_get_file_from(self, hostname, source_path, destination_path):
+ self._ensure_directory_exists(os.path.dirname(destination_path))
+ host = create_host(hostname)
+ host.get_file(source_path, destination_path, delete_dest=True)
+
+
+ def get_file_from(self, hostname, source_path, destination_path):
+ self.run_async_command(self._sync_get_file_from,
+ (hostname, source_path, destination_path))
+
+
+ def _sync_send_file_to(self, hostname, source_path, destination_path,
+ can_fail):
+ host = create_host(hostname)
+ try:
+ host.run('mkdir -p ' + os.path.dirname(destination_path))
+ host.send_file(source_path, destination_path, delete_dest=True)
+ except error.AutoservError:
+ if not can_fail:
+ raise
+
+ if os.path.isdir(source_path):
+ failed_file = os.path.join(source_path, _TRANSFER_FAILED_FILE)
+ file_object = open(failed_file, 'w')
+ try:
+ file_object.write('%s:%s\n%s\n%s' %
+ (hostname, destination_path,
+ datetime.datetime.now(),
+ traceback.format_exc()))
+ finally:
+ file_object.close()
+ else:
+ copy_to = destination_path + _TRANSFER_FAILED_FILE
+ self._ensure_directory_exists(os.path.dirname(copy_to))
+ self.copy_file(source_path, copy_to)
+
+
+ def send_file_to(self, hostname, source_path, destination_path,
+ can_fail=False):
+ self.run_async_command(self._sync_send_file_to,
+ (hostname, source_path, destination_path,
+ can_fail))
+
+
+ def _report_long_execution(self, calls, duration):
+ call_count = {}
+ for call in calls:
+ call_count.setdefault(call._method, 0)
+ call_count[call._method] += 1
+ call_summary = '\n'.join('%d %s' % (count, method)
+ for method, count in call_count.iteritems())
+ self._warn('Execution took %f sec\n%s' % (duration, call_summary))
+
+
+ def execute_calls(self, calls):
+ results = []
+ start_time = time.time()
+ for method_call in calls:
+ results.append(method_call.execute_on(self))
+ if len(self._subcommands) >= self._MAX_TRANSFER_PROCESSES:
+ self.wait_for_async_commands()
+ self.wait_for_async_commands()
+
+ duration = time.time() - start_time
+ if duration > self._WARNING_DURATION:
+ self._report_long_execution(calls, duration)
+
+ warnings = self.warnings
+ self.warnings = []
+ return dict(results=results, warnings=warnings)
+
+
+def create_host(hostname):
+ username = global_config.global_config.get_config_value(
+ 'SCHEDULER', hostname + '_username', default=getpass.getuser())
+ return hosts.SSHHost(hostname, user=username)
+
+
+def parse_input():
+ input_chunks = []
+ chunk_of_input = sys.stdin.read()
+ while chunk_of_input:
+ input_chunks.append(chunk_of_input)
+ chunk_of_input = sys.stdin.read()
+ pickled_input = ''.join(input_chunks)
+
+ try:
+ return pickle.loads(pickled_input)
+ except Exception, exc:
+ separator = '*' * 50
+ raise ValueError('Unpickling input failed\n'
+ 'Input: %r\n'
+ 'Exception from pickle:\n'
+ '%s\n%s\n%s' %
+ (pickled_input, separator, traceback.format_exc(),
+ separator))
+
+
+def return_data(data):
+ print pickle.dumps(data)
+
+
+def main():
+ calls = parse_input()
+ drone_utility = DroneUtility()
+ return_value = drone_utility.execute_calls(calls)
+ return_data(return_value)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/scheduler/drones.py b/scheduler/drones.py
new file mode 100644
index 0000000..f687cd8
--- /dev/null
+++ b/scheduler/drones.py
@@ -0,0 +1,172 @@
+import pickle, os, tempfile
+import common
+from autotest_lib.scheduler import drone_utility, email_manager
+from autotest_lib.server import hosts
+from autotest_lib.client.common_lib import error, global_config
+
+
+AUTOTEST_INSTALL_DIR = global_config.global_config.get_config_value('SCHEDULER',
+ 'drone_installation_directory')
+
+class _AbstractDrone(object):
+ def __init__(self):
+ self._calls = []
+ self.hostname = None
+
+
+ def shutdown(self):
+ pass
+
+
+ def _execute_calls_impl(self, calls):
+ raise NotImplementedError
+
+
+ def _execute_calls(self, calls):
+ return_message = self._execute_calls_impl(calls)
+ for warning in return_message['warnings']:
+ subject = 'Warning from drone %s' % self.hostname
+ print subject + '\n' + warning
+ email_manager.manager.enqueue_notify_email(subject, warning)
+ return return_message['results']
+
+
+ def call(self, method, *args, **kwargs):
+ return self._execute_calls(
+ [drone_utility.call(method, *args, **kwargs)])
+
+
+ def queue_call(self, method, *args, **kwargs):
+ self._calls.append(drone_utility.call(method, *args, **kwargs))
+
+ def clear_call_queue(self):
+ self._calls = []
+
+
+ def execute_queued_calls(self):
+ if not self._calls:
+ return
+ self._execute_calls(self._calls)
+ self.clear_call_queue()
+
+
+class _LocalDrone(_AbstractDrone):
+ def __init__(self):
+ super(_LocalDrone, self).__init__()
+ self.hostname = 'localhost'
+ self._drone_utility = drone_utility.DroneUtility()
+
+
+ def _execute_calls_impl(self, calls):
+ return self._drone_utility.execute_calls(calls)
+
+
+ def send_file_to(self, drone, source_path, destination_path,
+ can_fail=False):
+ if drone.hostname == self.hostname:
+ self.queue_call('copy_file', source_path, destination_path)
+ else:
+ self.queue_call('send_file_to', drone.hostname, source_path,
+ destination_path, can_fail)
+
+
+class _RemoteDrone(_AbstractDrone):
+ _temporary_directory = None
+
+ def __init__(self, hostname):
+ super(_RemoteDrone, self).__init__()
+ self.hostname = hostname
+ self._host = drone_utility.create_host(hostname)
+ self._drone_utility_path = os.path.join(AUTOTEST_INSTALL_DIR,
+ 'scheduler',
+ 'drone_utility.py')
+
+ try:
+ self._host.run('mkdir -p ' + self._temporary_directory,
+ timeout=10)
+ except error.AutoservError:
+ pass
+
+
+ @classmethod
+ def set_temporary_directory(cls, temporary_directory):
+ cls._temporary_directory = temporary_directory
+
+
+ def shutdown(self):
+ super(_RemoteDrone, self).shutdown()
+ self._host.close()
+
+
+ def _execute_calls_impl(self, calls):
+ calls_fd, calls_filename = tempfile.mkstemp(suffix='.pickled_calls')
+ calls_file = os.fdopen(calls_fd, 'w+')
+ pickle.dump(calls, calls_file)
+ calls_file.flush()
+ calls_file.seek(0)
+
+ try:
+ result = self._host.run('python %s' % self._drone_utility_path,
+ stdin=calls_file)
+ finally:
+ calls_file.close()
+ os.remove(calls_filename)
+
+ try:
+ return pickle.loads(result.stdout)
+ except Exception: # pickle.loads can throw all kinds of exceptions
+ print 'Invalid response:\n---\n%s\n---' % result.stdout
+ raise
+
+
+ def send_file_to(self, drone, source_path, destination_path,
+ can_fail=False):
+ if drone.hostname == self.hostname:
+ self.queue_call('copy_file', source_path, destination_path)
+ elif isinstance(drone, _LocalDrone):
+ drone.queue_call('get_file_from', self.hostname, source_path,
+ destination_path)
+ else:
+ self.queue_call('send_file_to', drone.hostname, source_path,
+ destination_path, can_fail)
+
+
+class NullDrone(object):
+ """
+ Null object utility for processes that have failed to run.
+ """
+ def shutdown(self):
+ pass
+
+
+ def call(self, *args, **kwargs):
+ pass
+
+
+ def queue_call(self, *args, **kwargs):
+ pass
+
+
+ def clear_call_queue(self):
+ pass
+
+
+ def execute_queued_calls(self):
+ pass
+
+
+ def send_file_to(self, *args, **kwargs):
+ pass
+
+
+def set_temporary_directory(temporary_directory):
+ _RemoteDrone.set_temporary_directory(temporary_directory)
+
+
+def get_drone(hostname):
+ """
+ Use this factory method to get drone objects.
+ """
+ if hostname == 'localhost':
+ return _LocalDrone()
+ return _RemoteDrone(hostname)
diff --git a/scheduler/email_manager.py b/scheduler/email_manager.py
new file mode 100644
index 0000000..bc30f78
--- /dev/null
+++ b/scheduler/email_manager.py
@@ -0,0 +1,72 @@
+import traceback, socket, os, time, smtplib, re, sys, getpass
+import common
+from autotest_lib.client.common_lib import global_config
+
+CONFIG_SECTION = 'SCHEDULER'
+
+class EmailNotificationManager(object):
+ def __init__(self):
+ self._emails = []
+
+ self._from_address = global_config.global_config.get_config_value(
+ CONFIG_SECTION, "notify_email_from")
+ if not self._from_address:
+ self._from_address = getpass.getuser()
+
+ self._notify_address = global_config.global_config.get_config_value(
+ CONFIG_SECTION, "notify_email")
+
+
+ def send_email(self, to_string, subject, body):
+ """Mails out emails to the addresses listed in to_string.
+
+ to_string is split into a list which can be delimited by any of:
+ ';', ',', ':' or any whitespace
+ """
+ # Create list from string removing empty strings from the list.
+ to_list = [x for x in re.split('\s|,|;|:', to_string) if x]
+ if not to_list:
+ return
+
+ msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (
+ self._from_address, ', '.join(to_list), subject, body)
+ try:
+ mailer = smtplib.SMTP('localhost')
+ try:
+ mailer.sendmail(self._from_address, to_list, msg)
+ finally:
+ mailer.quit()
+ except Exception:
+ print "Sending email failed:"
+ traceback.print_exc()
+
+
+ def enqueue_notify_email(self, subject, message):
+ if not self._notify_address:
+ return
+
+ body = 'Subject: ' + subject + '\n'
+ body += "%s / %s / %s\n%s" % (socket.gethostname(),
+ os.getpid(),
+ time.strftime("%X %x"), message)
+ self._emails.append(body)
+
+
+ def send_queued_emails(self):
+ if not self._emails:
+ return
+ subject = 'Scheduler notifications from ' + socket.gethostname()
+ separator = '\n' + '-' * 40 + '\n'
+ body = separator.join(self._emails)
+
+ self.send_email(self._notify_address, subject, body)
+ self._emails = []
+
+
+ def log_stacktrace(self, reason):
+ message = "EXCEPTION: %s\n%s" % (reason, traceback.format_exc())
+ sys.stderr.write("\n%s\n" % message)
+ self.enqueue_notify_email("monitor_db exception", message)
+
+
+manager = EmailNotificationManager()
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index 4210d42..a47f520 100644
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -7,17 +7,20 @@
import datetime, errno, MySQLdb, optparse, os, pwd, Queue, re, shutil, signal
import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
+import itertools, logging
import common
from autotest_lib.frontend import setup_django_environment
from autotest_lib.client.common_lib import global_config
from autotest_lib.client.common_lib import host_protections, utils, debug
from autotest_lib.database import database_connection
from autotest_lib.frontend.afe import models
+from autotest_lib.scheduler import drone_manager, drones, email_manager
RESULTS_DIR = '.'
AUTOSERV_NICE_LEVEL = 10
-CONFIG_SECTION = 'AUTOTEST_WEB'
+CONFIG_SECTION = 'SCHEDULER'
+DB_CONFIG_SECTION = 'AUTOTEST_WEB'
AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
@@ -29,20 +32,17 @@
if AUTOTEST_SERVER_DIR not in sys.path:
sys.path.insert(0, AUTOTEST_SERVER_DIR)
-AUTOSERV_PID_FILE = '.autoserv_execute'
# how long to wait for autoserv to write a pidfile
PIDFILE_TIMEOUT = 5 * 60 # 5 min
_db = None
_shutdown = False
-_notify_email = None
-_autoserv_path = 'autoserv'
+_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
+_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
_testing_mode = False
-_global_config_section = 'SCHEDULER'
_base_url = None
-# see os.getlogin() online docs
-_email_from = pwd.getpwuid(os.getuid())[0]
_notify_email_statuses = []
+_drone_manager = drone_manager.DroneManager()
def main():
@@ -65,26 +65,16 @@
global RESULTS_DIR
RESULTS_DIR = args[0]
- # read in notify_email from global_config
c = global_config.global_config
- global _notify_email
- val = c.get_config_value(_global_config_section, "notify_email")
- if val != "":
- _notify_email = val
-
- global _email_from
- val = c.get_config_value(_global_config_section, "notify_email_from")
- if val != "":
- _email_from = val
-
+ notify_statuses_list = c.get_config_value(CONFIG_SECTION,
+ "notify_email_statuses")
global _notify_email_statuses
- val = c.get_config_value(_global_config_section, "notify_email_statuses")
- if val != "":
- _notify_email_statuses = [status for status in
- re.split(r'[\s,;:]', val.lower()) if status]
+ _notify_email_statuses = [status for status in
+ re.split(r'[\s,;:]', notify_statuses_list.lower())
+ if status]
- tick_pause = c.get_config_value(
- _global_config_section, 'tick_pause_sec', type=int)
+ tick_pause = c.get_config_value(CONFIG_SECTION, 'tick_pause_sec',
+ type=int)
if options.test:
global _autoserv_path
@@ -95,7 +85,8 @@
# AUTOTEST_WEB.base_url is still a supported config option as some people
# may wish to override the entire url.
global _base_url
- config_base_url = c.get_config_value(CONFIG_SECTION, 'base_url')
+ config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
+ default='')
if config_base_url:
_base_url = config_base_url
else:
@@ -116,9 +107,11 @@
dispatcher.tick()
time.sleep(tick_pause)
except:
- log_stacktrace("Uncaught exception; terminating monitor_db")
+ email_manager.manager.log_stacktrace(
+ "Uncaught exception; terminating monitor_db")
- email_manager.send_queued_emails()
+ email_manager.manager.send_queued_emails()
+ _drone_manager.shutdown()
_db.disconnect()
@@ -136,21 +129,32 @@
if _testing_mode:
global_config.global_config.override_config_value(
- CONFIG_SECTION, 'database', 'stresstest_autotest_web')
+ DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
global _db
- _db = database_connection.DatabaseConnection(CONFIG_SECTION)
+ _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
_db.connect()
# ensure Django connection is in autocommit
setup_django_environment.enable_autocommit()
debug.configure('scheduler', format_string='%(message)s')
+ debug.get_logger().setLevel(logging.WARNING)
print "Setting signal handler"
signal.signal(signal.SIGINT, handle_sigint)
+ drones = global_config.global_config.get_config_value(CONFIG_SECTION,
+ 'drones')
+ if drones:
+ drone_list = [hostname.strip() for hostname in drones.split(',')]
+ else:
+ drone_list = ['localhost']
+ results_host = global_config.global_config.get_config_value(
+ CONFIG_SECTION, 'results_host', default='localhost')
+ _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
+
print "Connected! Running..."
@@ -176,98 +180,6 @@
qe = [HostQueueEntry(row=i) for i in rows]
return qe
-def remove_file_or_dir(path):
- if stat.S_ISDIR(os.stat(path).st_mode):
- # directory
- shutil.rmtree(path)
- else:
- # file
- os.remove(path)
-
-
-def log_stacktrace(reason):
- (type, value, tb) = sys.exc_info()
- str = "EXCEPTION: %s\n" % reason
- str += ''.join(traceback.format_exception(type, value, tb))
-
- sys.stderr.write("\n%s\n" % str)
- email_manager.enqueue_notify_email("monitor_db exception", str)
-
-
-def get_proc_poll_fn(pid):
- proc_path = os.path.join('/proc', str(pid))
- def poll_fn():
- if os.path.exists(proc_path):
- return None
- return 0 # we can't get a real exit code
- return poll_fn
-
-
-def send_email(to_string, subject, body):
- """Mails out emails to the addresses listed in to_string.
-
- to_string is split into a list which can be delimited by any of:
- ';', ',', ':' or any whitespace
- """
-
- # Create list from string removing empty strings from the list.
- to_list = [x for x in re.split('\s|,|;|:', to_string) if x]
- if not to_list:
- return
-
- msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (
- _email_from, ', '.join(to_list), subject, body)
- try:
- mailer = smtplib.SMTP('localhost')
- try:
- mailer.sendmail(_email_from, to_list, msg)
- finally:
- mailer.quit()
- except Exception, e:
- print "Sending email failed. Reason: %s" % repr(e)
-
-
-def kill_autoserv(pid, poll_fn=None):
- print 'killing', pid
- if poll_fn is None:
- poll_fn = get_proc_poll_fn(pid)
- if poll_fn() is None:
- os.kill(pid, signal.SIGCONT)
- os.kill(pid, signal.SIGTERM)
-
-
-def ensure_directory_exists(directory_path):
- if not os.path.exists(directory_path):
- os.makedirs(directory_path)
-
-
-class EmailNotificationManager(object):
- def __init__(self):
- self._emails = []
-
- def enqueue_notify_email(self, subject, message):
- if not _notify_email:
- return
-
- body = 'Subject: ' + subject + '\n'
- body += "%s / %s / %s\n%s" % (socket.gethostname(),
- os.getpid(),
- time.strftime("%X %x"), message)
- self._emails.append(body)
-
-
- def send_queued_emails(self):
- if not self._emails:
- return
- subject = 'Scheduler notifications from ' + socket.gethostname()
- separator = '\n' + '-' * 40 + '\n'
- body = separator.join(self._emails)
-
- send_email(_notify_email, subject, body)
- self._emails = []
-
-email_manager = EmailNotificationManager()
-
class HostScheduler(object):
def _get_ready_hosts(self):
@@ -466,25 +378,26 @@
return self._schedule_metahost(queue_entry)
-class Dispatcher:
- autoserv_procs_cache = None
+class Dispatcher(object):
max_running_processes = global_config.global_config.get_config_value(
- _global_config_section, 'max_running_jobs', type=int)
+ CONFIG_SECTION, 'max_running_jobs', type=int)
max_processes_started_per_cycle = (
global_config.global_config.get_config_value(
- _global_config_section, 'max_jobs_started_per_cycle', type=int))
+ CONFIG_SECTION, 'max_jobs_started_per_cycle', type=int))
clean_interval = (
global_config.global_config.get_config_value(
- _global_config_section, 'clean_interval_minutes', type=int))
+ CONFIG_SECTION, 'clean_interval_minutes', type=int))
synch_job_start_timeout_minutes = (
global_config.global_config.get_config_value(
- _global_config_section, 'synch_job_start_timeout_minutes',
+ CONFIG_SECTION, 'synch_job_start_timeout_minutes',
type=int))
def __init__(self):
self._agents = []
self._last_clean_time = time.time()
self._host_scheduler = HostScheduler()
+ self._host_agents = {}
+ self._queue_entry_agents = {}
def do_initial_recovery(self, recover_hosts=True):
@@ -496,12 +409,13 @@
def tick(self):
- Dispatcher.autoserv_procs_cache = None
+ _drone_manager.refresh()
self._run_cleanup_maybe()
self._find_aborting()
self._schedule_new_jobs()
self._handle_agents()
- email_manager.send_queued_emails()
+ _drone_manager.execute_actions()
+ email_manager.manager.send_queued_emails()
def _run_cleanup_maybe(self):
@@ -514,21 +428,45 @@
self._last_clean_time = time.time()
+ def _register_agent_for_ids(self, agent_dict, object_ids, agent):
+ for object_id in object_ids:
+ agent_dict.setdefault(object_id, set()).add(agent)
+
+
+ def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
+ for object_id in object_ids:
+ assert object_id in agent_dict
+ agent_dict[object_id].remove(agent)
+
+
def add_agent(self, agent):
self._agents.append(agent)
agent.dispatcher = self
+ self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
+ self._register_agent_for_ids(self._queue_entry_agents,
+ agent.queue_entry_ids, agent)
- # Find agent corresponding to the specified queue_entry
- def get_agents(self, queue_entry):
- res_agents = []
- for agent in self._agents:
- if queue_entry.id in agent.queue_entry_ids:
- res_agents.append(agent)
- return res_agents
+
+ def get_agents_for_entry(self, queue_entry):
+ """
+ Find agents corresponding to the specified queue_entry.
+ """
+ return self._queue_entry_agents.get(queue_entry.id, set())
+
+
+ def host_has_agent(self, host):
+ """
+ Determine if there is currently an Agent present using this host.
+ """
+ return bool(self._host_agents.get(host.id, None))
def remove_agent(self, agent):
self._agents.remove(agent)
+ self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
+ agent)
+ self._unregister_agent_for_ids(self._queue_entry_agents,
+ agent.queue_entry_ids, agent)
def num_running_processes(self):
@@ -536,115 +474,108 @@
if agent.is_running())
- @classmethod
- def find_autoservs(cls, orphans_only=False):
- """\
- Returns a dict mapping pids to command lines for root autoserv
- processes. If orphans_only=True, return only processes that
- have been orphaned (i.e. parent pid = 1).
- """
- if cls.autoserv_procs_cache is not None:
- return cls.autoserv_procs_cache
-
- proc = subprocess.Popen(
- ['/bin/ps', 'x', '-o', 'pid,pgid,ppid,comm,args'],
- stdout=subprocess.PIPE)
- # split each line into the four columns output by ps
- procs = [line.split(None, 4) for line in
- proc.communicate()[0].splitlines()]
- autoserv_procs = {}
- for proc in procs:
- # check ppid == 1 for orphans
- if orphans_only and proc[2] != 1:
- continue
- # only root autoserv processes have pgid == pid
- if (proc[3] == 'autoserv' and # comm
- proc[1] == proc[0]): # pgid == pid
- # map pid to args
- autoserv_procs[int(proc[0])] = proc[4]
- cls.autoserv_procs_cache = autoserv_procs
- return autoserv_procs
+ def _extract_execution_tag(self, command_line):
+ match = re.match(r'.* -P (\S+) ', command_line)
+ if not match:
+ return None
+ return match.group(1)
def _recover_queue_entries(self, queue_entries, run_monitor):
assert len(queue_entries) > 0
- queue_entry_ids = [entry.id for entry in queue_entries]
queue_task = RecoveryQueueTask(job=queue_entries[0].job,
queue_entries=queue_entries,
run_monitor=run_monitor)
self.add_agent(Agent(tasks=[queue_task],
- queue_entry_ids=queue_entry_ids))
+ num_processes=len(queue_entries)))
def _recover_processes(self):
- orphans = self.find_autoservs(orphans_only=True)
+ self._register_pidfiles()
+ _drone_manager.refresh()
+ self._recover_running_entries()
+ self._recover_aborting_entries()
+ self._requeue_other_active_entries()
+ self._recover_parsing_entries()
+ self._reverify_remaining_hosts()
+ # reinitialize drones after killing orphaned processes, since they can
+ # leave around files when they die
+ _drone_manager.execute_actions()
+ _drone_manager.reinitialize_drones()
- # first, recover running queue entries
- rows = _db.execute("""SELECT * FROM host_queue_entries
- WHERE status = 'Running'""")
- queue_entries = [HostQueueEntry(row=i) for i in rows]
- requeue_entries = []
- recovered_entry_ids = set()
+
+ def _register_pidfiles(self):
+ # during recovery we may need to read pidfiles for both running and
+ # parsing entries
+ queue_entries = HostQueueEntry.fetch(
+ where="status IN ('Running', 'Parsing')")
for queue_entry in queue_entries:
- run_monitor = PidfileRunMonitor(queue_entry.results_dir())
- if not run_monitor.has_pid():
- # autoserv apparently never got run, so requeue
- requeue_entries.append(queue_entry)
- continue
- if queue_entry.id in recovered_entry_ids:
+ pidfile_id = _drone_manager.get_pidfile_id_from(
+ queue_entry.execution_tag())
+ _drone_manager.register_pidfile(pidfile_id)
+
+
+ def _recover_running_entries(self):
+ orphans = _drone_manager.get_orphaned_autoserv_processes()
+
+ queue_entries = HostQueueEntry.fetch(where="status = 'Running'")
+ requeue_entries = []
+ for queue_entry in queue_entries:
+ if self.get_agents_for_entry(queue_entry):
# synchronous job we've already recovered
continue
- job_tag = queue_entry.job.get_job_tag([queue_entry])
- pid = run_monitor.get_pid()
- print 'Recovering %s (pid %d)' % (queue_entry.id, pid)
+ execution_tag = queue_entry.execution_tag()
+ run_monitor = PidfileRunMonitor()
+ run_monitor.attach_to_existing_process(execution_tag)
+ if not run_monitor.has_process():
+ # autoserv apparently never got run, so let it get requeued
+ continue
queue_entries = queue_entry.job.get_group_entries(queue_entry)
- recovered_entry_ids.union(entry.id for entry in queue_entries)
+ print 'Recovering %s (process %s)' % (
+ ', '.join(str(entry) for entry in queue_entries),
+ run_monitor.get_process())
self._recover_queue_entries(queue_entries, run_monitor)
- orphans.pop(pid, None)
-
- # and requeue other active queue entries
- rows = _db.execute("""SELECT * FROM host_queue_entries
- WHERE active AND NOT complete
- AND status != 'Running'
- AND status != 'Pending'
- AND status != 'Abort'
- AND status != 'Aborting'""")
- queue_entries = [HostQueueEntry(row=i) for i in rows]
- for queue_entry in queue_entries + requeue_entries:
- print 'Requeuing running QE %d' % queue_entry.id
- queue_entry.clear_results_dir(dont_delete_files=True)
- queue_entry.requeue()
-
+ orphans.pop(execution_tag, None)
# now kill any remaining autoserv processes
- for pid in orphans.keys():
- print 'Killing orphan %d (%s)' % (pid, orphans[pid])
- kill_autoserv(pid)
+ for process in orphans.itervalues():
+ print 'Killing orphan %s' % process
+ _drone_manager.kill_process(process)
- # recover aborting tasks
- rebooting_host_ids = set()
- rows = _db.execute("""SELECT * FROM host_queue_entries
- WHERE status='Abort' or status='Aborting'""")
- queue_entries = [HostQueueEntry(row=i) for i in rows]
+
+ def _recover_aborting_entries(self):
+ queue_entries = HostQueueEntry.fetch(
+ where='status IN ("Abort", "Aborting")')
for queue_entry in queue_entries:
- print 'Recovering aborting QE %d' % queue_entry.id
- agent = queue_entry.abort()
- self.add_agent(agent)
- if queue_entry.get_host():
- rebooting_host_ids.add(queue_entry.get_host().id)
+ print 'Recovering aborting QE %s' % queue_entry
+ agent = queue_entry.abort(self)
- self._recover_parsing_entries()
+ def _requeue_other_active_entries(self):
+ queue_entries = HostQueueEntry.fetch(
+ where='active AND NOT complete AND status != "Pending"')
+ for queue_entry in queue_entries:
+ if self.get_agents_for_entry(queue_entry):
+ # entry has already been recovered
+ continue
+ print 'Requeuing active QE %s (status=%s)' % (queue_entry,
+ queue_entry.status)
+ if queue_entry.host:
+ tasks = queue_entry.host.reverify_tasks()
+ self.add_agent(Agent(tasks))
+ agent = queue_entry.requeue()
+
+
+ def _reverify_remaining_hosts(self):
# reverify hosts that were in the middle of verify, repair or cleanup
self._reverify_hosts_where("""(status = 'Repairing' OR
status = 'Verifying' OR
- status = 'Cleaning')""",
- exclude_ids=rebooting_host_ids)
+ status = 'Cleaning')""")
- # finally, recover "Running" hosts with no active queue entries,
- # although this should never happen
- message = ('Recovering running host %s - this probably '
- 'indicates a scheduler bug')
+ # recover "Running" hosts with no active queue entries, although this
+ # should never happen
+ message = ('Recovering running host %s - this probably indicates a '
+ 'scheduler bug')
self._reverify_hosts_where("""status = 'Running' AND
id NOT IN (SELECT host_id
FROM host_queue_entries
@@ -653,33 +584,31 @@
def _reverify_hosts_where(self, where,
- print_message='Reverifying host %s',
- exclude_ids=set()):
- rows = _db.execute('SELECT * FROM hosts WHERE locked = 0 AND '
- 'invalid = 0 AND ' + where)
- hosts = [Host(row=i) for i in rows]
- for host in hosts:
- if host.id in exclude_ids:
+ print_message='Reverifying host %s'):
+ full_where='locked = 0 AND invalid = 0 AND ' + where
+ for host in Host.fetch(where=full_where):
+ if self.host_has_agent(host):
+ # host has already been recovered in some way
continue
- if print_message is not None:
+ if print_message:
print print_message % host.hostname
- verify_task = VerifyTask(host = host)
- self.add_agent(Agent(tasks = [verify_task]))
+ tasks = host.reverify_tasks()
+ self.add_agent(Agent(tasks))
def _recover_parsing_entries(self):
- # make sure there are no old parsers running
- os.system('killall parse')
-
recovered_entry_ids = set()
for entry in HostQueueEntry.fetch(where='status = "Parsing"'):
if entry.id in recovered_entry_ids:
continue
queue_entries = entry.job.get_group_entries(entry)
- recovered_entry_ids.union(entry.id for entry in queue_entries)
+ recovered_entry_ids = recovered_entry_ids.union(
+ entry.id for entry in queue_entries)
+ print 'Recovering parsing entries %s' % (
+ ', '.join(str(entry) for entry in queue_entries))
reparse_task = FinalReparseTask(queue_entries)
- self.add_agent(Agent([reparse_task]))
+ self.add_agent(Agent([reparse_task], num_processes=0))
def _recover_hosts(self):
@@ -742,8 +671,6 @@
def _schedule_new_jobs(self):
- print "finding work"
-
queue_entries = self._get_pending_queue_entries()
if not queue_entries:
return
@@ -759,24 +686,19 @@
def _run_queue_entry(self, queue_entry, host):
agent = queue_entry.run(assigned_host=host)
- # in some cases (synchronous jobs with run_verify=False), agent may be None
+ # in some cases (synchronous jobs with run_verify=False), agent may be
+ # None
if agent:
self.add_agent(agent)
def _find_aborting(self):
- num_aborted = 0
- # Find jobs that are aborting
for entry in queue_entries_to_abort():
- agents_to_abort = self.get_agents(entry)
+ agents_to_abort = list(self.get_agents_for_entry(entry))
for agent in agents_to_abort:
self.remove_agent(agent)
- agent = entry.abort(agents_to_abort)
- self.add_agent(agent)
- num_aborted += 1
- if num_aborted >= 50:
- break
+ entry.abort(self, agents_to_abort)
def _can_start_agent(self, agent, num_running_processes,
@@ -811,7 +733,7 @@
for agent in list(self._agents):
if agent.is_done():
print "agent finished"
- self._agents.remove(agent)
+ self.remove_agent(agent)
continue
if not agent.is_running():
if not self._can_start_agent(agent, num_running_processes,
@@ -836,197 +758,112 @@
message += '\n(truncated)\n'
print subject
- email_manager.enqueue_notify_email(subject, message)
+ email_manager.manager.enqueue_notify_email(subject, message)
-class RunMonitor(object):
- def __init__(self, cmd, nice_level = None, log_file = None):
- self.nice_level = nice_level
- self.log_file = log_file
- self.cmd = cmd
- self.proc = None
+class PidfileRunMonitor(object):
+ """
+ Client must call either run() to start a new process or
+ attach_to_existing_process().
+ """
- def run(self):
- if self.nice_level:
- nice_cmd = ['nice','-n', str(self.nice_level)]
- nice_cmd.extend(self.cmd)
- self.cmd = nice_cmd
-
- out_file = None
- if self.log_file:
- try:
- os.makedirs(os.path.dirname(self.log_file))
- except OSError, exc:
- if exc.errno != errno.EEXIST:
- log_stacktrace(
- 'Unexpected error creating logfile '
- 'directory for %s' % self.log_file)
- try:
- out_file = open(self.log_file, 'a')
- out_file.write("\n%s\n" % ('*'*80))
- out_file.write("%s> %s\n" %
- (time.strftime("%X %x"),
- self.cmd))
- out_file.write("%s\n" % ('*'*80))
- except (OSError, IOError):
- log_stacktrace('Error opening log file %s' %
- self.log_file)
-
- if not out_file:
- out_file = open('/dev/null', 'w')
-
- in_devnull = open('/dev/null', 'r')
- print "cmd = %s" % self.cmd
- print "path = %s" % os.getcwd()
-
- self.proc = subprocess.Popen(self.cmd, stdout=out_file,
- stderr=subprocess.STDOUT,
- stdin=in_devnull)
- out_file.close()
- in_devnull.close()
+ class _PidfileException(Exception):
+ """
+ Raised when there's some unexpected behavior with the pid file, but only
+ used internally (never allowed to escape this class).
+ """
- def has_pid(self):
- return self.proc is not None
+ def __init__(self):
+ self._lost_process = False
+ self._start_time = None
+ self.pidfile_id = None
+ self._state = drone_manager.PidfileContents()
- def get_pid(self):
- return self.proc.pid
+ def _add_nice_command(self, command, nice_level):
+ if not nice_level:
+ return command
+ return ['nice', '-n', str(nice_level)] + command
+
+
+ def _set_start_time(self):
+ self._start_time = time.time()
+
+
+ def run(self, command, working_directory, nice_level=None, log_file=None,
+ pidfile_name=None, paired_with_pidfile=None):
+ assert command is not None
+ if nice_level is not None:
+ command = ['nice', '-n', str(nice_level)] + command
+ self._set_start_time()
+ self.pidfile_id = _drone_manager.execute_command(
+ command, working_directory, log_file=log_file,
+ pidfile_name=pidfile_name, paired_with_pidfile=paired_with_pidfile)
+
+
+ def attach_to_existing_process(self, execution_tag):
+ self._set_start_time()
+ self.pidfile_id = _drone_manager.get_pidfile_id_from(execution_tag)
+ _drone_manager.register_pidfile(self.pidfile_id)
def kill(self):
- if self.has_pid():
- kill_autoserv(self.get_pid(), self.exit_code)
+ if self.has_process():
+ _drone_manager.kill_process(self.get_process())
- def exit_code(self):
- return self.proc.poll()
-
-
-class PidfileException(Exception):
- """\
- Raised when there's some unexpected behavior with the pid file.
- """
-
-
-class PidfileRunMonitor(RunMonitor):
- class PidfileState(object):
- pid = None
- exit_status = None
- num_tests_failed = None
-
- def reset(self):
- self.pid = self.exit_status = self.all_tests_passed = None
-
-
- def __init__(self, results_dir, cmd=None, nice_level=None,
- log_file=None):
- self.results_dir = os.path.abspath(results_dir)
- self.pid_file = os.path.join(results_dir, AUTOSERV_PID_FILE)
- self.lost_process = False
- self.start_time = time.time()
- self._state = self.PidfileState()
- super(PidfileRunMonitor, self).__init__(cmd, nice_level, log_file)
-
-
- def has_pid(self):
+ def has_process(self):
self._get_pidfile_info()
- return self._state.pid is not None
+ return self._state.process is not None
- def get_pid(self):
+ def get_process(self):
self._get_pidfile_info()
- assert self._state.pid is not None
- return self._state.pid
+ assert self.has_process()
+ return self._state.process
- def _check_command_line(self, command_line, spacer=' ',
- print_error=False):
- results_dir_arg = spacer.join(('', '-r', self.results_dir, ''))
- match = results_dir_arg in command_line
- if print_error and not match:
- print '%s not found in %s' % (repr(results_dir_arg),
- repr(command_line))
- return match
-
-
- def _check_proc_fs(self):
- cmdline_path = os.path.join('/proc', str(self._state.pid), 'cmdline')
- try:
- cmdline_file = open(cmdline_path, 'r')
- cmdline = cmdline_file.read().strip()
- cmdline_file.close()
- except IOError:
- return False
- # /proc/.../cmdline has \x00 separating args
- return self._check_command_line(cmdline, spacer='\x00',
- print_error=True)
-
-
- def _read_pidfile(self):
- self._state.reset()
- if not os.path.exists(self.pid_file):
- return
- file_obj = open(self.pid_file, 'r')
- lines = file_obj.readlines()
- file_obj.close()
- if not lines:
- return
- if len(lines) > 3:
- raise PidfileException('Corrupt pid file (%d lines) at %s:\n%s' %
- (len(lines), self.pid_file, lines))
- try:
- self._state.pid = int(lines[0])
- if len(lines) > 1:
- self._state.exit_status = int(lines[1])
- if len(lines) == 3:
- self._state.num_tests_failed = int(lines[2])
- else:
- # maintain backwards-compatibility with two-line pidfiles
- self._state.num_tests_failed = 0
- except ValueError, exc:
- raise PidfileException('Corrupt pid file: ' + str(exc.args))
-
-
- def _find_autoserv_proc(self):
- autoserv_procs = Dispatcher.find_autoservs()
- for pid, args in autoserv_procs.iteritems():
- if self._check_command_line(args):
- return pid, args
- return None, None
+ def _read_pidfile(self, use_second_read=False):
+ assert self.pidfile_id is not None, (
+ 'You must call run() or attach_to_existing_process()')
+ contents = _drone_manager.get_pidfile_contents(
+ self.pidfile_id, use_second_read=use_second_read)
+ if contents.is_invalid():
+ self._state = drone_manager.PidfileContents()
+ raise self._PidfileException(contents)
+ self._state = contents
def _handle_pidfile_error(self, error, message=''):
- message = error + '\nPid: %s\nPidfile: %s\n%s' % (self._state.pid,
- self.pid_file,
- message)
+ message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
+ self._state.process, self.pidfile_id, message)
print message
- email_manager.enqueue_notify_email(error, message)
- if self._state.pid is not None:
- pid = self._state.pid
+ email_manager.manager.enqueue_notify_email(error, message)
+ if self._state.process is not None:
+ process = self._state.process
else:
- pid = 0
- self.on_lost_process(pid)
+ process = _drone_manager.get_dummy_process()
+ self.on_lost_process(process)
def _get_pidfile_info_helper(self):
- if self.lost_process:
+ if self._lost_process:
return
self._read_pidfile()
- if self._state.pid is None:
- self._handle_no_pid()
+ if self._state.process is None:
+ self._handle_no_process()
return
if self._state.exit_status is None:
# double check whether or not autoserv is running
- proc_running = self._check_proc_fs()
- if proc_running:
+ if _drone_manager.is_process_running(self._state.process):
return
- # pid but no process - maybe process *just* exited
- self._read_pidfile()
+ # pid but no running process - maybe process *just* exited
+ self._read_pidfile(use_second_read=True)
if self._state.exit_status is None:
# autoserv exited without writing an exit code
# to the pidfile
@@ -1043,46 +880,33 @@
"""
try:
self._get_pidfile_info_helper()
- except PidfileException, exc:
+ except self._PidfileException, exc:
self._handle_pidfile_error('Pidfile error', traceback.format_exc())
- def _handle_no_pid(self):
+ def _handle_no_process(self):
"""\
Called when no pidfile is found or no pid is in the pidfile.
"""
- # is autoserv running?
- pid, args = self._find_autoserv_proc()
- if pid is None:
- # no autoserv process running
- message = 'No pid found at ' + self.pid_file
- else:
- message = ("Process %d (%s) hasn't written pidfile %s" %
- (pid, args, self.pid_file))
-
+ message = 'No pid found at %s' % self.pidfile_id
print message
- if time.time() - self.start_time > PIDFILE_TIMEOUT:
- email_manager.enqueue_notify_email(
+ if time.time() - self._start_time > PIDFILE_TIMEOUT:
+ email_manager.manager.enqueue_notify_email(
'Process has failed to write pidfile', message)
- if pid is not None:
- kill_autoserv(pid)
- else:
- pid = 0
- self.on_lost_process(pid)
- return
+ self.on_lost_process(_drone_manager.get_dummy_process())
- def on_lost_process(self, pid):
+ def on_lost_process(self, process):
"""\
Called when autoserv has exited without writing an exit status,
or we've timed out waiting for autoserv to write a pid to the
pidfile. In either case, we just return failure and the caller
should signal some kind of warning.
- pid is unimportant here, as it shouldn't be used by anyone.
+ process is unimportant here, as it shouldn't be used by anyone.
"""
self.lost_process = True
- self._state.pid = pid
+ self._state.process = process
self._state.exit_status = 1
self._state.num_tests_failed = 0
@@ -1099,17 +923,24 @@
class Agent(object):
- def __init__(self, tasks, queue_entry_ids=[], num_processes=1):
+ def __init__(self, tasks, num_processes=1):
self.active_task = None
self.queue = Queue.Queue(0)
self.dispatcher = None
- self.queue_entry_ids = queue_entry_ids
self.num_processes = num_processes
+ self.queue_entry_ids = self._union_ids(task.queue_entry_ids
+ for task in tasks)
+ self.host_ids = self._union_ids(task.host_ids for task in tasks)
+
for task in tasks:
self.add_task(task)
+ def _union_ids(self, id_lists):
+ return set(itertools.chain(*id_lists))
+
+
def add_task(self, task):
self.queue.put_nowait(task)
task.agent = self
@@ -1160,19 +991,31 @@
class AgentTask(object):
- def __init__(self, cmd, failure_tasks = []):
+ def __init__(self, cmd, working_directory=None, failure_tasks=[]):
self.done = False
self.failure_tasks = failure_tasks
self.started = False
self.cmd = cmd
+ self._working_directory = working_directory
self.task = None
self.agent = None
self.monitor = None
self.success = None
+ self.queue_entry_ids = []
+ self.host_ids = []
+ self.log_file = None
+
+
+ def _set_ids(self, host=None, queue_entries=None):
+ if queue_entries and queue_entries != [None]:
+ self.host_ids = [entry.host.id for entry in queue_entries]
+ self.queue_entry_ids = [entry.id for entry in queue_entries]
+ else:
+ assert host
+ self.host_ids = [host.id]
def poll(self):
- print "poll"
if self.monitor:
self.tick(self.monitor.exit_code())
else:
@@ -1180,9 +1023,8 @@
def tick(self, exit_code):
- if exit_code==None:
+ if exit_code is None:
return
-# print "exit_code was %d" % exit_code
if exit_code == 0:
success = True
else:
@@ -1206,13 +1048,13 @@
def create_temp_resultsdir(self, suffix=''):
- self.temp_results_dir = tempfile.mkdtemp(suffix=suffix)
+ self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
def cleanup(self):
- if (hasattr(self, 'temp_results_dir') and
- os.path.exists(self.temp_results_dir)):
- shutil.rmtree(self.temp_results_dir)
+ if self.monitor and self.log_file:
+ _drone_manager.copy_to_results_repository(
+ self.monitor.get_process(), self.log_file)
def epilog(self):
@@ -1236,35 +1078,40 @@
self.cleanup()
+ def set_host_log_file(self, base_name, host):
+ filename = '%s.%s' % (time.time(), base_name)
+ self.log_file = os.path.join('hosts', host.hostname, filename)
+
+
def run(self):
if self.cmd:
- print "agent starting monitor"
- log_file = None
- if hasattr(self, 'log_file'):
- log_file = self.log_file
- elif hasattr(self, 'host'):
- log_file = os.path.join(RESULTS_DIR, 'hosts',
- self.host.hostname)
- self.monitor = RunMonitor(
- self.cmd, nice_level=AUTOSERV_NICE_LEVEL, log_file=log_file)
- self.monitor.run()
+ self.monitor = PidfileRunMonitor()
+ self.monitor.run(self.cmd, self._working_directory,
+ nice_level=AUTOSERV_NICE_LEVEL,
+ log_file=self.log_file)
class RepairTask(AgentTask):
def __init__(self, host, queue_entry=None):
"""\
- fail_queue_entry: queue entry to mark failed if this repair
- fails.
+ queue_entry: queue entry to mark failed if this repair fails.
"""
protection = host_protections.Protection.get_string(host.protection)
# normalize the protection name
protection = host_protections.Protection.get_attr_name(protection)
- self.create_temp_resultsdir('.repair')
- cmd = [_autoserv_path , '-R', '-m', host.hostname,
- '-r', self.temp_results_dir, '--host-protection', protection]
+
self.host = host
self.queue_entry = queue_entry
- super(RepairTask, self).__init__(cmd)
+ self._set_ids(host=host, queue_entries=[queue_entry])
+
+ self.create_temp_resultsdir('.repair')
+ cmd = [_autoserv_path , '-p', '-R', '-m', host.hostname,
+ '-r', _drone_manager.absolute_path(self.temp_results_dir),
+ '--host-protection', protection]
+ super(RepairTask, self).__init__(cmd, self.temp_results_dir)
+
+ self._set_ids(host=host, queue_entries=[queue_entry])
+ self.set_host_log_file('repair', self.host)
def prolog(self):
@@ -1285,66 +1132,34 @@
class PreJobTask(AgentTask):
- def prolog(self):
- super(PreJobTask, self).prolog()
- if self.queue_entry:
- # clear any possibly existing results, could be a previously failed
- # verify or a previous execution that crashed
- self.queue_entry.clear_results_dir()
-
-
- def cleanup(self):
- if not os.path.exists(self.temp_results_dir):
- return
+ def epilog(self):
+ super(PreJobTask, self).epilog()
should_copy_results = (self.queue_entry and not self.success
and not self.queue_entry.meta_host)
if should_copy_results:
self.queue_entry.set_execution_subdir()
- self._move_results()
- super(PreJobTask, self).cleanup()
-
-
- def _move_results(self):
- assert self.queue_entry is not None
- target_dir = self.queue_entry.results_dir()
- ensure_directory_exists(target_dir)
- files = os.listdir(self.temp_results_dir)
- for filename in files:
- if filename == AUTOSERV_PID_FILE:
- continue
- self._force_move(os.path.join(self.temp_results_dir, filename),
- os.path.join(target_dir, filename))
-
-
- @staticmethod
- def _force_move(source, dest):
- """\
- Replacement for shutil.move() that will delete the destination
- if it exists, even if it's a directory.
- """
- if os.path.exists(dest):
- warning = 'Warning: removing existing destination file ' + dest
- print warning
- email_manager.enqueue_notify_email(warning, warning)
- remove_file_or_dir(dest)
- shutil.move(source, dest)
+ destination = os.path.join(self.queue_entry.execution_tag(),
+ os.path.basename(self.log_file))
+ _drone_manager.copy_to_results_repository(
+ self.monitor.get_process(), self.log_file,
+ destination_path=destination)
class VerifyTask(PreJobTask):
def __init__(self, queue_entry=None, host=None):
assert bool(queue_entry) != bool(host)
-
self.host = host or queue_entry.host
self.queue_entry = queue_entry
self.create_temp_resultsdir('.verify')
-
- cmd = [_autoserv_path, '-v', '-m', self.host.hostname, '-r',
- self.temp_results_dir]
-
+ cmd = [_autoserv_path, '-p', '-v', '-m', self.host.hostname, '-r',
+ _drone_manager.absolute_path(self.temp_results_dir)]
failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
+ super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
+ failure_tasks=failure_tasks)
- super(VerifyTask, self).__init__(cmd, failure_tasks=failure_tasks)
+ self.set_host_log_file('verify', self.host)
+ self._set_ids(host=host, queue_entries=[queue_entry])
def prolog(self):
@@ -1368,56 +1183,44 @@
class QueueTask(AgentTask):
def __init__(self, job, queue_entries, cmd):
- super(QueueTask, self).__init__(cmd)
self.job = job
self.queue_entries = queue_entries
+ super(QueueTask, self).__init__(cmd, self._execution_tag())
+ self._set_ids(queue_entries=queue_entries)
- @staticmethod
- def _write_keyval(keyval_dir, field, value, keyval_filename='keyval'):
- key_path = os.path.join(keyval_dir, keyval_filename)
- keyval_file = open(key_path, 'a')
- print >> keyval_file, '%s=%s' % (field, str(value))
- keyval_file.close()
+ def _format_keyval(self, key, value):
+ return '%s=%s' % (key, value)
- def _host_keyval_dir(self):
- return os.path.join(self.results_dir(), 'host_keyvals')
+ def _write_keyval(self, field, value):
+ keyval_path = os.path.join(self._execution_tag(), 'keyval')
+ assert self.monitor and self.monitor.has_process()
+ paired_with_pidfile = self.monitor.pidfile_id
+ _drone_manager.write_lines_to_file(
+ keyval_path, [self._format_keyval(field, value)],
+ paired_with_pidfile=paired_with_pidfile)
- def _write_host_keyval(self, host):
- labels = ','.join(host.labels())
- self._write_keyval(self._host_keyval_dir(), 'labels', labels,
- keyval_filename=host.hostname)
-
- def _create_host_keyval_dir(self):
- directory = self._host_keyval_dir()
- ensure_directory_exists(directory)
+ def _write_host_keyvals(self, host):
+ keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
+ host.hostname)
+ platform, all_labels = host.platform_and_labels()
+ keyvals = dict(platform=platform, labels=','.join(all_labels))
+ keyval_content = '\n'.join(self._format_keyval(key, value)
+ for key, value in keyvals.iteritems())
+ _drone_manager.attach_file_to_execution(self._execution_tag(),
+ keyval_content,
+ file_path=keyval_path)
- def results_dir(self):
- return self.queue_entries[0].results_dir()
-
-
- def run(self):
- """\
- Override AgentTask.run() so we can use a PidfileRunMonitor.
- """
- self.monitor = PidfileRunMonitor(self.results_dir(),
- cmd=self.cmd,
- nice_level=AUTOSERV_NICE_LEVEL)
- self.monitor.run()
+ def _execution_tag(self):
+ return self.queue_entries[0].execution_tag()
def prolog(self):
- # write some job timestamps into the job keyval file
- queued = time.mktime(self.job.created_on.timetuple())
- started = time.time()
- self._write_keyval(self.results_dir(), "job_queued", int(queued))
- self._write_keyval(self.results_dir(), "job_started", int(started))
- self._create_host_keyval_dir()
for queue_entry in self.queue_entries:
- self._write_host_keyval(queue_entry.host)
+ self._write_host_keyvals(queue_entry.host)
queue_entry.set_status('Running')
queue_entry.host.set_status('Running')
queue_entry.host.update_field('dirty', 1)
@@ -1427,22 +1230,30 @@
def _finish_task(self, success):
- # write out the finished time into the results keyval
+ queued = time.mktime(self.job.created_on.timetuple())
finished = time.time()
- self._write_keyval(self.results_dir(), "job_finished", int(finished))
+ self._write_keyval("job_queued", int(queued))
+ self._write_keyval("job_finished", int(finished))
+
+ _drone_manager.copy_to_results_repository(self.monitor.get_process(),
+ self._execution_tag() + '/')
# parse the results of the job
reparse_task = FinalReparseTask(self.queue_entries)
- self.agent.dispatcher.add_agent(Agent([reparse_task]))
+ self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
def _write_status_comment(self, comment):
- status_log = open(os.path.join(self.results_dir(), 'status.log'), 'a')
- status_log.write('INFO\t----\t----\t' + comment)
- status_log.close()
+ _drone_manager.write_lines_to_file(
+ os.path.join(self._execution_tag(), 'status.log'),
+ ['INFO\t----\t----\t' + comment],
+ paired_with_pidfile=self.monitor.pidfile_id)
def _log_abort(self):
+ if not self.monitor or not self.monitor.has_process():
+ return
+
# build up sets of all the aborted_by and aborted_on values
aborted_by, aborted_on = set(), set()
for queue_entry in self.queue_entries:
@@ -1459,9 +1270,10 @@
else:
aborted_by_value = 'autotest_system'
aborted_on_value = int(time.time())
- results_dir = self.results_dir()
- self._write_keyval(results_dir, "aborted_by", aborted_by_value)
- self._write_keyval(results_dir, "aborted_on", aborted_on_value)
+
+ self._write_keyval("aborted_by", aborted_by_value)
+ self._write_keyval("aborted_on", aborted_on_value)
+
aborted_on_string = str(datetime.datetime.fromtimestamp(
aborted_on_value))
self._write_status_comment('Job aborted by %s on %s' %
@@ -1495,10 +1307,6 @@
def epilog(self):
super(QueueTask, self).epilog()
- for queue_entry in self.queue_entries:
- # set status to PARSING here so queue entry is marked complete
- queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
-
self._finish_task(self.success)
self._reboot_hosts()
@@ -1507,8 +1315,7 @@
class RecoveryQueueTask(QueueTask):
def __init__(self, job, queue_entries, run_monitor):
- super(RecoveryQueueTask, self).__init__(job,
- queue_entries, cmd=None)
+ super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
self.run_monitor = run_monitor
@@ -1526,14 +1333,18 @@
assert bool(host) ^ bool(queue_entry)
if queue_entry:
host = queue_entry.get_host()
-
- self.create_temp_resultsdir('.cleanup')
- self.cmd = [_autoserv_path, '--cleanup', '-m', host.hostname,
- '-r', self.temp_results_dir]
self.queue_entry = queue_entry
self.host = host
+
+ self.create_temp_resultsdir('.cleanup')
+ self.cmd = [_autoserv_path, '-p', '--cleanup', '-m', host.hostname,
+ '-r', _drone_manager.absolute_path(self.temp_results_dir)]
repair_task = RepairTask(host, queue_entry=queue_entry)
- super(CleanupTask, self).__init__(self.cmd, failure_tasks=[repair_task])
+ super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
+ failure_tasks=[repair_task])
+
+ self._set_ids(host=host, queue_entries=[queue_entry])
+ self.set_host_log_file('cleanup', self.host)
def prolog(self):
@@ -1551,9 +1362,11 @@
class AbortTask(AgentTask):
def __init__(self, queue_entry, agents_to_abort):
- self.queue_entry = queue_entry
- self.agents_to_abort = agents_to_abort
super(AbortTask, self).__init__('')
+ self.queue_entry = queue_entry
+ # don't use _set_ids, since we don't want to set the host_ids
+ self.queue_entry_ids = [queue_entry.id]
+ self.agents_to_abort = agents_to_abort
def prolog(self):
@@ -1574,27 +1387,33 @@
class FinalReparseTask(AgentTask):
- MAX_PARSE_PROCESSES = (
- global_config.global_config.get_config_value(
- _global_config_section, 'max_parse_processes', type=int))
+ MAX_PARSE_PROCESSES = global_config.global_config.get_config_value(
+ CONFIG_SECTION, 'max_parse_processes', type=int)
_num_running_parses = 0
def __init__(self, queue_entries):
self._queue_entries = queue_entries
+ # don't use _set_ids, since we don't want to set the host_ids
+ self.queue_entry_ids = [entry.id for entry in queue_entries]
self._parse_started = False
assert len(queue_entries) > 0
queue_entry = queue_entries[0]
+ self._execution_tag = queue_entry.execution_tag()
+ self._results_dir = _drone_manager.absolute_path(self._execution_tag)
+ self._autoserv_monitor = PidfileRunMonitor()
+ self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
+ self._final_status = self._determine_final_status()
+
if _testing_mode:
self.cmd = 'true'
- return
+ else:
+ super(FinalReparseTask, self).__init__(
+ cmd=self._generate_parse_command(),
+ working_directory=self._execution_tag)
- self._results_dir = queue_entry.results_dir()
- self.log_file = os.path.abspath(os.path.join(self._results_dir,
- '.parse.log'))
- super(FinalReparseTask, self).__init__(
- cmd=self._generate_parse_command())
+ self.log_file = os.path.join(self._execution_tag, '.parse.log')
@classmethod
@@ -1612,6 +1431,13 @@
return cls._num_running_parses < cls.MAX_PARSE_PROCESSES
+ def _determine_final_status(self):
+ # we'll use a PidfileRunMonitor to read the autoserv exit status
+ if self._autoserv_monitor.exit_code() == 0:
+ return models.HostQueueEntry.Status.COMPLETED
+ return models.HostQueueEntry.Status.FAILED
+
+
def prolog(self):
super(FinalReparseTask, self).prolog()
for queue_entry in self._queue_entries:
@@ -1620,22 +1446,13 @@
def epilog(self):
super(FinalReparseTask, self).epilog()
- final_status = self._determine_final_status()
for queue_entry in self._queue_entries:
- queue_entry.set_status(final_status)
-
-
- def _determine_final_status(self):
- # use a PidfileRunMonitor to read the autoserv exit status
- monitor = PidfileRunMonitor(self._results_dir)
- if monitor.exit_code() == 0:
- return models.HostQueueEntry.Status.COMPLETED
- return models.HostQueueEntry.Status.FAILED
+ queue_entry.set_status(self._final_status)
def _generate_parse_command(self):
- parse = os.path.abspath(os.path.join(AUTOTEST_TKO_DIR, 'parse'))
- return [parse, '-l', '2', '-r', '-o', self._results_dir]
+ return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
+ self._results_dir]
def poll(self):
@@ -1655,8 +1472,14 @@
def _try_starting_parse(self):
if not self._can_run_new_parse():
return
+
# actually run the parse command
- super(FinalReparseTask, self).run()
+ self.monitor = PidfileRunMonitor()
+ self.monitor.run(self.cmd, self._working_directory,
+ log_file=self.log_file,
+ pidfile_name='.parser_execute',
+ paired_with_pidfile=self._autoserv_monitor.pidfile_id)
+
self._increment_running_parses()
self._parse_started = True
@@ -1846,19 +1669,32 @@
self.update_field('status',status)
- def labels(self):
+ def platform_and_labels(self):
"""
- Fetch a list of names of all non-platform labels associated with this
- host.
+ Returns a tuple (platform_name, list_of_all_label_names).
"""
rows = _db.execute("""
- SELECT labels.name
+ SELECT labels.name, labels.platform
FROM labels
INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
- WHERE NOT labels.platform AND hosts_labels.host_id = %s
+ WHERE hosts_labels.host_id = %s
ORDER BY labels.name
""", (self.id,))
- return [row[0] for row in rows]
+ platform = None
+ all_labels = []
+ for label_name, is_platform in rows:
+ if is_platform:
+ platform = label_name
+ all_labels.append(label_name)
+ return platform, all_labels
+
+
+ def reverify_tasks(self):
+ cleanup_task = CleanupTask(host=self)
+ verify_task = VerifyTask(host=self)
+ # just to make sure this host does not get taken away
+ self.set_status('Cleaning')
+ return [cleanup_task, verify_task]
class HostQueueEntry(DBObject):
@@ -1872,7 +1708,7 @@
else:
self.host = None
- self.queue_log_path = os.path.join(self.job.results_dir(),
+ self.queue_log_path = os.path.join(self.job.tag(),
'queue.log.' + str(self.id))
@@ -1911,9 +1747,8 @@
def queue_log_record(self, log_line):
now = str(datetime.datetime.now())
- queue_log = open(self.queue_log_path, 'a', 0)
- queue_log.write(now + ' ' + log_line + '\n')
- queue_log.close()
+ _drone_manager.write_lines_to_file(self.queue_log_path,
+ [now + ' ' + log_line])
def block_host(self, host_id):
@@ -1931,10 +1766,6 @@
block.delete()
- def results_dir(self):
- return os.path.join(self.job.job_dir, self.execution_subdir)
-
-
def set_execution_subdir(self, subdir=None):
if subdir is None:
assert self.get_host()
@@ -1948,6 +1779,10 @@
return 'no host'
+ def __str__(self):
+ return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
+
+
def set_status(self, status):
abort_statuses = ['Abort', 'Aborting', 'Aborted']
if status not in abort_statuses:
@@ -1957,8 +1792,7 @@
condition = ''
self.update_field('status', status, condition=condition)
- print "%s/%d (%d) -> %s" % (self._get_hostname(), self.job.id, self.id,
- self.status)
+ print "%s -> %s" % (self, self.status)
if status in ['Queued', 'Parsing']:
self.update_field('complete', False)
@@ -1989,7 +1823,7 @@
body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
self.job.id, self.job.name, hostname, status,
self._view_job_url())
- send_email(self.job.email_list, subject, body)
+ email_manager.manager.send_email(self.job.email_list, subject, body)
def _email_on_job_complete(self):
@@ -2014,14 +1848,13 @@
body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
self.job.id, self.job.name, status, self._view_job_url(),
summary_text)
- send_email(self.job.email_list, subject, body)
+ email_manager.manager.send_email(self.job.email_list, subject, body)
def run(self,assigned_host=None):
if self.meta_host:
assert assigned_host
# ensure results dir exists for the queue log
- self.job.create_results_dir()
self.set_host(assigned_host)
print "%s/%s scheduled on %s, status=%s" % (self.job.name,
@@ -2031,7 +1864,6 @@
def requeue(self):
self.set_status('Queued')
-
if self.meta_host:
self.set_host(None)
@@ -2046,24 +1878,6 @@
self.job.stop_if_necessary()
- def clear_results_dir(self, dont_delete_files=False):
- if not self.execution_subdir:
- return
- results_dir = self.results_dir()
- if not os.path.exists(results_dir):
- return
- if dont_delete_files:
- temp_dir = tempfile.mkdtemp(suffix='.clear_results')
- print 'Moving results from %s to %s' % (results_dir, temp_dir)
- for filename in os.listdir(results_dir):
- path = os.path.join(results_dir, filename)
- if dont_delete_files:
- shutil.move(path, os.path.join(temp_dir, filename))
- else:
- remove_file_or_dir(path)
- remove_file_or_dir(results_dir)
-
-
@property
def aborted_by(self):
self._load_abort_info()
@@ -2107,20 +1921,18 @@
return None
- def abort(self, agents_to_abort=[]):
- abort_task = AbortTask(self, agents_to_abort)
- tasks = [abort_task]
-
+ def abort(self, dispatcher, agents_to_abort=[]):
host = self.get_host()
if self.active and host:
- cleanup_task = CleanupTask(host=host)
- verify_task = VerifyTask(host=host)
- # just to make sure this host does not get taken away
- host.set_status('Cleaning')
- tasks += [cleanup_task, verify_task]
+ dispatcher.add_agent(Agent(tasks=host.reverify_tasks()))
+ abort_task = AbortTask(self, agents_to_abort)
self.set_status('Aborting')
- return Agent(tasks=tasks, queue_entry_ids=[self.id])
+ dispatcher.add_agent(Agent(tasks=[abort_task], num_processes=0))
+
+ def execution_tag(self):
+ assert self.execution_subdir
+ return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
class Job(DBObject):
@@ -2128,9 +1940,6 @@
assert id or row
super(Job, self).__init__(id=id, row=row)
- self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id,
- self.owner))
-
@classmethod
def _get_table(cls):
@@ -2148,6 +1957,10 @@
return self.control_type != 2
+ def tag(self):
+ return "%s-%s" % (self.id, self.owner)
+
+
def get_host_queue_entries(self):
rows = _db.execute("""
SELECT * FROM host_queue_entries
@@ -2174,9 +1987,6 @@
return (pending_entries.count() >= self.synch_count)
- def results_dir(self):
- return self.job_dir
-
def num_machines(self, clause = None):
sql = "job_id=%s" % self.id
if clause:
@@ -2224,25 +2034,8 @@
def write_to_machines_file(self, queue_entry):
hostname = queue_entry.get_host().hostname
- print "writing %s to job %s machines file" % (hostname, self.id)
- file_path = os.path.join(self.job_dir, '.machines')
- mf = open(file_path, 'a')
- mf.write(hostname + '\n')
- mf.close()
-
-
- def create_results_dir(self, queue_entry=None):
- ensure_directory_exists(self.job_dir)
-
- if queue_entry:
- results_dir = queue_entry.results_dir()
- if os.path.exists(results_dir):
- warning = 'QE results dir ' + results_dir + ' already exists'
- print warning
- email_manager.enqueue_notify_email(warning, warning)
- ensure_directory_exists(results_dir)
- return results_dir
- return self.job_dir
+ file_path = os.path.join(self.tag(), '.machines')
+ _drone_manager.write_lines_to_file(file_path, [hostname])
def _next_group_name(self):
@@ -2258,14 +2051,10 @@
return "group%d" % next_id
- def _write_control_file(self):
- 'Writes control file out to disk, returns a filename'
- control_fd, control_filename = tempfile.mkstemp(suffix='.control_file')
- control_file = os.fdopen(control_fd, 'w')
- if self.control_file:
- control_file.write(self.control_file)
- control_file.close()
- return control_filename
+ def _write_control_file(self, execution_tag):
+ control_path = _drone_manager.attach_file_to_execution(
+ execution_tag, self.control_file)
+ return control_path
def get_group_entries(self, queue_entry_from_group):
@@ -2275,23 +2064,17 @@
params=(self.id, execution_subdir)))
- def get_job_tag(self, queue_entries):
- assert len(queue_entries) > 0
- execution_subdir = queue_entries[0].execution_subdir
- assert execution_subdir
- return "%s-%s/%s" % (self.id, self.owner, execution_subdir)
-
-
def _get_autoserv_params(self, queue_entries):
- results_dir = self.create_results_dir(queue_entries[0])
- control_filename = self._write_control_file()
+ assert queue_entries
+ execution_tag = queue_entries[0].execution_tag()
+ control_path = self._write_control_file(execution_tag)
hostnames = ','.join([entry.get_host().hostname
for entry in queue_entries])
- job_tag = self.get_job_tag(queue_entries)
- params = [_autoserv_path, '-P', job_tag, '-p', '-n',
- '-r', os.path.abspath(results_dir), '-u', self.owner,
- '-l', self.name, '-m', hostnames, control_filename]
+ params = [_autoserv_path, '-P', execution_tag, '-p', '-n',
+ '-r', _drone_manager.absolute_path(execution_tag),
+ '-u', self.owner, '-l', self.name, '-m', hostnames,
+ _drone_manager.absolute_path(control_path)]
if not self.is_server_job():
params.append('-c')
@@ -2344,8 +2127,7 @@
if not self.is_ready():
if self.run_verify:
queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
- return Agent(self._get_pre_job_tasks(queue_entry),
- [queue_entry.id])
+ return Agent(self._get_pre_job_tasks(queue_entry))
else:
return queue_entry.on_pending()
@@ -2362,7 +2144,7 @@
tasks = initial_tasks + [queue_task]
entry_ids = [entry.id for entry in queue_entries]
- return Agent(tasks, entry_ids, num_processes=len(queue_entries))
+ return Agent(tasks, num_processes=len(queue_entries))
if __name__ == '__main__':
diff --git a/scheduler/monitor_db_unittest.py b/scheduler/monitor_db_unittest.py
index 059d1ee..2dff024 100644
--- a/scheduler/monitor_db_unittest.py
+++ b/scheduler/monitor_db_unittest.py
@@ -10,12 +10,32 @@
from autotest_lib.database import database_connection, migrate
from autotest_lib.frontend import thread_local
from autotest_lib.frontend.afe import models
-from autotest_lib.scheduler import monitor_db
+from autotest_lib.scheduler import monitor_db, drone_manager, email_manager
_DEBUG = False
-class Dummy(object):
- 'Dummy object that can have attribute assigned to it'
+class DummyAgent(object):
+ _is_running = False
+ _is_done = False
+ num_processes = 1
+ host_ids = []
+ queue_entry_ids = []
+
+ def is_running(self):
+ return self._is_running
+
+
+ def tick(self):
+ self._is_running = True
+
+
+ def is_done(self):
+ return self._is_done
+
+
+ def set_done(self, done):
+ self._is_done = done
+ self._is_running = not done
class IsRow(mock.argument_comparator):
@@ -67,6 +87,8 @@
def _set_monitor_stubs(self):
monitor_db._db = self._database
+ monitor_db._drone_manager._results_dir = '/test/path'
+ monitor_db._drone_manager._temporary_directory = '/test/path/tmp'
def _fill_in_test_data(self):
@@ -158,7 +180,7 @@
else:
host = hqe_self.host
self._record_job_scheduled(hqe_self.job.id, host.id)
- return Dummy()
+ return DummyAgent()
monitor_db.HostQueueEntry.run = run_stub
@@ -401,30 +423,8 @@
self._dispatcher.max_processes_started_per_cycle = self._MAX_STARTED
- class DummyAgent(object):
- _is_running = False
- _is_done = False
- num_processes = 1
-
- def is_running(self):
- return self._is_running
-
-
- def tick(self):
- self._is_running = True
-
-
- def is_done(self):
- return self._is_done
-
-
- def set_done(self, done):
- self._is_done = done
- self._is_running = not done
-
-
def _setup_some_agents(self, num_agents):
- self._agents = [self.DummyAgent() for i in xrange(num_agents)]
+ self._agents = [DummyAgent() for i in xrange(num_agents)]
self._dispatcher._agents = list(self._agents)
@@ -494,33 +494,39 @@
"""
Test the dispatcher abort functionality.
"""
- def _check_agent(self, agent, entry_and_host_id, include_host_tasks):
+ def _check_abort_agent(self, agent, entry_id):
self.assert_(isinstance(agent, monitor_db.Agent))
tasks = list(agent.queue.queue)
- self.assert_(len(tasks) > 0)
-
+ self.assertEquals(len(tasks), 1)
abort = tasks[0]
+
self.assert_(isinstance(abort, monitor_db.AbortTask))
- self.assertEquals(abort.queue_entry.id, entry_and_host_id)
+ self.assertEquals(abort.queue_entry.id, entry_id)
- if not include_host_tasks:
- self.assertEquals(len(tasks), 1)
- return
- self.assertEquals(len(tasks), 3)
- cleanup, verify = tasks[1:]
+ def _check_host_agent(self, agent, host_id):
+ self.assert_(isinstance(agent, monitor_db.Agent))
+ tasks = list(agent.queue.queue)
+ self.assertEquals(len(tasks), 2)
+ cleanup, verify = tasks
self.assert_(isinstance(cleanup, monitor_db.CleanupTask))
- self.assertEquals(cleanup.host.id, entry_and_host_id)
+ self.assertEquals(cleanup.host.id, host_id)
self.assert_(isinstance(verify, monitor_db.VerifyTask))
- self.assertEquals(verify.host.id, entry_and_host_id)
+ self.assertEquals(verify.host.id, host_id)
def _check_agents(self, agents, include_host_tasks):
+ agents = list(agents)
+ if include_host_tasks:
+ self.assertEquals(len(agents), 4)
+ self._check_host_agent(agents.pop(0), 1)
+ self._check_host_agent(agents.pop(1), 2)
+
self.assertEquals(len(agents), 2)
- for index, agent in enumerate(agents):
- self._check_agent(agent, index + 1, include_host_tasks)
+ self._check_abort_agent(agents[0], 1)
+ self._check_abort_agent(agents[1], 2)
def test_find_aborting_inactive(self):
@@ -538,7 +544,7 @@
self._update_hqe(set='status="Abort", active=1')
# have to make an Agent for the active HQEs
agent = self.god.create_mock_class(monitor_db.Agent, 'old_agent')
- agent.queue_entry_ids = [1, 2]
+ agent.host_ids = agent.queue_entry_ids = [1, 2]
self._dispatcher.add_agent(agent)
self._dispatcher._find_aborting()
@@ -547,9 +553,9 @@
self.god.check_playback()
# ensure agent gets aborted
- abort1 = self._dispatcher._agents[0].queue.queue[0]
+ abort1 = self._dispatcher._agents[1].queue.queue[0]
self.assertEquals(abort1.agents_to_abort, [agent])
- abort2 = self._dispatcher._agents[1].queue.queue[0]
+ abort2 = self._dispatcher._agents[3].queue.queue[0]
self.assertEquals(abort2.agents_to_abort, [])
@@ -596,61 +602,81 @@
class PidfileRunMonitorTest(unittest.TestCase):
- results_dir = '/test/path'
- pidfile_path = os.path.join(results_dir, monitor_db.AUTOSERV_PID_FILE)
+ execution_tag = 'test_tag'
pid = 12345
+ process = drone_manager.Process('myhost', pid)
num_tests_failed = 1
- args = ('nice -n 10 autoserv -P 123-myuser/myhost -p -n '
- '-r ' + results_dir + ' -b -u myuser -l my-job-name '
- '-m myhost /tmp/filejx43Zi -c')
- bad_args = args.replace(results_dir, '/random/results/dir')
def setUp(self):
self.god = mock.mock_god()
- self.god.stub_function(monitor_db, 'open')
- self.god.stub_function(os.path, 'exists')
- self.god.stub_function(monitor_db.email_manager,
- 'enqueue_notify_email')
- self.monitor = monitor_db.PidfileRunMonitor(self.results_dir)
+ self.mock_drone_manager = self.god.create_mock_class(
+ drone_manager.DroneManager, 'drone_manager')
+ self.god.stub_with(monitor_db, '_drone_manager',
+ self.mock_drone_manager)
+ self.god.stub_function(email_manager.manager, 'enqueue_notify_email')
+
+ self.pidfile_id = object()
+
+ self.mock_drone_manager.get_pidfile_id_from.expect_call(
+ self.execution_tag).and_return(self.pidfile_id)
+ self.mock_drone_manager.register_pidfile.expect_call(self.pidfile_id)
+
+ self.monitor = monitor_db.PidfileRunMonitor()
+ self.monitor.attach_to_existing_process(self.execution_tag)
def tearDown(self):
self.god.unstub_all()
+ def setup_pidfile(self, pid=None, exit_code=None, tests_failed=None,
+ use_second_read=False):
+ contents = drone_manager.PidfileContents()
+ if pid is not None:
+ contents.process = drone_manager.Process('myhost', pid)
+ contents.exit_status = exit_code
+ contents.num_tests_failed = tests_failed
+ self.mock_drone_manager.get_pidfile_contents.expect_call(
+ self.pidfile_id, use_second_read=use_second_read).and_return(
+ contents)
+
+
def set_not_yet_run(self):
- os.path.exists.expect_call(self.pidfile_path).and_return(False)
-
-
- def setup_pidfile(self, pidfile_contents):
- os.path.exists.expect_call(self.pidfile_path).and_return(True)
- pidfile = StringIO.StringIO(pidfile_contents)
- monitor_db.open.expect_call(
- self.pidfile_path, 'r').and_return(pidfile)
+ self.setup_pidfile()
def set_empty_pidfile(self):
- self.setup_pidfile('')
+ self.setup_pidfile()
- def set_running(self):
- self.setup_pidfile(str(self.pid) + '\n')
+ def set_running(self, use_second_read=False):
+ self.setup_pidfile(self.pid, use_second_read=use_second_read)
- def set_complete(self, error_code):
- self.setup_pidfile(str(self.pid) + '\n' +
- str(error_code) + '\n' +
- str(self.num_tests_failed))
+ def set_complete(self, error_code, use_second_read=False):
+ self.setup_pidfile(self.pid, error_code, self.num_tests_failed,
+ use_second_read=use_second_read)
+
+
+ def _check_monitor(self, expected_pid, expected_exit_status,
+ expected_num_tests_failed):
+ if expected_pid is None:
+ self.assertEquals(self.monitor._state.process, None)
+ else:
+ self.assertEquals(self.monitor._state.process.pid, expected_pid)
+ self.assertEquals(self.monitor._state.exit_status, expected_exit_status)
+ self.assertEquals(self.monitor._state.num_tests_failed,
+ expected_num_tests_failed)
+
+
+ self.god.check_playback()
def _test_read_pidfile_helper(self, expected_pid, expected_exit_status,
expected_num_tests_failed):
self.monitor._read_pidfile()
- self.assertEquals(self.monitor._state.pid, expected_pid)
- self.assertEquals(self.monitor._state.exit_status, expected_exit_status)
- self.assertEquals(self.monitor._state.num_tests_failed,
- expected_num_tests_failed)
- self.god.check_playback()
+ self._check_monitor(expected_pid, expected_exit_status,
+ expected_num_tests_failed)
def _get_expected_tests_failed(self, expected_exit_status):
@@ -676,34 +702,24 @@
def test_read_pidfile_error(self):
- self.setup_pidfile('asdf')
- self.assertRaises(monitor_db.PidfileException,
+ self.mock_drone_manager.get_pidfile_contents.expect_call(
+ self.pidfile_id, use_second_read=False).and_return(
+ drone_manager.InvalidPidfile('error'))
+ self.assertRaises(monitor_db.PidfileRunMonitor._PidfileException,
self.monitor._read_pidfile)
self.god.check_playback()
- def setup_proc_cmdline(self, args):
- proc_cmdline = args.replace(' ', '\x00')
- proc_file = StringIO.StringIO(proc_cmdline)
- monitor_db.open.expect_call(
- '/proc/%d/cmdline' % self.pid, 'r').and_return(proc_file)
-
-
- def setup_find_autoservs(self, process_dict):
- self.god.stub_class_method(monitor_db.Dispatcher,
- 'find_autoservs')
- monitor_db.Dispatcher.find_autoservs.expect_call().and_return(
- process_dict)
+ def setup_is_running(self, is_running):
+ self.mock_drone_manager.is_process_running.expect_call(
+ self.process).and_return(is_running)
def _test_get_pidfile_info_helper(self, expected_pid, expected_exit_status,
expected_num_tests_failed):
self.monitor._get_pidfile_info()
- self.assertEquals(self.monitor._state.pid, expected_pid)
- self.assertEquals(self.monitor._state.exit_status, expected_exit_status)
- self.assertEquals(self.monitor._state.num_tests_failed,
- expected_num_tests_failed)
- self.god.check_playback()
+ self._check_monitor(expected_pid, expected_exit_status,
+ expected_num_tests_failed)
def test_get_pidfile_info(self):
@@ -712,14 +728,13 @@
"""
# running
self.set_running()
- self.setup_proc_cmdline(self.args)
+ self.setup_is_running(True)
self._test_get_pidfile_info_helper(self.pid, None, None)
# exited during check
self.set_running()
- monitor_db.open.expect_call(
- '/proc/%d/cmdline' % self.pid, 'r').and_raises(IOError)
- self.set_complete(123) # pidfile gets read again
+ self.setup_is_running(False)
+ self.set_complete(123, use_second_read=True) # pidfile gets read again
self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed)
# completed
@@ -733,10 +748,9 @@
"""
# running but no proc
self.set_running()
- monitor_db.open.expect_call(
- '/proc/%d/cmdline' % self.pid, 'r').and_raises(IOError)
- self.set_running()
- monitor_db.email_manager.enqueue_notify_email.expect_call(
+ self.setup_is_running(False)
+ self.set_running(use_second_read=True)
+ email_manager.manager.enqueue_notify_email.expect_call(
mock.is_string_comparator(), mock.is_string_comparator())
self._test_get_pidfile_info_helper(self.pid, 1, 0)
self.assertTrue(self.monitor.lost_process)
@@ -746,20 +760,19 @@
"""
pidfile hasn't been written yet
"""
- # process not running
self.set_not_yet_run()
- self.setup_find_autoservs({})
self._test_get_pidfile_info_helper(None, None, None)
- # process running
- self.set_not_yet_run()
- self.setup_find_autoservs({self.pid : self.args})
- self._test_get_pidfile_info_helper(None, None, None)
- # another process running under same pid
+ def test_process_failed_to_write_pidfile(self):
self.set_not_yet_run()
- self.setup_find_autoservs({self.pid : self.bad_args})
- self._test_get_pidfile_info_helper(None, None, None)
+ email_manager.manager.enqueue_notify_email.expect_call(
+ mock.is_string_comparator(), mock.is_string_comparator())
+ dummy_process = drone_manager.Process('dummy', 12345)
+ self.mock_drone_manager.get_dummy_process.expect_call().and_return(
+ dummy_process)
+ self.monitor._start_time = time.time() - monitor_db.PIDFILE_TIMEOUT - 1
+ self._test_get_pidfile_info_helper(12345, 1, 0)
class AgentTest(unittest.TestCase):
@@ -771,13 +784,16 @@
self.god.unstub_all()
+ def _create_mock_task(self, name):
+ task = self.god.create_mock_class(monitor_db.AgentTask, name)
+ task.host_ids = task.queue_entry_ids = []
+ return task
+
+
def test_agent(self):
- task1 = self.god.create_mock_class(monitor_db.AgentTask,
- 'task1')
- task2 = self.god.create_mock_class(monitor_db.AgentTask,
- 'task2')
- task3 = self.god.create_mock_class(monitor_db.AgentTask,
- 'task3')
+ task1 = self._create_mock_task('task1')
+ task2 = self._create_mock_task('task2')
+ task3 = self._create_mock_task('task3')
task1.start.expect_call()
task1.is_done.expect_call().and_return(False)
@@ -806,27 +822,39 @@
class AgentTasksTest(unittest.TestCase):
- TEMP_DIR = '/temp/dir'
+ TEMP_DIR = '/abspath/tempdir'
RESULTS_DIR = '/results/dir'
HOSTNAME = 'myhost'
+ DUMMY_PROCESS = object()
HOST_PROTECTION = host_protections.default
+ PIDFILE_ID = object()
def setUp(self):
self.god = mock.mock_god()
- self.god.stub_with(tempfile, 'mkdtemp',
- mock.mock_function('mkdtemp', self.TEMP_DIR))
- self.god.stub_with(os.path, 'exists',
- mock.mock_function('exists', True))
- self.god.stub_with(shutil, 'rmtree', mock.mock_function('rmtree', None))
- self.god.stub_class_method(monitor_db.RunMonitor, 'run')
- self.god.stub_class_method(monitor_db.RunMonitor, 'exit_code')
- self.god.stub_class_method(monitor_db.PreJobTask, '_move_results')
+ self.god.stub_with(drone_manager.DroneManager, 'get_temporary_path',
+ mock.mock_function('get_temporary_path',
+ default_return_val='tempdir'))
+ self.god.stub_function(drone_manager.DroneManager,
+ 'copy_to_results_repository')
+ self.god.stub_function(drone_manager.DroneManager,
+ 'get_pidfile_id_from')
+
+ def dummy_absolute_path(self, path):
+ return '/abspath/' + path
+ self.god.stub_with(drone_manager.DroneManager, 'absolute_path',
+ dummy_absolute_path)
+
+ self.god.stub_class_method(monitor_db.PidfileRunMonitor, 'run')
+ self.god.stub_class_method(monitor_db.PidfileRunMonitor, 'exit_code')
+ self.god.stub_class_method(monitor_db.PidfileRunMonitor, 'get_process')
self.host = self.god.create_mock_class(monitor_db.Host, 'host')
+ self.host.id = 1
self.host.hostname = self.HOSTNAME
self.host.protection = self.HOST_PROTECTION
self.queue_entry = self.god.create_mock_class(
monitor_db.HostQueueEntry, 'queue_entry')
self.job = self.god.create_mock_class(monitor_db.Job, 'job')
+ self.queue_entry.id = 1
self.queue_entry.job = self.job
self.queue_entry.host = self.host
self.queue_entry.meta_host = None
@@ -857,12 +885,31 @@
self.assertEquals(task.success, success)
- def setup_run_monitor(self, exit_status):
- monitor_db.RunMonitor.run.expect_call()
- monitor_db.RunMonitor.exit_code.expect_call()
- monitor_db.RunMonitor.exit_code.expect_call().and_return(
+ def setup_run_monitor(self, exit_status, copy_log_file=True):
+ monitor_db.PidfileRunMonitor.run.expect_call(
+ mock.is_instance_comparator(list),
+ 'tempdir',
+ nice_level=monitor_db.AUTOSERV_NICE_LEVEL,
+ log_file=mock.anything_comparator())
+ monitor_db.PidfileRunMonitor.exit_code.expect_call()
+ monitor_db.PidfileRunMonitor.exit_code.expect_call().and_return(
exit_status)
+ if copy_log_file:
+ self._setup_move_logfile()
+
+
+ def _setup_move_logfile(self, include_destination=False):
+ monitor_db.PidfileRunMonitor.get_process.expect_call().and_return(
+ self.DUMMY_PROCESS)
+ if include_destination:
+ drone_manager.DroneManager.copy_to_results_repository.expect_call(
+ self.DUMMY_PROCESS, mock.is_string_comparator(),
+ destination_path=mock.is_string_comparator())
+ else:
+ drone_manager.DroneManager.copy_to_results_repository.expect_call(
+ self.DUMMY_PROCESS, mock.is_string_comparator())
+
def _test_repair_task_helper(self, success):
self.host.set_status.expect_call('Repairing')
@@ -882,10 +929,10 @@
expected_protection = host_protections.Protection.get_attr_name(
expected_protection)
- self.assertTrue(set(task.monitor.cmd) >=
- set(['autoserv', '-R', '-m', self.HOSTNAME, '-r',
- self.TEMP_DIR, '--host-protection',
- expected_protection]))
+ self.assertTrue(set(task.cmd) >=
+ set([monitor_db._autoserv_path, '-p', '-R', '-m',
+ self.HOSTNAME, '-r', self.TEMP_DIR,
+ '--host-protection', expected_protection]))
self.god.check_playback()
@@ -908,7 +955,6 @@
def setup_verify_expects(self, success, use_queue_entry):
if use_queue_entry:
- self.queue_entry.clear_results_dir.expect_call()
self.queue_entry.set_status.expect_call('Verifying')
self.host.set_status.expect_call('Verifying')
if success:
@@ -920,7 +966,8 @@
self.setup_run_monitor(1)
if use_queue_entry and not self.queue_entry.meta_host:
self.queue_entry.set_execution_subdir.expect_call()
- monitor_db.VerifyTask._move_results.expect_call()
+ self.queue_entry.execution_tag.expect_call().and_return('tag')
+ self._setup_move_logfile(include_destination=True)
def _check_verify_failure_tasks(self, verify_task):
@@ -939,15 +986,14 @@
self.setup_verify_expects(success, use_queue_entry)
if use_queue_entry:
- task = monitor_db.VerifyTask(
- queue_entry=self.queue_entry)
+ task = monitor_db.VerifyTask(queue_entry=self.queue_entry)
else:
task = monitor_db.VerifyTask(host=self.host)
self._check_verify_failure_tasks(task)
self.run_task(task, success)
- self.assertTrue(set(task.monitor.cmd) >=
- set(['autoserv', '-v', '-m', self.HOSTNAME, '-r',
- self.TEMP_DIR]))
+ self.assertTrue(set(task.cmd) >=
+ set([monitor_db._autoserv_path, '-p', '-v', '-m',
+ self.HOSTNAME, '-r', self.TEMP_DIR]))
self.god.check_playback()
@@ -969,6 +1015,7 @@
def test_abort_task(self):
queue_entry = self.god.create_mock_class(monitor_db.HostQueueEntry,
'queue_entry')
+ queue_entry.id = 1
queue_entry.host_id, queue_entry.job_id = 1, 2
task = self.god.create_mock_class(monitor_db.AgentTask, 'task')
agent = self.god.create_mock_class(monitor_db.Agent, 'agent')
@@ -982,45 +1029,61 @@
self.god.check_playback()
- def _setup_pre_parse_expects(self, is_synch, num_machines):
- self.queue_entry.results_dir.expect_call().and_return(self.RESULTS_DIR)
+ def _setup_pre_parse_expects(self, autoserv_success):
+ self.queue_entry.execution_tag.expect_call().and_return('tag')
+ self.pidfile_monitor = monitor_db.PidfileRunMonitor.expect_new()
+ self.pidfile_monitor.pidfile_id = self.PIDFILE_ID
+ self.pidfile_monitor.attach_to_existing_process.expect_call('tag')
+ if autoserv_success:
+ code = 0
+ else:
+ code = 1
+ self.pidfile_monitor.exit_code.expect_call().and_return(code)
+
self.queue_entry.set_status.expect_call('Parsing')
def _setup_post_parse_expects(self, autoserv_success):
- pidfile_monitor = monitor_db.PidfileRunMonitor.expect_new(
- self.RESULTS_DIR)
if autoserv_success:
- code, status = 0, 'Completed'
+ status = 'Completed'
else:
- code, status = 1, 'Failed'
- pidfile_monitor.exit_code.expect_call().and_return(code)
+ status = 'Failed'
self.queue_entry.set_status.expect_call(status)
- def _test_final_reparse_task_helper(self, is_synch=False, num_machines=1,
- autoserv_success=True):
- tko_dir = '/tko/dir'
- monitor_db.AUTOTEST_TKO_DIR = tko_dir
- parse_path = os.path.join(tko_dir, 'parse')
+ def setup_reparse_run_monitor(self):
+ autoserv_pidfile_id = object()
+ monitor = monitor_db.PidfileRunMonitor.expect_new()
+ monitor.run.expect_call(
+ mock.is_instance_comparator(list),
+ 'tag',
+ log_file=mock.anything_comparator(),
+ pidfile_name='.parser_execute',
+ paired_with_pidfile=self.PIDFILE_ID)
+ monitor.exit_code.expect_call()
+ monitor.exit_code.expect_call().and_return(0)
+ monitor.get_process.expect_call().and_return(self.DUMMY_PROCESS)
+ drone_manager.DroneManager.copy_to_results_repository.expect_call(
+ self.DUMMY_PROCESS, mock.is_string_comparator())
- self._setup_pre_parse_expects(is_synch, num_machines)
- self.setup_run_monitor(0)
+
+ def _test_final_reparse_task_helper(self, autoserv_success=True):
+ self._setup_pre_parse_expects(autoserv_success)
+ self.setup_reparse_run_monitor()
self._setup_post_parse_expects(autoserv_success)
task = monitor_db.FinalReparseTask([self.queue_entry])
self.run_task(task, True)
self.god.check_playback()
- cmd = [parse_path, '-l', '2', '-r', '-o', self.RESULTS_DIR]
+ cmd = [monitor_db._parser_path, '--write-pidfile', '-l', '2', '-r',
+ '-o', '/abspath/tag']
self.assertEquals(task.cmd, cmd)
def test_final_reparse_task(self):
self.god.stub_class(monitor_db, 'PidfileRunMonitor')
self._test_final_reparse_task_helper()
- self._test_final_reparse_task_helper(num_machines=2)
- self._test_final_reparse_task_helper(is_synch=True)
self._test_final_reparse_task_helper(autoserv_success=False)
@@ -1029,12 +1092,12 @@
self.god.stub_function(monitor_db.FinalReparseTask,
'_can_run_new_parse')
- self._setup_pre_parse_expects(False, 1)
+ self._setup_pre_parse_expects(True)
monitor_db.FinalReparseTask._can_run_new_parse.expect_call().and_return(
False)
monitor_db.FinalReparseTask._can_run_new_parse.expect_call().and_return(
True)
- self.setup_run_monitor(0)
+ self.setup_reparse_run_monitor()
self._setup_post_parse_expects(True)
task = monitor_db.FinalReparseTask([self.queue_entry])
@@ -1045,7 +1108,6 @@
def _test_cleanup_task_helper(self, success, use_queue_entry=False):
if use_queue_entry:
self.queue_entry.get_host.expect_call().and_return(self.host)
- self.queue_entry.clear_results_dir.expect_call()
self.host.set_status.expect_call('Cleaning')
if success:
self.setup_run_monitor(0)
@@ -1055,7 +1117,8 @@
self.setup_run_monitor(1)
if use_queue_entry and not self.queue_entry.meta_host:
self.queue_entry.set_execution_subdir.expect_call()
- monitor_db.VerifyTask._move_results.expect_call()
+ self.queue_entry.execution_tag.expect_call().and_return('tag')
+ self._setup_move_logfile(include_destination=True)
if use_queue_entry:
task = monitor_db.CleanupTask(queue_entry=self.queue_entry)
@@ -1070,9 +1133,9 @@
self.run_task(task, success)
self.god.check_playback()
- self.assert_(set(task.monitor.cmd) >=
- set(['autoserv', '--cleanup', '-m', self.HOSTNAME,
- '-r', self.TEMP_DIR]))
+ self.assert_(set(task.cmd) >=
+ set([monitor_db._autoserv_path, '-p', '--cleanup', '-m',
+ self.HOSTNAME, '-r', self.TEMP_DIR]))
def test_cleanup_task(self):
self._test_cleanup_task_helper(True)
@@ -1086,16 +1149,15 @@
class JobTest(BaseSchedulerTest):
def setUp(self):
super(JobTest, self).setUp()
- self.god.stub_function(os.path, 'exists')
- self.god.stub_function(monitor_db, 'ensure_directory_exists')
+ self.god.stub_with(
+ drone_manager.DroneManager, 'attach_file_to_execution',
+ mock.mock_function('attach_file_to_execution',
+ default_return_val='/test/path/tmp/foo'))
def _setup_directory_expects(self, execution_subdir):
job_path = os.path.join('.', '1-my_user')
results_dir = os.path.join(job_path, execution_subdir)
- monitor_db.ensure_directory_exists.expect_call(job_path)
- os.path.exists.expect_call(results_dir)
- monitor_db.ensure_directory_exists.expect_call(results_dir)
def _test_run_helper(self, expect_agent=True, expect_starting=False,
diff --git a/server/hosts/abstract_ssh.py b/server/hosts/abstract_ssh.py
index 9a5b38f..c6707d1 100644
--- a/server/hosts/abstract_ssh.py
+++ b/server/hosts/abstract_ssh.py
@@ -1,5 +1,5 @@
import os, sys, time, types, socket, traceback, shutil
-from autotest_lib.client.common_lib import error
+from autotest_lib.client.common_lib import error, debug
from autotest_lib.server import utils, autotest
from autotest_lib.server.hosts import site_host
@@ -41,7 +41,8 @@
format (%s@%s:%s).
"""
- print '_copy_files: copying %s to %s' % (sources, dest)
+ debug.get_logger().debug('_copy_files: copying %s to %s' %
+ (sources, dest))
try:
ssh = make_ssh_command(self.user, self.port)
if delete_dest:
@@ -289,3 +290,12 @@
self.machine_install()
except NotImplementedError, e:
sys.stderr.write(str(e) + "\n\n")
+
+
+class LoggerFile(object):
+ def write(self, data):
+ debug.get_logger().debug(data)
+
+
+ def flush(self):
+ pass
diff --git a/server/hosts/paramiko_host.py b/server/hosts/paramiko_host.py
index 68643a5..4912881 100644
--- a/server/hosts/paramiko_host.py
+++ b/server/hosts/paramiko_host.py
@@ -101,8 +101,8 @@
"""
# tee to std* if no tees are provided
- stdout = stdout_tee or sys.stdout
- stderr = stderr_tee or sys.stdout
+ stdout = stdout_tee or abstract_ssh.LoggerFile()
+ stderr = stderr_tee or abstract_ssh.LoggerFile()
self.host_log.debug("ssh-paramiko: %s" % command)
# start up the command
diff --git a/server/hosts/ssh_host.py b/server/hosts/ssh_host.py
index e839b3e..4887512 100644
--- a/server/hosts/ssh_host.py
+++ b/server/hosts/ssh_host.py
@@ -63,7 +63,7 @@
def _run(self, command, timeout, ignore_status, stdout, stderr,
- connect_timeout, env, options):
+ connect_timeout, env, options, stdin=None):
"""Helper function for run()."""
ssh_cmd = self.ssh_command(connect_timeout, options)
echo_cmd = "echo \`date '+%m/%d/%y %H:%M:%S'\` Connected. >&2"
@@ -74,7 +74,7 @@
full_cmd = '%s "%s;%s %s"' % (ssh_cmd, echo_cmd, env,
utils.sh_escape(command))
result = utils.run(full_cmd, timeout, True, stdout, stderr,
- verbose=False)
+ verbose=False, stdin=stdin)
# The error messages will show up in band (indistinguishable
# from stuff sent through the SSH connection), so we have the
@@ -96,7 +96,8 @@
def run(self, command, timeout=3600, ignore_status=False,
- stdout_tee=None, stderr_tee=None, connect_timeout=30, options=''):
+ stdout_tee=None, stderr_tee=None, connect_timeout=30, options='',
+ stdin=None):
"""
Run a command on the remote host.
@@ -108,6 +109,7 @@
to complete if it has to kill the process.
ignore_status: do not raise an exception, no matter
what the exit code of the command is.
+ stdin: stdin to pass to the executed process
Returns:
a utils.CmdResult object
@@ -117,20 +119,22 @@
execution was not 0
AutoservSSHTimeout: ssh connection has timed out
"""
- stdout = stdout_tee or sys.stdout
- stderr = stderr_tee or sys.stdout
+ stdout = stdout_tee or abstract_ssh.LoggerFile()
+ stderr = stderr_tee or abstract_ssh.LoggerFile()
self.ssh_host_log.debug("ssh: %s" % command)
env = " ".join("=".join(pair) for pair in self.env.iteritems())
try:
try:
return self._run(command, timeout, ignore_status, stdout,
- stderr, connect_timeout, env, options)
+ stderr, connect_timeout, env, options,
+ stdin=stdin)
except PermissionDeniedError:
- print("Permission denied to ssh; re-running"
- "with increased logging:")
+ print >>stdout, ("Permission denied to ssh; re-running"
+ "with increased logging:")
try:
self._run(command, timeout, ignore_status, stdout,
- stderr, connect_timeout, env, '-v -v -v')
+ stderr, connect_timeout, env, '-v -v -v',
+ stdin=stdin)
except Exception:
pass
raise
diff --git a/server/server_job.py b/server/server_job.py
index 51fbe2b..5580b66 100755
--- a/server/server_job.py
+++ b/server/server_job.py
@@ -154,7 +154,8 @@
os.unlink(self.status)
job_data = {'label' : label, 'user' : user,
'hostname' : ','.join(machines),
- 'status_version' : str(self.STATUS_VERSION)}
+ 'status_version' : str(self.STATUS_VERSION),
+ 'job_started' : str(int(time.time()))}
if self.resultdir:
job_data.update(get_site_job_data(self))
utils.write_keyval(self.resultdir, job_data)
diff --git a/server/utils.py b/server/utils.py
index 3e80430..d5991f5 100644
--- a/server/utils.py
+++ b/server/utils.py
@@ -24,9 +24,9 @@
############# we need pass throughs for the methods in client/common_lib/utils
def run(command, timeout=None, ignore_status=False,
- stdout_tee=None, stderr_tee=None, verbose=True):
+ stdout_tee=None, stderr_tee=None, verbose=True, stdin=None):
return utils.run(command, timeout, ignore_status,
- stdout_tee, stderr_tee, verbose)
+ stdout_tee, stderr_tee, verbose, stdin=stdin)
def system(command, timeout=None, ignore_status=False):
diff --git a/tko/parse b/tko/parse
index 8379db3..6854e95 100755
--- a/tko/parse
+++ b/tko/parse
@@ -1,9 +1,5 @@
#!/usr/bin/python -u
-import os, sys
-
-# this is a stub that just execs into parse.py
-mypath = os.path.dirname(os.path.abspath(__file__))
-parse_py = os.path.join(mypath, "parse.py")
-
-os.execv(parse_py, [parse_py] + sys.argv[1:])
+import common
+from autotest_lib.tko import parse
+parse.main()
diff --git a/tko/retrieve_logs.cgi b/tko/retrieve_logs.cgi
index 157bf8a..5cb3a22 100755
--- a/tko/retrieve_logs.cgi
+++ b/tko/retrieve_logs.cgi
@@ -1,6 +1,8 @@
#!/usr/bin/python
-import cgi, os, sys
+import cgi, os, sys, urllib2
+import common
+from autotest_lib.client.common_lib import global_config
page = """\
Status: 302 Found
@@ -16,12 +18,12 @@
# Define function for retrieving logs
try:
- import site_retrieve_logs
- retrieve_logs = site_retrieve_logs.retrieve_logs
- del site_retrieve_logs
+ import site_retrieve_logs
+ retrieve_logs = site_retrieve_logs.retrieve_logs
+ del site_retrieve_logs
except ImportError:
- def retrieve_logs(job_path):
- pass
+ def retrieve_logs(job_path):
+ pass
# Get form fields
form = cgi.FieldStorage(keep_blank_values=True)
@@ -30,12 +32,27 @@
job_path = os.path.join(autodir, job_path)
keyval = retrieve_logs(job_path)
-# Redirect to results page
-testname = ''
-if 'test' in form:
- testname = form['test'].value
- full_path = os.path.join(job_path, form['test'].value)
- if not os.path.exists(full_path):
- testname = ''
-path = "%s%s" % (form['job'].value, testname)
-print page % path
+
+def find_repo(job_path):
+ """Find the machine holding the given logs and return a URL to the logs"""
+ config = global_config.global_config
+ drones = config.get_config_value('SCHEDULER', 'drones')
+ results_host = config.get_config_value('SCHEDULER', 'results_host')
+ if drones and results_host and results_host != 'localhost':
+ drone_list = [hostname.strip() for hostname in drones.split(',')]
+ results_repos = [results_host] + drone_list
+ for drone in results_repos:
+ http_path = 'http://%s%s' % (drone, form['job'].value)
+ try:
+ urllib2.urlopen(http_path).read()
+ return http_path
+ except urllib2.URLError:
+ pass
+ # just return the results repo if we haven't found any
+ return 'http://%s%s' % (results_host, form['job'].value)
+ else:
+ # Local
+ return form['job'].value
+
+
+print page % find_repo(job_path)