Add a new Archiving stage to the scheduler, which runs after Parsing. This stage is responsible for copying results to the results server in a drone setup, a task currently performed directly by the scheduler, and allows for site-specific archiving functionality, replacing the site_parse functionality. It does this by running autoserv with a special control file (scheduler/archive_results.control.srv), which loads and runs code from the new scheduler.archive_results module. The implementation was mostly straightfoward, as the archiving stage is fully analogous to the parser stage. I did make a couple of refactorings:
* factored out the parser throttling code into a common superclass that the ArchiveResultsTask could share
* added some generic flags to Autoserv to duplicate special-case functionality we'd added for the --collect-crashinfo option -- namely, specifying a different pidfile name and specifying that autoserv should allow (and even expect) an existing results directory. in the future, i think it'd be more elegant to make crashinfo collection run using a special control file (as archiving works), rather than a hard-coded command-line option.
* moved call to server_job.init_parser() out of the constructor, since this was an easy source of exceptions that wouldn't get logged.
Note I believe some of the functional test changes slipped into my previous change there, which is why that looks smaller than you'd expect.
Signed-off-by: Steve Howard <showard@google.com>
==== (deleted) //depot/google_vendor_src_branch/autotest/tko/site_parse.py ====
git-svn-id: http://test.kernel.org/svn/autotest/trunk@4070 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index d4ec92a..9e4a654 100755
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -45,9 +45,10 @@
_AUTOSERV_PID_FILE = '.autoserv_execute'
_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
_PARSER_PID_FILE = '.parser_execute'
+_ARCHIVER_PID_FILE = '.archiver_execute'
_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
- _PARSER_PID_FILE)
+ _PARSER_PID_FILE, _ARCHIVER_PID_FILE)
# error message to leave in results dir when an autoserv process disappears
# mysteriously
@@ -766,7 +767,8 @@
statuses = (models.HostQueueEntry.Status.STARTING,
models.HostQueueEntry.Status.RUNNING,
models.HostQueueEntry.Status.GATHERING,
- models.HostQueueEntry.Status.PARSING)
+ models.HostQueueEntry.Status.PARSING,
+ models.HostQueueEntry.Status.ARCHIVING)
status_list = ','.join("'%s'" % status for status in statuses)
queue_entries = HostQueueEntry.fetch(
where='status IN (%s)' % status_list)
@@ -812,16 +814,19 @@
return GatherLogsTask(queue_entries=task_entries)
if queue_entry.status == models.HostQueueEntry.Status.PARSING:
return FinalReparseTask(queue_entries=task_entries)
+ if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
+ return ArchiveResultsTask(queue_entries=task_entries)
raise SchedulerError('_get_agent_task_for_queue_entry got entry with '
'invalid status %s: %s' % (entry.status, entry))
def _check_for_duplicate_host_entries(self, task_entries):
- parsing_status = models.HostQueueEntry.Status.PARSING
+ non_host_statuses = (models.HostQueueEntry.Status.PARSING,
+ models.HostQueueEntry.Status.ARCHIVING)
for task_entry in task_entries:
using_host = (task_entry.host is not None
- and task_entry.status != parsing_status)
+ and task_entry.status not in non_host_statuses)
if using_host:
self._assert_host_has_no_agent(task_entry)
@@ -1607,9 +1612,9 @@
queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
- def _copy_and_parse_results(self, queue_entries, use_monitor=None):
- self._copy_results(queue_entries, use_monitor)
- self._parse_results(queue_entries)
+ def _archive_results(self, queue_entries):
+ for queue_entry in queue_entries:
+ queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
def _command_line(self):
@@ -1721,6 +1726,22 @@
self._working_directory()))
+ def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
+ allowed_host_statuses=None):
+ for entry in queue_entries:
+ if entry.status not in allowed_hqe_statuses:
+ raise SchedulerError('Queue task attempting to start '
+ 'entry with invalid status %s: %s'
+ % (entry.status, entry))
+ invalid_host_status = (
+ allowed_host_statuses is not None
+ and entry.host.status not in allowed_host_statuses)
+ if invalid_host_status:
+ raise SchedulerError('Queue task attempting to start on queue '
+ 'entry with invalid host status %s: %s'
+ % (entry.host.status, entry))
+
+
class TaskWithJobKeyvals(object):
"""AgentTask mixin providing functionality to help with job keyval files."""
_KEYVAL_FILE = 'keyval'
@@ -1844,17 +1865,15 @@
source_path=self._working_directory() + '/',
destination_path=self.queue_entry.execution_path() + '/')
- self._copy_results([self.queue_entry])
-
- if not self.queue_entry.job.parse_failed_repair:
- self.queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
- return
-
pidfile_id = _drone_manager.get_pidfile_id_from(
self.queue_entry.execution_path(),
pidfile_name=_AUTOSERV_PID_FILE)
_drone_manager.register_pidfile(pidfile_id)
- self._parse_results([self.queue_entry])
+
+ if self.queue_entry.job.parse_failed_repair:
+ self._parse_results([self.queue_entry])
+ else:
+ self._archive_results([self.queue_entry])
def cleanup(self):
@@ -1995,6 +2014,55 @@
self.host.set_status(models.Host.Status.READY)
+class CleanupTask(PreJobTask):
+ # note this can also run post-job, but when it does, it's running standalone
+ # against the host (not related to the job), so it's not considered a
+ # PostJobTask
+
+ TASK_TYPE = models.SpecialTask.Task.CLEANUP
+
+
+ def __init__(self, task, recover_run_monitor=None):
+ super(CleanupTask, self).__init__(task, ['--cleanup'])
+ self._set_ids(host=self.host, queue_entries=[self.queue_entry])
+
+
+ def prolog(self):
+ super(CleanupTask, self).prolog()
+ logging.info("starting cleanup task for host: %s", self.host.hostname)
+ self.host.set_status(models.Host.Status.CLEANING)
+ if self.queue_entry:
+ self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
+
+
+ def _finish_epilog(self):
+ if not self.queue_entry or not self.success:
+ return
+
+ do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
+ should_run_verify = (
+ self.queue_entry.job.run_verify
+ and self.host.protection != do_not_verify_protection)
+ if should_run_verify:
+ entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
+ models.SpecialTask.objects.create(
+ host=models.Host.objects.get(id=self.host.id),
+ queue_entry=entry,
+ task=models.SpecialTask.Task.VERIFY)
+ else:
+ self.queue_entry.on_pending()
+
+
+ def epilog(self):
+ super(CleanupTask, self).epilog()
+
+ if self.success:
+ self.host.update_field('dirty', 0)
+ self.host.set_status(models.Host.Status.READY)
+
+ self._finish_epilog()
+
+
class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
"""
Common functionality for QueueTask and HostlessQueueTask
@@ -2110,17 +2178,12 @@
def prolog(self):
- for entry in self.queue_entries:
- if entry.status not in (models.HostQueueEntry.Status.STARTING,
- models.HostQueueEntry.Status.RUNNING):
- raise SchedulerError('Queue task attempting to start '
- 'entry with invalid status %s: %s'
- % (entry.status, entry))
- if entry.host.status not in (models.Host.Status.PENDING,
- models.Host.Status.RUNNING):
- raise SchedulerError('Queue task attempting to start on queue '
- 'entry with invalid host status %s: %s'
- % (entry.host.status, entry))
+ self._check_queue_entry_statuses(
+ self.queue_entries,
+ allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
+ models.HostQueueEntry.Status.RUNNING),
+ allowed_host_statuses=(models.Host.Status.PENDING,
+ models.Host.Status.RUNNING))
super(QueueTask, self).prolog()
@@ -2141,6 +2204,30 @@
queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
+class HostlessQueueTask(AbstractQueueTask):
+ def __init__(self, queue_entry):
+ super(HostlessQueueTask, self).__init__([queue_entry])
+ self.queue_entry_ids = [queue_entry.id]
+
+
+ def prolog(self):
+ self.queue_entries[0].update_field('execution_subdir', 'hostless')
+ super(HostlessQueueTask, self).prolog()
+
+
+ def _final_status(self):
+ if self.queue_entries[0].aborted:
+ return models.HostQueueEntry.Status.ABORTED
+ if self.monitor.exit_code() == 0:
+ return models.HostQueueEntry.Status.COMPLETED
+ return models.HostQueueEntry.Status.FAILED
+
+
+ def _finish_task(self):
+ super(HostlessQueueTask, self)._finish_task()
+ self.queue_entries[0].set_status(self._final_status())
+
+
class PostJobTask(AgentTask):
def __init__(self, queue_entries, log_file_name):
super(PostJobTask, self).__init__(log_file_name=log_file_name)
@@ -2213,6 +2300,11 @@
pass
+ def _pidfile_label(self):
+ # '.autoserv_execute' -> 'autoserv'
+ return self._pidfile_name()[1:-len('_execute')]
+
+
class GatherLogsTask(PostJobTask):
"""
Task responsible for
@@ -2231,8 +2323,10 @@
def _generate_command(self, results_dir):
host_list = ','.join(queue_entry.host.hostname
for queue_entry in self.queue_entries)
- return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
- '-r', results_dir]
+ return [_autoserv_path , '-p',
+ '--pidfile-label=%s' % self._pidfile_label(),
+ '--use-existing-results', '--collect-crashinfo',
+ '-m', host_list, '-r', results_dir]
@property
@@ -2245,23 +2339,17 @@
def prolog(self):
- for queue_entry in self.queue_entries:
- if queue_entry.status != models.HostQueueEntry.Status.GATHERING:
- raise SchedulerError('Gather task attempting to start on '
- 'non-gathering entry: %s' % queue_entry)
- if queue_entry.host.status != models.Host.Status.RUNNING:
- raise SchedulerError('Gather task attempting to start on queue '
- 'entry with non-running host status %s: %s'
- % (queue_entry.host.status, queue_entry))
+ self._check_queue_entry_statuses(
+ self.queue_entries,
+ allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
+ allowed_host_statuses=(models.Host.Status.RUNNING,))
super(GatherLogsTask, self).prolog()
def epilog(self):
super(GatherLogsTask, self).epilog()
-
- self._copy_and_parse_results(self.queue_entries,
- use_monitor=self._autoserv_monitor)
+ self._parse_results(self.queue_entries)
self._reboot_hosts()
@@ -2304,54 +2392,67 @@
self.finished(True)
-class CleanupTask(PreJobTask):
- TASK_TYPE = models.SpecialTask.Task.CLEANUP
+class SelfThrottledPostJobTask(PostJobTask):
+ """
+ Special AgentTask subclass that maintains its own global process limit.
+ """
+ _num_running_processes = 0
- def __init__(self, task, recover_run_monitor=None):
- super(CleanupTask, self).__init__(task, ['--cleanup'])
- self._set_ids(host=self.host, queue_entries=[self.queue_entry])
+ @classmethod
+ def _increment_running_processes(cls):
+ cls._num_running_processes += 1
- def prolog(self):
- super(CleanupTask, self).prolog()
- logging.info("starting cleanup task for host: %s", self.host.hostname)
- self.host.set_status(models.Host.Status.CLEANING)
- if self.queue_entry:
- self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
+ @classmethod
+ def _decrement_running_processes(cls):
+ cls._num_running_processes -= 1
- def _finish_epilog(self):
- if not self.queue_entry or not self.success:
+ @classmethod
+ def _max_processes(cls):
+ raise NotImplementedError
+
+
+ @classmethod
+ def _can_run_new_process(cls):
+ return cls._num_running_processes < cls._max_processes()
+
+
+ def _process_started(self):
+ return bool(self.monitor)
+
+
+ def tick(self):
+ # override tick to keep trying to start until the process count goes
+ # down and we can, at which point we revert to default behavior
+ if self._process_started():
+ super(SelfThrottledPostJobTask, self).tick()
+ else:
+ self._try_starting_process()
+
+
+ def run(self):
+ # override run() to not actually run unless we can
+ self._try_starting_process()
+
+
+ def _try_starting_process(self):
+ if not self._can_run_new_process():
return
- do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
- should_run_verify = (
- self.queue_entry.job.run_verify
- and self.host.protection != do_not_verify_protection)
- if should_run_verify:
- entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
- models.SpecialTask.objects.create(
- host=models.Host.objects.get(id=self.host.id),
- queue_entry=entry,
- task=models.SpecialTask.Task.VERIFY)
- else:
- self.queue_entry.on_pending()
+ # actually run the command
+ super(SelfThrottledPostJobTask, self).run()
+ self._increment_running_processes()
- def epilog(self):
- super(CleanupTask, self).epilog()
-
- if self.success:
- self.host.update_field('dirty', 0)
- self.host.set_status(models.Host.Status.READY)
-
- self._finish_epilog()
+ def finished(self, success):
+ super(SelfThrottledPostJobTask, self).finished(success)
+ if self._process_started():
+ self._decrement_running_processes()
-class FinalReparseTask(PostJobTask):
- _num_running_parses = 0
-
+class FinalReparseTask(SelfThrottledPostJobTask):
def __init__(self, queue_entries):
super(FinalReparseTask, self).__init__(queue_entries,
log_file_name='.parse.log')
@@ -2360,7 +2461,7 @@
def _generate_command(self, results_dir):
- return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
+ return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
results_dir]
@@ -2373,91 +2474,59 @@
return _PARSER_PID_FILE
- def _parse_started(self):
- return bool(self.monitor)
-
-
@classmethod
- def _increment_running_parses(cls):
- cls._num_running_parses += 1
-
-
- @classmethod
- def _decrement_running_parses(cls):
- cls._num_running_parses -= 1
-
-
- @classmethod
- def _can_run_new_parse(cls):
- return (cls._num_running_parses <
- scheduler_config.config.max_parse_processes)
+ def _max_processes(cls):
+ return scheduler_config.config.max_parse_processes
def prolog(self):
- for queue_entry in self.queue_entries:
- if queue_entry.status != models.HostQueueEntry.Status.PARSING:
- raise SchedulerError('Parse task attempting to start on '
- 'non-parsing entry: %s' % queue_entry)
+ self._check_queue_entry_statuses(
+ self.queue_entries,
+ allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
super(FinalReparseTask, self).prolog()
def epilog(self):
super(FinalReparseTask, self).epilog()
- self._set_all_statuses(self._final_status())
+ self._archive_results(self.queue_entries)
- def tick(self):
- # override tick to keep trying to start until the parse count goes down
- # and we can, at which point we revert to default behavior
- if self._parse_started():
- super(FinalReparseTask, self).tick()
- else:
- self._try_starting_parse()
+class ArchiveResultsTask(SelfThrottledPostJobTask):
+ def __init__(self, queue_entries):
+ super(ArchiveResultsTask, self).__init__(queue_entries,
+ log_file_name='.archiving.log')
+ # don't use _set_ids, since we don't want to set the host_ids
+ self.queue_entry_ids = [entry.id for entry in queue_entries]
- def run(self):
- # override run() to not actually run unless we can
- self._try_starting_parse()
+ def _pidfile_name(self):
+ return _ARCHIVER_PID_FILE
- def _try_starting_parse(self):
- if not self._can_run_new_parse():
- return
-
- # actually run the parse command
- super(FinalReparseTask, self).run()
- self._increment_running_parses()
+ def _generate_command(self, results_dir):
+ return [_autoserv_path , '-p',
+ '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
+ '--use-existing-results',
+ os.path.join('..', 'scheduler', 'archive_results.control.srv')]
- def finished(self, success):
- super(FinalReparseTask, self).finished(success)
- if self._parse_started():
- self._decrement_running_parses()
-
-
-class HostlessQueueTask(AbstractQueueTask):
- def __init__(self, queue_entry):
- super(HostlessQueueTask, self).__init__([queue_entry])
- self.queue_entry_ids = [queue_entry.id]
+ @classmethod
+ def _max_processes(cls):
+ return scheduler_config.config.max_transfer_processes
def prolog(self):
- self.queue_entries[0].update_field('execution_subdir', 'hostless')
- super(HostlessQueueTask, self).prolog()
+ self._check_queue_entry_statuses(
+ self.queue_entries,
+ allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
+
+ super(ArchiveResultsTask, self).prolog()
- def _final_status(self):
- if self.queue_entries[0].aborted:
- return models.HostQueueEntry.Status.ABORTED
- if self.monitor.exit_code() == 0:
- return models.HostQueueEntry.Status.COMPLETED
- return models.HostQueueEntry.Status.FAILED
-
-
- def _finish_task(self):
- super(HostlessQueueTask, self)._finish_task()
- self.queue_entries[0].set_status(self._final_status())
+ def epilog(self):
+ super(ArchiveResultsTask, self).epilog()
+ self._set_all_statuses(self._final_status())
class DBError(Exception):
@@ -2899,34 +2968,22 @@
self.update_field('status', status)
- if status in (models.HostQueueEntry.Status.QUEUED,
- models.HostQueueEntry.Status.PARSING):
- self.update_field('complete', False)
- self.update_field('active', False)
+ active = (status in models.HostQueueEntry.ACTIVE_STATUSES)
+ complete = (status in models.HostQueueEntry.COMPLETE_STATUSES)
+ assert not (active and complete)
- if status in (models.HostQueueEntry.Status.PENDING,
- models.HostQueueEntry.Status.RUNNING,
- models.HostQueueEntry.Status.VERIFYING,
- models.HostQueueEntry.Status.STARTING,
- models.HostQueueEntry.Status.GATHERING):
- self.update_field('complete', False)
- self.update_field('active', True)
+ self.update_field('active', active)
+ self.update_field('complete', complete)
- if status in (models.HostQueueEntry.Status.FAILED,
- models.HostQueueEntry.Status.COMPLETED,
- models.HostQueueEntry.Status.STOPPED,
- models.HostQueueEntry.Status.ABORTED):
- self.update_field('complete', True)
- self.update_field('active', False)
+ if complete:
self._on_complete()
+ self._email_on_job_complete()
should_email_status = (status.lower() in _notify_email_statuses or
'all' in _notify_email_statuses)
if should_email_status:
self._email_on_status(status)
- self._email_on_job_complete()
-
def _on_complete(self):
self.job.stop_if_necessary()
@@ -3063,7 +3120,7 @@
assert self.aborted and not self.complete
Status = models.HostQueueEntry.Status
- if self.status in (Status.GATHERING, Status.PARSING):
+ if self.status in (Status.GATHERING, Status.PARSING, Status.ARCHIVING):
# do nothing; post-job tasks will finish and then mark this entry
# with status "Aborted" and take care of the host
return