Adds parallel work queue index to test events, stdout/stderr results support.

See http://reviews.llvm.org/D12983 for details.

llvm-svn: 248036
diff --git a/lldb/test/dosep.py b/lldb/test/dosep.py
index 0148508..18d697c 100755
--- a/lldb/test/dosep.py
+++ b/lldb/test/dosep.py
@@ -80,7 +80,12 @@
 RUNNER_PROCESS_ASYNC_MAP = None
 RESULTS_LISTENER_CHANNEL = None
 
-def setup_global_variables(lock, counter, total, name_len, options):
+"""Contains an optional function pointer that can return the worker index
+   for the given thread/process calling it.  Returns a 0-based index."""
+GET_WORKER_INDEX = None
+
+def setup_global_variables(
+        lock, counter, total, name_len, options, worker_index_map):
     global output_lock, test_counter, total_tests, test_name_len
     global dotest_options
     output_lock = lock
@@ -89,6 +94,22 @@
     test_name_len = name_len
     dotest_options = options
 
+    if worker_index_map is not None:
+        # We'll use the output lock for this to avoid sharing another lock.
+        # This won't be used much.
+        index_lock = lock
+
+        def get_worker_index_use_pid():
+            """Returns a 0-based, process-unique index for the worker."""
+            pid = os.getpid()
+            with index_lock:
+                if pid not in worker_index_map:
+                    worker_index_map[pid] = len(worker_index_map)
+                return worker_index_map[pid]
+
+        global GET_WORKER_INDEX
+        GET_WORKER_INDEX = get_worker_index_use_pid
+
 
 def report_test_failure(name, command, output):
     global output_lock
@@ -150,31 +171,6 @@
     return passes, failures, unexpected_successes
 
 
-def inferior_session_interceptor(forwarding_func, event):
-    """Intercepts session begin/end events, passing through everyting else.
-
-    @param forwarding_func a callable object to pass along the event if it
-    is not one that gets intercepted.
-
-    @param event the test result event received.
-    """
-
-    if event is not None and isinstance(event, dict):
-        if "event" in event:
-            if event["event"] == "session_begin":
-                # Swallow it.  Could report on inferior here if we
-                # cared.
-                return
-            elif event["event"] == "session_end":
-                # Swallow it.  Could report on inferior here if we
-                # cared.  More usefully, we can verify that the
-                # inferior went down hard if we don't receive this.
-                return
-
-    # Pass it along.
-    forwarding_func(event)
-
-
 def call_with_timeout(command, timeout, name, inferior_pid_events):
     """Run command with a timeout if possible.
     -s QUIT will create a coredump if they are enabled on your system
@@ -183,6 +179,10 @@
     if timeout_command and timeout != "0":
         command = [timeout_command, '-s', 'QUIT', timeout] + command
 
+    if GET_WORKER_INDEX is not None:
+        worker_index = GET_WORKER_INDEX()
+        command.extend([
+            "--event-add-entries", "worker_index={}".format(worker_index)])
     # Specifying a value for close_fds is unsupported on Windows when using
     # subprocess.PIPE
     if os.name != "nt":
@@ -263,7 +263,8 @@
 
 def process_dir_worker_multiprocessing(
         a_output_lock, a_test_counter, a_total_tests, a_test_name_len,
-        a_dotest_options, job_queue, result_queue, inferior_pid_events):
+        a_dotest_options, job_queue, result_queue, inferior_pid_events,
+        worker_index_map):
     """Worker thread main loop when in multiprocessing mode.
     Takes one directory specification at a time and works on it."""
 
@@ -273,7 +274,7 @@
     # Setup the global state for the worker process.
     setup_global_variables(
         a_output_lock, a_test_counter, a_total_tests, a_test_name_len,
-        a_dotest_options)
+        a_dotest_options, worker_index_map)
 
     # Keep grabbing entries from the queue until done.
     while not job_queue.empty():
@@ -441,14 +442,36 @@
     # rest of the flat module.
     global output_lock
     output_lock = multiprocessing.RLock()
+
     initialize_global_vars_common(num_threads, test_work_items)
 
 
 def initialize_global_vars_threading(num_threads, test_work_items):
+    """Initializes global variables used in threading mode.
+    @param num_threads specifies the number of workers used.
+    @param test_work_items specifies all the work items
+    that will be processed.
+    """
     # Initialize the global state we'll use to communicate with the
     # rest of the flat module.
     global output_lock
     output_lock = threading.RLock()
+
+    index_lock = threading.RLock()
+    index_map = {}
+
+    def get_worker_index_threading():
+        """Returns a 0-based, thread-unique index for the worker thread."""
+        thread_id = threading.current_thread().ident
+        with index_lock:
+            if thread_id not in index_map:
+                index_map[thread_id] = len(index_map)
+            return index_map[thread_id]
+
+
+    global GET_WORKER_INDEX
+    GET_WORKER_INDEX = get_worker_index_threading
+
     initialize_global_vars_common(num_threads, test_work_items)
 
 
@@ -630,6 +653,10 @@
     # hold 2 * (num inferior dotest.py processes started) entries.
     inferior_pid_events = multiprocessing.Queue(4096)
 
+    # Worker dictionary allows each worker to figure out its worker index.
+    manager = multiprocessing.Manager()
+    worker_index_map = manager.dict()
+
     # Create workers.  We don't use multiprocessing.Pool due to
     # challenges with handling ^C keyboard interrupts.
     workers = []
@@ -643,7 +670,8 @@
                   dotest_options,
                   job_queue,
                   result_queue,
-                  inferior_pid_events))
+                  inferior_pid_events,
+                  worker_index_map))
         worker.start()
         workers.append(worker)
 
@@ -717,11 +745,14 @@
     # Initialize our global state.
     initialize_global_vars_multiprocessing(num_threads, test_work_items)
 
+    manager = multiprocessing.Manager()
+    worker_index_map = manager.dict()
+
     pool = multiprocessing.Pool(
         num_threads,
         initializer=setup_global_variables,
         initargs=(output_lock, test_counter, total_tests, test_name_len,
-                  dotest_options))
+                  dotest_options, worker_index_map))
 
     # Start the map operation (async mode).
     map_future = pool.map_async(
@@ -819,6 +850,10 @@
     # Initialize our global state.
     initialize_global_vars_multiprocessing(1, test_work_items)
 
+    # We're always worker index 0
+    global GET_WORKER_INDEX
+    GET_WORKER_INDEX = lambda: 0
+
     # Run the listener and related channel maps in a separate thread.
     # global RUNNER_PROCESS_ASYNC_MAP
     global RESULTS_LISTENER_CHANNEL
@@ -861,8 +896,7 @@
     # listener channel and tell the inferior to send results to the
     # port on which we'll be listening.
     if RESULTS_FORMATTER is not None:
-        forwarding_func = lambda event: inferior_session_interceptor(
-            RESULTS_FORMATTER.process_event, event)
+        forwarding_func = RESULTS_FORMATTER.process_event
         RESULTS_LISTENER_CHANNEL = (
             dotest_channels.UnpicklingForwardingListenerChannel(
                 RUNNER_PROCESS_ASYNC_MAP, "localhost", 0, forwarding_func))
@@ -1088,6 +1122,11 @@
     if dotest_options.results_formatter_options is not None:
         _remove_option(dotest_argv, "--results-formatter-options", 2)
 
+    # Remove test runner name if present.
+    if dotest_options.test_runner_name is not None:
+        _remove_option(dotest_argv, "--test-runner-name", 2)
+
+
 def main(print_details_on_success, num_threads, test_subdir,
          test_runner_name, results_formatter):
     """Run dotest.py in inferior mode in parallel.