Add JUnit/XUnit-formatted output to the lldb test run system

Also introduces the test event system into our test runner framework.
See the following for details:
http://reviews.llvm.org/D12831

llvm-svn: 247722
diff --git a/lldb/test/dosep.py b/lldb/test/dosep.py
index 57f5464..d3999c5 100755
--- a/lldb/test/dosep.py
+++ b/lldb/test/dosep.py
@@ -32,6 +32,7 @@
 echo core.%p | sudo tee /proc/sys/kernel/core_pattern
 """
 
+import asyncore
 import fnmatch
 import multiprocessing
 import multiprocessing.pool
@@ -44,10 +45,9 @@
 import sys
 import threading
 
+import dotest_channels
 import dotest_args
 
-from optparse import OptionParser
-
 
 def get_timeout_command():
     """Search for a suitable timeout command."""
@@ -76,6 +76,9 @@
 dotest_options = None
 output_on_success = False
 
+RESULTS_FORMATTER = None
+RUNNER_PROCESS_ASYNC_MAP = None
+RESULTS_LISTENER_CHANNEL = None
 
 def setup_global_variables(lock, counter, total, name_len, options):
     global output_lock, test_counter, total_tests, test_name_len
@@ -147,12 +150,39 @@
     return passes, failures, unexpected_successes
 
 
+def inferior_session_interceptor(forwarding_func, event):
+    """Intercepts session begin/end events, passing through everyting else.
+
+    @param forwarding_func a callable object to pass along the event if it
+    is not one that gets intercepted.
+
+    @param event the test result event received.
+    """
+
+    if event is not None and isinstance(event, dict):
+        if "event" in event:
+            if event["event"] == "session_begin":
+                # Swallow it.  Could report on inferior here if we
+                # cared.
+                return
+            elif event["event"] == "session_end":
+                # Swallow it.  Could report on inferior here if we
+                # cared.  More usefully, we can verify that the
+                # inferior went down hard if we don't receive this.
+                return
+
+    # Pass it along.
+    forwarding_func(event)
+
+
 def call_with_timeout(command, timeout, name, inferior_pid_events):
-    """Run command with a timeout if possible."""
-    """-s QUIT will create a coredump if they are enabled on your system"""
+    """Run command with a timeout if possible.
+    -s QUIT will create a coredump if they are enabled on your system
+    """
     process = None
     if timeout_command and timeout != "0":
         command = [timeout_command, '-s', 'QUIT', timeout] + command
+
     # Specifying a value for close_fds is unsupported on Windows when using
     # subprocess.PIPE
     if os.name != "nt":
@@ -170,7 +200,14 @@
     if inferior_pid_events:
         inferior_pid_events.put_nowait(('created', inferior_pid))
     output = process.communicate()
+
+    # The inferior should now be entirely wrapped up.
     exit_status = process.returncode
+    if exit_status is None:
+        raise Exception(
+            "no exit status available after the inferior dotest.py "
+            "should have completed")
+
     if inferior_pid_events:
         inferior_pid_events.put_nowait(('destroyed', inferior_pid))
 
@@ -180,6 +217,10 @@
         # only stderr does.
         report_test_pass(name, output[1])
     else:
+        # TODO need to differentiate a failing test from a run that
+        # was broken out of by a SIGTERM/SIGKILL, reporting those as
+        # an error.  If a signal-based completion, need to call that
+        # an error.
         report_test_failure(name, command, output[1])
     return name, exit_status, passes, failures, unexpected_successes
 
@@ -250,9 +291,7 @@
     return process_dir(*args)
 
 
-def process_dir_worker_threading(
-        a_test_counter, a_total_tests, a_test_name_len,
-        a_dotest_options, job_queue, result_queue, inferior_pid_events):
+def process_dir_worker_threading(job_queue, result_queue, inferior_pid_events):
     """Worker thread main loop when in threading mode.
 
     This one supports the hand-rolled pooling support.
@@ -413,6 +452,150 @@
     initialize_global_vars_common(num_threads, test_work_items)
 
 
+def ctrl_c_loop(main_op_func, done_func, ctrl_c_handler):
+    """Provides a main loop that is Ctrl-C protected.
+
+    The main loop calls the main_op_func() repeatedly until done_func()
+    returns true.  The ctrl_c_handler() method is called with a single
+    int parameter that contains the number of times the ctrl_c has been
+    hit (starting with 1).  The ctrl_c_handler() should mutate whatever
+    it needs to have the done_func() return True as soon as it is desired
+    to exit the loop.
+    """
+    done = False
+    ctrl_c_count = 0
+
+    while not done:
+        try:
+            # See if we're done.  Start with done check since it is
+            # the first thing executed after a Ctrl-C handler in the
+            # following loop.
+            done = done_func()
+            if not done:
+                # Run the main op once.
+                main_op_func()
+
+        except KeyboardInterrupt:
+            ctrl_c_count += 1
+            ctrl_c_handler(ctrl_c_count)
+
+
+def pump_workers_and_asyncore_map(workers, asyncore_map):
+    """Prunes out completed workers and maintains the asyncore loop.
+
+    The asyncore loop contains the optional socket listener
+    and handlers.  When all workers are complete, this method
+    takes care of stopping the listener.  It also runs the
+    asyncore loop for the given async map for 10 iterations.
+
+    @param workers the list of worker Thread/Process instances.
+
+    @param asyncore_map the asyncore threading-aware map that
+    indicates which channels are in use and still alive.
+    """
+
+    # Check on all the workers, removing them from the workers
+    # list as they complete.
+    dead_workers = []
+    for worker in workers:
+        # This non-blocking join call is what allows us
+        # to still receive keyboard interrupts.
+        worker.join(0.01)
+        if not worker.is_alive():
+            dead_workers.append(worker)
+            # Clear out the completed workers
+    for dead_worker in dead_workers:
+        workers.remove(dead_worker)
+
+    # If there are no more workers and there is a listener,
+    # close the listener.
+    global RESULTS_LISTENER_CHANNEL
+    if len(workers) == 0 and RESULTS_LISTENER_CHANNEL is not None:
+        RESULTS_LISTENER_CHANNEL.close()
+        RESULTS_LISTENER_CHANNEL = None
+
+    # Pump the asyncore map if it isn't empty.
+    if len(asyncore_map) > 0:
+        asyncore.loop(0.1, False, asyncore_map, 10)
+
+
+def handle_ctrl_c(ctrl_c_count, job_queue, workers, inferior_pid_events,
+                  stop_all_inferiors_func):
+    """Performs the appropriate ctrl-c action for non-pool parallel test runners
+
+    @param ctrl_c_count starting with 1, indicates the number of times ctrl-c
+    has been intercepted.  The value is 1 on the first intercept, 2 on the
+    second, etc.
+
+    @param job_queue a Queue object that contains the work still outstanding
+    (i.e. hasn't been assigned to a worker yet).
+
+    @param workers list of Thread or Process workers.
+
+    @param inferior_pid_events specifies a Queue of inferior process
+    construction and destruction events.  Used to build the list of inferior
+    processes that should be killed if we get that far.
+
+    @param stop_all_inferiors_func a callable object that takes the
+    workers and inferior_pid_events parameters (in that order) if a hard
+    stop is to be used on the workers.
+    """
+
+    # Print out which Ctrl-C we're handling.
+    key_name = [
+        "first",
+        "second",
+        "third",
+        "many"]
+
+    if ctrl_c_count < len(key_name):
+        name_index = ctrl_c_count - 1
+    else:
+        name_index = len(key_name) - 1
+    message = "\nHandling {} KeyboardInterrupt".format(key_name[name_index])
+    with output_lock:
+        print message
+
+    if ctrl_c_count == 1:
+        # Remove all outstanding items from the work queue so we stop
+        # doing any more new work.
+        while not job_queue.empty():
+            try:
+                # Just drain it to stop more work from being started.
+                job_queue.get_nowait()
+            except Queue.Empty:
+                pass
+        with output_lock:
+            print "Stopped more work from being started."
+    elif ctrl_c_count == 2:
+        # Try to stop all inferiors, even the ones currently doing work.
+        stop_all_inferiors_func(workers, inferior_pid_events)
+    else:
+        with output_lock:
+            print "All teardown activities kicked off, should finish soon."
+
+
+def workers_and_async_done(workers, async_map):
+    """Returns True if the workers list and asyncore channels are all done.
+
+    @param workers list of workers (threads/processes).  These must adhere
+    to the threading Thread or multiprocessing.Process interface.
+
+    @param async_map the threading-aware asyncore channel map to check
+    for live channels.
+
+    @return False if the workers list exists and has any entries in it, or
+    if the async_map exists and has any entries left in it; otherwise, True.
+    """
+    if workers is not None and len(workers) > 0:
+        # We're not done if we still have workers left.
+        return False
+    if async_map is not None and len(async_map) > 0:
+        return False
+    # We're done.
+    return True
+
+
 def multiprocessing_test_runner(num_threads, test_work_items):
     """Provides hand-wrapped pooling test runner adapter with Ctrl-C support.
 
@@ -464,36 +647,72 @@
         worker.start()
         workers.append(worker)
 
-    # Wait for all workers to finish, handling ^C as needed.
-    try:
-        for worker in workers:
-            worker.join()
-    except KeyboardInterrupt:
-        # First try to drain the queue of work and let the
-        # running tests complete.
-        while not job_queue.empty():
-            try:
-                # Just drain it to stop more work from being started.
-                job_queue.get_nowait()
-            except Queue.Empty:
-                pass
+    # Main loop: wait for all workers to finish and wait for
+    # the socket handlers to wrap up.
+    ctrl_c_loop(
+        # Main operation of loop
+        lambda: pump_workers_and_asyncore_map(
+            workers, RUNNER_PROCESS_ASYNC_MAP),
 
-        print ('\nFirst KeyboardInterrupt received, stopping '
-               'future work.  Press again to hard-stop existing tests.')
-        try:
-            for worker in workers:
-                worker.join()
-        except KeyboardInterrupt:
-            print ('\nSecond KeyboardInterrupt received, killing '
-                   'all worker process trees.')
-            kill_all_worker_processes(workers, inferior_pid_events)
+        # Return True when we're done with the main loop.
+        lambda: workers_and_async_done(workers, RUNNER_PROCESS_ASYNC_MAP),
 
+        # Indicate what we do when we receive one or more Ctrl-Cs.
+        lambda ctrl_c_count: handle_ctrl_c(
+            ctrl_c_count, job_queue, workers, inferior_pid_events,
+            kill_all_worker_processes))
+
+    # Reap the test results.
     test_results = []
     while not result_queue.empty():
         test_results.append(result_queue.get(block=False))
     return test_results
 
 
+def map_async_run_loop(future, channel_map, listener_channel):
+    """Blocks until the Pool.map_async completes and the channel completes.
+
+    @param future an AsyncResult instance from a Pool.map_async() call.
+
+    @param channel_map the asyncore dispatch channel map that should be pumped.
+    Optional: may be None.
+
+    @param listener_channel the channel representing a listener that should be
+    closed once the map_async results are available.
+
+    @return the results from the async_result instance.
+    """
+    map_results = None
+
+    done = False
+    while not done:
+        # Check if we need to reap the map results.
+        if map_results is None:
+            if future.ready():
+                # Get the results.
+                map_results = future.get()
+
+                # Close the runner process listener channel if we have
+                # one: no more connections will be incoming.
+                if listener_channel is not None:
+                    listener_channel.close()
+
+        # Pump the asyncore loop if we have a listener socket.
+        if channel_map is not None:
+            asyncore.loop(0.01, False, channel_map, 10)
+
+        # Figure out if we're done running.
+        done = map_results is not None
+        if channel_map is not None:
+            # We have a runner process async map.  Check if it
+            # is complete.
+            if len(channel_map) > 0:
+                # We still have an asyncore channel running.  Not done yet.
+                done = False
+
+    return map_results
+
+
 def multiprocessing_test_runner_pool(num_threads, test_work_items):
     # Initialize our global state.
     initialize_global_vars_multiprocessing(num_threads, test_work_items)
@@ -503,7 +722,12 @@
         initializer=setup_global_variables,
         initargs=(output_lock, test_counter, total_tests, test_name_len,
                   dotest_options))
-    return pool.map(process_dir_worker_multiprocessing_pool, test_work_items)
+
+    # Start the map operation (async mode).
+    map_future = pool.map_async(
+        process_dir_worker_multiprocessing_pool, test_work_items)
+    return map_async_run_loop(
+        map_future, RUNNER_PROCESS_ASYNC_MAP, RESULTS_LISTENER_CHANNEL)
 
 
 def threading_test_runner(num_threads, test_work_items):
@@ -541,53 +765,28 @@
     for _ in range(num_threads):
         worker = threading.Thread(
             target=process_dir_worker_threading,
-            args=(test_counter,
-                  total_tests,
-                  test_name_len,
-                  dotest_options,
-                  job_queue,
+            args=(job_queue,
                   result_queue,
                   inferior_pid_events))
         worker.start()
         workers.append(worker)
 
-    # Wait for all workers to finish, handling ^C as needed.
-    try:
-        # We do some trickery here to ensure we can catch keyboard
-        # interrupts.
-        while len(workers) > 0:
-            # Make a pass throug the workers, checking for who is done.
-            dead_workers = []
-            for worker in workers:
-                # This non-blocking join call is what allows us
-                # to still receive keyboard interrupts.
-                worker.join(0.01)
-                if not worker.isAlive():
-                    dead_workers.append(worker)
-            # Clear out the completed workers
-            for dead_worker in dead_workers:
-                workers.remove(dead_worker)
+    # Main loop: wait for all workers to finish and wait for
+    # the socket handlers to wrap up.
+    ctrl_c_loop(
+        # Main operation of loop
+        lambda: pump_workers_and_asyncore_map(
+            workers, RUNNER_PROCESS_ASYNC_MAP),
 
-    except KeyboardInterrupt:
-        # First try to drain the queue of work and let the
-        # running tests complete.
-        while not job_queue.empty():
-            try:
-                # Just drain it to stop more work from being started.
-                job_queue.get_nowait()
-            except Queue.Empty:
-                pass
+        # Return True when we're done with the main loop.
+        lambda: workers_and_async_done(workers, RUNNER_PROCESS_ASYNC_MAP),
 
-        print ('\nFirst KeyboardInterrupt received, stopping '
-               'future work.  Press again to hard-stop existing tests.')
-        try:
-            for worker in workers:
-                worker.join()
-        except KeyboardInterrupt:
-            print ('\nSecond KeyboardInterrupt received, killing '
-                   'all worker process trees.')
-            kill_all_worker_threads(workers, inferior_pid_events)
+        # Indicate what we do when we receive one or more Ctrl-Cs.
+        lambda ctrl_c_count: handle_ctrl_c(
+            ctrl_c_count, job_queue, workers, inferior_pid_events,
+            kill_all_worker_threads))
 
+    # Reap the test results.
     test_results = []
     while not result_queue.empty():
         test_results.append(result_queue.get(block=False))
@@ -598,20 +797,49 @@
     # Initialize our global state.
     initialize_global_vars_threading(num_threads, test_work_items)
 
-    pool = multiprocessing.pool.ThreadPool(
-        num_threads
-        # initializer=setup_global_variables,
-        # initargs=(output_lock, test_counter, total_tests, test_name_len,
-        #           dotest_options)
-    )
-    return pool.map(process_dir_worker_threading_pool, test_work_items)
+    pool = multiprocessing.pool.ThreadPool(num_threads)
+    map_future = pool.map_async(
+        process_dir_worker_threading_pool, test_work_items)
+
+    return map_async_run_loop(
+        map_future, RUNNER_PROCESS_ASYNC_MAP, RESULTS_LISTENER_CHANNEL)
+
+
+def asyncore_run_loop(channel_map):
+    try:
+        asyncore.loop(None, False, channel_map)
+    except:
+        # Swallow it, we're seeing:
+        #   error: (9, 'Bad file descriptor')
+        # when the listener channel is closed.  Shouldn't be the case.
+        pass
 
 
 def inprocess_exec_test_runner(test_work_items):
     # Initialize our global state.
     initialize_global_vars_multiprocessing(1, test_work_items)
-    return map(process_dir_mapper_inprocess, test_work_items)
 
+    # Run the listener and related channel maps in a separate thread.
+    # global RUNNER_PROCESS_ASYNC_MAP
+    global RESULTS_LISTENER_CHANNEL
+    if RESULTS_LISTENER_CHANNEL is not None:
+        socket_thread = threading.Thread(
+            target=lambda: asyncore_run_loop(RUNNER_PROCESS_ASYNC_MAP))
+        socket_thread.start()
+
+    # Do the work.
+    test_results = map(process_dir_mapper_inprocess, test_work_items)
+
+    # If we have a listener channel, shut it down here.
+    if RESULTS_LISTENER_CHANNEL is not None:
+        # Close down the channel.
+        RESULTS_LISTENER_CHANNEL.close()
+        RESULTS_LISTENER_CHANNEL = None
+
+        # Wait for the listener and handlers to complete.
+        socket_thread.join()
+
+    return test_results
 
 def walk_and_invoke(test_directory, test_subdir, dotest_argv,
                     test_runner_func):
@@ -624,6 +852,22 @@
     test_subdir - lldb/test/ or a subfolder with the tests we're interested in
                   running
     """
+    # The async_map is important to keep all thread-related asyncore
+    # channels distinct when we call asyncore.loop() later on.
+    global RESULTS_LISTENER_CHANNEL, RUNNER_PROCESS_ASYNC_MAP
+    RUNNER_PROCESS_ASYNC_MAP = {}
+
+    # If we're outputting side-channel test results, create the socket
+    # listener channel and tell the inferior to send results to the
+    # port on which we'll be listening.
+    if RESULTS_FORMATTER is not None:
+        forwarding_func = lambda event: inferior_session_interceptor(
+            RESULTS_FORMATTER.process_event, event)
+        RESULTS_LISTENER_CHANNEL = (
+            dotest_channels.UnpicklingForwardingListenerChannel(
+                RUNNER_PROCESS_ASYNC_MAP, "localhost", 0, forwarding_func))
+        dotest_argv.append("--results-port")
+        dotest_argv.append(str(RESULTS_LISTENER_CHANNEL.address[1]))
 
     # Collect the test files that we'll run.
     test_work_items = []
@@ -654,7 +898,7 @@
     if platform_name is None:
         target = sys.platform
     else:
-        m = re.search('remote-(\w+)', platform_name)
+        m = re.search(r'remote-(\w+)', platform_name)
         target = m.group(1)
 
     expected_timeout = set()
@@ -759,20 +1003,94 @@
 
         # threading-pool uses threading for the workers (in-process)
         # and uses the multiprocessing.pool thread-enabled pool.
+        # This does not properly support Ctrl-C.
         "threading-pool":
         (lambda work_items: threading_test_runner_pool(
             num_threads, work_items)),
 
         # serial uses the subprocess-based, single process
         # test runner.  This provides process isolation but
-        # no concurrent test running.
+        # no concurrent test execution.
         "serial":
         inprocess_exec_test_runner
     }
 
 
+def _remove_option(args, option_name, removal_count):
+    """Removes option and related option arguments from args array.
+    @param args the array of command line arguments (in/out)
+    @param option_name the full command line representation of the
+    option that will be removed (including '--' or '-').
+    @param the count of elements to remove.  A value of 1 will remove
+    just the found option, while 2 will remove the option and its first
+    argument.
+    """
+    try:
+        index = args.index(option_name)
+        # Handle the exact match case.
+        del args[index:index+removal_count]
+        return
+    except ValueError:
+        # Thanks to argparse not handling options with known arguments
+        # like other options parsing libraries (see
+        # https://bugs.python.org/issue9334), we need to support the
+        # --results-formatter-options={second-level-arguments} (note
+        # the equal sign to fool the first-level arguments parser into
+        # not treating the second-level arguments as first-level
+        # options). We're certainly at risk of getting this wrong
+        # since now we're forced into the business of trying to figure
+        # out what is an argument (although I think this
+        # implementation will suffice).
+        regex_string = "^" + option_name + "="
+        regex = re.compile(regex_string)
+        for index in range(len(args)):
+            match = regex.match(args[index])
+            if match:
+                print "found matching option= at index {}".format(index)
+                del args[index]
+                return
+        print "failed to find regex '{}'".format(regex_string)
+
+    # We didn't find the option but we should have.
+    raise Exception("failed to find option '{}' in args '{}'".format(
+        option_name, args))
+
+
+def adjust_inferior_options(dotest_argv):
+    """Adjusts the commandline args array for inferiors.
+
+    This method adjusts the inferior dotest commandline options based
+    on the parallel test runner's options.  Some of the inferior options
+    will need to change to properly handle aggregation functionality.
+    """
+    global dotest_options
+
+    # If we don't have a session directory, create one.
+    if not dotest_options.s:
+        # no session log directory, we need to add this to prevent
+        # every dotest invocation from creating its own directory
+        import datetime
+        # The windows platforms don't like ':' in the pathname.
+        timestamp_started = datetime.datetime.now().strftime("%F-%H_%M_%S")
+        dotest_argv.append('-s')
+        dotest_argv.append(timestamp_started)
+        dotest_options.s = timestamp_started
+
+    # Adjust inferior results formatter options - if the parallel
+    # test runner is collecting into the user-specified test results,
+    # we'll have inferiors spawn with the --results-port option and
+    # strip the original test runner options.
+    if dotest_options.results_file is not None:
+        _remove_option(dotest_argv, "--results-file", 2)
+    if dotest_options.results_port is not None:
+        _remove_option(dotest_argv, "--results-port", 2)
+    if dotest_options.results_formatter is not None:
+        _remove_option(dotest_argv, "--results-formatter", 2)
+    if dotest_options.results_formatter_options is not None:
+        _remove_option(dotest_argv, "--results-formatter-options", 2)
+
 def main(print_details_on_success, num_threads, test_subdir,
-         test_runner_name):
+         test_runner_name, results_formatter):
     """Run dotest.py in inferior mode in parallel.
 
     @param print_details_on_success the parsed value of the output-on-success
@@ -793,53 +1111,31 @@
     system to choose the most appropriate test runner given desired
     thread count and OS type.
 
+    @param results_formatter if specified, provides the TestResultsFormatter
+    instance that will format and output test result data from the
+    side-channel test results.  When specified, inferior dotest calls
+    will send test results side-channel data over a socket to the parallel
+    test runner, which will forward them on to results_formatter.
     """
 
     dotest_argv = sys.argv[1:]
 
-    global output_on_success
+    global output_on_success, RESULTS_FORMATTER
     output_on_success = print_details_on_success
+    RESULTS_FORMATTER = results_formatter
 
     # We can't use sys.path[0] to determine the script directory
     # because it doesn't work under a debugger
-    test_directory = os.path.dirname(os.path.realpath(__file__))
-    parser = OptionParser(usage="""\
-Run lldb test suite using a separate process for each test file.
-
-       Each test will run with a time limit of 10 minutes by default.
-
-       Override the default time limit of 10 minutes by setting
-       the environment variable LLDB_TEST_TIMEOUT.
-
-       E.g., export LLDB_TEST_TIMEOUT=10m
-
-       Override the time limit for individual tests by setting
-       the environment variable LLDB_[TEST NAME]_TIMEOUT.
-
-       E.g., export LLDB_TESTCONCURRENTEVENTS_TIMEOUT=2m
-
-       Set to "0" to run without time limit.
-
-       E.g., export LLDB_TEST_TIMEOUT=0
-       or    export LLDB_TESTCONCURRENTEVENTS_TIMEOUT=0
-""")
     parser = dotest_args.create_parser()
     global dotest_options
     dotest_options = dotest_args.parse_args(parser, dotest_argv)
 
-    if not dotest_options.s:
-        # no session log directory, we need to add this to prevent
-        # every dotest invocation from creating its own directory
-        import datetime
-        # The windows platforms don't like ':' in the pathname.
-        timestamp_started = datetime.datetime.now().strftime("%F-%H_%M_%S")
-        dotest_argv.append('-s')
-        dotest_argv.append(timestamp_started)
-        dotest_options.s = timestamp_started
+    adjust_inferior_options(dotest_argv)
 
     session_dir = os.path.join(os.getcwd(), dotest_options.s)
 
     # The root directory was specified on the command line
+    test_directory = os.path.dirname(os.path.realpath(__file__))
     if test_subdir and len(test_subdir) > 0:
         test_subdir = os.path.join(test_directory, test_subdir)
     else: