[Autotest] merge cleanup and verify

The objective of this CL is to merge cleanup and verify into a single job to
reduce run time of each test. In existing design, by default, a cleanup job is
scheduled after a test is finished, and a verify job is scheduled before a
test is started. By merging these two jobs together, we are seeing the total
run time of these two jobs is reduced from about 47s to 37s, around 10s saving.
That does not include the saving on scheduler to schedule two jobs, which may
take another 5-10s.

The design is to create a new special task, reset, which runs at the beginning
of a job by default. Verify task is changed to not to run by default before a
job starts. Cleanup job will only be run if a job is scheduled to reboot and
any test failed in that job.

BUG=chromium:220679
TEST=tested with run_suite in local machine
DEPLOY=afe,apache,scheduler,change all users' preference on reboot_after to
Never, sql: |update chromeos_autotest_db.afe_users set reboot_after=0|

Change-Id: Ia38baf6b73897b7e09fdf635eadedc752b5eba2f
Reviewed-on: https://gerrit.chromium.org/gerrit/48685
Commit-Queue: Dan Shi <dshi@chromium.org>
Reviewed-by: Dan Shi <dshi@chromium.org>
Tested-by: Dan Shi <dshi@chromium.org>
diff --git a/client/bin/site_sysinfo.py b/client/bin/site_sysinfo.py
index 2ba5a01..ddbab48 100755
--- a/client/bin/site_sysinfo.py
+++ b/client/bin/site_sysinfo.py
@@ -215,7 +215,7 @@
         if collect_init_status:
             self._get_init_status_of_src_dir(self.dir)
         elif os.path.exists(self.dir):
-                self._log_diff(self.dir, log_dir)
+            self._log_diff(self.dir, log_dir)
 
 
 class purgeable_logdir(logdir):
diff --git a/client/common_lib/host_queue_entry_states.py b/client/common_lib/host_queue_entry_states.py
index e0f94fa..00e5392 100644
--- a/client/common_lib/host_queue_entry_states.py
+++ b/client/common_lib/host_queue_entry_states.py
@@ -7,13 +7,15 @@
 
 from autotest_lib.client.common_lib import enum
 
-Status_list = ['Queued', 'Starting', 'Verifying', 'Pending', 'Waiting',
-               'Running', 'Gathering', 'Parsing', 'Archiving', 'Aborted',
-               'Completed', 'Failed', 'Stopped', 'Template']
+Status_list = ['Queued', 'Starting', 'Resetting', 'Verifying', 'Pending',
+               'Waiting', 'Running', 'Gathering', 'Parsing', 'Archiving',
+               'Aborted', 'Completed', 'Failed', 'Stopped', 'Cleaning',
+               'Template']
 
 Status = enum.Enum(*Status_list, string_values=True)
-ACTIVE_STATUSES = (Status.STARTING, Status.VERIFYING, Status.PENDING,
-                   Status.RUNNING, Status.GATHERING)
+ACTIVE_STATUSES = (Status.STARTING, Status.RESETTING, Status.VERIFYING,
+                   Status.PENDING, Status.RUNNING, Status.GATHERING,
+                   Status.CLEANING)
 COMPLETE_STATUSES = (Status.ABORTED, Status.COMPLETED, Status.FAILED,
                      Status.STOPPED, Status.TEMPLATE)
 
diff --git a/frontend/afe/doctests/001_rpc_test.txt b/frontend/afe/doctests/001_rpc_test.txt
index f5acb7b..39b7743 100644
--- a/frontend/afe/doctests/001_rpc_test.txt
+++ b/frontend/afe/doctests/001_rpc_test.txt
@@ -156,7 +156,8 @@
 ...           'test_type': 'Client',
 ...           'test_class': 'Kernel',
 ...           'test_time': 'SHORT',
-...           'run_verify': 1,
+...           'run_verify': 0,
+...           'run_reset': 1,
 ...           'test_category': 'Functional',
 ...           'path': '/my/path',
 ...           'test_retry': 0}]
@@ -188,7 +189,7 @@
 ...           'login': 'showard',
 ...           'access_level': 1,
 ...           'reboot_before': 'If dirty',
-...           'reboot_after': 'Always',
+...           'reboot_after': 'Never',
 ...           'drone_set': None,
 ...           'show_experimental': False}]
 True
@@ -541,10 +542,11 @@
 ...         'timeout': 24,
 ...         'max_runtime_mins': 1440,
 ...         'max_runtime_hrs' : 72,
-...         'run_verify': True,
+...         'run_verify': False,
+...         'run_reset': True,
 ...         'email_list': '',
 ...         'reboot_before': 'If dirty',
-...         'reboot_after': 'Always',
+...         'reboot_after': 'Never',
 ...         'parse_failed_repair': True,
 ...         'drone_set': drone_set,
 ...         'parameterized_job': None,
diff --git a/frontend/afe/models.py b/frontend/afe/models.py
index 6f66d74..2a528e2 100644
--- a/frontend/afe/models.py
+++ b/frontend/afe/models.py
@@ -21,7 +21,7 @@
 
 # job options and user preferences
 DEFAULT_REBOOT_BEFORE = model_attributes.RebootBefore.IF_DIRTY
-DEFAULT_REBOOT_AFTER = model_attributes.RebootBefore.ALWAYS
+DEFAULT_REBOOT_AFTER = model_attributes.RebootBefore.NEVER
 
 
 class AclAccessViolation(Exception):
@@ -359,7 +359,7 @@
     dirty: true if the host has been used without being rebooted
     """
     Status = enum.Enum('Verifying', 'Running', 'Ready', 'Repairing',
-                       'Repair Failed', 'Cleaning', 'Pending',
+                       'Repair Failed', 'Cleaning', 'Pending', 'Resetting',
                        string_values=True)
     Protection = host_protections.Protection
 
@@ -576,6 +576,7 @@
                        test dependencies.
     experimental: If this is set to True production servers will ignore the test
     run_verify: Whether or not the scheduler should run the verify stage
+    run_reset: Whether or not the scheduler should run the reset stage
     test_retry: Number of times to retry test if the test did not complete
                 successfully. (optional, default: 0)
     """
@@ -588,7 +589,7 @@
     dependencies = dbmodels.CharField(max_length=255, blank=True)
     description = dbmodels.TextField(blank=True)
     experimental = dbmodels.BooleanField(default=True)
-    run_verify = dbmodels.BooleanField(default=True)
+    run_verify = dbmodels.BooleanField(default=False)
     test_time = dbmodels.SmallIntegerField(choices=TestTime.choices(),
                                            default=TestTime.MEDIUM)
     test_type = dbmodels.SmallIntegerField(
@@ -596,6 +597,7 @@
     sync_count = dbmodels.IntegerField(default=1)
     path = dbmodels.CharField(max_length=255, unique=True)
     test_retry = dbmodels.IntegerField(blank=True, default=0)
+    run_reset = dbmodels.BooleanField(default=True)
 
     dependency_labels = (
         dbmodels.ManyToManyField(Label, blank=True,
@@ -1000,6 +1002,7 @@
     submitted_on: date of job submission
     synch_count: how many hosts should be used per autoserv execution
     run_verify: Whether or not to run the verify phase
+    run_reset: Whether or not to run the reset phase
     timeout: hours from queuing time until job times out
     max_runtime_hrs: DEPRECATED - hours from job starting time until job
                      times out
@@ -1044,7 +1047,7 @@
     created_on = dbmodels.DateTimeField()
     synch_count = dbmodels.IntegerField(null=True, default=1)
     timeout = dbmodels.IntegerField(default=DEFAULT_TIMEOUT)
-    run_verify = dbmodels.BooleanField(default=True)
+    run_verify = dbmodels.BooleanField(default=False)
     email_list = dbmodels.CharField(max_length=250, blank=True)
     dependency_labels = (
             dbmodels.ManyToManyField(Label, blank=True,
@@ -1070,6 +1073,8 @@
 
     test_retry = dbmodels.IntegerField(blank=True, default=0)
 
+    run_reset = dbmodels.BooleanField(default=True)
+
     # custom manager
     objects = JobManager()
 
@@ -1165,7 +1170,8 @@
             drone_set=drone_set,
             parameterized_job=parameterized_job,
             parent_job=options.get('parent_job_id'),
-            test_retry=options.get('test_retry'))
+            test_retry=options.get('test_retry'),
+            run_reset=options.get('run_reset'))
 
         job.dependency_labels = options['dependencies']
 
@@ -1540,7 +1546,8 @@
     queue_entry: Host queue entry waiting on this task (or None, if task was not
                  started in preparation of a job)
     """
-    Task = enum.Enum('Verify', 'Cleanup', 'Repair', string_values=True)
+    Task = enum.Enum('Verify', 'Cleanup', 'Repair', 'Reset',
+                     string_values=True)
 
     host = dbmodels.ForeignKey(Host, blank=False, null=False)
     task = dbmodels.CharField(max_length=64, choices=Task.choices(),
diff --git a/frontend/afe/resources.py b/frontend/afe/resources.py
index 8c9ced3..04171b1 100644
--- a/frontend/afe/resources.py
+++ b/frontend/afe/resources.py
@@ -436,6 +436,7 @@
             'dependencies': [],
             'machines_per_execution': 1,
             'run_verify': bool(_job_fields['run_verify'].default),
+            'run_reset': bool(_job_fields['run_reset'].default),
             'timeout_hrs': _job_fields['timeout'].default,
             'maximum_runtime_mins': _job_fields['max_runtime_mins'].default,
             'cleanup_before_job':
@@ -477,6 +478,7 @@
                                  in job.dependency_labels.all()],
                 'machines_per_execution': job.synch_count,
                 'run_verify': bool(job.run_verify),
+                'run_reset': bool(job.run_reset),
                 'timeout_hrs': job.timeout,
                 'maximum_runtime_mins': job.max_runtime_mins,
                 'cleanup_before_job':
@@ -686,6 +688,7 @@
                 max_runtime_mins=execution_info.get('maximum_runtime_mins'),
                 synch_count=execution_info.get('machines_per_execution'),
                 run_verify=execution_info.get('run_verify'),
+                run_reset=execution_info.get('run_reset'),
                 email_list=input_dict.get('email_list', None),
                 dependencies=execution_info.get('dependencies', ()),
                 reboot_before=execution_info.get('cleanup_before_job'),
diff --git a/frontend/afe/resources_test.py b/frontend/afe/resources_test.py
index 1f679ee..de7b53a 100755
--- a/frontend/afe/resources_test.py
+++ b/frontend/afe/resources_test.py
@@ -343,9 +343,10 @@
         self.assertEquals(info['control_file'], self.CONTROL_FILE_CONTENTS)
         self.assertEquals(info['is_server'], False)
         self.assertEquals(info['cleanup_before_job'], 'Never')
-        self.assertEquals(info['cleanup_after_job'], 'Always')
+        self.assertEquals(info['cleanup_after_job'], 'Never')
         self.assertEquals(info['machines_per_execution'], 1)
-        self.assertEquals(info['run_verify'], True)
+        self.assertEquals(info['run_verify'], False)
+        self.assertEquals(info['run_reset'], True)
 
 
     def test_queue_entries(self):
diff --git a/frontend/afe/rpc_interface.py b/frontend/afe/rpc_interface.py
index d41d376..e910a61 100644
--- a/frontend/afe/rpc_interface.py
+++ b/frontend/afe/rpc_interface.py
@@ -419,10 +419,11 @@
                              meta_hosts=(), one_time_hosts=(),
                              atomic_group_name=None, synch_count=None,
                              is_template=False, timeout=None,
-                             max_runtime_mins=None, run_verify=True,
+                             max_runtime_mins=None, run_verify=False,
                              email_list='', dependencies=(), reboot_before=None,
                              reboot_after=None, parse_failed_repair=None,
-                             hostless=False, keyvals=None, drone_set=None):
+                             hostless=False, keyvals=None, drone_set=None,
+                             run_reset=True):
     """
     Creates and enqueues a parameterized job.
 
@@ -498,11 +499,11 @@
 def create_job(name, priority, control_file, control_type,
                hosts=(), meta_hosts=(), one_time_hosts=(),
                atomic_group_name=None, synch_count=None, is_template=False,
-               timeout=None, max_runtime_mins=None, run_verify=True,
+               timeout=None, max_runtime_mins=None, run_verify=False,
                email_list='', dependencies=(), reboot_before=None,
                reboot_after=None, parse_failed_repair=None, hostless=False,
                keyvals=None, drone_set=None, image=None, parent_job_id=None,
-               test_retry=0):
+               test_retry=0, run_reset=True):
     """\
     Create and enqueue a job.
 
@@ -536,10 +537,10 @@
     @param parent_job_id id of a job considered to be parent of created job.
     @param test_retry: Number of times to retry test if the test did not
                        complete successfully. (optional, default: 0)
+    @param run_reset: Should the host be reset before running the test?
 
     @returns The created Job id number.
     """
-
     stats.Counter('create_job').increment()
     # Force control files to only contain ascii characters.
     try:
@@ -913,7 +914,8 @@
                                    "Gathering": "Gathering log files",
                                    "Template": "Template job for recurring run",
                                    "Waiting": "Waiting for scheduler action",
-                                   "Archiving": "Archiving results"}
+                                   "Archiving": "Archiving results",
+                                   "Resetting": "Resetting hosts"}
     return result
 
 
diff --git a/frontend/afe/rpc_utils.py b/frontend/afe/rpc_utils.py
index 9c4f245..7d370d1 100644
--- a/frontend/afe/rpc_utils.py
+++ b/frontend/afe/rpc_utils.py
@@ -8,7 +8,7 @@
 
 import datetime, os, inspect
 import django.http
-from autotest_lib.frontend.afe import models, model_logic, model_attributes
+from autotest_lib.frontend.afe import models, model_logic
 from autotest_lib.client.common_lib import control_data
 
 NULL_DATETIME = datetime.datetime.max
@@ -648,7 +648,7 @@
                       reboot_before=None, reboot_after=None,
                       parse_failed_repair=None, hostless=False, keyvals=None,
                       drone_set=None, parameterized_job=None,
-                      parent_job_id=None, test_retry=0):
+                      parent_job_id=None, test_retry=0, run_reset=True):
     #pylint: disable-msg=C0111
     """
     Common code between creating "standard" jobs and creating parameterized jobs
@@ -753,7 +753,8 @@
                    drone_set=drone_set,
                    parameterized_job=parameterized_job,
                    parent_job_id=parent_job_id,
-                   test_retry=test_retry)
+                   test_retry=test_retry,
+                   run_reset=run_reset)
     return create_new_job(owner=owner,
                           options=options,
                           host_objects=host_objects,
diff --git a/frontend/client/src/autotest/afe/HostDetailView.java b/frontend/client/src/autotest/afe/HostDetailView.java
index be86941..e5fc3d3 100644
--- a/frontend/client/src/autotest/afe/HostDetailView.java
+++ b/frontend/client/src/autotest/afe/HostDetailView.java
@@ -256,7 +256,8 @@
         selectionManager.setSelectableRowFilter(this);
         jobsTable.setWidgetFactory(selectionManager);
         tableDecorator.addTableActionsPanel(this, true);
-        tableDecorator.addControl("Show verifies, repairs and cleanups", showSpecialTasks);
+        tableDecorator.addControl("Show verifies, repairs, cleanups and resets",
+                                  showSpecialTasks);
         addWidget(tableDecorator, "view_host_jobs_table");
 
         showSpecialTasks.addClickHandler(new ClickHandler() {
diff --git a/frontend/client/src/autotest/afe/create/CreateJobViewDisplay.java b/frontend/client/src/autotest/afe/create/CreateJobViewDisplay.java
index 4fd2f79..affcaa7 100644
--- a/frontend/client/src/autotest/afe/create/CreateJobViewDisplay.java
+++ b/frontend/client/src/autotest/afe/create/CreateJobViewDisplay.java
@@ -50,6 +50,7 @@
     private TextBox testRetry = new TextBox();
     private TextBox emailList = new TextBox();
     private CheckBoxImpl skipVerify = new CheckBoxImpl();
+    private CheckBoxImpl skipReset = new CheckBoxImpl();
     private RadioChooserDisplay rebootBefore = new RadioChooserDisplay();
     private RadioChooserDisplay rebootAfter = new RadioChooserDisplay();
     private CheckBox parseFailedRepair = new CheckBox();
@@ -108,6 +109,7 @@
         panel.add(emailList, "create_email_list");
         panel.add(priorityList, "create_priority");
         panel.add(skipVerify, "create_skip_verify");
+        panel.add(skipReset, "create_skip_reset");
         panel.add(rebootBefore, "create_reboot_before");
         panel.add(rebootAfter, "create_reboot_after");
         panel.add(parseFailedRepair, "create_parse_failed_repair");
@@ -218,6 +220,10 @@
         return skipVerify;
     }
 
+    public ICheckBox getSkipReset() {
+      return skipReset;
+    }
+
     public IButton getSubmitJobButton() {
         return submitJobButton;
     }
diff --git a/frontend/client/src/autotest/afe/create/CreateJobViewPresenter.java b/frontend/client/src/autotest/afe/create/CreateJobViewPresenter.java
index f136ac4..e59a1fc 100644
--- a/frontend/client/src/autotest/afe/create/CreateJobViewPresenter.java
+++ b/frontend/client/src/autotest/afe/create/CreateJobViewPresenter.java
@@ -69,6 +69,7 @@
         public HasText getTestRetry();
         public HasText getEmailList();
         public ICheckBox getSkipVerify();
+        public ICheckBox getSkipReset();
         public RadioChooser.Display getRebootBefore();
         public RadioChooser.Display getRebootAfter();
         public HasValue<Boolean> getParseFailedRepair();
@@ -175,6 +176,7 @@
                 jobObject.get("email_list").isString().stringValue());
 
         display.getSkipVerify().setValue(!jobObject.get("run_verify").isBoolean().booleanValue());
+        display.getSkipReset().setValue(!jobObject.get("run_reset").isBoolean().booleanValue());
         rebootBefore.setSelectedChoice(Utils.jsonToString(jobObject.get("reboot_before")));
         rebootAfter.setSelectedChoice(Utils.jsonToString(jobObject.get("reboot_after")));
         display.getParseFailedRepair().setValue(
@@ -384,6 +386,24 @@
         }
     }
 
+    public void handleSkipReset() {
+        boolean shouldSkipReset = false;
+        for (JSONObject test : testSelector.getSelectedTests()) {
+            boolean runReset = test.get("run_reset").isBoolean().booleanValue();
+            if (!runReset) {
+                shouldSkipReset = true;
+                break;
+            }
+        }
+
+        if (shouldSkipReset) {
+            display.getSkipReset().setValue(true);
+            display.getSkipReset().setEnabled(false);
+        } else {
+            display.getSkipReset().setEnabled(true);
+        }
+    }
+
     protected int getMaximumRetriesCount() {
         int maxRetries = 0;
         for (JSONObject test : testSelector.getSelectedTests()) {
@@ -396,6 +416,7 @@
         testSelector.setEnabled(true);
         profilersPanel.setEnabled(true);
         handleSkipVerify();
+        handleSkipReset();
         display.getKernel().setEnabled(true);
         display.getKernelCmdline().setEnabled(true);
         display.getImageUrl().setEnabled(true);
@@ -565,7 +586,8 @@
         display.getTestRetry().setText("");
         display.getEmailList().setText("");
         testSelector.reset();
-        display.getSkipVerify().setValue(false);
+        display.getSkipVerify().setValue(true);
+        display.getSkipReset().setValue(false);
         profilersPanel.reset();
         setInputsEnabled();
         controlTypeSelect.setControlType(TestSelector.CLIENT_TYPE);
@@ -624,6 +646,8 @@
                 args.put("email_list", new JSONString(display.getEmailList().getText()));
                 args.put("run_verify", JSONBoolean.getInstance(
                         !display.getSkipVerify().getValue()));
+                args.put("run_reset", JSONBoolean.getInstance(
+                        !display.getSkipReset().getValue()));
                 args.put("is_template", JSONBoolean.getInstance(isTemplate));
                 args.put("dependencies", getSelectedDependencies());
                 args.put("reboot_before", new JSONString(rebootBefore.getSelectedChoice()));
diff --git a/frontend/client/src/autotest/public/AfeClient.html b/frontend/client/src/autotest/public/AfeClient.html
index 740b2ae..45d29e5 100644
--- a/frontend/client/src/autotest/public/AfeClient.html
+++ b/frontend/client/src/autotest/public/AfeClient.html
@@ -140,6 +140,8 @@
               <td id="create_email_list"></td><td></td></tr>
           <tr><td class="field-name">Skip verify:</td>
               <td id="create_skip_verify"></td><td></td></tr>
+          <tr><td class="field-name">Skip reset:</td>
+              <td id="create_skip_reset"></td><td></td></tr>
           <tr><td class="field-name">Reboot before:</td>
               <td id="create_reboot_before"></td><td></td></tr>
           <tr><td class="field-name">Reboot after:</td>
diff --git a/frontend/migrations/077_add_run_reset.py b/frontend/migrations/077_add_run_reset.py
new file mode 100644
index 0000000..b71ab9a
--- /dev/null
+++ b/frontend/migrations/077_add_run_reset.py
@@ -0,0 +1,9 @@
+UP_SQL = """
+ALTER TABLE afe_autotests ADD COLUMN run_reset SMALLINT NOT NULL DEFAULT '1';
+ALTER TABLE afe_jobs ADD COLUMN run_reset SMALLINT NOT NULL DEFAULT '1';
+"""
+
+DOWN_SQL = """
+ALTER TABLE afe_autotests DROP COLUMN run_reset;
+ALTER TABLE afe_jobs DROP COLUMN run_reset;
+"""
\ No newline at end of file
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index 6c62962..dd1ea49 100755
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -516,7 +516,8 @@
         """
         self._assert_host_has_no_agent(special_task)
 
-        special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
+        special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask,
+                                      ResetTask)
         for agent_task_class in special_agent_task_classes:
             if agent_task_class.TASK_TYPE == special_task.task:
                 return agent_task_class(task=special_task)
@@ -614,7 +615,8 @@
         # reorder tasks by priority
         task_priority_order = [models.SpecialTask.Task.REPAIR,
                                models.SpecialTask.Task.CLEANUP,
-                               models.SpecialTask.Task.VERIFY]
+                               models.SpecialTask.Task.VERIFY,
+                               models.SpecialTask.Task.RESET]
         def task_priority_key(task):
             return task_priority_order.index(task.task)
         return sorted(queued_tasks, key=task_priority_key)
@@ -1595,6 +1597,24 @@
                 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
 
 
+    def remove_special_tasks(self, special_task_to_remove, keep_last_one=False):
+        """Remove a type of special task in all tasks, keep last one if needed.
+
+        @param special_task_to_remove: type of special task to be removed, e.g.,
+            models.SpecialTask.Task.VERIFY.
+        @param keep_last_one: True to keep the last special task if its type is
+            the same as of special_task_to_remove.
+
+        """
+        queued_special_tasks = models.SpecialTask.objects.filter(
+            host__id=self.host.id,
+            task=special_task_to_remove,
+            is_active=False, is_complete=False, queue_entry=None)
+        if keep_last_one:
+            queued_special_tasks = queued_special_tasks.exclude(id=self.task.id)
+        queued_special_tasks.delete()
+
+
 class RepairTask(SpecialAgentTask):
     TASK_TYPE = models.SpecialTask.Task.REPAIR
 
@@ -1729,12 +1749,8 @@
 
         # Delete any queued manual reverifies for this host.  One verify will do
         # and there's no need to keep records of other requests.
-        queued_verifies = models.SpecialTask.objects.filter(
-            host__id=self.host.id,
-            task=models.SpecialTask.Task.VERIFY,
-            is_active=False, is_complete=False, queue_entry=None)
-        queued_verifies = queued_verifies.exclude(id=self.task.id)
-        queued_verifies.delete()
+        self.remove_special_tasks(models.SpecialTask.Task.VERIFY,
+                                  keep_last_one=True)
 
 
     def epilog(self):
@@ -1764,7 +1780,7 @@
         logging.info("starting cleanup task for host: %s", self.host.hostname)
         self.host.set_status(models.Host.Status.CLEANING)
         if self.queue_entry:
-            self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
+            self.queue_entry.set_status(models.HostQueueEntry.Status.CLEANING)
 
 
     def _finish_epilog(self):
@@ -1796,6 +1812,52 @@
         self._finish_epilog()
 
 
+class ResetTask(PreJobTask):
+    """Task to reset a DUT, including cleanup and verify."""
+    # note this can also run post-job, but when it does, it's running standalone
+    # against the host (not related to the job), so it's not considered a
+    # PostJobTask
+
+    TASK_TYPE = models.SpecialTask.Task.RESET
+
+
+    def __init__(self, task, recover_run_monitor=None):
+        super(ResetTask, self).__init__(task, ['--reset'])
+        self._set_ids(host=self.host, queue_entries=[self.queue_entry])
+
+
+    def prolog(self):
+        super(ResetTask, self).prolog()
+        logging.info('starting reset task for host: %s',
+                     self.host.hostname)
+        self.host.set_status(models.Host.Status.RESETTING)
+        if self.queue_entry:
+            self.queue_entry.set_status(models.HostQueueEntry.Status.RESETTING)
+
+        # Delete any queued cleanups for this host.
+        self.remove_special_tasks(models.SpecialTask.Task.CLEANUP,
+                                  keep_last_one=False)
+
+        # Delete any queued reverifies for this host.
+        self.remove_special_tasks(models.SpecialTask.Task.VERIFY,
+                                  keep_last_one=False)
+
+        # Only one reset is needed.
+        self.remove_special_tasks(models.SpecialTask.Task.RESET,
+                                  keep_last_one=True)
+
+
+    def epilog(self):
+        super(ResetTask, self).epilog()
+
+        if self.success:
+            self.host.update_field('dirty', 0)
+            self.host.set_status(models.Host.Status.READY)
+
+            if self.queue_entry:
+                self.queue_entry.on_pending()
+
+
 class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
     """
     Common functionality for QueueTask and HostlessQueueTask
@@ -2112,14 +2174,14 @@
         else:
             final_success = False
             num_tests_failed = 0
-
         reboot_after = self._job.reboot_after
         do_reboot = (
                 # always reboot after aborted jobs
                 self._final_status() == models.HostQueueEntry.Status.ABORTED
                 or reboot_after == model_attributes.RebootAfter.ALWAYS
                 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
-                    and final_success and num_tests_failed == 0))
+                    and final_success and num_tests_failed == 0)
+                or num_tests_failed > 0)
 
         for queue_entry in self.queue_entries:
             if do_reboot:
diff --git a/scheduler/scheduler_models.py b/scheduler/scheduler_models.py
index bc8da6f..7053fb6 100644
--- a/scheduler/scheduler_models.py
+++ b/scheduler/scheduler_models.py
@@ -677,9 +677,9 @@
 
 
     def _do_schedule_pre_job_tasks(self):
-        # Every host goes thru the Verifying stage (which may or may not
+        # Every host goes thru the Resetting stage (which may or may not
         # actually do anything as determined by get_pre_job_tasks).
-        self.set_status(models.HostQueueEntry.Status.VERIFYING)
+        self.set_status(models.HostQueueEntry.Status.RESETTING)
         self.job.schedule_pre_job_tasks(queue_entry=self)
 
 
@@ -757,7 +757,8 @@
                            Status.WAITING):
             assert not dispatcher.get_agents_for_entry(self)
             self.host.set_status(models.Host.Status.READY)
-        elif self.status == Status.VERIFYING:
+        elif (self.status == Status.VERIFYING or
+              self.status == Status.RESETTING):
             models.SpecialTask.objects.create(
                     task=models.SpecialTask.Task.CLEANUP,
                     host=models.Host.objects.get(id=self.host.id),
@@ -828,7 +829,7 @@
                'run_verify', 'email_list', 'reboot_before', 'reboot_after',
                'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id',
                'parameterized_job_id', 'max_runtime_mins', 'parent_job_id',
-               'test_retry')
+               'test_retry', 'run_reset')
     _timer = stats.Timer("scheduler_models.Job")
 
     # This does not need to be a column in the DB.  The delays are likely to
@@ -1202,6 +1203,15 @@
         return self.run_verify
 
 
+    def _should_run_reset(self, queue_entry):
+        can_verify = (queue_entry.host.protection !=
+                         host_protections.Protection.DO_NOT_VERIFY)
+        can_reboot = self.reboot_before != model_attributes.RebootBefore.NEVER
+        return (can_reboot and can_verify and (self.run_reset or
+                (self._should_run_cleanup(queue_entry) and
+                 self._should_run_verify(queue_entry))))
+
+
     def _queue_special_task(self, queue_entry, task):
         """
         Create a special task and associate it with a host queue entry.
@@ -1231,12 +1241,18 @@
         task_queued = False
         hqe_model = models.HostQueueEntry.objects.get(id=queue_entry.id)
 
-        if self._should_run_cleanup(queue_entry):
-            self._queue_special_task(hqe_model, models.SpecialTask.Task.CLEANUP)
+        if self._should_run_reset(queue_entry):
+            self._queue_special_task(hqe_model, models.SpecialTask.Task.RESET)
             task_queued = True
-        if self._should_run_verify(queue_entry):
-            self._queue_special_task(hqe_model, models.SpecialTask.Task.VERIFY)
-            task_queued = True
+        else:
+            if self._should_run_cleanup(queue_entry):
+                self._queue_special_task(hqe_model,
+                                         models.SpecialTask.Task.CLEANUP)
+                task_queued = True
+            if self._should_run_verify(queue_entry):
+                self._queue_special_task(hqe_model,
+                                         models.SpecialTask.Task.VERIFY)
+                task_queued = True
 
         if not task_queued:
             queue_entry.on_pending()
diff --git a/scheduler/scheduler_models_unittest.py b/scheduler/scheduler_models_unittest.py
index 92b5d56..9b73694 100755
--- a/scheduler/scheduler_models_unittest.py
+++ b/scheduler/scheduler_models_unittest.py
@@ -256,13 +256,15 @@
         self.god.stub_with(models.SpecialTask.objects, 'create', _mock_create)
 
 
-    def _test_pre_job_tasks_helper(self):
+    def _test_pre_job_tasks_helper(self,
+                            reboot_before=model_attributes.RebootBefore.ALWAYS):
         """
         Calls HQE._do_schedule_pre_job_tasks() and returns the created special
         task
         """
         self._tasks = []
         queue_entry = scheduler_models.HostQueueEntry.fetch('id = 1')[0]
+        queue_entry.job.reboot_before = reboot_before
         queue_entry._do_schedule_pre_job_tasks()
         return self._tasks
 
@@ -320,7 +322,7 @@
 
         tasks = self._test_pre_job_tasks_helper()
 
-        self._check_special_tasks(tasks, [(models.SpecialTask.Task.VERIFY, 1)])
+        self._check_special_tasks(tasks, [(models.SpecialTask.Task.RESET, 1)])
 
 
     def test_run_asynchronous_skip_verify(self):
@@ -330,7 +332,7 @@
 
         tasks = self._test_pre_job_tasks_helper()
 
-        self.assertEquals(tasks, [])
+        self._check_special_tasks(tasks, [(models.SpecialTask.Task.RESET, 1)])
 
 
     def test_run_synchronous_verify(self):
@@ -338,7 +340,7 @@
 
         tasks = self._test_pre_job_tasks_helper()
 
-        self._check_special_tasks(tasks, [(models.SpecialTask.Task.VERIFY, 1)])
+        self._check_special_tasks(tasks, [(models.SpecialTask.Task.RESET, 1)])
 
 
     def test_run_synchronous_skip_verify(self):
@@ -348,9 +350,42 @@
 
         tasks = self._test_pre_job_tasks_helper()
 
+        self._check_special_tasks(tasks, [(models.SpecialTask.Task.RESET, 1)])
+
+
+    def test_run_asynchronous_do_not_reset(self):
+        job = self._create_job(hosts=[1, 2])
+        job.run_reset = False
+        job.run_verify = False
+        job.save()
+
+        tasks = self._test_pre_job_tasks_helper()
+
         self.assertEquals(tasks, [])
 
 
+    def test_run_synchronous_do_not_reset_no_RebootBefore(self):
+        job = self._create_job(hosts=[1, 2], synchronous=True)
+        job.reboot_before = model_attributes.RebootBefore.NEVER
+        job.save()
+
+        tasks = self._test_pre_job_tasks_helper(
+                            reboot_before=model_attributes.RebootBefore.NEVER)
+
+        self.assertEqual(tasks, [])
+
+
+    def test_run_asynchronous_do_not_reset(self):
+        job = self._create_job(hosts=[1, 2], synchronous=False)
+        job.reboot_before = model_attributes.RebootBefore.NEVER
+        job.save()
+
+        tasks = self._test_pre_job_tasks_helper(
+                            reboot_before=model_attributes.RebootBefore.NEVER)
+
+        self.assertEqual(tasks, [])
+
+
     def test_run_atomic_group_already_started(self):
         self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True)
         self._update_hqe("status='Starting', execution_subdir=''")
@@ -371,34 +406,29 @@
         tasks = self._test_pre_job_tasks_helper()
 
         self._check_special_tasks(tasks, [
-                (models.SpecialTask.Task.CLEANUP, None),
-                (models.SpecialTask.Task.VERIFY, None),
+                (models.SpecialTask.Task.RESET, None)
             ])
 
 
-    def _test_reboot_before_if_dirty_helper(self, expect_reboot):
+    def _test_reboot_before_if_dirty_helper(self):
         job = self._create_job(hosts=[1])
         job.reboot_before = model_attributes.RebootBefore.IF_DIRTY
         job.save()
 
         tasks = self._test_pre_job_tasks_helper()
-
-        task_types = []
-        if expect_reboot:
-            task_types.append((models.SpecialTask.Task.CLEANUP, None))
-        task_types.append((models.SpecialTask.Task.VERIFY, None))
+        task_types = [(models.SpecialTask.Task.RESET, None)]
 
         self._check_special_tasks(tasks, task_types)
 
 
     def test_reboot_before_if_dirty(self):
         models.Host.smart_get(1).update_object(dirty=True)
-        self._test_reboot_before_if_dirty_helper(True)
+        self._test_reboot_before_if_dirty_helper()
 
 
     def test_reboot_before_not_dirty(self):
         models.Host.smart_get(1).update_object(dirty=False)
-        self._test_reboot_before_if_dirty_helper(False)
+        self._test_reboot_before_if_dirty_helper()
 
 
     def test_next_group_name(self):
diff --git a/scheduler/site_monitor_db.py b/scheduler/site_monitor_db.py
index 73f72fb..04c6857 100644
--- a/scheduler/site_monitor_db.py
+++ b/scheduler/site_monitor_db.py
@@ -2,6 +2,8 @@
 # Use of this source code is governed by a BSD-style license that can be
 # found in the LICENSE file.
 
+#pylint: disable-msg=C0111
+
 import os
 import logging
 
@@ -13,6 +15,10 @@
 
 # Override default parser with our site parser.
 def parser_path(install_dir):
+    """Return site implementation of parser.
+
+    @param install_dir: installation directory.
+    """
     return os.path.join(install_dir, 'tko', 'site_parse')
 
 
@@ -135,7 +141,7 @@
         """
         This is an altered version of _reverify_hosts_where the class to
         models.SpecialTask.objects.create passes in an argument for
-        requested_by, in order to allow the Cleanup task to be created
+        requested_by, in order to allow the Reset task to be created
         properly.
         """
         full_where='locked = 0 AND invalid = 0 AND ' + where
@@ -154,22 +160,24 @@
                 user = models.User.objects.get(
                         id=SiteDispatcher.DEFAULT_REQUESTED_BY_USER_ID)
             models.SpecialTask.objects.create(
-                    task=models.SpecialTask.Task.CLEANUP,
+                    task=models.SpecialTask.Task.RESET,
                     host=models.Host.objects.get(id=host.id),
                     requested_by=user)
 
 
     def _check_for_unrecovered_verifying_entries(self):
+        # Verify is replaced by Reset.
         queue_entries = scheduler_models.HostQueueEntry.fetch(
-                where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
+                where='status = "%s"' % models.HostQueueEntry.Status.RESETTING)
         for queue_entry in queue_entries:
             special_tasks = models.SpecialTask.objects.filter(
                     task__in=(models.SpecialTask.Task.CLEANUP,
-                              models.SpecialTask.Task.VERIFY),
+                              models.SpecialTask.Task.VERIFY,
+                              models.SpecialTask.Task.RESET),
                     queue_entry__id=queue_entry.id,
                     is_complete=False)
             if special_tasks.count() == 0:
-                logging.error('Unrecovered Verifying host queue entry: %s. '
+                logging.error('Unrecovered Resetting host queue entry: %s. '
                               'Setting status to Queued.', str(queue_entry))
                 # Essentially this host queue entry was set to be Verifying
                 # however no special task exists for entry. This occurs if the
diff --git a/server/autoserv b/server/autoserv
index 1cfde22..10c2875 100755
--- a/server/autoserv
+++ b/server/autoserv
@@ -83,6 +83,7 @@
     repair = parser.options.repair
     cleanup = parser.options.cleanup
     provision = parser.options.provision
+    reset = parser.options.reset
     no_tee = parser.options.no_tee
     parse_job = parser.options.parse_job
     execution_tag = parser.options.execution_tag
@@ -106,7 +107,7 @@
         parser.parser.error("Cannot specify provisioning and client!")
 
     is_special_task = (verify or repair or cleanup or collect_crashinfo or
-                       provision)
+                       provision or reset)
     if len(parser.args) < 1 and not is_special_task:
         parser.parser.error("Missing argument: control file")
 
@@ -160,6 +161,8 @@
                 job.verify()
             elif provision:
                 job.provision(provision)
+            elif reset:
+                job.reset()
             else:
                 job.run(cleanup, install_before, install_after,
                         verify_job_repo_url=verify_job_repo_url,
diff --git a/server/autoserv_parser.py b/server/autoserv_parser.py
index 12d405d..8469bab 100644
--- a/server/autoserv_parser.py
+++ b/server/autoserv_parser.py
@@ -1,3 +1,5 @@
+# pylint: disable-msg=C0111
+
 import os, sys, optparse
 
 from autotest_lib.client.common_lib import host_protections, utils
@@ -76,6 +78,10 @@
                                help="cleanup all machines after the job")
         self.parser.add_option("--provision", action="store",
                                help="Labels to provision the machine to.")
+        self.parser.add_option("-T", "--reset", action="store_true",
+                               default=False,
+                               help="Reset (cleanup and verify) all machines"
+                               "after the job")
         self.parser.add_option("-n", action="store_true",
                                dest="no_tee", default=False,
                                help="no teeing the status to stdout/err")
diff --git a/server/control_segments/cleanup b/server/control_segments/cleanup
index ad0c814..d63f7ac 100644
--- a/server/control_segments/cleanup
+++ b/server/control_segments/cleanup
@@ -6,6 +6,9 @@
         host = hosts.create_host(machine, initialize=False, auto_monitor=False)
         timer = stats.Timer('cleanup_time.%s' % host._get_board_from_afe())
         timer.start()
+        log_dir = os.path.join(job.resultdir, machine)
+        os.makedirs(log_dir)
+        host.get_file('/var/log/', log_dir, preserve_symlinks=True)
         host.cleanup()
     finally:
         if timer:
diff --git a/server/control_segments/reset b/server/control_segments/reset
new file mode 100644
index 0000000..90dd651
--- /dev/null
+++ b/server/control_segments/reset
@@ -0,0 +1,27 @@
+import sys
+
+from autotest_lib.site_utils.graphite import stats
+
+def reset(machine):
+    print 'Starting to reset host ' + machine
+    timer = None
+    try:
+        host = hosts.create_host(machine, initialize=False, auto_monitor=False)
+        timer = stats.Timer('reset_time.%s' %
+                            host._get_board_from_afe())
+        timer.start()
+        # Assume cleanup always runs first.
+        host.cleanup()
+        host.verify()
+        job.record('GOOD', None, 'reset',
+                   '%s is reset successfully' % machine)
+    except Exception as e:
+        msg = 'reset failed: %s' % e
+        job.record('FAIL', None, 'reset', msg)
+        raise
+    finally:
+        if timer:
+            timer.stop()
+
+
+job.parallel_simple(reset, machines)
diff --git a/server/server_job.py b/server/server_job.py
index fcf7d77..2f50492 100644
--- a/server/server_job.py
+++ b/server/server_job.py
@@ -1,3 +1,5 @@
+# pylint: disable-msg=C0111
+
 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
 # Use of this source code is governed by a BSD-style license that can be
 # found in the LICENSE file.
@@ -39,6 +41,7 @@
 REPAIR_CONTROL_FILE = _control_segment_path('repair')
 PROVISION_CONTROL_FILE = _control_segment_path('provision')
 VERIFY_JOB_REPO_URL_CONTROL_FILE = _control_segment_path('verify_job_repo_url')
+RESET_CONTROL_FILE = _control_segment_path('reset')
 
 
 # by default provide a stub that generates no site data
@@ -334,6 +337,7 @@
 
 
     def verify(self):
+        """Verify machines are all ssh-able."""
         if not self.machines:
             raise error.AutoservError('No machines specified to verify')
         if self.resultdir:
@@ -350,6 +354,26 @@
             raise
 
 
+    def reset(self):
+        """Reset machines by first cleanup then verify each machine."""
+        if not self.machines:
+            raise error.AutoservError('No machines specified to reset.')
+        if self.resultdir:
+            os.chdir(self.resultdir)
+
+        try:
+            namespace = {'machines' : self.machines, 'job' : self,
+                         'ssh_user' : self._ssh_user,
+                         'ssh_port' : self._ssh_port,
+                         'ssh_pass' : self._ssh_pass}
+            self._execute_code(RESET_CONTROL_FILE, namespace, protect=False)
+        except Exception as e:
+            msg = ('Reset failed\n' + str(e) + '\n' +
+                   traceback.format_exc())
+            self.record('ABORT', None, None, msg)
+            raise
+
+
     def repair(self, host_protection):
         if not self.machines:
             raise error.AutoservError('No machines specified to repair')
@@ -878,6 +902,10 @@
         @param update_func - a function that updates the list of uncollected
             logs. Should take one parameter, the list to be updated.
         """
+        # Skip log collection if file _uncollected_log_file does not exist.
+        if not (self._uncollected_log_file and
+                os.path.exists(self._uncollected_log_file)):
+            return
         if self._uncollected_log_file:
             log_file = open(self._uncollected_log_file, "r+")
             fcntl.flock(log_file, fcntl.LOCK_EX)