Make drone_manager track running processes counts using only the information passed in from the scheduler.  Currently it also uses process counts derived from "ps", but that is an unreliable source of information.  This improves accuracy and consistency and gives us full control over the process.

This involves a few primary changes:
* made the drone_manager track process counts with each PidfileId
* added method declare_process_count() for the scheduler to indicate the process count of a pidfile ID during recovery (in other cases, the DroneManager gets that info in execute_process())

Doing this involved some extensive refactorings.  Because the scheduler now needs to declare process counts during recovery, and because the AgentTasks are the entities that know about process counts, it made sense to move the bulk of the recovery process to the AgentTasks.  Changes for this include:
* converted a bunch of AgentTask instance variables to abstract methods, and added overriding implementations in subclasses as necessary
* added methods register_necessary_pidfiles() and recover() to AgentTasks, allowing them to perform recovery for themselves.  got rid of the recover_run_monitor() argument to AgentTasks as a result.
* changed recovery code to delegate most of the work to the AgentTasks.  The flow now looks like this: create all AgentTasks, call them to register pidfiles, call DroneManager to refresh pidfile contents, call AgentTasks to recover themselves, perform extra cleanup and error checking.  This simplified the Dispatcher somewhat, in my opinion, though there's room for more simplification.

Other changes include:
* removed DroneManager.get_process_for(), which was unused, as well as related code (include the DroneManager._processes structure)
* moved logic from HostQueueEntry.handle_host_failure to SpecialAgentTask._fail_queue_entry.  That was the only call site.
And some other bug fixes:
* eliminated some extra state from QueueTask
* fixed models.HostQueueEntry.execution_path(). It was returning the wrong value, but it was never used.
* eliminated some big chunks from monitor_db_unittest.  These broke from the refactorings described above and I deemed it not worthwhile to fix them up for the new code.  I checked and the total coverage was unaffected by deleting these chunks.

Signed-off-by: Steve Howard <showard@google.com>


git-svn-id: http://test.kernel.org/svn/autotest/trunk@4007 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/scheduler/monitor_db_functional_test.py b/scheduler/monitor_db_functional_test.py
index 858bb52..f31cdf2 100644
--- a/scheduler/monitor_db_functional_test.py
+++ b/scheduler/monitor_db_functional_test.py
@@ -12,6 +12,7 @@
 _re_translator = database_connection.TranslatingDatabase.make_regexp_translator
 _DB_TRANSLATORS = (
         _re_translator(r'NOW\(\)', 'time("now")'),
+        _re_translator(r'LAST_INSERT_ID\(\)', 'LAST_INSERT_ROWID()'),
         # older SQLite doesn't support group_concat, so just don't bother until
         # it arises in an important query
         _re_translator(r'GROUP_CONCAT\((.*?)\)', r'\1'),
@@ -53,6 +54,17 @@
                          'parse')
 
 
+_PIDFILE_TO_PIDFILE_TYPE = {
+        monitor_db._AUTOSERV_PID_FILE: _PidfileType.JOB,
+        monitor_db._CRASHINFO_PID_FILE: _PidfileType.GATHER,
+        monitor_db._PARSER_PID_FILE: _PidfileType.PARSE,
+        }
+
+
+_PIDFILE_TYPE_TO_PIDFILE = dict((value, key) for key, value
+                                in _PIDFILE_TO_PIDFILE_TYPE.iteritems())
+
+
 class MockDroneManager(NullMethodObject):
     """
     Public attributes:
@@ -67,13 +79,20 @@
         Object to represent pidfile IDs that is opaque to the scheduler code but
         still debugging-friendly for us.
         """
-        def __init__(self, debug_string, num_processes):
-            self._debug_string = debug_string
+        def __init__(self, working_directory, pidfile_name, num_processes=None):
+            self._working_directory = working_directory
+            self._pidfile_name = pidfile_name
             self._num_processes = num_processes
+            self._paired_with_pidfile = None
+
+
+        def key(self):
+            """Key for MockDroneManager._pidfile_index"""
+            return (self._working_directory, self._pidfile_name)
 
 
         def __str__(self):
-            return self._debug_string
+            return os.path.join(self._working_directory, self._pidfile_name)
 
 
         def __repr__(self):
@@ -111,7 +130,7 @@
 
 
     def finish_specific_process(self, working_directory, pidfile_name):
-        pidfile_id = self._pidfile_index[(working_directory, pidfile_name)]
+        pidfile_id = self.pidfile_from_path(working_directory, pidfile_name)
         self._set_pidfile_exit_status(pidfile_id, 0)
 
 
@@ -138,6 +157,10 @@
                 if self._pidfiles[pidfile_id].process is not None]
 
 
+    def pidfile_from_path(self, working_directory, pidfile_name):
+        return self._pidfile_index[(working_directory, pidfile_name)]
+
+
     # DroneManager emulation APIs for use by monitor_db
 
     def get_orphaned_autoserv_processes(self):
@@ -179,14 +202,9 @@
 
     def _initialize_pidfile(self, pidfile_id):
         if pidfile_id not in self._pidfiles:
+            assert pidfile_id.key() not in self._pidfile_index
             self._pidfiles[pidfile_id] = drone_manager.PidfileContents()
-
-
-    _pidfile_type_map = {
-            monitor_db._AUTOSERV_PID_FILE: _PidfileType.JOB,
-            monitor_db._CRASHINFO_PID_FILE: _PidfileType.GATHER,
-            monitor_db._PARSER_PID_FILE: _PidfileType.PARSE,
-    }
+            self._pidfile_index[pidfile_id.key()] = pidfile_id
 
 
     def _set_last_pidfile(self, pidfile_id, working_directory, pidfile_name):
@@ -195,16 +213,19 @@
             type_string = working_directory.rsplit('-', 1)[1]
             pidfile_type = _PidfileType.get_value(type_string)
         else:
-            pidfile_type = self._pidfile_type_map[pidfile_name]
+            pidfile_type = _PIDFILE_TO_PIDFILE_TYPE[pidfile_name]
         self._last_pidfile_id[pidfile_type] = pidfile_id
 
 
     def execute_command(self, command, working_directory, pidfile_name,
                         num_processes, log_file=None, paired_with_pidfile=None,
                         username=None):
-        pidfile_id = self._DummyPidfileId(
-                self._get_pidfile_debug_string(working_directory, pidfile_name),
-                num_processes)
+        pidfile_id = self._DummyPidfileId(working_directory, pidfile_name)
+        if pidfile_id.key() in self._pidfile_index:
+            pidfile_id = self._pidfile_index[pidfile_id.key()]
+        pidfile_id._num_processes = num_processes
+        pidfile_id._paired_with_pidfile = paired_with_pidfile
+
         self._future_pidfiles.append(pidfile_id)
         self._initialize_pidfile(pidfile_id)
         self._pidfile_index[(working_directory, pidfile_name)] = pidfile_id
@@ -212,10 +233,6 @@
         return pidfile_id
 
 
-    def _get_pidfile_debug_string(self, working_directory, pidfile_name):
-        return os.path.join(working_directory, pidfile_name)
-
-
     def get_pidfile_contents(self, pidfile_id, use_second_read=False):
         if pidfile_id not in self._pidfiles:
             print 'Request for nonexistent pidfile %s' % pidfile_id
@@ -234,6 +251,10 @@
         self._unregistered_pidfiles.add(pidfile_id)
 
 
+    def declare_process_count(self, pidfile_id, num_processes):
+        pidfile_id.num_processes = num_processes
+
+
     def absolute_path(self, path):
         return 'absolute/' + path
 
@@ -244,12 +265,10 @@
 
 
     def get_pidfile_id_from(self, execution_tag, pidfile_name):
-        debug_string = ('Nonexistent pidfile: '
-                        + self._get_pidfile_debug_string(execution_tag,
-                                                         pidfile_name))
+        default_pidfile = self._DummyPidfileId(execution_tag, pidfile_name,
+                                               num_processes=0)
         return self._pidfile_index.get((execution_tag, pidfile_name),
-                                       self._DummyPidfileId(debug_string,
-                                                            num_processes=0))
+                                       default_pidfile)
 
 
     def kill_process(self, process):
@@ -276,11 +295,15 @@
         self._frontend_common_setup()
         self._set_stubs()
         self._set_global_config_values()
-        self.dispatcher = monitor_db.Dispatcher()
+        self._create_dispatcher()
 
         logging.basicConfig(level=logging.DEBUG)
 
 
+    def _create_dispatcher(self):
+        self.dispatcher = monitor_db.Dispatcher()
+
+
     def tearDown(self):
         self._frontend_common_teardown()
 
@@ -330,10 +353,14 @@
                      '%s/%s not executed' % (working_directory, pidfile_name))
 
 
+    def _update_instance(self, model_instance):
+        return type(model_instance).objects.get(pk=model_instance.pk)
+
+
     def _check_statuses(self, queue_entry, queue_entry_status,
                         host_status=None):
         # update from DB
-        queue_entry = models.HostQueueEntry.objects.get(id=queue_entry.id)
+        queue_entry = self._update_instance(queue_entry)
         self.assertEquals(queue_entry.status, queue_entry_status)
         if host_status:
             self.assertEquals(queue_entry.host.status, host_status)
@@ -470,11 +497,20 @@
                              HostStatus.REPAIR_FAILED)
 
 
+    def _ensure_post_job_process_is_paired(self, queue_entry, pidfile_type):
+        pidfile_name = _PIDFILE_TYPE_TO_PIDFILE[pidfile_type]
+        queue_entry = self._update_instance(queue_entry)
+        pidfile_id = self.mock_drone_manager.pidfile_from_path(
+                queue_entry.execution_path(), pidfile_name)
+        self.assert_(pidfile_id._paired_with_pidfile)
+
+
     def _finish_job(self, queue_entry):
         self.mock_drone_manager.finish_process(_PidfileType.JOB)
         self._run_dispatcher() # launches parsing + cleanup
         self._check_statuses(queue_entry, HqeStatus.PARSING,
                              HostStatus.CLEANING)
+        self._ensure_post_job_process_is_paired(queue_entry, _PidfileType.PARSE)
         self._finish_parsing_and_cleanup()
 
 
@@ -693,6 +729,36 @@
             self.assertEquals(queue_entry.status, HqeStatus.STARTING)
 
 
+    def test_recover_parsing(self):
+        self._initialize_test()
+        job, queue_entry = self._make_job_and_queue_entry()
+        job.run_verify = False
+        job.reboot_after = models.RebootAfter.NEVER
+        job.save()
+
+        self._run_dispatcher() # launches job
+        self.mock_drone_manager.finish_process(_PidfileType.JOB)
+        self._run_dispatcher() # launches parsing
+
+        # now "restart" the scheduler
+        self._create_dispatcher()
+        self._initialize_test()
+        self._run_dispatcher()
+        self.mock_drone_manager.finish_process(_PidfileType.PARSE)
+        self._run_dispatcher()
+
+
+    def test_recover_parsing__no_process_already_aborted(self):
+        _, queue_entry = self._make_job_and_queue_entry()
+        queue_entry.execution_subdir = 'host1'
+        queue_entry.status = HqeStatus.PARSING
+        queue_entry.aborted = True
+        queue_entry.save()
+
+        self._initialize_test()
+        self._run_dispatcher()
+
+
     def test_job_scheduled_just_after_abort(self):
         # test a pretty obscure corner case where a job is aborted while queued,
         # another job is ready to run, and throttling is active. the post-abort
@@ -849,5 +915,25 @@
                             HqeStatus.PARSING)
 
 
+    def test_simple_atomic_group_job(self):
+        job = self._create_job(atomic_group=1)
+        self._run_dispatcher() # expand + verify
+        queue_entries = job.hostqueueentry_set.all()
+        self.assertEquals(len(queue_entries), 2)
+        self.assertEquals(queue_entries[0].host.hostname, 'host5')
+        self.assertEquals(queue_entries[1].host.hostname, 'host6')
+
+        self.mock_drone_manager.finish_process(_PidfileType.VERIFY)
+        self._run_dispatcher() # delay task started waiting
+
+        self.mock_drone_manager.finish_specific_process(
+                'hosts/host5/1-verify', monitor_db._AUTOSERV_PID_FILE)
+        self._run_dispatcher() # job starts now
+        for entry in queue_entries:
+            self._check_statuses(entry, HqeStatus.RUNNING, HostStatus.RUNNING)
+
+        # rest of job proceeds normally
+
+
 if __name__ == '__main__':
     unittest.main()