[autotest] Scheduler refactor.
Break scheduler into simpler modules.
This change also modifies run_pylint to check for undefined variables.
BUG=chromium:312338
TEST=Ran smoke suite against multiple duts.
Triggered agents like repair, verify etc. Pylint, Unittests.
DEPLOY=scheduler
Change-Id: Ibd685a27b5b50abd26cdf2976ac4189c3e9acc0a
Reviewed-on: https://chromium-review.googlesource.com/174080
Commit-Queue: Prashanth B <beeps@chromium.org>
Tested-by: Prashanth B <beeps@chromium.org>
Reviewed-by: Alex Miller <milleral@chromium.org>
diff --git a/scheduler/agent_task.py b/scheduler/agent_task.py
new file mode 100644
index 0000000..7cd28f6
--- /dev/null
+++ b/scheduler/agent_task.py
@@ -0,0 +1,633 @@
+#pylint: disable-msg=C0111
+
+""" This is the module for everything related to the AgentTask.
+
+The BaseAgentTask imposes an interface through which the scheduler can monitor
+a processes; Examples of such processes include Verify, Cleanup and the Queue
+Tasks that run the tests. The scheduler itself only understands Agents.
+Agents:
+ The Agent is the bridge between the scheduler and the AgentTask. The
+ schedulers tick has a method called handle_agents, which calls the
+ tick of each agent in the Dispatchers queue. This leads to the Agent
+ polling its AgentTask. The scheduler will keep polling a task through
+ the associated Agent till the Agent is removed from the dispatcher.
+
+ At a high level:
+ agents finished = tasks done
+ agent polls till finished
+ task polls till done
+ task sets done
+ agent is removed from dispatcher
+AgentTasks:
+ Basic AgentTasks are created when an hqe changes state. Examples of these
+ are the QueueTask, which is created when a hqe goes into the Starting state
+ and the FinalReparseTask, which is created when the hqe goes into parsing.
+SpecialAgentTasks:
+ Unlink AgentTasks, SpecialAgentTasks are only created when a row is inserted
+ in the afe_special_tasks table. All PrejobTasks are SpecialAgentTasks.
+
+Monitor_db.get_agent_task_for_special_task/get_agent_task_for_queue_entry maps
+an AgentTask to an Agent, which the scheduler understands. From this point
+onward, the scheduler manages the task through the Agents interface,as follows:
+At a high level:
+ task poll
+ start
+ prolog
+ tick till we get an exit code
+ finished(exit==0)
+ done=True
+ epilog
+ cleanup
+ set is_active, is_complete, success (checked in scheduler)
+
+The first special task for an HQE is usually Reset.
+-poll: The first poll will start the task, polls thereafter will call the tasks
+ tick method. A started task will have the started bit set.
+- start: Call prolog, run the process and set the start bit.
+ - prolog: Usually where one puts any model state changes that happen before
+ the actual task. Different per Task. Examples of things that might
+ happen in a prolog:
+ - state of Host, HQE (to something like Resetting)
+ - delete any unwanted queued special tasks
+ - register a pidfile
+ - set the is_active bit on the special task
+ - run:
+ - create a PidfileRunMonitor
+ - pass the autoserv command, working directory etc to drone manager.
+ This will start the actual autoserv process.
+ - set the start bit: so subsequent polls do not 'start' again
+
+- tick: For as long as a started tasks done bit is not set, a poll will lead
+ to a tick. The tick monitors the pid file of the autoserv process
+ running on the drone through the PidfileRunMonitor created in prolog.
+ If the autoserv process has finished we call finished with true/false
+ depending on autoserv exit code.
+
+ - finished: sets the done and success values, then calls epilog. The
+ done bit is important because the Agent polls this bit to
+ measure the success or failure of its task.
+
+ - epilog: Is generally where we set status of the Host/HQE again,
+ requeue any other task that needs to run after this one
+ and perform cleanup. Just like the prolog, this step is
+ different per task.
+
+ - cleanup: Sets the is_active and is_complete and success
+ states on the tasks model. Also uses the
+ drone_manager to:
+ unregister the pidfile
+ copy results of the task
+ (Note this is not to be confused with the
+ special task called cleanup).
+
+ The actions we take in the epilog are based on the
+ success/failure of the autoserv process set in cleanup,
+ eg: if reset failed we will enqueue a repair, but if all
+ is well the epilog will just return. Prejob task epilogs
+ also have an on_pending method that change the status of
+ the HQE to pending/starting, which gets picked up in the
+ scheduler.
+By this point the is_done flag is set, which results in the Agent noticing that
+the task has finished and unregistering it from the dispatcher.Class hierarchy:
+BaseAgentTask
+ |--->SpecialAgentTask (prejob_task.py)
+ |--->RepairTask
+ |--->PreJobTask
+ |--->Verify, Cleanup, Reset, Provision
+
+ |--->AbstractQueueTask (monitor_db.py)
+ |--->QueueTask
+ |--->HostlessQueueTask
+
+ |--->PostJobTask (postjob_task.py)
+ |--->GatherLogsTask
+ |--->SelfThrottledPostJobTask
+ |--->FinalReparseTask
+ |--->ArchiveResultsTask
+
+"""
+
+import logging
+import os
+import urllib
+import time
+
+from autotest_lib.frontend.afe import models
+from autotest_lib.scheduler import drone_manager, pidfile_monitor
+from autotest_lib.client.common_lib import utils
+from autotest_lib.scheduler import email_manager, host_scheduler
+from autotest_lib.scheduler import scheduler_models
+from autotest_lib.server import autoserv_utils
+
+
+_drone_manager = drone_manager.instance()
+
+AUTOSERV_NICE_LEVEL = 10
+
+
+class BaseAgentTask(object):
+ class _NullMonitor(object):
+ pidfile_id = None
+
+ def has_process(self):
+ return True
+
+
+ def __init__(self, log_file_name=None):
+ """
+ @param log_file_name: (optional) name of file to log command output to
+ """
+ self.done = False
+ self.started = False
+ self.success = None
+ self.aborted = False
+ self.monitor = None
+ self.queue_entry_ids = []
+ self.host_ids = []
+ self._log_file_name = log_file_name
+
+
+ def _set_ids(self, host=None, queue_entries=None):
+ if queue_entries and queue_entries != [None]:
+ self.host_ids = [entry.host.id for entry in queue_entries]
+ self.queue_entry_ids = [entry.id for entry in queue_entries]
+ else:
+ assert host
+ self.host_ids = [host.id]
+
+
+ def poll(self):
+ if not self.started:
+ self.start()
+ if not self.done:
+ self.tick()
+
+
+ def tick(self):
+ assert self.monitor
+ exit_code = self.monitor.exit_code()
+ if exit_code is None:
+ return
+
+ success = (exit_code == 0)
+ self.finished(success)
+
+
+ def is_done(self):
+ return self.done
+
+
+ def finished(self, success):
+ if self.done:
+ assert self.started
+ return
+ self.started = True
+ self.done = True
+ self.success = success
+ self.epilog()
+
+
+ def prolog(self):
+ """
+ To be overridden.
+ """
+ assert not self.monitor
+ self.register_necessary_pidfiles()
+
+
+ def _log_file(self):
+ if not self._log_file_name:
+ return None
+ return os.path.join(self._working_directory(), self._log_file_name)
+
+
+ def cleanup(self):
+ log_file = self._log_file()
+ if self.monitor and log_file:
+ self.monitor.try_copy_to_results_repository(log_file)
+
+
+ def epilog(self):
+ """
+ To be overridden.
+ """
+ self.cleanup()
+ logging.info("%s finished with success=%s", type(self).__name__,
+ self.success)
+
+
+ def start(self):
+ if not self.started:
+ self.prolog()
+ self.run()
+
+ self.started = True
+
+
+ def abort(self):
+ if self.monitor:
+ self.monitor.kill()
+ self.done = True
+ self.aborted = True
+ self.cleanup()
+
+
+ def _get_consistent_execution_path(self, execution_entries):
+ first_execution_path = execution_entries[0].execution_path()
+ for execution_entry in execution_entries[1:]:
+ assert execution_entry.execution_path() == first_execution_path, (
+ '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
+ execution_entry,
+ first_execution_path,
+ execution_entries[0]))
+ return first_execution_path
+
+
+ def _copy_results(self, execution_entries, use_monitor=None):
+ """
+ @param execution_entries: list of objects with execution_path() method
+ """
+ if use_monitor is not None and not use_monitor.has_process():
+ return
+
+ assert len(execution_entries) > 0
+ if use_monitor is None:
+ assert self.monitor
+ use_monitor = self.monitor
+ assert use_monitor.has_process()
+ execution_path = self._get_consistent_execution_path(execution_entries)
+ results_path = execution_path + '/'
+ use_monitor.try_copy_to_results_repository(results_path)
+
+
+ def _parse_results(self, queue_entries):
+ for queue_entry in queue_entries:
+ queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
+
+
+ def _archive_results(self, queue_entries):
+ for queue_entry in queue_entries:
+ queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
+
+
+ def _command_line(self):
+ """
+ Return the command line to run. Must be overridden.
+ """
+ raise NotImplementedError
+
+
+ @property
+ def num_processes(self):
+ """
+ Return the number of processes forked by this BaseAgentTask's process.
+ It may only be approximate. To be overridden if necessary.
+ """
+ return 1
+
+
+ def _paired_with_monitor(self):
+ """
+ If this BaseAgentTask's process must run on the same machine as some
+ previous process, this method should be overridden to return a
+ PidfileRunMonitor for that process.
+ """
+ return self._NullMonitor()
+
+
+ @property
+ def owner_username(self):
+ """
+ Return login of user responsible for this task. May be None. Must be
+ overridden.
+ """
+ raise NotImplementedError
+
+
+ def _working_directory(self):
+ """
+ Return the directory where this BaseAgentTask's process executes.
+ Must be overridden.
+ """
+ raise NotImplementedError
+
+
+ def _pidfile_name(self):
+ """
+ Return the name of the pidfile this BaseAgentTask's process uses. To be
+ overridden if necessary.
+ """
+ return drone_manager.AUTOSERV_PID_FILE
+
+
+ def _check_paired_results_exist(self):
+ if not self._paired_with_monitor().has_process():
+ email_manager.manager.enqueue_notify_email(
+ 'No paired results in task',
+ 'No paired results in task %s at %s'
+ % (self, self._paired_with_monitor().pidfile_id))
+ self.finished(False)
+ return False
+ return True
+
+
+ def _create_monitor(self):
+ assert not self.monitor
+ self.monitor = pidfile_monitor.PidfileRunMonitor()
+
+
+ def run(self):
+ if not self._check_paired_results_exist():
+ return
+
+ self._create_monitor()
+ self.monitor.run(
+ self._command_line(), self._working_directory(),
+ num_processes=self.num_processes,
+ nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
+ pidfile_name=self._pidfile_name(),
+ paired_with_pidfile=self._paired_with_monitor().pidfile_id,
+ username=self.owner_username,
+ drone_hostnames_allowed=self.get_drone_hostnames_allowed())
+
+
+ def get_drone_hostnames_allowed(self):
+ if not models.DroneSet.drone_sets_enabled():
+ return None
+
+ hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
+ if not hqes:
+ # Only special tasks could be missing host queue entries
+ assert isinstance(self, SpecialAgentTask)
+ return self._user_or_global_default_drone_set(
+ self.task, self.task.requested_by)
+
+ job_ids = hqes.values_list('job', flat=True).distinct()
+ assert job_ids.count() == 1, ("BaseAgentTask's queue entries "
+ "span multiple jobs")
+
+ job = models.Job.objects.get(id=job_ids[0])
+ drone_set = job.drone_set
+ if not drone_set:
+ return self._user_or_global_default_drone_set(job, job.user())
+
+ return drone_set.get_drone_hostnames()
+
+
+ def _user_or_global_default_drone_set(self, obj_with_owner, user):
+ """
+ Returns the user's default drone set, if present.
+
+ Otherwise, returns the global default drone set.
+ """
+ default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
+ if not user:
+ logging.warn('%s had no owner; using default drone set',
+ obj_with_owner)
+ return default_hostnames
+ if not user.drone_set:
+ logging.warn('User %s has no default drone set, using global '
+ 'default', user.login)
+ return default_hostnames
+ return user.drone_set.get_drone_hostnames()
+
+
+ def register_necessary_pidfiles(self):
+ pidfile_id = _drone_manager.get_pidfile_id_from(
+ self._working_directory(), self._pidfile_name())
+ _drone_manager.register_pidfile(pidfile_id)
+
+ paired_pidfile_id = self._paired_with_monitor().pidfile_id
+ if paired_pidfile_id:
+ _drone_manager.register_pidfile(paired_pidfile_id)
+
+
+ def recover(self):
+ if not self._check_paired_results_exist():
+ return
+
+ self._create_monitor()
+ self.monitor.attach_to_existing_process(
+ self._working_directory(), pidfile_name=self._pidfile_name(),
+ num_processes=self.num_processes)
+ if not self.monitor.has_process():
+ # no process to recover; wait to be started normally
+ self.monitor = None
+ return
+
+ self.started = True
+ logging.info('Recovering process %s for %s at %s',
+ self.monitor.get_process(), type(self).__name__,
+ self._working_directory())
+
+
+ def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
+ allowed_host_statuses=None):
+ class_name = self.__class__.__name__
+ for entry in queue_entries:
+ if entry.status not in allowed_hqe_statuses:
+ raise host_scheduler.SchedulerError(
+ '%s attempting to start entry with invalid status %s: '
+ '%s' % (class_name, entry.status, entry))
+ invalid_host_status = (
+ allowed_host_statuses is not None
+ and entry.host.status not in allowed_host_statuses)
+ if invalid_host_status:
+ raise host_scheduler.SchedulerError(
+ '%s attempting to start on queue entry with invalid '
+ 'host status %s: %s'
+ % (class_name, entry.host.status, entry))
+
+
+SiteAgentTask = utils.import_site_class(
+ __file__, 'autotest_lib.scheduler.site_monitor_db',
+ 'SiteAgentTask', BaseAgentTask)
+
+class AgentTask(SiteAgentTask):
+ pass
+
+
+class TaskWithJobKeyvals(object):
+ """AgentTask mixin providing functionality to help with job keyval files."""
+ _KEYVAL_FILE = 'keyval'
+ def _format_keyval(self, key, value):
+ return '%s=%s' % (key, value)
+
+
+ def _keyval_path(self):
+ """Subclasses must override this"""
+ raise NotImplementedError
+
+
+ def _write_keyval_after_job(self, field, value):
+ assert self.monitor
+ if not self.monitor.has_process():
+ return
+ _drone_manager.write_lines_to_file(
+ self._keyval_path(), [self._format_keyval(field, value)],
+ paired_with_process=self.monitor.get_process())
+
+
+ def _job_queued_keyval(self, job):
+ return 'job_queued', int(time.mktime(job.created_on.timetuple()))
+
+
+ def _write_job_finished(self):
+ self._write_keyval_after_job("job_finished", int(time.time()))
+
+
+ def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
+ keyval_contents = '\n'.join(self._format_keyval(key, value)
+ for key, value in keyval_dict.iteritems())
+ # always end with a newline to allow additional keyvals to be written
+ keyval_contents += '\n'
+ _drone_manager.attach_file_to_execution(self._working_directory(),
+ keyval_contents,
+ file_path=keyval_path)
+
+
+ def _write_keyvals_before_job(self, keyval_dict):
+ self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
+
+
+ def _write_host_keyvals(self, host):
+ keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
+ host.hostname)
+ platform, all_labels = host.platform_and_labels()
+ all_labels = [ urllib.quote(label) for label in all_labels ]
+ keyval_dict = dict(platform=platform, labels=','.join(all_labels))
+ self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
+
+
+class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
+ """
+ Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
+ """
+
+ TASK_TYPE = None
+ host = None
+ queue_entry = None
+
+ def __init__(self, task, extra_command_args):
+ super(SpecialAgentTask, self).__init__()
+
+ assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
+
+ self.host = scheduler_models.Host(id=task.host.id)
+ self.queue_entry = None
+ if task.queue_entry:
+ self.queue_entry = scheduler_models.HostQueueEntry(
+ id=task.queue_entry.id)
+
+ self.task = task
+ self._extra_command_args = extra_command_args
+
+
+ def _keyval_path(self):
+ return os.path.join(self._working_directory(), self._KEYVAL_FILE)
+
+
+ def _command_line(self):
+ return autoserv_utils._autoserv_command_line(self.host.hostname,
+ self._extra_command_args,
+ queue_entry=self.queue_entry)
+
+
+ def _working_directory(self):
+ return self.task.execution_path()
+
+
+ @property
+ def owner_username(self):
+ if self.task.requested_by:
+ return self.task.requested_by.login
+ return None
+
+
+ def prolog(self):
+ super(SpecialAgentTask, self).prolog()
+ self.task.activate()
+ self._write_host_keyvals(self.host)
+
+
+ def _fail_queue_entry(self):
+ assert self.queue_entry
+
+ if self.queue_entry.meta_host:
+ return # don't fail metahost entries, they'll be reassigned
+
+ self.queue_entry.update_from_database()
+ if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
+ return # entry has been aborted
+
+ self._actually_fail_queue_entry()
+
+
+ # TODO(milleral): http://crbug.com/268607
+ # All this used to be a part of _fail_queue_entry. The
+ # exact semantics of when one should and should not be failing a queue
+ # entry need to be worked out, because provisioning has placed us in a
+ # case where we want to fail a queue entry that could be requeued,
+ # which makes us fail the two above if statements, and thus
+ # _fail_queue_entry() would exit early and have no effect.
+ # What's left here with _actually_fail_queue_entry is a hack to be able to
+ # bypass the checks and unconditionally execute the code.
+ def _actually_fail_queue_entry(self):
+ self.queue_entry.set_execution_subdir()
+ queued_key, queued_time = self._job_queued_keyval(
+ self.queue_entry.job)
+ self._write_keyval_after_job(queued_key, queued_time)
+ self._write_job_finished()
+
+ # copy results logs into the normal place for job results
+ self.monitor.try_copy_results_on_drone(
+ source_path=self._working_directory() + '/',
+ destination_path=self.queue_entry.execution_path() + '/')
+
+ pidfile_id = _drone_manager.get_pidfile_id_from(
+ self.queue_entry.execution_path(),
+ pidfile_name=drone_manager.AUTOSERV_PID_FILE)
+ _drone_manager.register_pidfile(pidfile_id)
+
+ if self.queue_entry.job.parse_failed_repair:
+ self._parse_results([self.queue_entry])
+ else:
+ self._archive_results([self.queue_entry])
+
+ # Also fail all other special tasks that have not yet run for this HQE
+ pending_tasks = models.SpecialTask.objects.filter(
+ queue_entry__id=self.queue_entry.id,
+ is_complete=0)
+ for task in pending_tasks:
+ task.finish(False)
+
+
+ def cleanup(self):
+ super(SpecialAgentTask, self).cleanup()
+
+ # We will consider an aborted task to be "Failed"
+ self.task.finish(bool(self.success))
+
+ if self.monitor:
+ if self.monitor.has_process():
+ self._copy_results([self.task])
+ if self.monitor.pidfile_id is not None:
+ _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
+
+
+ def remove_special_tasks(self, special_task_to_remove, keep_last_one=False):
+ """Remove a type of special task in all tasks, keep last one if needed.
+
+ @param special_task_to_remove: type of special task to be removed, e.g.,
+ models.SpecialTask.Task.VERIFY.
+ @param keep_last_one: True to keep the last special task if its type is
+ the same as of special_task_to_remove.
+
+ """
+ queued_special_tasks = models.SpecialTask.objects.filter(
+ host__id=self.host.id,
+ task=special_task_to_remove,
+ is_active=False, is_complete=False, queue_entry=None)
+ if keep_last_one:
+ queued_special_tasks = queued_special_tasks.exclude(id=self.task.id)
+ queued_special_tasks.delete()
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index 22f087b..29dc2c1 100755
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -5,9 +5,8 @@
Autotest scheduler
"""
-
import datetime, optparse, os, signal
-import sys, time, traceback, urllib
+import sys, time
import logging, gc
import common
@@ -16,24 +15,22 @@
import django.db
from autotest_lib.client.common_lib import global_config, logging_manager
-from autotest_lib.client.common_lib import host_protections, utils
+from autotest_lib.client.common_lib import utils
from autotest_lib.database import database_connection
-from autotest_lib.frontend.afe import model_attributes
from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
-from autotest_lib.scheduler import drone_manager, drones, email_manager
-from autotest_lib.scheduler import gc_stats, host_scheduler, monitor_db_cleanup
-from autotest_lib.scheduler import scheduler_logging_config
+from autotest_lib.scheduler import agent_task, drone_manager, drones
+from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
+from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
+from autotest_lib.scheduler import postjob_task, scheduler_logging_config
from autotest_lib.scheduler import scheduler_models
from autotest_lib.scheduler import status_server, scheduler_config
from autotest_lib.server import autoserv_utils
-from autotest_lib.server.cros import provision
from autotest_lib.site_utils.graphite import stats
BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
PID_FILE_PREFIX = 'monitor_db'
RESULTS_DIR = '.'
-AUTOSERV_NICE_LEVEL = 10
DB_CONFIG_SECTION = 'AUTOTEST_WEB'
AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
@@ -54,8 +51,10 @@
_db = None
_shutdown = False
-_autoserv_directory = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server')
-_autoserv_path = os.path.join(_autoserv_directory, 'autoserv')
+
+# These 2 globals are replaced for testing
+_autoserv_directory = autoserv_utils.autoserv_directory
+_autoserv_path = autoserv_utils.autoserv_path
_testing_mode = False
_drone_manager = None
@@ -67,13 +66,6 @@
_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
-def _get_pidfile_timeout_secs():
- """@returns How long to wait for autoserv to write pidfile."""
- pidfile_timeout_mins = global_config.global_config.get_config_value(
- scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
- return pidfile_timeout_mins * 60
-
-
def _site_init_monitor_db_dummy():
return {}
@@ -502,11 +494,11 @@
return HostlessQueueTask(queue_entry=queue_entry)
return QueueTask(queue_entries=task_entries)
if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
- return GatherLogsTask(queue_entries=task_entries)
+ return postjob_task.GatherLogsTask(queue_entries=task_entries)
if queue_entry.status == models.HostQueueEntry.Status.PARSING:
- return FinalReparseTask(queue_entries=task_entries)
+ return postjob_task.FinalReparseTask(queue_entries=task_entries)
if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
- return ArchiveResultsTask(queue_entries=task_entries)
+ return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
raise host_scheduler.SchedulerError(
'_get_agent_task_for_queue_entry got entry with '
@@ -552,8 +544,12 @@
"""
self._assert_host_has_no_agent(special_task)
- special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask,
- ResetTask, ProvisionTask)
+ special_agent_task_classes = (prejob_task.CleanupTask,
+ prejob_task.VerifyTask,
+ prejob_task.RepairTask,
+ prejob_task.ResetTask,
+ prejob_task.ProvisionTask)
+
for agent_task_class in special_agent_task_classes:
if agent_task_class.TASK_TYPE == special_task.task:
return agent_task_class(task=special_task)
@@ -943,7 +939,7 @@
# through the host and through the hqe. A special task
# always needs a host, but doesn't always need a hqe.
for agent in self._host_agents.get(task.host.id, []):
- if isinstance(agent.task, SpecialAgentTask):
+ if isinstance(agent.task, agent_task.SpecialAgentTask):
# The epilog preforms critical actions such as
# queueing the next SpecialTask, requeuing the
@@ -1093,182 +1089,6 @@
pass
-class PidfileRunMonitor(object):
- """
- Client must call either run() to start a new process or
- attach_to_existing_process().
- """
-
- class _PidfileException(Exception):
- """
- Raised when there's some unexpected behavior with the pid file, but only
- used internally (never allowed to escape this class).
- """
-
-
- def __init__(self):
- self.lost_process = False
- self._start_time = None
- self.pidfile_id = None
- self._state = drone_manager.PidfileContents()
-
-
- def _add_nice_command(self, command, nice_level):
- if not nice_level:
- return command
- return ['nice', '-n', str(nice_level)] + command
-
-
- def _set_start_time(self):
- self._start_time = time.time()
-
-
- def run(self, command, working_directory, num_processes, nice_level=None,
- log_file=None, pidfile_name=None, paired_with_pidfile=None,
- username=None, drone_hostnames_allowed=None):
- assert command is not None
- if nice_level is not None:
- command = ['nice', '-n', str(nice_level)] + command
- self._set_start_time()
- self.pidfile_id = _drone_manager.execute_command(
- command, working_directory, pidfile_name=pidfile_name,
- num_processes=num_processes, log_file=log_file,
- paired_with_pidfile=paired_with_pidfile, username=username,
- drone_hostnames_allowed=drone_hostnames_allowed)
-
-
- def attach_to_existing_process(self, execution_path,
- pidfile_name=drone_manager.AUTOSERV_PID_FILE,
- num_processes=None):
- self._set_start_time()
- self.pidfile_id = _drone_manager.get_pidfile_id_from(
- execution_path, pidfile_name=pidfile_name)
- if num_processes is not None:
- _drone_manager.declare_process_count(self.pidfile_id, num_processes)
-
-
- def kill(self):
- if self.has_process():
- _drone_manager.kill_process(self.get_process())
-
-
- def has_process(self):
- self._get_pidfile_info()
- return self._state.process is not None
-
-
- def get_process(self):
- self._get_pidfile_info()
- assert self._state.process is not None
- return self._state.process
-
-
- def _read_pidfile(self, use_second_read=False):
- assert self.pidfile_id is not None, (
- 'You must call run() or attach_to_existing_process()')
- contents = _drone_manager.get_pidfile_contents(
- self.pidfile_id, use_second_read=use_second_read)
- if contents.is_invalid():
- self._state = drone_manager.PidfileContents()
- raise self._PidfileException(contents)
- self._state = contents
-
-
- def _handle_pidfile_error(self, error, message=''):
- message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
- self._state.process, self.pidfile_id, message)
- email_manager.manager.enqueue_notify_email(error, message)
- self.on_lost_process(self._state.process)
-
-
- def _get_pidfile_info_helper(self):
- if self.lost_process:
- return
-
- self._read_pidfile()
-
- if self._state.process is None:
- self._handle_no_process()
- return
-
- if self._state.exit_status is None:
- # double check whether or not autoserv is running
- if _drone_manager.is_process_running(self._state.process):
- return
-
- # pid but no running process - maybe process *just* exited
- self._read_pidfile(use_second_read=True)
- if self._state.exit_status is None:
- # autoserv exited without writing an exit code
- # to the pidfile
- self._handle_pidfile_error(
- 'autoserv died without writing exit code')
-
-
- def _get_pidfile_info(self):
- """\
- After completion, self._state will contain:
- pid=None, exit_status=None if autoserv has not yet run
- pid!=None, exit_status=None if autoserv is running
- pid!=None, exit_status!=None if autoserv has completed
- """
- try:
- self._get_pidfile_info_helper()
- except self._PidfileException, exc:
- self._handle_pidfile_error('Pidfile error', traceback.format_exc())
-
-
- def _handle_no_process(self):
- """\
- Called when no pidfile is found or no pid is in the pidfile.
- """
- message = 'No pid found at %s' % self.pidfile_id
- if time.time() - self._start_time > _get_pidfile_timeout_secs():
- email_manager.manager.enqueue_notify_email(
- 'Process has failed to write pidfile', message)
- self.on_lost_process()
-
-
- def on_lost_process(self, process=None):
- """\
- Called when autoserv has exited without writing an exit status,
- or we've timed out waiting for autoserv to write a pid to the
- pidfile. In either case, we just return failure and the caller
- should signal some kind of warning.
-
- process is unimportant here, as it shouldn't be used by anyone.
- """
- self.lost_process = True
- self._state.process = process
- self._state.exit_status = 1
- self._state.num_tests_failed = 0
-
-
- def exit_code(self):
- self._get_pidfile_info()
- return self._state.exit_status
-
-
- def num_tests_failed(self):
- """@returns The number of tests that failed or -1 if unknown."""
- self._get_pidfile_info()
- if self._state.num_tests_failed is None:
- return -1
- return self._state.num_tests_failed
-
-
- def try_copy_results_on_drone(self, **kwargs):
- if self.has_process():
- # copy results logs into the normal place for job results
- _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
-
-
- def try_copy_to_results_repository(self, source, **kwargs):
- if self.has_process():
- _drone_manager.copy_to_results_repository(self.get_process(),
- source, **kwargs)
-
-
class Agent(object):
"""
An agent for use by the Dispatcher class to perform a task. An agent wraps
@@ -1326,837 +1146,7 @@
self.finished = True
-class BaseAgentTask(object):
- class _NullMonitor(object):
- pidfile_id = None
-
- def has_process(self):
- return True
-
-
- def __init__(self, log_file_name=None):
- """
- @param log_file_name: (optional) name of file to log command output to
- """
- self.done = False
- self.started = False
- self.success = None
- self.aborted = False
- self.monitor = None
- self.queue_entry_ids = []
- self.host_ids = []
- self._log_file_name = log_file_name
-
-
- def _set_ids(self, host=None, queue_entries=None):
- if queue_entries and queue_entries != [None]:
- self.host_ids = [entry.host.id for entry in queue_entries]
- self.queue_entry_ids = [entry.id for entry in queue_entries]
- else:
- assert host
- self.host_ids = [host.id]
-
-
- def poll(self):
- if not self.started:
- self.start()
- if not self.done:
- self.tick()
-
-
- def tick(self):
- assert self.monitor
- exit_code = self.monitor.exit_code()
- if exit_code is None:
- return
-
- success = (exit_code == 0)
- self.finished(success)
-
-
- def is_done(self):
- return self.done
-
-
- def finished(self, success):
- if self.done:
- assert self.started
- return
- self.started = True
- self.done = True
- self.success = success
- self.epilog()
-
-
- def prolog(self):
- """
- To be overridden.
- """
- assert not self.monitor
- self.register_necessary_pidfiles()
-
-
- def _log_file(self):
- if not self._log_file_name:
- return None
- return os.path.join(self._working_directory(), self._log_file_name)
-
-
- def cleanup(self):
- log_file = self._log_file()
- if self.monitor and log_file:
- self.monitor.try_copy_to_results_repository(log_file)
-
-
- def epilog(self):
- """
- To be overridden.
- """
- self.cleanup()
- logging.info("%s finished with success=%s", type(self).__name__,
- self.success)
-
-
- def start(self):
- if not self.started:
- self.prolog()
- self.run()
-
- self.started = True
-
-
- def abort(self):
- if self.monitor:
- self.monitor.kill()
- self.done = True
- self.aborted = True
- self.cleanup()
-
-
- def _get_consistent_execution_path(self, execution_entries):
- first_execution_path = execution_entries[0].execution_path()
- for execution_entry in execution_entries[1:]:
- assert execution_entry.execution_path() == first_execution_path, (
- '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
- execution_entry,
- first_execution_path,
- execution_entries[0]))
- return first_execution_path
-
-
- def _copy_results(self, execution_entries, use_monitor=None):
- """
- @param execution_entries: list of objects with execution_path() method
- """
- if use_monitor is not None and not use_monitor.has_process():
- return
-
- assert len(execution_entries) > 0
- if use_monitor is None:
- assert self.monitor
- use_monitor = self.monitor
- assert use_monitor.has_process()
- execution_path = self._get_consistent_execution_path(execution_entries)
- results_path = execution_path + '/'
- use_monitor.try_copy_to_results_repository(results_path)
-
-
- def _parse_results(self, queue_entries):
- for queue_entry in queue_entries:
- queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
-
-
- def _archive_results(self, queue_entries):
- for queue_entry in queue_entries:
- queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
-
-
- def _command_line(self):
- """
- Return the command line to run. Must be overridden.
- """
- raise NotImplementedError
-
-
- @property
- def num_processes(self):
- """
- Return the number of processes forked by this BaseAgentTask's process.
- It may only be approximate. To be overridden if necessary.
- """
- return 1
-
-
- def _paired_with_monitor(self):
- """
- If this BaseAgentTask's process must run on the same machine as some
- previous process, this method should be overridden to return a
- PidfileRunMonitor for that process.
- """
- return self._NullMonitor()
-
-
- @property
- def owner_username(self):
- """
- Return login of user responsible for this task. May be None. Must be
- overridden.
- """
- raise NotImplementedError
-
-
- def _working_directory(self):
- """
- Return the directory where this BaseAgentTask's process executes.
- Must be overridden.
- """
- raise NotImplementedError
-
-
- def _pidfile_name(self):
- """
- Return the name of the pidfile this BaseAgentTask's process uses. To be
- overridden if necessary.
- """
- return drone_manager.AUTOSERV_PID_FILE
-
-
- def _check_paired_results_exist(self):
- if not self._paired_with_monitor().has_process():
- email_manager.manager.enqueue_notify_email(
- 'No paired results in task',
- 'No paired results in task %s at %s'
- % (self, self._paired_with_monitor().pidfile_id))
- self.finished(False)
- return False
- return True
-
-
- def _create_monitor(self):
- assert not self.monitor
- self.monitor = PidfileRunMonitor()
-
-
- def run(self):
- if not self._check_paired_results_exist():
- return
-
- self._create_monitor()
- self.monitor.run(
- self._command_line(), self._working_directory(),
- num_processes=self.num_processes,
- nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
- pidfile_name=self._pidfile_name(),
- paired_with_pidfile=self._paired_with_monitor().pidfile_id,
- username=self.owner_username,
- drone_hostnames_allowed=self.get_drone_hostnames_allowed())
-
-
- def get_drone_hostnames_allowed(self):
- if not models.DroneSet.drone_sets_enabled():
- return None
-
- hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
- if not hqes:
- # Only special tasks could be missing host queue entries
- assert isinstance(self, SpecialAgentTask)
- return self._user_or_global_default_drone_set(
- self.task, self.task.requested_by)
-
- job_ids = hqes.values_list('job', flat=True).distinct()
- assert job_ids.count() == 1, ("BaseAgentTask's queue entries "
- "span multiple jobs")
-
- job = models.Job.objects.get(id=job_ids[0])
- drone_set = job.drone_set
- if not drone_set:
- return self._user_or_global_default_drone_set(job, job.user())
-
- return drone_set.get_drone_hostnames()
-
-
- def _user_or_global_default_drone_set(self, obj_with_owner, user):
- """
- Returns the user's default drone set, if present.
-
- Otherwise, returns the global default drone set.
- """
- default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
- if not user:
- logging.warn('%s had no owner; using default drone set',
- obj_with_owner)
- return default_hostnames
- if not user.drone_set:
- logging.warn('User %s has no default drone set, using global '
- 'default', user.login)
- return default_hostnames
- return user.drone_set.get_drone_hostnames()
-
-
- def register_necessary_pidfiles(self):
- pidfile_id = _drone_manager.get_pidfile_id_from(
- self._working_directory(), self._pidfile_name())
- _drone_manager.register_pidfile(pidfile_id)
-
- paired_pidfile_id = self._paired_with_monitor().pidfile_id
- if paired_pidfile_id:
- _drone_manager.register_pidfile(paired_pidfile_id)
-
-
- def recover(self):
- if not self._check_paired_results_exist():
- return
-
- self._create_monitor()
- self.monitor.attach_to_existing_process(
- self._working_directory(), pidfile_name=self._pidfile_name(),
- num_processes=self.num_processes)
- if not self.monitor.has_process():
- # no process to recover; wait to be started normally
- self.monitor = None
- return
-
- self.started = True
- logging.info('Recovering process %s for %s at %s',
- self.monitor.get_process(), type(self).__name__,
- self._working_directory())
-
-
- def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
- allowed_host_statuses=None):
- class_name = self.__class__.__name__
- for entry in queue_entries:
- if entry.status not in allowed_hqe_statuses:
- raise host_scheduler.SchedulerError(
- '%s attempting to start entry with invalid status %s: '
- '%s' % (class_name, entry.status, entry))
- invalid_host_status = (
- allowed_host_statuses is not None
- and entry.host.status not in allowed_host_statuses)
- if invalid_host_status:
- raise host_scheduler.SchedulerError(
- '%s attempting to start on queue entry with invalid '
- 'host status %s: %s'
- % (class_name, entry.host.status, entry))
-
-
-SiteAgentTask = utils.import_site_class(
- __file__, 'autotest_lib.scheduler.site_monitor_db',
- 'SiteAgentTask', BaseAgentTask)
-
-class AgentTask(SiteAgentTask):
- pass
-
-
-class TaskWithJobKeyvals(object):
- """AgentTask mixin providing functionality to help with job keyval files."""
- _KEYVAL_FILE = 'keyval'
- def _format_keyval(self, key, value):
- return '%s=%s' % (key, value)
-
-
- def _keyval_path(self):
- """Subclasses must override this"""
- raise NotImplementedError
-
-
- def _write_keyval_after_job(self, field, value):
- assert self.monitor
- if not self.monitor.has_process():
- return
- _drone_manager.write_lines_to_file(
- self._keyval_path(), [self._format_keyval(field, value)],
- paired_with_process=self.monitor.get_process())
-
-
- def _job_queued_keyval(self, job):
- return 'job_queued', int(time.mktime(job.created_on.timetuple()))
-
-
- def _write_job_finished(self):
- self._write_keyval_after_job("job_finished", int(time.time()))
-
-
- def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
- keyval_contents = '\n'.join(self._format_keyval(key, value)
- for key, value in keyval_dict.iteritems())
- # always end with a newline to allow additional keyvals to be written
- keyval_contents += '\n'
- _drone_manager.attach_file_to_execution(self._working_directory(),
- keyval_contents,
- file_path=keyval_path)
-
-
- def _write_keyvals_before_job(self, keyval_dict):
- self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
-
-
- def _write_host_keyvals(self, host):
- keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
- host.hostname)
- platform, all_labels = host.platform_and_labels()
- all_labels = [ urllib.quote(label) for label in all_labels ]
- keyval_dict = dict(platform=platform, labels=','.join(all_labels))
- self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
-
-
-class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
- """
- Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
- """
-
- TASK_TYPE = None
- host = None
- queue_entry = None
-
- def __init__(self, task, extra_command_args):
- super(SpecialAgentTask, self).__init__()
-
- assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
-
- self.host = scheduler_models.Host(id=task.host.id)
- self.queue_entry = None
- if task.queue_entry:
- self.queue_entry = scheduler_models.HostQueueEntry(
- id=task.queue_entry.id)
-
- self.task = task
- self._extra_command_args = extra_command_args
-
-
- def _keyval_path(self):
- return os.path.join(self._working_directory(), self._KEYVAL_FILE)
-
-
- def _command_line(self):
- return _autoserv_command_line(self.host.hostname,
- self._extra_command_args,
- queue_entry=self.queue_entry)
-
-
- def _working_directory(self):
- return self.task.execution_path()
-
-
- @property
- def owner_username(self):
- if self.task.requested_by:
- return self.task.requested_by.login
- return None
-
-
- def prolog(self):
- super(SpecialAgentTask, self).prolog()
- self.task.activate()
- self._write_host_keyvals(self.host)
-
-
- def _fail_queue_entry(self):
- assert self.queue_entry
-
- if self.queue_entry.meta_host:
- return # don't fail metahost entries, they'll be reassigned
-
- self.queue_entry.update_from_database()
- if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
- return # entry has been aborted
-
- self._actually_fail_queue_entry()
-
-
- # TODO(milleral): http://crbug.com/268607
- # All this used to be a part of _fail_queue_entry. The
- # exact semantics of when one should and should not be failing a queue
- # entry need to be worked out, because provisioning has placed us in a
- # case where we want to fail a queue entry that could be requeued,
- # which makes us fail the two above if statements, and thus
- # _fail_queue_entry() would exit early and have no effect.
- # What's left here with _actually_fail_queue_entry is a hack to be able to
- # bypass the checks and unconditionally execute the code.
- def _actually_fail_queue_entry(self):
- self.queue_entry.set_execution_subdir()
- queued_key, queued_time = self._job_queued_keyval(
- self.queue_entry.job)
- self._write_keyval_after_job(queued_key, queued_time)
- self._write_job_finished()
-
- # copy results logs into the normal place for job results
- self.monitor.try_copy_results_on_drone(
- source_path=self._working_directory() + '/',
- destination_path=self.queue_entry.execution_path() + '/')
-
- pidfile_id = _drone_manager.get_pidfile_id_from(
- self.queue_entry.execution_path(),
- pidfile_name=drone_manager.AUTOSERV_PID_FILE)
- _drone_manager.register_pidfile(pidfile_id)
-
- if self.queue_entry.job.parse_failed_repair:
- self._parse_results([self.queue_entry])
- else:
- self._archive_results([self.queue_entry])
-
- # Also fail all other special tasks that have not yet run for this HQE
- pending_tasks = models.SpecialTask.objects.filter(
- queue_entry__id=self.queue_entry.id,
- is_complete=0)
- for task in pending_tasks:
- task.finish(False)
-
-
- def cleanup(self):
- super(SpecialAgentTask, self).cleanup()
-
- # We will consider an aborted task to be "Failed"
- self.task.finish(bool(self.success))
-
- if self.monitor:
- if self.monitor.has_process():
- self._copy_results([self.task])
- if self.monitor.pidfile_id is not None:
- _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
-
-
- def remove_special_tasks(self, special_task_to_remove, keep_last_one=False):
- """Remove a type of special task in all tasks, keep last one if needed.
-
- @param special_task_to_remove: type of special task to be removed, e.g.,
- models.SpecialTask.Task.VERIFY.
- @param keep_last_one: True to keep the last special task if its type is
- the same as of special_task_to_remove.
-
- """
- queued_special_tasks = models.SpecialTask.objects.filter(
- host__id=self.host.id,
- task=special_task_to_remove,
- is_active=False, is_complete=False, queue_entry=None)
- if keep_last_one:
- queued_special_tasks = queued_special_tasks.exclude(id=self.task.id)
- queued_special_tasks.delete()
-
-
-class RepairTask(SpecialAgentTask):
- TASK_TYPE = models.SpecialTask.Task.REPAIR
-
-
- def __init__(self, task):
- """\
- queue_entry: queue entry to mark failed if this repair fails.
- """
- protection = host_protections.Protection.get_string(
- task.host.protection)
- # normalize the protection name
- protection = host_protections.Protection.get_attr_name(protection)
-
- super(RepairTask, self).__init__(
- task, ['-R', '--host-protection', protection])
-
- # *don't* include the queue entry in IDs -- if the queue entry is
- # aborted, we want to leave the repair task running
- self._set_ids(host=self.host)
-
-
- def prolog(self):
- super(RepairTask, self).prolog()
- logging.info("repair_task starting")
- self.host.set_status(models.Host.Status.REPAIRING)
-
-
- def epilog(self):
- super(RepairTask, self).epilog()
-
- if self.success:
- self.host.set_status(models.Host.Status.READY)
- else:
- self.host.set_status(models.Host.Status.REPAIR_FAILED)
- if self.queue_entry:
- self._fail_queue_entry()
-
-
-class PreJobTask(SpecialAgentTask):
- def _copy_to_results_repository(self):
- if not self.queue_entry or self.queue_entry.meta_host:
- return
-
- self.queue_entry.set_execution_subdir()
- log_name = os.path.basename(self.task.execution_path())
- source = os.path.join(self.task.execution_path(), 'debug',
- 'autoserv.DEBUG')
- destination = os.path.join(
- self.queue_entry.execution_path(), log_name)
-
- self.monitor.try_copy_to_results_repository(
- source, destination_path=destination)
-
-
- def epilog(self):
- super(PreJobTask, self).epilog()
-
- if self.success:
- return
-
- if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
- # effectively ignore failure for these hosts
- self.success = True
- return
-
- if self.queue_entry:
- # If we requeue a HQE, we should cancel any remaining pre-job
- # tasks against this host, otherwise we'll be left in a state
- # where a queued HQE has special tasks to run against a host.
- models.SpecialTask.objects.filter(
- queue_entry__id=self.queue_entry.id,
- host__id=self.host.id,
- is_complete=0).update(is_complete=1, success=0)
-
- previous_provisions = models.SpecialTask.objects.filter(
- task=models.SpecialTask.Task.PROVISION,
- queue_entry_id=self.queue_entry.id).count()
- if (previous_provisions >
- scheduler_config.config.max_provision_retries):
- self._actually_fail_queue_entry()
- # This abort will mark the aborted bit on the HQE itself, to
- # signify that we're killing it. Technically it also will do
- # the recursive aborting of all child jobs, but that shouldn't
- # matter here, as only suites have children, and those are
- # hostless and thus don't have provisioning.
- # TODO(milleral) http://crbug.com/188217
- # However, we can't actually do this yet, as if we set the
- # abort bit the FinalReparseTask will set the status of the HQE
- # to ABORTED, which then means that we don't show the status in
- # run_suite. So in the meantime, don't mark the HQE as
- # aborted.
- # queue_entry.abort()
- else:
- # requeue() must come after handling provision retries, since
- # _actually_fail_queue_entry needs an execution subdir.
- # We also don't want to requeue if we hit the provision retry
- # limit, since then we overwrite the PARSING state of the HQE.
- self.queue_entry.requeue()
-
- previous_repairs = models.SpecialTask.objects.filter(
- task=models.SpecialTask.Task.REPAIR,
- queue_entry_id=self.queue_entry.id).count()
- if previous_repairs >= scheduler_config.config.max_repair_limit:
- self.host.set_status(models.Host.Status.REPAIR_FAILED)
- self._fail_queue_entry()
- return
-
- queue_entry = models.HostQueueEntry.objects.get(
- id=self.queue_entry.id)
- else:
- queue_entry = None
-
- models.SpecialTask.objects.create(
- host=models.Host.objects.get(id=self.host.id),
- task=models.SpecialTask.Task.REPAIR,
- queue_entry=queue_entry,
- requested_by=self.task.requested_by)
-
-
- def _should_pending(self):
- """
- Decide if we should call the host queue entry's on_pending method.
- We should if:
- 1) There exists an associated host queue entry.
- 2) The current special task completed successfully.
- 3) There do not exist any more special tasks to be run before the
- host queue entry starts.
-
- @returns: True if we should call pending, false if not.
-
- """
- if not self.queue_entry or not self.success:
- return False
-
- # We know if this is the last one when we create it, so we could add
- # another column to the database to keep track of this information, but
- # I expect the overhead of querying here to be minimal.
- queue_entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
- queued = models.SpecialTask.objects.filter(
- host__id=self.host.id, is_active=False,
- is_complete=False, queue_entry=queue_entry)
- queued = queued.exclude(id=self.task.id)
- return queued.count() == 0
-
-
-class VerifyTask(PreJobTask):
- TASK_TYPE = models.SpecialTask.Task.VERIFY
-
-
- def __init__(self, task):
- super(VerifyTask, self).__init__(task, ['-v'])
- self._set_ids(host=self.host, queue_entries=[self.queue_entry])
-
-
- def prolog(self):
- super(VerifyTask, self).prolog()
-
- logging.info("starting verify on %s", self.host.hostname)
- if self.queue_entry:
- self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
- self.host.set_status(models.Host.Status.VERIFYING)
-
- # Delete any queued manual reverifies for this host. One verify will do
- # and there's no need to keep records of other requests.
- self.remove_special_tasks(models.SpecialTask.Task.VERIFY,
- keep_last_one=True)
-
-
- def epilog(self):
- super(VerifyTask, self).epilog()
- if self.success:
- if self._should_pending():
- self.queue_entry.on_pending()
- else:
- self.host.set_status(models.Host.Status.READY)
-
-
-class CleanupTask(PreJobTask):
- # note this can also run post-job, but when it does, it's running standalone
- # against the host (not related to the job), so it's not considered a
- # PostJobTask
-
- TASK_TYPE = models.SpecialTask.Task.CLEANUP
-
-
- def __init__(self, task, recover_run_monitor=None):
- super(CleanupTask, self).__init__(task, ['--cleanup'])
- self._set_ids(host=self.host, queue_entries=[self.queue_entry])
-
-
- def prolog(self):
- super(CleanupTask, self).prolog()
- logging.info("starting cleanup task for host: %s", self.host.hostname)
- self.host.set_status(models.Host.Status.CLEANING)
- if self.queue_entry:
- self.queue_entry.set_status(models.HostQueueEntry.Status.CLEANING)
-
-
- def _finish_epilog(self):
- if not self.queue_entry or not self.success:
- return
-
- do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
- should_run_verify = (
- self.queue_entry.job.run_verify
- and self.host.protection != do_not_verify_protection)
- if should_run_verify:
- entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
- models.SpecialTask.objects.create(
- host=models.Host.objects.get(id=self.host.id),
- queue_entry=entry,
- task=models.SpecialTask.Task.VERIFY)
- else:
- if self._should_pending():
- self.queue_entry.on_pending()
-
-
- def epilog(self):
- super(CleanupTask, self).epilog()
-
- if self.success:
- self.host.update_field('dirty', 0)
- self.host.set_status(models.Host.Status.READY)
-
- self._finish_epilog()
-
-
-class ResetTask(PreJobTask):
- """Task to reset a DUT, including cleanup and verify."""
- # note this can also run post-job, but when it does, it's running standalone
- # against the host (not related to the job), so it's not considered a
- # PostJobTask
-
- TASK_TYPE = models.SpecialTask.Task.RESET
-
-
- def __init__(self, task, recover_run_monitor=None):
- super(ResetTask, self).__init__(task, ['--reset'])
- self._set_ids(host=self.host, queue_entries=[self.queue_entry])
-
-
- def prolog(self):
- super(ResetTask, self).prolog()
- logging.info('starting reset task for host: %s',
- self.host.hostname)
- self.host.set_status(models.Host.Status.RESETTING)
- if self.queue_entry:
- self.queue_entry.set_status(models.HostQueueEntry.Status.RESETTING)
-
- # Delete any queued cleanups for this host.
- self.remove_special_tasks(models.SpecialTask.Task.CLEANUP,
- keep_last_one=False)
-
- # Delete any queued reverifies for this host.
- self.remove_special_tasks(models.SpecialTask.Task.VERIFY,
- keep_last_one=False)
-
- # Only one reset is needed.
- self.remove_special_tasks(models.SpecialTask.Task.RESET,
- keep_last_one=True)
-
-
- def epilog(self):
- super(ResetTask, self).epilog()
-
- if self.success:
- self.host.update_field('dirty', 0)
-
- if self._should_pending():
- self.queue_entry.on_pending()
- else:
- self.host.set_status(models.Host.Status.READY)
-
-
-class ProvisionTask(PreJobTask):
- TASK_TYPE = models.SpecialTask.Task.PROVISION
-
- def __init__(self, task):
- # Provisioning requires that we be associated with a job/queue entry
- assert task.queue_entry, "No HQE associated with provision task!"
- # task.queue_entry is an afe model HostQueueEntry object.
- # self.queue_entry is a scheduler models HostQueueEntry object, but
- # it gets constructed and assigned in __init__, so it's not available
- # yet. Therefore, we're stuck pulling labels off of the afe model
- # so that we can pass the --provision args into the __init__ call.
- labels = {x.name for x in task.queue_entry.job.dependency_labels.all()}
- _, provisionable = provision.filter_labels(labels)
- extra_command_args = ['--provision', ','.join(provisionable)]
- super(ProvisionTask, self).__init__(task, extra_command_args)
- self._set_ids(host=self.host, queue_entries=[self.queue_entry])
-
-
- def _command_line(self):
- # If we give queue_entry to _autoserv_command_line, then it will append
- # -c for this invocation if the queue_entry is a client side test. We
- # don't want that, as it messes with provisioning, so we just drop it
- # from the arguments here.
- # Note that we also don't verify job_repo_url as provisioining tasks are
- # required to stage whatever content we need, and the job itself will
- # force autotest to be staged if it isn't already.
- return _autoserv_command_line(self.host.hostname,
- self._extra_command_args)
-
-
- def prolog(self):
- super(ProvisionTask, self).prolog()
- # add check for previous provision task and abort if exist.
- logging.info("starting provision task for host: %s", self.host.hostname)
- self.queue_entry.set_status(
- models.HostQueueEntry.Status.PROVISIONING)
- self.host.set_status(models.Host.Status.PROVISIONING)
-
-
- def epilog(self):
- super(ProvisionTask, self).epilog()
-
- if self._should_pending():
- self.queue_entry.on_pending()
- else:
- self.host.set_status(models.Host.Status.READY)
-
-
-class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
+class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
"""
Common functionality for QueueTask and HostlessQueueTask
"""
@@ -2341,368 +1331,5 @@
self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
-class PostJobTask(AgentTask):
- def __init__(self, queue_entries, log_file_name):
- super(PostJobTask, self).__init__(log_file_name=log_file_name)
-
- self.queue_entries = queue_entries
-
- self._autoserv_monitor = PidfileRunMonitor()
- self._autoserv_monitor.attach_to_existing_process(
- self._working_directory())
-
-
- def _command_line(self):
- if _testing_mode:
- return 'true'
- return self._generate_command(
- _drone_manager.absolute_path(self._working_directory()))
-
-
- def _generate_command(self, results_dir):
- raise NotImplementedError('Subclasses must override this')
-
-
- @property
- def owner_username(self):
- return self.queue_entries[0].job.owner
-
-
- def _working_directory(self):
- return self._get_consistent_execution_path(self.queue_entries)
-
-
- def _paired_with_monitor(self):
- return self._autoserv_monitor
-
-
- def _job_was_aborted(self):
- was_aborted = None
- for queue_entry in self.queue_entries:
- queue_entry.update_from_database()
- if was_aborted is None: # first queue entry
- was_aborted = bool(queue_entry.aborted)
- elif was_aborted != bool(queue_entry.aborted): # subsequent entries
- entries = ['%s (aborted: %s)' % (entry, entry.aborted)
- for entry in self.queue_entries]
- email_manager.manager.enqueue_notify_email(
- 'Inconsistent abort state',
- 'Queue entries have inconsistent abort state:\n' +
- '\n'.join(entries))
- # don't crash here, just assume true
- return True
- return was_aborted
-
-
- def _final_status(self):
- if self._job_was_aborted():
- return models.HostQueueEntry.Status.ABORTED
-
- # we'll use a PidfileRunMonitor to read the autoserv exit status
- if self._autoserv_monitor.exit_code() == 0:
- return models.HostQueueEntry.Status.COMPLETED
- return models.HostQueueEntry.Status.FAILED
-
-
- def _set_all_statuses(self, status):
- for queue_entry in self.queue_entries:
- queue_entry.set_status(status)
-
-
- def abort(self):
- # override AgentTask.abort() to avoid killing the process and ending
- # the task. post-job tasks continue when the job is aborted.
- pass
-
-
- def _pidfile_label(self):
- # '.autoserv_execute' -> 'autoserv'
- return self._pidfile_name()[1:-len('_execute')]
-
-
-class GatherLogsTask(PostJobTask):
- """
- Task responsible for
- * gathering uncollected logs (if Autoserv crashed hard or was killed)
- * copying logs to the results repository
- * spawning CleanupTasks for hosts, if necessary
- * spawning a FinalReparseTask for the job
- """
- def __init__(self, queue_entries, recover_run_monitor=None):
- self._job = queue_entries[0].job
- super(GatherLogsTask, self).__init__(
- queue_entries, log_file_name='.collect_crashinfo.log')
- self._set_ids(queue_entries=queue_entries)
-
-
- # TODO: Refactor into autoserv_utils. crbug.com/243090
- def _generate_command(self, results_dir):
- host_list = ','.join(queue_entry.host.hostname
- for queue_entry in self.queue_entries)
- return [_autoserv_path , '-p',
- '--pidfile-label=%s' % self._pidfile_label(),
- '--use-existing-results', '--collect-crashinfo',
- '-m', host_list, '-r', results_dir]
-
-
- @property
- def num_processes(self):
- return len(self.queue_entries)
-
-
- def _pidfile_name(self):
- return drone_manager.CRASHINFO_PID_FILE
-
-
- def prolog(self):
- self._check_queue_entry_statuses(
- self.queue_entries,
- allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
- allowed_host_statuses=(models.Host.Status.RUNNING,))
-
- super(GatherLogsTask, self).prolog()
-
-
- def epilog(self):
- super(GatherLogsTask, self).epilog()
- self._parse_results(self.queue_entries)
- self._reboot_hosts()
-
-
- def _reboot_hosts(self):
- if self._autoserv_monitor.has_process():
- final_success = (self._final_status() ==
- models.HostQueueEntry.Status.COMPLETED)
- num_tests_failed = self._autoserv_monitor.num_tests_failed()
- else:
- final_success = False
- num_tests_failed = 0
- reboot_after = self._job.reboot_after
- do_reboot = (
- # always reboot after aborted jobs
- self._final_status() == models.HostQueueEntry.Status.ABORTED
- or reboot_after == model_attributes.RebootAfter.ALWAYS
- or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
- and final_success and num_tests_failed == 0)
- or num_tests_failed > 0)
-
- for queue_entry in self.queue_entries:
- if do_reboot:
- # don't pass the queue entry to the CleanupTask. if the cleanup
- # fails, the job doesn't care -- it's over.
- models.SpecialTask.objects.create(
- host=models.Host.objects.get(id=queue_entry.host.id),
- task=models.SpecialTask.Task.CLEANUP,
- requested_by=self._job.owner_model())
- else:
- queue_entry.host.set_status(models.Host.Status.READY)
-
-
- def run(self):
- autoserv_exit_code = self._autoserv_monitor.exit_code()
- # only run if Autoserv exited due to some signal. if we have no exit
- # code, assume something bad (and signal-like) happened.
- if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
- super(GatherLogsTask, self).run()
- else:
- self.finished(True)
-
-
-class SelfThrottledPostJobTask(PostJobTask):
- """
- Special AgentTask subclass that maintains its own global process limit.
- """
- _num_running_processes = 0
- # Last known limit of max processes, used to check whether
- # max processes config has been changed.
- _last_known_max_processes = 0
- # Whether an email should be sent to notifiy process limit being hit.
- _notification_on = True
- # Once process limit is hit, an email will be sent.
- # To prevent spams, do not send another email until
- # it drops to lower than the following level.
- REVIVE_NOTIFICATION_THRESHOLD = 0.80
-
-
- @classmethod
- def _increment_running_processes(cls):
- cls._num_running_processes += 1
- stats.Gauge('scheduler').send('%s.num_running_processes' % cls.__name__,
- cls._num_running_processes)
-
-
- @classmethod
- def _decrement_running_processes(cls):
- cls._num_running_processes -= 1
- stats.Gauge('scheduler').send('%s.num_running_processes' % cls.__name__,
- cls._num_running_processes)
-
-
- @classmethod
- def _max_processes(cls):
- raise NotImplementedError
-
-
- @classmethod
- def _can_run_new_process(cls):
- return cls._num_running_processes < cls._max_processes()
-
-
- def _process_started(self):
- return bool(self.monitor)
-
-
- def tick(self):
- # override tick to keep trying to start until the process count goes
- # down and we can, at which point we revert to default behavior
- if self._process_started():
- super(SelfThrottledPostJobTask, self).tick()
- else:
- self._try_starting_process()
-
-
- def run(self):
- # override run() to not actually run unless we can
- self._try_starting_process()
-
-
- @classmethod
- def _notify_process_limit_hit(cls):
- """Send an email to notify that process limit is hit."""
- if cls._notification_on:
- subject = '%s: hitting max process limit.' % cls.__name__
- message = ('Running processes/Max processes: %d/%d'
- % (cls._num_running_processes, cls._max_processes()))
- email_manager.manager.enqueue_notify_email(subject, message)
- cls._notification_on = False
-
-
- @classmethod
- def _reset_notification_switch_if_necessary(cls):
- """Reset _notification_on if necessary.
-
- Set _notification_on to True on the following cases:
- 1) If the limit of max processes configuration changes;
- 2) If _notification_on is False and the number of running processes
- drops to lower than a level defined in REVIVE_NOTIFICATION_THRESHOLD.
-
- """
- if cls._last_known_max_processes != cls._max_processes():
- cls._notification_on = True
- cls._last_known_max_processes = cls._max_processes()
- return
- percentage = float(cls._num_running_processes) / cls._max_processes()
- if (not cls._notification_on and
- percentage < cls.REVIVE_NOTIFICATION_THRESHOLD):
- cls._notification_on = True
-
-
- def _try_starting_process(self):
- self._reset_notification_switch_if_necessary()
- if not self._can_run_new_process():
- self._notify_process_limit_hit()
- return
-
- # actually run the command
- super(SelfThrottledPostJobTask, self).run()
- if self._process_started():
- self._increment_running_processes()
-
-
- def finished(self, success):
- super(SelfThrottledPostJobTask, self).finished(success)
- if self._process_started():
- self._decrement_running_processes()
-
-
-class FinalReparseTask(SelfThrottledPostJobTask):
- def __init__(self, queue_entries):
- super(FinalReparseTask, self).__init__(queue_entries,
- log_file_name='.parse.log')
- # don't use _set_ids, since we don't want to set the host_ids
- self.queue_entry_ids = [entry.id for entry in queue_entries]
-
-
- def _generate_command(self, results_dir):
- return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
- results_dir]
-
-
- @property
- def num_processes(self):
- return 0 # don't include parser processes in accounting
-
-
- def _pidfile_name(self):
- return drone_manager.PARSER_PID_FILE
-
-
- @classmethod
- def _max_processes(cls):
- return scheduler_config.config.max_parse_processes
-
-
- def prolog(self):
- self._check_queue_entry_statuses(
- self.queue_entries,
- allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
-
- super(FinalReparseTask, self).prolog()
-
-
- def epilog(self):
- super(FinalReparseTask, self).epilog()
- self._archive_results(self.queue_entries)
-
-
-class ArchiveResultsTask(SelfThrottledPostJobTask):
- _ARCHIVING_FAILED_FILE = '.archiver_failed'
-
- def __init__(self, queue_entries):
- super(ArchiveResultsTask, self).__init__(queue_entries,
- log_file_name='.archiving.log')
- # don't use _set_ids, since we don't want to set the host_ids
- self.queue_entry_ids = [entry.id for entry in queue_entries]
-
-
- def _pidfile_name(self):
- return drone_manager.ARCHIVER_PID_FILE
-
-
- # TODO: Refactor into autoserv_utils. crbug.com/243090
- def _generate_command(self, results_dir):
- return [_autoserv_path , '-p',
- '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
- '--use-existing-results', '--control-filename=control.archive',
- os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
- 'archive_results.control.srv')]
-
-
- @classmethod
- def _max_processes(cls):
- return scheduler_config.config.max_transfer_processes
-
-
- def prolog(self):
- self._check_queue_entry_statuses(
- self.queue_entries,
- allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
-
- super(ArchiveResultsTask, self).prolog()
-
-
- def epilog(self):
- super(ArchiveResultsTask, self).epilog()
- if not self.success and self._paired_with_monitor().has_process():
- failed_file = os.path.join(self._working_directory(),
- self._ARCHIVING_FAILED_FILE)
- paired_process = self._paired_with_monitor().get_process()
- _drone_manager.write_lines_to_file(
- failed_file, ['Archiving failed with exit code %s'
- % self.monitor.exit_code()],
- paired_with_process=paired_process)
- self._set_all_statuses(self._final_status())
-
-
if __name__ == '__main__':
main()
diff --git a/scheduler/monitor_db_unittest.py b/scheduler/monitor_db_unittest.py
index 016f264..b4cfcda 100755
--- a/scheduler/monitor_db_unittest.py
+++ b/scheduler/monitor_db_unittest.py
@@ -9,7 +9,9 @@
from autotest_lib.client.common_lib.test_utils import unittest
from autotest_lib.database import database_connection
from autotest_lib.frontend.afe import models
+from autotest_lib.scheduler import agent_task
from autotest_lib.scheduler import monitor_db, drone_manager, email_manager
+from autotest_lib.scheduler import pidfile_monitor
from autotest_lib.scheduler import scheduler_config, gc_stats, host_scheduler
from autotest_lib.scheduler import monitor_db_functional_test
from autotest_lib.scheduler import scheduler_models
@@ -831,10 +833,10 @@
self.god = mock.mock_god()
self.mock_drone_manager = self.god.create_mock_class(
drone_manager.DroneManager, 'drone_manager')
- self.god.stub_with(monitor_db, '_drone_manager',
+ self.god.stub_with(pidfile_monitor, '_drone_manager',
self.mock_drone_manager)
self.god.stub_function(email_manager.manager, 'enqueue_notify_email')
- self.god.stub_with(monitor_db, '_get_pidfile_timeout_secs',
+ self.god.stub_with(pidfile_monitor, '_get_pidfile_timeout_secs',
self._mock_get_pidfile_timeout_secs)
self.pidfile_id = object()
@@ -844,7 +846,7 @@
pidfile_name=drone_manager.AUTOSERV_PID_FILE)
.and_return(self.pidfile_id))
- self.monitor = monitor_db.PidfileRunMonitor()
+ self.monitor = pidfile_monitor.PidfileRunMonitor()
self.monitor.attach_to_existing_process(self.execution_tag)
def tearDown(self):
@@ -931,7 +933,7 @@
self.mock_drone_manager.get_pidfile_contents.expect_call(
self.pidfile_id, use_second_read=False).and_return(
drone_manager.InvalidPidfile('error'))
- self.assertRaises(monitor_db.PidfileRunMonitor._PidfileException,
+ self.assertRaises(pidfile_monitor.PidfileRunMonitor._PidfileException,
self.monitor._read_pidfile)
self.god.check_playback()
@@ -995,7 +997,7 @@
email_manager.manager.enqueue_notify_email.expect_call(
mock.is_string_comparator(), mock.is_string_comparator())
self.monitor._start_time = (time.time() -
- monitor_db._get_pidfile_timeout_secs() - 1)
+ pidfile_monitor._get_pidfile_timeout_secs() - 1)
self._test_get_pidfile_info_helper(None, 1, 0)
self.assertTrue(self.monitor.lost_process)
@@ -1012,7 +1014,7 @@
def _create_mock_task(self, name):
- task = self.god.create_mock_class(monitor_db.AgentTask, name)
+ task = self.god.create_mock_class(agent_task.AgentTask, name)
task.num_processes = 1
_set_host_and_qe_ids(task)
return task
@@ -1414,7 +1416,7 @@
hqe_3 = job_3.hostqueueentry_set.all()[0]
hqe_4 = job_4.hostqueueentry_set.all()[0]
- return (hqe_1, hqe_2, hqe_3, hqe_4), monitor_db.AgentTask()
+ return (hqe_1, hqe_2, hqe_3, hqe_4), agent_task.AgentTask()
def test_get_drone_hostnames_allowed_no_drones_in_set(self):
@@ -1458,7 +1460,7 @@
class MockSpecialTask(object):
requested_by = object()
- class MockSpecialAgentTask(monitor_db.SpecialAgentTask):
+ class MockSpecialAgentTask(agent_task.SpecialAgentTask):
task = MockSpecialTask()
queue_entry_ids = []
def __init__(self, *args, **kwargs):
@@ -1498,7 +1500,7 @@
self._setup_test_user_or_global_default_drone_set()
- actual = monitor_db.AgentTask()._user_or_global_default_drone_set(
+ actual = agent_task.AgentTask()._user_or_global_default_drone_set(
None, MockUser())
self.assertEqual(expected, actual)
@@ -1507,7 +1509,7 @@
def test_user_or_global_default_drone_set_no_user(self):
expected = self._setup_test_user_or_global_default_drone_set()
- actual = monitor_db.AgentTask()._user_or_global_default_drone_set(
+ actual = agent_task.AgentTask()._user_or_global_default_drone_set(
None, None)
self.assertEqual(expected, actual)
@@ -1520,7 +1522,7 @@
login = None
expected = self._setup_test_user_or_global_default_drone_set()
- actual = monitor_db.AgentTask()._user_or_global_default_drone_set(
+ actual = agent_task.AgentTask()._user_or_global_default_drone_set(
None, MockUser())
self.assertEqual(expected, actual)
diff --git a/scheduler/pidfile_monitor.py b/scheduler/pidfile_monitor.py
new file mode 100644
index 0000000..72d4c79
--- /dev/null
+++ b/scheduler/pidfile_monitor.py
@@ -0,0 +1,196 @@
+#pylint: disable-msg=C0111
+
+"""
+Pidfile monitor.
+"""
+
+import time, traceback
+from autotest_lib.client.common_lib import global_config
+from autotest_lib.scheduler import drone_manager, email_manager
+from autotest_lib.scheduler import scheduler_config
+
+
+_drone_manager = drone_manager.instance()
+
+def _get_pidfile_timeout_secs():
+ """@returns How long to wait for autoserv to write pidfile."""
+ pidfile_timeout_mins = global_config.global_config.get_config_value(
+ scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
+ return pidfile_timeout_mins * 60
+
+
+class PidfileRunMonitor(object):
+ """
+ Client must call either run() to start a new process or
+ attach_to_existing_process().
+ """
+
+ class _PidfileException(Exception):
+ """
+ Raised when there's some unexpected behavior with the pid file, but only
+ used internally (never allowed to escape this class).
+ """
+
+
+ def __init__(self):
+ self.lost_process = False
+ self._start_time = None
+ self.pidfile_id = None
+ self._state = drone_manager.PidfileContents()
+
+
+ def _add_nice_command(self, command, nice_level):
+ if not nice_level:
+ return command
+ return ['nice', '-n', str(nice_level)] + command
+
+
+ def _set_start_time(self):
+ self._start_time = time.time()
+
+
+ def run(self, command, working_directory, num_processes, nice_level=None,
+ log_file=None, pidfile_name=None, paired_with_pidfile=None,
+ username=None, drone_hostnames_allowed=None):
+ assert command is not None
+ if nice_level is not None:
+ command = ['nice', '-n', str(nice_level)] + command
+ self._set_start_time()
+ self.pidfile_id = _drone_manager.execute_command(
+ command, working_directory, pidfile_name=pidfile_name,
+ num_processes=num_processes, log_file=log_file,
+ paired_with_pidfile=paired_with_pidfile, username=username,
+ drone_hostnames_allowed=drone_hostnames_allowed)
+
+
+ def attach_to_existing_process(self, execution_path,
+ pidfile_name=drone_manager.AUTOSERV_PID_FILE,
+ num_processes=None):
+ self._set_start_time()
+ self.pidfile_id = _drone_manager.get_pidfile_id_from(
+ execution_path, pidfile_name=pidfile_name)
+ if num_processes is not None:
+ _drone_manager.declare_process_count(self.pidfile_id, num_processes)
+
+
+ def kill(self):
+ if self.has_process():
+ _drone_manager.kill_process(self.get_process())
+
+
+ def has_process(self):
+ self._get_pidfile_info()
+ return self._state.process is not None
+
+
+ def get_process(self):
+ self._get_pidfile_info()
+ assert self._state.process is not None
+ return self._state.process
+
+
+ def _read_pidfile(self, use_second_read=False):
+ assert self.pidfile_id is not None, (
+ 'You must call run() or attach_to_existing_process()')
+ contents = _drone_manager.get_pidfile_contents(
+ self.pidfile_id, use_second_read=use_second_read)
+ if contents.is_invalid():
+ self._state = drone_manager.PidfileContents()
+ raise self._PidfileException(contents)
+ self._state = contents
+
+
+ def _handle_pidfile_error(self, error, message=''):
+ message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
+ self._state.process, self.pidfile_id, message)
+ email_manager.manager.enqueue_notify_email(error, message)
+ self.on_lost_process(self._state.process)
+
+
+ def _get_pidfile_info_helper(self):
+ if self.lost_process:
+ return
+
+ self._read_pidfile()
+
+ if self._state.process is None:
+ self._handle_no_process()
+ return
+
+ if self._state.exit_status is None:
+ # double check whether or not autoserv is running
+ if _drone_manager.is_process_running(self._state.process):
+ return
+
+ # pid but no running process - maybe process *just* exited
+ self._read_pidfile(use_second_read=True)
+ if self._state.exit_status is None:
+ # autoserv exited without writing an exit code
+ # to the pidfile
+ self._handle_pidfile_error(
+ 'autoserv died without writing exit code')
+
+
+ def _get_pidfile_info(self):
+ """\
+ After completion, self._state will contain:
+ pid=None, exit_status=None if autoserv has not yet run
+ pid!=None, exit_status=None if autoserv is running
+ pid!=None, exit_status!=None if autoserv has completed
+ """
+ try:
+ self._get_pidfile_info_helper()
+ except self._PidfileException, exc:
+ self._handle_pidfile_error('Pidfile error', traceback.format_exc())
+
+
+ def _handle_no_process(self):
+ """\
+ Called when no pidfile is found or no pid is in the pidfile.
+ """
+ message = 'No pid found at %s' % self.pidfile_id
+ if time.time() - self._start_time > _get_pidfile_timeout_secs():
+ email_manager.manager.enqueue_notify_email(
+ 'Process has failed to write pidfile', message)
+ self.on_lost_process()
+
+
+ def on_lost_process(self, process=None):
+ """\
+ Called when autoserv has exited without writing an exit status,
+ or we've timed out waiting for autoserv to write a pid to the
+ pidfile. In either case, we just return failure and the caller
+ should signal some kind of warning.
+
+ process is unimportant here, as it shouldn't be used by anyone.
+ """
+ self.lost_process = True
+ self._state.process = process
+ self._state.exit_status = 1
+ self._state.num_tests_failed = 0
+
+
+ def exit_code(self):
+ self._get_pidfile_info()
+ return self._state.exit_status
+
+
+ def num_tests_failed(self):
+ """@returns The number of tests that failed or -1 if unknown."""
+ self._get_pidfile_info()
+ if self._state.num_tests_failed is None:
+ return -1
+ return self._state.num_tests_failed
+
+
+ def try_copy_results_on_drone(self, **kwargs):
+ if self.has_process():
+ # copy results logs into the normal place for job results
+ _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
+
+
+ def try_copy_to_results_repository(self, source, **kwargs):
+ if self.has_process():
+ _drone_manager.copy_to_results_repository(self.get_process(),
+ source, **kwargs)
+
diff --git a/scheduler/postjob_task.py b/scheduler/postjob_task.py
new file mode 100644
index 0000000..4382093
--- /dev/null
+++ b/scheduler/postjob_task.py
@@ -0,0 +1,390 @@
+#pylint: disable-msg=C0111
+
+"""
+Postjob task.
+
+Postjob tasks are responsible for setting the final status of the HQE
+and Host, and scheduling additional special agents such as cleanup,
+if necessary.
+"""
+
+import os
+
+from autotest_lib.frontend.afe import models, model_attributes
+from autotest_lib.scheduler import agent_task, drones, drone_manager
+from autotest_lib.scheduler import email_manager, pidfile_monitor
+from autotest_lib.scheduler import scheduler_config
+from autotest_lib.server import autoserv_utils
+from autotest_lib.site_utils.graphite import stats
+
+
+_drone_manager = drone_manager.instance()
+_parser_path = autoserv_utils._parser_path_func(
+ autoserv_utils.AUTOTEST_INSTALL_DIR)
+
+
+class PostJobTask(agent_task.AgentTask):
+ def __init__(self, queue_entries, log_file_name):
+ super(PostJobTask, self).__init__(log_file_name=log_file_name)
+
+ self.queue_entries = queue_entries
+
+ self._autoserv_monitor = pidfile_monitor.PidfileRunMonitor()
+ self._autoserv_monitor.attach_to_existing_process(
+ self._working_directory())
+
+
+ def _command_line(self):
+ # Do we need testing_mode?
+ return self._generate_command(
+ _drone_manager.absolute_path(self._working_directory()))
+
+
+ def _generate_command(self, results_dir):
+ raise NotImplementedError('Subclasses must override this')
+
+
+ @property
+ def owner_username(self):
+ return self.queue_entries[0].job.owner
+
+
+ def _working_directory(self):
+ return self._get_consistent_execution_path(self.queue_entries)
+
+
+ def _paired_with_monitor(self):
+ return self._autoserv_monitor
+
+
+ def _job_was_aborted(self):
+ was_aborted = None
+ for queue_entry in self.queue_entries:
+ queue_entry.update_from_database()
+ if was_aborted is None: # first queue entry
+ was_aborted = bool(queue_entry.aborted)
+ elif was_aborted != bool(queue_entry.aborted): # subsequent entries
+ entries = ['%s (aborted: %s)' % (entry, entry.aborted)
+ for entry in self.queue_entries]
+ email_manager.manager.enqueue_notify_email(
+ 'Inconsistent abort state',
+ 'Queue entries have inconsistent abort state:\n' +
+ '\n'.join(entries))
+ # don't crash here, just assume true
+ return True
+ return was_aborted
+
+
+ def _final_status(self):
+ if self._job_was_aborted():
+ return models.HostQueueEntry.Status.ABORTED
+
+ # we'll use a PidfileRunMonitor to read the autoserv exit status
+ if self._autoserv_monitor.exit_code() == 0:
+ return models.HostQueueEntry.Status.COMPLETED
+ return models.HostQueueEntry.Status.FAILED
+
+
+ def _set_all_statuses(self, status):
+ for queue_entry in self.queue_entries:
+ queue_entry.set_status(status)
+
+
+ def abort(self):
+ # override AgentTask.abort() to avoid killing the process and ending
+ # the task. post-job tasks continue when the job is aborted.
+ pass
+
+
+ def _pidfile_label(self):
+ # '.autoserv_execute' -> 'autoserv'
+ return self._pidfile_name()[1:-len('_execute')]
+
+
+class SelfThrottledPostJobTask(PostJobTask):
+ """
+ PostJobTask that maintains its own process limit.
+
+ We throttle tasks like parsing because we don't want them to
+ hold up tests. At the same time we don't wish to build up load
+ that will take forever to parse.
+ """
+ _num_running_processes = 0
+ # Last known limit of max processes, used to check whether
+ # max processes config has been changed.
+ _last_known_max_processes = 0
+ # Whether an email should be sent to notifiy process limit being hit.
+ _notification_on = True
+ # Once process limit is hit, an email will be sent.
+ # To prevent spams, do not send another email until
+ # it drops to lower than the following level.
+ REVIVE_NOTIFICATION_THRESHOLD = 0.80
+
+
+ @classmethod
+ def _increment_running_processes(cls):
+ cls._num_running_processes += 1
+ stats.Gauge('scheduler').send('%s.num_running_processes' % cls.__name__,
+ cls._num_running_processes)
+
+
+ @classmethod
+ def _decrement_running_processes(cls):
+ cls._num_running_processes -= 1
+ stats.Gauge('scheduler').send('%s.num_running_processes' % cls.__name__,
+ cls._num_running_processes)
+
+
+ @classmethod
+ def _max_processes(cls):
+ raise NotImplementedError
+
+
+ @classmethod
+ def _can_run_new_process(cls):
+ return cls._num_running_processes < cls._max_processes()
+
+
+ def _process_started(self):
+ return bool(self.monitor)
+
+
+ def tick(self):
+ # override tick to keep trying to start until the process count goes
+ # down and we can, at which point we revert to default behavior
+ if self._process_started():
+ super(SelfThrottledPostJobTask, self).tick()
+ else:
+ self._try_starting_process()
+
+
+ def run(self):
+ # override run() to not actually run unless we can
+ self._try_starting_process()
+
+
+ @classmethod
+ def _notify_process_limit_hit(cls):
+ """Send an email to notify that process limit is hit."""
+ if cls._notification_on:
+ subject = '%s: hitting max process limit.' % cls.__name__
+ message = ('Running processes/Max processes: %d/%d'
+ % (cls._num_running_processes, cls._max_processes()))
+ email_manager.manager.enqueue_notify_email(subject, message)
+ cls._notification_on = False
+
+
+ @classmethod
+ def _reset_notification_switch_if_necessary(cls):
+ """Reset _notification_on if necessary.
+
+ Set _notification_on to True on the following cases:
+ 1) If the limit of max processes configuration changes;
+ 2) If _notification_on is False and the number of running processes
+ drops to lower than a level defined in REVIVE_NOTIFICATION_THRESHOLD.
+
+ """
+ if cls._last_known_max_processes != cls._max_processes():
+ cls._notification_on = True
+ cls._last_known_max_processes = cls._max_processes()
+ return
+ percentage = float(cls._num_running_processes) / cls._max_processes()
+ if (not cls._notification_on and
+ percentage < cls.REVIVE_NOTIFICATION_THRESHOLD):
+ cls._notification_on = True
+
+
+ def _try_starting_process(self):
+ self._reset_notification_switch_if_necessary()
+ if not self._can_run_new_process():
+ self._notify_process_limit_hit()
+ return
+
+ # actually run the command
+ super(SelfThrottledPostJobTask, self).run()
+ if self._process_started():
+ self._increment_running_processes()
+
+
+ def finished(self, success):
+ super(SelfThrottledPostJobTask, self).finished(success)
+ if self._process_started():
+ self._decrement_running_processes()
+
+
+class GatherLogsTask(PostJobTask):
+ """
+ Task responsible for
+ * gathering uncollected logs (if Autoserv crashed hard or was killed)
+ * copying logs to the results repository
+ * spawning CleanupTasks for hosts, if necessary
+ * spawning a FinalReparseTask for the job
+ * setting the final status of the host, directly or through a cleanup
+ """
+ def __init__(self, queue_entries, recover_run_monitor=None):
+ self._job = queue_entries[0].job
+ super(GatherLogsTask, self).__init__(
+ queue_entries, log_file_name='.collect_crashinfo.log')
+ self._set_ids(queue_entries=queue_entries)
+
+
+ # TODO: Refactor into autoserv_utils. crbug.com/243090
+ def _generate_command(self, results_dir):
+ host_list = ','.join(queue_entry.host.hostname
+ for queue_entry in self.queue_entries)
+ return [autoserv_utils.autoserv_path , '-p',
+ '--pidfile-label=%s' % self._pidfile_label(),
+ '--use-existing-results', '--collect-crashinfo',
+ '-m', host_list, '-r', results_dir]
+
+
+ @property
+ def num_processes(self):
+ return len(self.queue_entries)
+
+
+ def _pidfile_name(self):
+ return drone_manager.CRASHINFO_PID_FILE
+
+
+ def prolog(self):
+ self._check_queue_entry_statuses(
+ self.queue_entries,
+ allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
+ allowed_host_statuses=(models.Host.Status.RUNNING,))
+
+ super(GatherLogsTask, self).prolog()
+
+
+ def epilog(self):
+ super(GatherLogsTask, self).epilog()
+ self._parse_results(self.queue_entries)
+ self._reboot_hosts()
+
+
+ def _reboot_hosts(self):
+ if self._autoserv_monitor.has_process():
+ final_success = (self._final_status() ==
+ models.HostQueueEntry.Status.COMPLETED)
+ num_tests_failed = self._autoserv_monitor.num_tests_failed()
+ else:
+ final_success = False
+ num_tests_failed = 0
+ reboot_after = self._job.reboot_after
+ do_reboot = (
+ # always reboot after aborted jobs
+ self._final_status() == models.HostQueueEntry.Status.ABORTED
+ or reboot_after == model_attributes.RebootAfter.ALWAYS
+ or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
+ and final_success and num_tests_failed == 0)
+ or num_tests_failed > 0)
+
+ for queue_entry in self.queue_entries:
+ if do_reboot:
+ # don't pass the queue entry to the CleanupTask. if the cleanup
+ # fails, the job doesn't care -- it's over.
+ models.SpecialTask.objects.create(
+ host=models.Host.objects.get(id=queue_entry.host.id),
+ task=models.SpecialTask.Task.CLEANUP,
+ requested_by=self._job.owner_model())
+ else:
+ queue_entry.host.set_status(models.Host.Status.READY)
+
+
+ def run(self):
+ autoserv_exit_code = self._autoserv_monitor.exit_code()
+ # only run if Autoserv exited due to some signal. if we have no exit
+ # code, assume something bad (and signal-like) happened.
+ if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
+ super(GatherLogsTask, self).run()
+ else:
+ self.finished(True)
+
+
+class FinalReparseTask(SelfThrottledPostJobTask):
+ def __init__(self, queue_entries):
+ super(FinalReparseTask, self).__init__(queue_entries,
+ log_file_name='.parse.log')
+ # don't use _set_ids, since we don't want to set the host_ids
+ self.queue_entry_ids = [entry.id for entry in queue_entries]
+
+
+ def _generate_command(self, results_dir):
+ return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
+ results_dir]
+
+
+ @property
+ def num_processes(self):
+ return 0 # don't include parser processes in accounting
+
+
+ def _pidfile_name(self):
+ return drone_manager.PARSER_PID_FILE
+
+
+ @classmethod
+ def _max_processes(cls):
+ return scheduler_config.config.max_parse_processes
+
+
+ def prolog(self):
+ self._check_queue_entry_statuses(
+ self.queue_entries,
+ allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
+
+ super(FinalReparseTask, self).prolog()
+
+
+ def epilog(self):
+ super(FinalReparseTask, self).epilog()
+ self._archive_results(self.queue_entries)
+
+
+class ArchiveResultsTask(SelfThrottledPostJobTask):
+ _ARCHIVING_FAILED_FILE = '.archiver_failed'
+
+ def __init__(self, queue_entries):
+ super(ArchiveResultsTask, self).__init__(queue_entries,
+ log_file_name='.archiving.log')
+ # don't use _set_ids, since we don't want to set the host_ids
+ self.queue_entry_ids = [entry.id for entry in queue_entries]
+
+
+ def _pidfile_name(self):
+ return drone_manager.ARCHIVER_PID_FILE
+
+
+ # TODO: Refactor into autoserv_utils. crbug.com/243090
+ def _generate_command(self, results_dir):
+ return [autoserv_utils.autoserv_path , '-p',
+ '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
+ '--use-existing-results', '--control-filename=control.archive',
+ os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
+ 'archive_results.control.srv')]
+
+
+ @classmethod
+ def _max_processes(cls):
+ return scheduler_config.config.max_transfer_processes
+
+
+ def prolog(self):
+ self._check_queue_entry_statuses(
+ self.queue_entries,
+ allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
+
+ super(ArchiveResultsTask, self).prolog()
+
+
+ def epilog(self):
+ super(ArchiveResultsTask, self).epilog()
+ if not self.success and self._paired_with_monitor().has_process():
+ failed_file = os.path.join(self._working_directory(),
+ self._ARCHIVING_FAILED_FILE)
+ paired_process = self._paired_with_monitor().get_process()
+ _drone_manager.write_lines_to_file(
+ failed_file, ['Archiving failed with exit code %s'
+ % self.monitor.exit_code()],
+ paired_with_process=paired_process)
+ self._set_all_statuses(self._final_status())
diff --git a/scheduler/prejob_task.py b/scheduler/prejob_task.py
new file mode 100644
index 0000000..37a5fff
--- /dev/null
+++ b/scheduler/prejob_task.py
@@ -0,0 +1,368 @@
+#pylint: disable-msg=C0111
+
+"""
+Prejob tasks.
+
+Prejob tasks _usually_ run before a job and verify the state of a machine.
+Cleanup and repair are exceptions, cleanup can run after a job too, while
+repair will run anytime the host needs a repair, which could be pre or post
+job. Most of the work specific to this module is achieved through the prolog
+and epilog of each task.
+
+All prejob tasks must have a host, though they may not have an HQE. If a
+prejob task has a hqe, it will activate the hqe through its on_pending
+method on successfull completion. A row in afe_special_tasks with values:
+ host=C1, unlocked, is_active=0, is_complete=0, type=Verify
+will indicate to the scheduler that it needs to schedule a new special task
+of type=Verify, against the C1 host. While the special task is running
+the scheduler only monitors it through the Agent, and its is_active bit=1.
+Once a special task finishes, we set its is_active=0, is_complete=1 and
+success bits, so the scheduler ignores it.
+HQE.on_pending:
+ Host, HQE -> Pending, Starting
+ This status is acted upon in the scheduler, to assign an AgentTask.
+PreJobTask:
+ epilog:
+ failure:
+ requeue hqe
+ repair the host
+Children PreJobTasks:
+ prolog:
+ set Host, HQE status
+ epilog:
+ success:
+ on_pending
+ failure:
+ repair throgh PreJobTask
+ set Host, HQE status
+"""
+import logging
+import os
+
+from autotest_lib.client.common_lib import host_protections
+from autotest_lib.frontend.afe import models
+from autotest_lib.scheduler import agent_task, scheduler_config
+from autotest_lib.server import autoserv_utils
+from autotest_lib.server.cros import provision
+
+
+class PreJobTask(agent_task.SpecialAgentTask):
+ def _copy_to_results_repository(self):
+ if not self.queue_entry or self.queue_entry.meta_host:
+ return
+
+ self.queue_entry.set_execution_subdir()
+ log_name = os.path.basename(self.task.execution_path())
+ source = os.path.join(self.task.execution_path(), 'debug',
+ 'autoserv.DEBUG')
+ destination = os.path.join(
+ self.queue_entry.execution_path(), log_name)
+
+ self.monitor.try_copy_to_results_repository(
+ source, destination_path=destination)
+
+
+ def epilog(self):
+ super(PreJobTask, self).epilog()
+
+ if self.success:
+ return
+
+ if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
+ # effectively ignore failure for these hosts
+ self.success = True
+ return
+
+ if self.queue_entry:
+ # If we requeue a HQE, we should cancel any remaining pre-job
+ # tasks against this host, otherwise we'll be left in a state
+ # where a queued HQE has special tasks to run against a host.
+ models.SpecialTask.objects.filter(
+ queue_entry__id=self.queue_entry.id,
+ host__id=self.host.id,
+ is_complete=0).update(is_complete=1, success=0)
+
+ previous_provisions = models.SpecialTask.objects.filter(
+ task=models.SpecialTask.Task.PROVISION,
+ queue_entry_id=self.queue_entry.id).count()
+ if (previous_provisions >
+ scheduler_config.config.max_provision_retries):
+ self._actually_fail_queue_entry()
+ # This abort will mark the aborted bit on the HQE itself, to
+ # signify that we're killing it. Technically it also will do
+ # the recursive aborting of all child jobs, but that shouldn't
+ # matter here, as only suites have children, and those are
+ # hostless and thus don't have provisioning.
+ # TODO(milleral) http://crbug.com/188217
+ # However, we can't actually do this yet, as if we set the
+ # abort bit the FinalReparseTask will set the status of the HQE
+ # to ABORTED, which then means that we don't show the status in
+ # run_suite. So in the meantime, don't mark the HQE as
+ # aborted.
+ # queue_entry.abort()
+ else:
+ # requeue() must come after handling provision retries, since
+ # _actually_fail_queue_entry needs an execution subdir.
+ # We also don't want to requeue if we hit the provision retry
+ # limit, since then we overwrite the PARSING state of the HQE.
+ self.queue_entry.requeue()
+
+ previous_repairs = models.SpecialTask.objects.filter(
+ task=models.SpecialTask.Task.REPAIR,
+ queue_entry_id=self.queue_entry.id).count()
+ if previous_repairs >= scheduler_config.config.max_repair_limit:
+ self.host.set_status(models.Host.Status.REPAIR_FAILED)
+ self._fail_queue_entry()
+ return
+
+ queue_entry = models.HostQueueEntry.objects.get(
+ id=self.queue_entry.id)
+ else:
+ queue_entry = None
+
+ models.SpecialTask.objects.create(
+ host=models.Host.objects.get(id=self.host.id),
+ task=models.SpecialTask.Task.REPAIR,
+ queue_entry=queue_entry,
+ requested_by=self.task.requested_by)
+
+
+ def _should_pending(self):
+ """
+ Decide if we should call the host queue entry's on_pending method.
+ We should if:
+ 1) There exists an associated host queue entry.
+ 2) The current special task completed successfully.
+ 3) There do not exist any more special tasks to be run before the
+ host queue entry starts.
+
+ @returns: True if we should call pending, false if not.
+
+ """
+ if not self.queue_entry or not self.success:
+ return False
+
+ # We know if this is the last one when we create it, so we could add
+ # another column to the database to keep track of this information, but
+ # I expect the overhead of querying here to be minimal.
+ queue_entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
+ queued = models.SpecialTask.objects.filter(
+ host__id=self.host.id, is_active=False,
+ is_complete=False, queue_entry=queue_entry)
+ queued = queued.exclude(id=self.task.id)
+ return queued.count() == 0
+
+
+class VerifyTask(PreJobTask):
+ TASK_TYPE = models.SpecialTask.Task.VERIFY
+
+
+ def __init__(self, task):
+ super(VerifyTask, self).__init__(task, ['-v'])
+ self._set_ids(host=self.host, queue_entries=[self.queue_entry])
+
+
+ def prolog(self):
+ super(VerifyTask, self).prolog()
+
+ logging.info("starting verify on %s", self.host.hostname)
+ if self.queue_entry:
+ self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
+ self.host.set_status(models.Host.Status.VERIFYING)
+
+ # Delete any queued manual reverifies for this host. One verify will do
+ # and there's no need to keep records of other requests.
+ self.remove_special_tasks(models.SpecialTask.Task.VERIFY,
+ keep_last_one=True)
+
+
+ def epilog(self):
+ super(VerifyTask, self).epilog()
+ if self.success:
+ if self._should_pending():
+ self.queue_entry.on_pending()
+ else:
+ self.host.set_status(models.Host.Status.READY)
+
+
+class CleanupTask(PreJobTask):
+ # note this can also run post-job, but when it does, it's running standalone
+ # against the host (not related to the job), so it's not considered a
+ # PostJobTask
+
+ TASK_TYPE = models.SpecialTask.Task.CLEANUP
+
+
+ def __init__(self, task, recover_run_monitor=None):
+ super(CleanupTask, self).__init__(task, ['--cleanup'])
+ self._set_ids(host=self.host, queue_entries=[self.queue_entry])
+
+
+ def prolog(self):
+ super(CleanupTask, self).prolog()
+ logging.info("starting cleanup task for host: %s", self.host.hostname)
+ self.host.set_status(models.Host.Status.CLEANING)
+ if self.queue_entry:
+ self.queue_entry.set_status(models.HostQueueEntry.Status.CLEANING)
+
+
+ def _finish_epilog(self):
+ if not self.queue_entry or not self.success:
+ return
+
+ do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
+ should_run_verify = (
+ self.queue_entry.job.run_verify
+ and self.host.protection != do_not_verify_protection)
+ if should_run_verify:
+ entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
+ models.SpecialTask.objects.create(
+ host=models.Host.objects.get(id=self.host.id),
+ queue_entry=entry,
+ task=models.SpecialTask.Task.VERIFY)
+ else:
+ if self._should_pending():
+ self.queue_entry.on_pending()
+
+
+ def epilog(self):
+ super(CleanupTask, self).epilog()
+
+ if self.success:
+ self.host.update_field('dirty', 0)
+ self.host.set_status(models.Host.Status.READY)
+
+ self._finish_epilog()
+
+
+class ResetTask(PreJobTask):
+ """Task to reset a DUT, including cleanup and verify."""
+ # note this can also run post-job, but when it does, it's running standalone
+ # against the host (not related to the job), so it's not considered a
+ # PostJobTask
+
+ TASK_TYPE = models.SpecialTask.Task.RESET
+
+
+ def __init__(self, task, recover_run_monitor=None):
+ super(ResetTask, self).__init__(task, ['--reset'])
+ self._set_ids(host=self.host, queue_entries=[self.queue_entry])
+
+
+ def prolog(self):
+ super(ResetTask, self).prolog()
+ logging.info('starting reset task for host: %s',
+ self.host.hostname)
+ self.host.set_status(models.Host.Status.RESETTING)
+ if self.queue_entry:
+ self.queue_entry.set_status(models.HostQueueEntry.Status.RESETTING)
+
+ # Delete any queued cleanups for this host.
+ self.remove_special_tasks(models.SpecialTask.Task.CLEANUP,
+ keep_last_one=False)
+
+ # Delete any queued reverifies for this host.
+ self.remove_special_tasks(models.SpecialTask.Task.VERIFY,
+ keep_last_one=False)
+
+ # Only one reset is needed.
+ self.remove_special_tasks(models.SpecialTask.Task.RESET,
+ keep_last_one=True)
+
+
+ def epilog(self):
+ super(ResetTask, self).epilog()
+
+ if self.success:
+ self.host.update_field('dirty', 0)
+
+ if self._should_pending():
+ self.queue_entry.on_pending()
+ else:
+ self.host.set_status(models.Host.Status.READY)
+
+
+class ProvisionTask(PreJobTask):
+ TASK_TYPE = models.SpecialTask.Task.PROVISION
+
+ def __init__(self, task):
+ # Provisioning requires that we be associated with a job/queue entry
+ assert task.queue_entry, "No HQE associated with provision task!"
+ # task.queue_entry is an afe model HostQueueEntry object.
+ # self.queue_entry is a scheduler models HostQueueEntry object, but
+ # it gets constructed and assigned in __init__, so it's not available
+ # yet. Therefore, we're stuck pulling labels off of the afe model
+ # so that we can pass the --provision args into the __init__ call.
+ labels = {x.name for x in task.queue_entry.job.dependency_labels.all()}
+ _, provisionable = provision.filter_labels(labels)
+ extra_command_args = ['--provision', ','.join(provisionable)]
+ super(ProvisionTask, self).__init__(task, extra_command_args)
+ self._set_ids(host=self.host, queue_entries=[self.queue_entry])
+
+
+ def _command_line(self):
+ # If we give queue_entry to _autoserv_command_line, then it will append
+ # -c for this invocation if the queue_entry is a client side test. We
+ # don't want that, as it messes with provisioning, so we just drop it
+ # from the arguments here.
+ # Note that we also don't verify job_repo_url as provisioining tasks are
+ # required to stage whatever content we need, and the job itself will
+ # force autotest to be staged if it isn't already.
+ return autoserv_utils._autoserv_command_line(self.host.hostname,
+ self._extra_command_args)
+
+
+ def prolog(self):
+ super(ProvisionTask, self).prolog()
+ # add check for previous provision task and abort if exist.
+ logging.info("starting provision task for host: %s", self.host.hostname)
+ self.queue_entry.set_status(
+ models.HostQueueEntry.Status.PROVISIONING)
+ self.host.set_status(models.Host.Status.PROVISIONING)
+
+
+ def epilog(self):
+ super(ProvisionTask, self).epilog()
+
+ if self._should_pending():
+ self.queue_entry.on_pending()
+ else:
+ self.host.set_status(models.Host.Status.READY)
+
+
+class RepairTask(agent_task.SpecialAgentTask):
+ TASK_TYPE = models.SpecialTask.Task.REPAIR
+
+
+ def __init__(self, task):
+ """\
+ queue_entry: queue entry to mark failed if this repair fails.
+ """
+ protection = host_protections.Protection.get_string(
+ task.host.protection)
+ # normalize the protection name
+ protection = host_protections.Protection.get_attr_name(protection)
+
+ super(RepairTask, self).__init__(
+ task, ['-R', '--host-protection', protection])
+
+ # *don't* include the queue entry in IDs -- if the queue entry is
+ # aborted, we want to leave the repair task running
+ self._set_ids(host=self.host)
+
+
+ def prolog(self):
+ super(RepairTask, self).prolog()
+ logging.info("repair_task starting")
+ self.host.set_status(models.Host.Status.REPAIRING)
+
+
+ def epilog(self):
+ super(RepairTask, self).epilog()
+
+ if self.success:
+ self.host.set_status(models.Host.Status.READY)
+ else:
+ self.host.set_status(models.Host.Status.REPAIR_FAILED)
+ if self.queue_entry:
+ self._fail_queue_entry()