make drone_utility *much* more efficient with rsync subprocesses.  It will now keep them running at max capacity, unlike the old method, where when it hit capacity it would halt everything until *all* of them finished.  this required adding a poll() method to subcommand, which of course required copying over a bunch of magic from subprocess.Popen to deal with the fact that you can't call waitpid() more than once after a process exits.

Signed-off-by: Steve Howard <showard@google.com>



git-svn-id: http://test.kernel.org/svn/autotest/trunk@2611 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/scheduler/drone_utility.py b/scheduler/drone_utility.py
index 2d305bc..908d3d7 100644
--- a/scheduler/drone_utility.py
+++ b/scheduler/drone_utility.py
@@ -170,18 +170,34 @@
         shutil.copy(source_path, destination_path)
 
 
+    def wait_for_all_async_commands(self):
+        for subproc in self._subcommands:
+            subproc.fork_waitfor()
+        self._subcommands = []
+
+
+    def _poll_async_commands(self):
+        still_running = []
+        for subproc in self._subcommands:
+            if subproc.poll() is None:
+                still_running.append(subproc)
+        self._subcommands = still_running
+
+
+    def _wait_for_some_async_commands(self):
+        self._poll_async_commands()
+        max_processes = scheduler_config.config.max_transfer_processes
+        while len(self._subcommands) >= max_processes:
+            time.sleep(1)
+            self._poll_async_commands()
+
+
     def run_async_command(self, function, args):
         subproc = subcommand.subcommand(function, args)
         self._subcommands.append(subproc)
         subproc.fork_start()
 
 
-    def wait_for_async_commands(self):
-        for subproc in self._subcommands:
-            subproc.fork_waitfor()
-        self._subcommands = []
-
-
     def _sync_get_file_from(self, hostname, source_path, destination_path):
         self._ensure_directory_exists(os.path.dirname(destination_path))
         host = create_host(hostname)
@@ -239,12 +255,12 @@
     def execute_calls(self, calls):
         results = []
         start_time = time.time()
+        max_processes = scheduler_config.config.max_transfer_processes
         for method_call in calls:
             results.append(method_call.execute_on(self))
-            max_processes = scheduler_config.config.max_transfer_processes
             if len(self._subcommands) >= max_processes:
-                self.wait_for_async_commands()
-        self.wait_for_async_commands()
+                self._wait_for_some_async_commands()
+        self.wait_for_all_async_commands()
 
         duration = time.time() - start_time
         if duration > self._WARNING_DURATION:
diff --git a/server/subcommand.py b/server/subcommand.py
index b0cb37c..63d7ca5 100644
--- a/server/subcommand.py
+++ b/server/subcommand.py
@@ -130,6 +130,7 @@
         self.lambda_function = lambda: func(*args)
         self.pid = None
         self.stdprint = stdprint
+        self.returncode = None
 
 
     @classmethod
@@ -190,34 +191,68 @@
             os._exit(exit_code)
 
 
-    def fork_waitfor(self, timeout=None):
-        if not timeout:
-            (pid, status) = os.waitpid(self.pid, 0)
+    def _handle_exitstatus(self, sts):
+        """
+        This is partially borrowed from subprocess.Popen.
+        """
+        if os.WIFSIGNALED(sts):
+            self.returncode = -os.WTERMSIG(sts)
+        elif os.WIFEXITED(sts):
+            self.returncode = os.WEXITSTATUS(sts)
         else:
-            pid = None
-            start_time = time.time()
-            while time.time() <= start_time + timeout:
-                (pid, status) = os.waitpid(self.pid, os.WNOHANG)
-                if pid:
-                    break
-                time.sleep(1)
+            # Should never happen
+            raise RuntimeError("Unknown child exit status!")
 
-            if not pid:
-                utils.nuke_pid(self.pid)
-                print "subcommand failed pid %d" % self.pid
-                print "%s" % (self.func,)
-                print "timeout after %ds" % timeout
-                print
-                return None
-
-        if status != 0:
-            print "subcommand failed pid %d" % pid
+        if self.returncode != 0:
+            print "subcommand failed pid %d" % self.pid
             print "%s" % (self.func,)
-            print "rc=%d" % status
+            print "rc=%d" % self.returncode
             print
             if os.path.exists(self.stderr):
                 for line in open(self.stderr).readlines():
                     print line,
             print "\n--------------------------------------------\n"
-            raise error.AutoservSubcommandError(self.func, status)
-        return status
+            raise error.AutoservSubcommandError(self.func, self.returncode)
+
+
+    def poll(self):
+        """
+        This is borrowed from subprocess.Popen.
+        """
+        if self.returncode is None:
+            try:
+                pid, sts = os.waitpid(self.pid, os.WNOHANG)
+                if pid == self.pid:
+                    self._handle_exitstatus(sts)
+            except os.error:
+                pass
+        return self.returncode
+
+
+    def wait(self):
+        """
+        This is borrowed from subprocess.Popen.
+        """
+        if self.returncode is None:
+            pid, sts = os.waitpid(self.pid, 0)
+            self._handle_exitstatus(sts)
+        return self.returncode
+
+
+    def fork_waitfor(self, timeout=None):
+        if not timeout:
+            return self.wait()
+        else:
+            start_time = time.time()
+            while time.time() <= start_time + timeout:
+                self.poll()
+                if self.returncode is not None:
+                    return self.returncode
+                time.sleep(1)
+
+            utils.nuke_pid(self.pid)
+            print "subcommand failed pid %d" % self.pid
+            print "%s" % (self.func,)
+            print "timeout after %ds" % timeout
+            print
+            return None