Added string stdin support to utils.BgJob and all its users that give it 
a stdin argument (like utils.run()). Fixed a bug in _wait_for_commands() 
when used for more than one BgJob where if a BgJob would finish then it 
would enter a 100% CPU loop because the stdout/stderr descriptors are 
not removed from the select sets. Added unittest for utils.run. Updated 
users of utils.run() that were creating dummy files or pipes to 
workaround the fact that they could not give utils.run() a string stdin.

Signed-off-by: Mihai Rusu <dizzy@google.com>


git-svn-id: http://test.kernel.org/svn/autotest/trunk@3839 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/client/common_lib/utils.py b/client/common_lib/utils.py
index 48a944c..647166c 100644
--- a/client/common_lib/utils.py
+++ b/client/common_lib/utils.py
@@ -49,6 +49,15 @@
         self.stdout_tee = get_stream_tee_file(stdout_tee, DEFAULT_STDOUT_LEVEL)
         self.stderr_tee = get_stream_tee_file(stderr_tee, stderr_level)
         self.result = CmdResult(command)
+
+        # allow for easy stdin input by string, we'll let subprocess create
+        # a pipe for stdin input and we'll write to it in the wait loop
+        if isinstance(stdin, basestring):
+            self.string_stdin = stdin
+            stdin = subprocess.PIPE
+        else:
+            self.string_stdin = None
+
         if verbose:
             logging.debug("Running '%s'" % command)
         self.sp = subprocess.Popen(command, stdout=subprocess.PIPE,
@@ -346,7 +355,8 @@
             in result.stdout).
     @param stderr_tee: likewise for stderr.
     @param verbose: if True, log the command being run.
-    @param stdin: stdin to pass to the executed process.
+    @param stdin: stdin to pass to the executed process (can be a file
+            descriptor, a file object of a real file or a string).
     @param args: sequence of strings of arguments to be given to the command
             inside " quotes after they have been escaped for that; each
             element in the sequence will be given as a separate command
@@ -374,13 +384,14 @@
 
 def run_parallel(commands, timeout=None, ignore_status=False,
                  stdout_tee=None, stderr_tee=None):
-    """Beahves the same as run with the following exceptions:
+    """
+    Behaves the same as run() with the following exceptions:
 
     - commands is a list of commands to run in parallel.
     - ignore_status toggles whether or not an exception should be raised
       on any error.
 
-    returns a list of CmdResult objects
+    @return: a list of CmdResult objects
     """
     bg_jobs = []
     for command in commands:
@@ -448,42 +459,78 @@
     # a 1 second timeout is used in select.
     SELECT_TIMEOUT = 1
 
-    select_list = []
+    read_list = []
+    write_list = []
     reverse_dict = {}
+
     for bg_job in bg_jobs:
-        select_list.append(bg_job.sp.stdout)
-        select_list.append(bg_job.sp.stderr)
-        reverse_dict[bg_job.sp.stdout] = (bg_job,True)
-        reverse_dict[bg_job.sp.stderr] = (bg_job,False)
+        read_list.append(bg_job.sp.stdout)
+        read_list.append(bg_job.sp.stderr)
+        reverse_dict[bg_job.sp.stdout] = (bg_job, True)
+        reverse_dict[bg_job.sp.stderr] = (bg_job, False)
+        if bg_job.string_stdin:
+            write_list.append(bg_job.sp.stdin)
+            reverse_dict[bg_job.sp.stdin] = bg_job
 
     if timeout:
         stop_time = start_time + timeout
         time_left = stop_time - time.time()
     else:
         time_left = None # so that select never times out
+
     while not timeout or time_left > 0:
-        # select will return when stdout is ready (including when it is
+        # select will return when we may write to stdin or when there is
+        # stdout/stderr output we can read (including when it is
         # EOF, that is the process has terminated).
-        ready, _, _ = select.select(select_list, [], [], SELECT_TIMEOUT)
+        read_ready, write_ready, _ = select.select(read_list, write_list, [],
+                                                   SELECT_TIMEOUT)
 
         # os.read() has to be used instead of
         # subproc.stdout.read() which will otherwise block
-        for fileno in ready:
-            bg_job,stdout = reverse_dict[fileno]
-            bg_job.process_output(stdout)
+        for file_obj in read_ready:
+            bg_job, is_stdout = reverse_dict[file_obj]
+            bg_job.process_output(is_stdout)
 
-        remaining_jobs = [x for x in bg_jobs if x.result.exit_status is None]
-        if len(remaining_jobs) == 0:
-            return False
-        for bg_job in remaining_jobs:
+        for file_obj in write_ready:
+            # we can write PIPE_BUF bytes without blocking
+            # POSIX requires PIPE_BUF is >= 512
+            bg_job = reverse_dict[file_obj]
+            file_obj.write(bg_job.string_stdin[:512])
+            bg_job.string_stdin = bg_job.string_stdin[512:]
+            # no more input data, close stdin, remove it from the select set
+            if not bg_job.string_stdin:
+                file_obj.close()
+                write_list.remove(file_obj)
+                del reverse_dict[file_obj]
+
+        all_jobs_finished = True
+        for bg_job in bg_jobs:
+            if bg_job.result.exit_status is not None:
+                continue
+
             bg_job.result.exit_status = bg_job.sp.poll()
+            if bg_job.result.exit_status is not None:
+                # process exited, remove its stdout/stdin from the select set
+                read_list.remove(bg_job.sp.stdout)
+                read_list.remove(bg_job.sp.stderr)
+                del reverse_dict[bg_job.sp.stdout]
+                del reverse_dict[bg_job.sp.stderr]
+            else:
+                all_jobs_finished = False
+
+        if all_jobs_finished:
+            return False
 
         if timeout:
             time_left = stop_time - time.time()
 
     # Kill all processes which did not complete prior to timeout
-    for bg_job in [x for x in bg_jobs if x.result.exit_status is None]:
-        print '* Warning: run process timeout (%s) fired' % timeout
+    for bg_job in bg_jobs:
+        if bg_job.result.exit_status is not None:
+            continue
+
+        logging.warn('run process timeout (%s) fired on: %s', timeout,
+                     bg_job.command)
         nuke_subprocess(bg_job.sp)
         bg_job.result.exit_status = bg_job.sp.poll()