make drone_utility *much* more efficient with rsync subprocesses. It will now keep them running at max capacity, unlike the old method, where when it hit capacity it would halt everything until *all* of them finished. this required adding a poll() method to subcommand, which of course required copying over a bunch of magic from subprocess.Popen to deal with the fact that you can't call waitpid() more than once after a process exits.
Signed-off-by: Steve Howard <showard@google.com>
git-svn-id: http://test.kernel.org/svn/autotest/trunk@2611 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/scheduler/drone_utility.py b/scheduler/drone_utility.py
index 2d305bc..908d3d7 100644
--- a/scheduler/drone_utility.py
+++ b/scheduler/drone_utility.py
@@ -170,18 +170,34 @@
shutil.copy(source_path, destination_path)
+ def wait_for_all_async_commands(self):
+ for subproc in self._subcommands:
+ subproc.fork_waitfor()
+ self._subcommands = []
+
+
+ def _poll_async_commands(self):
+ still_running = []
+ for subproc in self._subcommands:
+ if subproc.poll() is None:
+ still_running.append(subproc)
+ self._subcommands = still_running
+
+
+ def _wait_for_some_async_commands(self):
+ self._poll_async_commands()
+ max_processes = scheduler_config.config.max_transfer_processes
+ while len(self._subcommands) >= max_processes:
+ time.sleep(1)
+ self._poll_async_commands()
+
+
def run_async_command(self, function, args):
subproc = subcommand.subcommand(function, args)
self._subcommands.append(subproc)
subproc.fork_start()
- def wait_for_async_commands(self):
- for subproc in self._subcommands:
- subproc.fork_waitfor()
- self._subcommands = []
-
-
def _sync_get_file_from(self, hostname, source_path, destination_path):
self._ensure_directory_exists(os.path.dirname(destination_path))
host = create_host(hostname)
@@ -239,12 +255,12 @@
def execute_calls(self, calls):
results = []
start_time = time.time()
+ max_processes = scheduler_config.config.max_transfer_processes
for method_call in calls:
results.append(method_call.execute_on(self))
- max_processes = scheduler_config.config.max_transfer_processes
if len(self._subcommands) >= max_processes:
- self.wait_for_async_commands()
- self.wait_for_async_commands()
+ self._wait_for_some_async_commands()
+ self.wait_for_all_async_commands()
duration = time.time() - start_time
if duration > self._WARNING_DURATION:
diff --git a/server/subcommand.py b/server/subcommand.py
index b0cb37c..63d7ca5 100644
--- a/server/subcommand.py
+++ b/server/subcommand.py
@@ -130,6 +130,7 @@
self.lambda_function = lambda: func(*args)
self.pid = None
self.stdprint = stdprint
+ self.returncode = None
@classmethod
@@ -190,34 +191,68 @@
os._exit(exit_code)
- def fork_waitfor(self, timeout=None):
- if not timeout:
- (pid, status) = os.waitpid(self.pid, 0)
+ def _handle_exitstatus(self, sts):
+ """
+ This is partially borrowed from subprocess.Popen.
+ """
+ if os.WIFSIGNALED(sts):
+ self.returncode = -os.WTERMSIG(sts)
+ elif os.WIFEXITED(sts):
+ self.returncode = os.WEXITSTATUS(sts)
else:
- pid = None
- start_time = time.time()
- while time.time() <= start_time + timeout:
- (pid, status) = os.waitpid(self.pid, os.WNOHANG)
- if pid:
- break
- time.sleep(1)
+ # Should never happen
+ raise RuntimeError("Unknown child exit status!")
- if not pid:
- utils.nuke_pid(self.pid)
- print "subcommand failed pid %d" % self.pid
- print "%s" % (self.func,)
- print "timeout after %ds" % timeout
- print
- return None
-
- if status != 0:
- print "subcommand failed pid %d" % pid
+ if self.returncode != 0:
+ print "subcommand failed pid %d" % self.pid
print "%s" % (self.func,)
- print "rc=%d" % status
+ print "rc=%d" % self.returncode
print
if os.path.exists(self.stderr):
for line in open(self.stderr).readlines():
print line,
print "\n--------------------------------------------\n"
- raise error.AutoservSubcommandError(self.func, status)
- return status
+ raise error.AutoservSubcommandError(self.func, self.returncode)
+
+
+ def poll(self):
+ """
+ This is borrowed from subprocess.Popen.
+ """
+ if self.returncode is None:
+ try:
+ pid, sts = os.waitpid(self.pid, os.WNOHANG)
+ if pid == self.pid:
+ self._handle_exitstatus(sts)
+ except os.error:
+ pass
+ return self.returncode
+
+
+ def wait(self):
+ """
+ This is borrowed from subprocess.Popen.
+ """
+ if self.returncode is None:
+ pid, sts = os.waitpid(self.pid, 0)
+ self._handle_exitstatus(sts)
+ return self.returncode
+
+
+ def fork_waitfor(self, timeout=None):
+ if not timeout:
+ return self.wait()
+ else:
+ start_time = time.time()
+ while time.time() <= start_time + timeout:
+ self.poll()
+ if self.returncode is not None:
+ return self.returncode
+ time.sleep(1)
+
+ utils.nuke_pid(self.pid)
+ print "subcommand failed pid %d" % self.pid
+ print "%s" % (self.func,)
+ print "timeout after %ds" % timeout
+ print
+ return None