make drone_utility *much* more efficient with rsync subprocesses.  It will now keep them running at max capacity, unlike the old method, where when it hit capacity it would halt everything until *all* of them finished.  this required adding a poll() method to subcommand, which of course required copying over a bunch of magic from subprocess.Popen to deal with the fact that you can't call waitpid() more than once after a process exits.

Signed-off-by: Steve Howard <showard@google.com>



git-svn-id: http://test.kernel.org/svn/autotest/trunk@2611 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/scheduler/drone_utility.py b/scheduler/drone_utility.py
index 2d305bc..908d3d7 100644
--- a/scheduler/drone_utility.py
+++ b/scheduler/drone_utility.py
@@ -170,18 +170,34 @@
         shutil.copy(source_path, destination_path)
 
 
+    def wait_for_all_async_commands(self):
+        for subproc in self._subcommands:
+            subproc.fork_waitfor()
+        self._subcommands = []
+
+
+    def _poll_async_commands(self):
+        still_running = []
+        for subproc in self._subcommands:
+            if subproc.poll() is None:
+                still_running.append(subproc)
+        self._subcommands = still_running
+
+
+    def _wait_for_some_async_commands(self):
+        self._poll_async_commands()
+        max_processes = scheduler_config.config.max_transfer_processes
+        while len(self._subcommands) >= max_processes:
+            time.sleep(1)
+            self._poll_async_commands()
+
+
     def run_async_command(self, function, args):
         subproc = subcommand.subcommand(function, args)
         self._subcommands.append(subproc)
         subproc.fork_start()
 
 
-    def wait_for_async_commands(self):
-        for subproc in self._subcommands:
-            subproc.fork_waitfor()
-        self._subcommands = []
-
-
     def _sync_get_file_from(self, hostname, source_path, destination_path):
         self._ensure_directory_exists(os.path.dirname(destination_path))
         host = create_host(hostname)
@@ -239,12 +255,12 @@
     def execute_calls(self, calls):
         results = []
         start_time = time.time()
+        max_processes = scheduler_config.config.max_transfer_processes
         for method_call in calls:
             results.append(method_call.execute_on(self))
-            max_processes = scheduler_config.config.max_transfer_processes
             if len(self._subcommands) >= max_processes:
-                self.wait_for_async_commands()
-        self.wait_for_async_commands()
+                self._wait_for_some_async_commands()
+        self.wait_for_all_async_commands()
 
         duration = time.time() - start_time
         if duration > self._WARNING_DURATION: