Explicitly release pidfiles after we're done with them.  This does it in a kind of lazy way, but it should work just fine.  Also extended the new scheduler functional test with a few more cases and added a test to check pidfile release under these various cases.  In the process, I changed how some of the code works to allow the tests to more cleanly express their intentions.

Signed-off-by: Steve Howard <showard@google.com>


git-svn-id: http://test.kernel.org/svn/autotest/trunk@3804 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/scheduler/drone_manager.py b/scheduler/drone_manager.py
index 42979ec..7fcbac7 100644
--- a/scheduler/drone_manager.py
+++ b/scheduler/drone_manager.py
@@ -195,8 +195,8 @@
     def _drop_old_pidfiles(self):
         for pidfile_id, age in self._pidfile_age.items():
             if age > self._get_max_pidfile_refreshes():
-                logging.info('forgetting pidfile %s', pidfile_id)
-                del self._pidfile_age[pidfile_id]
+                logging.warning('dropping leaked pidfile %s', pidfile_id)
+                self.unregister_pidfile(pidfile_id)
             else:
                 self._pidfile_age[pidfile_id] += 1
 
@@ -483,6 +483,12 @@
         self._pidfile_age[pidfile_id] = 0
 
 
+    def unregister_pidfile(self, pidfile_id):
+        if pidfile_id in self._pidfile_age:
+            logging.info('forgetting pidfile %s', pidfile_id)
+            del self._pidfile_age[pidfile_id]
+
+
     def get_pidfile_contents(self, pidfile_id, use_second_read=False):
         """
         Retrieve a PidfileContents object for the given pidfile_id.  If
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index 3ea83f7..1c62491 100755
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -1732,8 +1732,11 @@
     def cleanup(self):
         super(SpecialAgentTask, self).cleanup()
         self.task.finish()
-        if self.monitor and self.monitor.has_process():
-            self._copy_results([self.task])
+        if self.monitor:
+            if self.monitor.has_process():
+                self._copy_results([self.task])
+            if self.monitor.pidfile_id is not None:
+                _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
 
 
 class RepairTask(SpecialAgentTask):
@@ -2727,6 +2730,7 @@
                       models.HostQueueEntry.Status.ABORTED):
             self.update_field('complete', True)
             self.update_field('active', False)
+            self._on_complete()
 
         should_email_status = (status.lower() in _notify_email_statuses or
                                'all' in _notify_email_statuses)
@@ -2736,6 +2740,16 @@
         self._email_on_job_complete()
 
 
+    def _on_complete(self):
+        if not self.execution_subdir:
+            return
+        # unregister any possible pidfiles associated with this queue entry
+        for pidfile_name in _ALL_PIDFILE_NAMES:
+            pidfile_id = _drone_manager.get_pidfile_id_from(
+                    self.execution_path(), pidfile_name=pidfile_name)
+            _drone_manager.unregister_pidfile(pidfile_id)
+
+
     def _email_on_status(self, status):
         hostname = self._get_hostname()
 
diff --git a/scheduler/monitor_db_functional_test.py b/scheduler/monitor_db_functional_test.py
index b1aa6df..4bf17fe 100644
--- a/scheduler/monitor_db_functional_test.py
+++ b/scheduler/monitor_db_functional_test.py
@@ -2,7 +2,7 @@
 
 import logging, unittest
 import common
-from autotest_lib.client.common_lib import global_config
+from autotest_lib.client.common_lib import enum, global_config
 from autotest_lib.database import database_connection
 from autotest_lib.frontend import setup_django_environment
 from autotest_lib.frontend.afe import frontend_test_utils
@@ -44,6 +44,12 @@
         return self._config_info[identifier]
 
 
+# the SpecialTask names here must match the suffixes used on the SpecialTask
+# results directories
+_PidfileType = enum.Enum('verify', 'cleanup', 'repair', 'job', 'gather',
+                         'parse')
+
+
 class MockDroneManager(NullMethodObject):
     _NULL_METHODS = ('refresh', 'reinitialize_drones',
                      'copy_to_results_repository')
@@ -56,30 +62,35 @@
         self._pidfiles = {}
         # pidfile IDs that haven't been created yet
         self._future_pidfiles = []
-        # the most recently created pidfile ID
-        self._last_pidfile_id = None
+        # maps _PidfileType to the most recently created pidfile ID of that type
+        self._last_pidfile_id = {}
         # maps (working_directory, pidfile_name) to pidfile IDs
         self._pidfile_index = {}
+        # maps process to pidfile IDs
+        self._process_index = {}
+        # tracks pidfiles of processes that have been killed
+        self._killed_pidfiles = set()
 
 
     # utility APIs for use by the test
 
-    def set_last_pidfile_exit_status(self, exit_status):
-        assert self._last_pidfile_id is not None
-        self._set_pidfile_exit_status(self._last_pidfile_id, exit_status)
+    def finish_process(self, pidfile_type, exit_status=0):
+        pidfile_id = self._last_pidfile_id[pidfile_type]
+        self._set_pidfile_exit_status(pidfile_id, exit_status)
 
 
-    def set_pidfile_exit_status(self, working_directory, pidfile_name,
-                                exit_status):
-        key = (working_directory, pidfile_name)
-        self._set_pidfile_exit_status(self._pidfile_index[key], exit_status)
-
     def _set_pidfile_exit_status(self, pidfile_id, exit_status):
+        assert pidfile_id is not None
         contents = self._pidfiles[pidfile_id]
         contents.exit_status = exit_status
         contents.num_tests_failed = 0
 
 
+    def was_last_process_killed(self, pidfile_type):
+        pidfile_id = self._last_pidfile_id[pidfile_type]
+        return pidfile_id in self._killed_pidfiles
+
+
     # DroneManager emulation APIs for use by monitor_db
 
     def get_orphaned_autoserv_processes(self):
@@ -98,7 +109,9 @@
         # executing an "execute_command" causes a pidfile to be created
         for pidfile_id in self._future_pidfiles:
             # Process objects are opaque to monitor_db
-            self._pidfiles[pidfile_id].process = object()
+            process = object()
+            self._pidfiles[pidfile_id].process = process
+            self._process_index[process] = pidfile_id
         self._future_pidfiles = []
 
 
@@ -109,14 +122,36 @@
         return 'attach_path'
 
 
+    def _initialize_pidfile(self, pidfile_id):
+        if pidfile_id not in self._pidfiles:
+            self._pidfiles[pidfile_id] = drone_manager.PidfileContents()
+
+
+    _pidfile_type_map = {
+            monitor_db._AUTOSERV_PID_FILE: _PidfileType.JOB,
+            monitor_db._CRASHINFO_PID_FILE: _PidfileType.GATHER,
+            monitor_db._PARSER_PID_FILE: _PidfileType.PARSE,
+    }
+
+
+    def _set_last_pidfile(self, pidfile_id, working_directory, pidfile_name):
+        if working_directory.startswith('hosts/'):
+            # such paths look like hosts/host1/1-verify, we'll grab the end
+            type_string = working_directory.rsplit('-', 1)[1]
+            pidfile_type = _PidfileType.get_value(type_string)
+        else:
+            pidfile_type = self._pidfile_type_map[pidfile_name]
+        self._last_pidfile_id[pidfile_type] = pidfile_id
+
+
     def execute_command(self, command, working_directory, pidfile_name,
                         log_file=None, paired_with_pidfile=None):
         # TODO: record this
         pidfile_id = object() # PidfileIds are opaque to monitor_db
         self._future_pidfiles.append(pidfile_id)
-        self._pidfiles[pidfile_id] = drone_manager.PidfileContents()
+        self._initialize_pidfile(pidfile_id)
         self._pidfile_index[(working_directory, pidfile_name)] = pidfile_id
-        self._last_pidfile_id = pidfile_id
+        self._set_last_pidfile(pidfile_id, working_directory, pidfile_name)
         return pidfile_id
 
 
@@ -130,8 +165,12 @@
 
 
     def register_pidfile(self, pidfile_id):
-        # TODO
-        pass
+        self._initialize_pidfile(pidfile_id)
+
+
+    def unregister_pidfile(self, pidfile_id):
+        # intentionally handle non-registered pidfiles silently
+        self._pidfiles.pop(pidfile_id, None)
 
 
     def absolute_path(self, path):
@@ -147,9 +186,19 @@
         return self._pidfile_index.get((execution_tag, pidfile_name), object())
 
 
+    def kill_process(self, process):
+        pidfile_id = self._process_index[process]
+        self._killed_pidfiles.add(pidfile_id)
+        self._set_pidfile_exit_status(pidfile_id, 271)
+
+
 class MockEmailManager(NullMethodObject):
     _NULL_METHODS = ('send_queued_emails', 'send_email')
 
+    def enqueue_notify_email(self, subject, message):
+        logging.warn('enqueue_notify_email: %s', subject)
+        logging.warn(message)
+
 
 class SchedulerFunctionalTest(unittest.TestCase,
                               frontend_test_utils.FrontendTestMixin):
@@ -165,6 +214,8 @@
 
         logging.basicConfig(level=logging.DEBUG)
 
+        self._initialize_test()
+
 
     def tearDown(self):
         self._frontend_common_teardown()
@@ -204,24 +255,61 @@
 
 
     def test_idle(self):
-        self._initialize_test()
         self._run_dispatcher()
 
 
     def test_simple_job(self):
-        self._initialize_test()
         self._create_job(hosts=[1])
         self._run_dispatcher() # launches verify
-        self.mock_drone_manager.set_last_pidfile_exit_status(0)
+        self.mock_drone_manager.finish_process(_PidfileType.VERIFY)
         self._run_dispatcher() # launches job
-        self.mock_drone_manager.set_last_pidfile_exit_status(0)
+        self.mock_drone_manager.finish_process(_PidfileType.JOB)
         self._run_dispatcher() # launches parsing + cleanup
-        self.mock_drone_manager.set_pidfile_exit_status(
-                'hosts/host1/2-cleanup', monitor_db._AUTOSERV_PID_FILE, 0)
-        self.mock_drone_manager.set_pidfile_exit_status(
-                '1-my_user/host1', monitor_db._PARSER_PID_FILE, 0)
+        self._finish_parsing_and_cleanup()
+
+
+    def _finish_parsing_and_cleanup(self):
+        self.mock_drone_manager.finish_process(_PidfileType.CLEANUP)
+        self.mock_drone_manager.finish_process(_PidfileType.PARSE)
         self._run_dispatcher()
 
 
+    def test_job_abort_in_verify(self):
+        job = self._create_job(hosts=[1])
+        self._run_dispatcher() # launches verify
+        job.hostqueueentry_set.update(aborted=True)
+        self._run_dispatcher() # kills verify, launches cleanup
+        self.assert_(self.mock_drone_manager.was_last_process_killed(
+                _PidfileType.VERIFY))
+        self.mock_drone_manager.finish_process(_PidfileType.CLEANUP)
+        self._run_dispatcher()
+
+
+    def test_job_abort(self):
+        job = self._create_job(hosts=[1])
+        job.run_verify = False
+        job.save()
+
+        self._run_dispatcher() # launches job
+        job.hostqueueentry_set.update(aborted=True)
+        self._run_dispatcher() # kills job, launches gathering
+        self.assert_(self.mock_drone_manager.was_last_process_killed(
+                _PidfileType.JOB))
+        self.mock_drone_manager.finish_process(_PidfileType.GATHER)
+        self._run_dispatcher() # launches parsing + cleanup
+        self._finish_parsing_and_cleanup()
+
+
+    def test_no_pidfile_leaking(self):
+        self.test_simple_job()
+        self.assertEquals(self.mock_drone_manager._pidfiles, {})
+
+        self.test_job_abort_in_verify()
+        self.assertEquals(self.mock_drone_manager._pidfiles, {})
+
+        self.test_job_abort()
+        self.assertEquals(self.mock_drone_manager._pidfiles, {})
+
+
 if __name__ == '__main__':
     unittest.main()