Add JUnit/XUnit-formatted output to the lldb test run system
Also introduces the test event system into our test runner framework.
See the following for details:
http://reviews.llvm.org/D12831
llvm-svn: 247722
diff --git a/lldb/test/dosep.py b/lldb/test/dosep.py
index 57f5464..d3999c5 100755
--- a/lldb/test/dosep.py
+++ b/lldb/test/dosep.py
@@ -32,6 +32,7 @@
echo core.%p | sudo tee /proc/sys/kernel/core_pattern
"""
+import asyncore
import fnmatch
import multiprocessing
import multiprocessing.pool
@@ -44,10 +45,9 @@
import sys
import threading
+import dotest_channels
import dotest_args
-from optparse import OptionParser
-
def get_timeout_command():
"""Search for a suitable timeout command."""
@@ -76,6 +76,9 @@
dotest_options = None
output_on_success = False
+RESULTS_FORMATTER = None
+RUNNER_PROCESS_ASYNC_MAP = None
+RESULTS_LISTENER_CHANNEL = None
def setup_global_variables(lock, counter, total, name_len, options):
global output_lock, test_counter, total_tests, test_name_len
@@ -147,12 +150,39 @@
return passes, failures, unexpected_successes
+def inferior_session_interceptor(forwarding_func, event):
+ """Intercepts session begin/end events, passing through everyting else.
+
+ @param forwarding_func a callable object to pass along the event if it
+ is not one that gets intercepted.
+
+ @param event the test result event received.
+ """
+
+ if event is not None and isinstance(event, dict):
+ if "event" in event:
+ if event["event"] == "session_begin":
+ # Swallow it. Could report on inferior here if we
+ # cared.
+ return
+ elif event["event"] == "session_end":
+ # Swallow it. Could report on inferior here if we
+ # cared. More usefully, we can verify that the
+ # inferior went down hard if we don't receive this.
+ return
+
+ # Pass it along.
+ forwarding_func(event)
+
+
def call_with_timeout(command, timeout, name, inferior_pid_events):
- """Run command with a timeout if possible."""
- """-s QUIT will create a coredump if they are enabled on your system"""
+ """Run command with a timeout if possible.
+ -s QUIT will create a coredump if they are enabled on your system
+ """
process = None
if timeout_command and timeout != "0":
command = [timeout_command, '-s', 'QUIT', timeout] + command
+
# Specifying a value for close_fds is unsupported on Windows when using
# subprocess.PIPE
if os.name != "nt":
@@ -170,7 +200,14 @@
if inferior_pid_events:
inferior_pid_events.put_nowait(('created', inferior_pid))
output = process.communicate()
+
+ # The inferior should now be entirely wrapped up.
exit_status = process.returncode
+ if exit_status is None:
+ raise Exception(
+ "no exit status available after the inferior dotest.py "
+ "should have completed")
+
if inferior_pid_events:
inferior_pid_events.put_nowait(('destroyed', inferior_pid))
@@ -180,6 +217,10 @@
# only stderr does.
report_test_pass(name, output[1])
else:
+ # TODO need to differentiate a failing test from a run that
+ # was broken out of by a SIGTERM/SIGKILL, reporting those as
+ # an error. If a signal-based completion, need to call that
+ # an error.
report_test_failure(name, command, output[1])
return name, exit_status, passes, failures, unexpected_successes
@@ -250,9 +291,7 @@
return process_dir(*args)
-def process_dir_worker_threading(
- a_test_counter, a_total_tests, a_test_name_len,
- a_dotest_options, job_queue, result_queue, inferior_pid_events):
+def process_dir_worker_threading(job_queue, result_queue, inferior_pid_events):
"""Worker thread main loop when in threading mode.
This one supports the hand-rolled pooling support.
@@ -413,6 +452,150 @@
initialize_global_vars_common(num_threads, test_work_items)
+def ctrl_c_loop(main_op_func, done_func, ctrl_c_handler):
+ """Provides a main loop that is Ctrl-C protected.
+
+ The main loop calls the main_op_func() repeatedly until done_func()
+ returns true. The ctrl_c_handler() method is called with a single
+ int parameter that contains the number of times the ctrl_c has been
+ hit (starting with 1). The ctrl_c_handler() should mutate whatever
+ it needs to have the done_func() return True as soon as it is desired
+ to exit the loop.
+ """
+ done = False
+ ctrl_c_count = 0
+
+ while not done:
+ try:
+ # See if we're done. Start with done check since it is
+ # the first thing executed after a Ctrl-C handler in the
+ # following loop.
+ done = done_func()
+ if not done:
+ # Run the main op once.
+ main_op_func()
+
+ except KeyboardInterrupt:
+ ctrl_c_count += 1
+ ctrl_c_handler(ctrl_c_count)
+
+
+def pump_workers_and_asyncore_map(workers, asyncore_map):
+ """Prunes out completed workers and maintains the asyncore loop.
+
+ The asyncore loop contains the optional socket listener
+ and handlers. When all workers are complete, this method
+ takes care of stopping the listener. It also runs the
+ asyncore loop for the given async map for 10 iterations.
+
+ @param workers the list of worker Thread/Process instances.
+
+ @param asyncore_map the asyncore threading-aware map that
+ indicates which channels are in use and still alive.
+ """
+
+ # Check on all the workers, removing them from the workers
+ # list as they complete.
+ dead_workers = []
+ for worker in workers:
+ # This non-blocking join call is what allows us
+ # to still receive keyboard interrupts.
+ worker.join(0.01)
+ if not worker.is_alive():
+ dead_workers.append(worker)
+ # Clear out the completed workers
+ for dead_worker in dead_workers:
+ workers.remove(dead_worker)
+
+ # If there are no more workers and there is a listener,
+ # close the listener.
+ global RESULTS_LISTENER_CHANNEL
+ if len(workers) == 0 and RESULTS_LISTENER_CHANNEL is not None:
+ RESULTS_LISTENER_CHANNEL.close()
+ RESULTS_LISTENER_CHANNEL = None
+
+ # Pump the asyncore map if it isn't empty.
+ if len(asyncore_map) > 0:
+ asyncore.loop(0.1, False, asyncore_map, 10)
+
+
+def handle_ctrl_c(ctrl_c_count, job_queue, workers, inferior_pid_events,
+ stop_all_inferiors_func):
+ """Performs the appropriate ctrl-c action for non-pool parallel test runners
+
+ @param ctrl_c_count starting with 1, indicates the number of times ctrl-c
+ has been intercepted. The value is 1 on the first intercept, 2 on the
+ second, etc.
+
+ @param job_queue a Queue object that contains the work still outstanding
+ (i.e. hasn't been assigned to a worker yet).
+
+ @param workers list of Thread or Process workers.
+
+ @param inferior_pid_events specifies a Queue of inferior process
+ construction and destruction events. Used to build the list of inferior
+ processes that should be killed if we get that far.
+
+ @param stop_all_inferiors_func a callable object that takes the
+ workers and inferior_pid_events parameters (in that order) if a hard
+ stop is to be used on the workers.
+ """
+
+ # Print out which Ctrl-C we're handling.
+ key_name = [
+ "first",
+ "second",
+ "third",
+ "many"]
+
+ if ctrl_c_count < len(key_name):
+ name_index = ctrl_c_count - 1
+ else:
+ name_index = len(key_name) - 1
+ message = "\nHandling {} KeyboardInterrupt".format(key_name[name_index])
+ with output_lock:
+ print message
+
+ if ctrl_c_count == 1:
+ # Remove all outstanding items from the work queue so we stop
+ # doing any more new work.
+ while not job_queue.empty():
+ try:
+ # Just drain it to stop more work from being started.
+ job_queue.get_nowait()
+ except Queue.Empty:
+ pass
+ with output_lock:
+ print "Stopped more work from being started."
+ elif ctrl_c_count == 2:
+ # Try to stop all inferiors, even the ones currently doing work.
+ stop_all_inferiors_func(workers, inferior_pid_events)
+ else:
+ with output_lock:
+ print "All teardown activities kicked off, should finish soon."
+
+
+def workers_and_async_done(workers, async_map):
+ """Returns True if the workers list and asyncore channels are all done.
+
+ @param workers list of workers (threads/processes). These must adhere
+ to the threading Thread or multiprocessing.Process interface.
+
+ @param async_map the threading-aware asyncore channel map to check
+ for live channels.
+
+ @return False if the workers list exists and has any entries in it, or
+ if the async_map exists and has any entries left in it; otherwise, True.
+ """
+ if workers is not None and len(workers) > 0:
+ # We're not done if we still have workers left.
+ return False
+ if async_map is not None and len(async_map) > 0:
+ return False
+ # We're done.
+ return True
+
+
def multiprocessing_test_runner(num_threads, test_work_items):
"""Provides hand-wrapped pooling test runner adapter with Ctrl-C support.
@@ -464,36 +647,72 @@
worker.start()
workers.append(worker)
- # Wait for all workers to finish, handling ^C as needed.
- try:
- for worker in workers:
- worker.join()
- except KeyboardInterrupt:
- # First try to drain the queue of work and let the
- # running tests complete.
- while not job_queue.empty():
- try:
- # Just drain it to stop more work from being started.
- job_queue.get_nowait()
- except Queue.Empty:
- pass
+ # Main loop: wait for all workers to finish and wait for
+ # the socket handlers to wrap up.
+ ctrl_c_loop(
+ # Main operation of loop
+ lambda: pump_workers_and_asyncore_map(
+ workers, RUNNER_PROCESS_ASYNC_MAP),
- print ('\nFirst KeyboardInterrupt received, stopping '
- 'future work. Press again to hard-stop existing tests.')
- try:
- for worker in workers:
- worker.join()
- except KeyboardInterrupt:
- print ('\nSecond KeyboardInterrupt received, killing '
- 'all worker process trees.')
- kill_all_worker_processes(workers, inferior_pid_events)
+ # Return True when we're done with the main loop.
+ lambda: workers_and_async_done(workers, RUNNER_PROCESS_ASYNC_MAP),
+ # Indicate what we do when we receive one or more Ctrl-Cs.
+ lambda ctrl_c_count: handle_ctrl_c(
+ ctrl_c_count, job_queue, workers, inferior_pid_events,
+ kill_all_worker_processes))
+
+ # Reap the test results.
test_results = []
while not result_queue.empty():
test_results.append(result_queue.get(block=False))
return test_results
+def map_async_run_loop(future, channel_map, listener_channel):
+ """Blocks until the Pool.map_async completes and the channel completes.
+
+ @param future an AsyncResult instance from a Pool.map_async() call.
+
+ @param channel_map the asyncore dispatch channel map that should be pumped.
+ Optional: may be None.
+
+ @param listener_channel the channel representing a listener that should be
+ closed once the map_async results are available.
+
+ @return the results from the async_result instance.
+ """
+ map_results = None
+
+ done = False
+ while not done:
+ # Check if we need to reap the map results.
+ if map_results is None:
+ if future.ready():
+ # Get the results.
+ map_results = future.get()
+
+ # Close the runner process listener channel if we have
+ # one: no more connections will be incoming.
+ if listener_channel is not None:
+ listener_channel.close()
+
+ # Pump the asyncore loop if we have a listener socket.
+ if channel_map is not None:
+ asyncore.loop(0.01, False, channel_map, 10)
+
+ # Figure out if we're done running.
+ done = map_results is not None
+ if channel_map is not None:
+ # We have a runner process async map. Check if it
+ # is complete.
+ if len(channel_map) > 0:
+ # We still have an asyncore channel running. Not done yet.
+ done = False
+
+ return map_results
+
+
def multiprocessing_test_runner_pool(num_threads, test_work_items):
# Initialize our global state.
initialize_global_vars_multiprocessing(num_threads, test_work_items)
@@ -503,7 +722,12 @@
initializer=setup_global_variables,
initargs=(output_lock, test_counter, total_tests, test_name_len,
dotest_options))
- return pool.map(process_dir_worker_multiprocessing_pool, test_work_items)
+
+ # Start the map operation (async mode).
+ map_future = pool.map_async(
+ process_dir_worker_multiprocessing_pool, test_work_items)
+ return map_async_run_loop(
+ map_future, RUNNER_PROCESS_ASYNC_MAP, RESULTS_LISTENER_CHANNEL)
def threading_test_runner(num_threads, test_work_items):
@@ -541,53 +765,28 @@
for _ in range(num_threads):
worker = threading.Thread(
target=process_dir_worker_threading,
- args=(test_counter,
- total_tests,
- test_name_len,
- dotest_options,
- job_queue,
+ args=(job_queue,
result_queue,
inferior_pid_events))
worker.start()
workers.append(worker)
- # Wait for all workers to finish, handling ^C as needed.
- try:
- # We do some trickery here to ensure we can catch keyboard
- # interrupts.
- while len(workers) > 0:
- # Make a pass throug the workers, checking for who is done.
- dead_workers = []
- for worker in workers:
- # This non-blocking join call is what allows us
- # to still receive keyboard interrupts.
- worker.join(0.01)
- if not worker.isAlive():
- dead_workers.append(worker)
- # Clear out the completed workers
- for dead_worker in dead_workers:
- workers.remove(dead_worker)
+ # Main loop: wait for all workers to finish and wait for
+ # the socket handlers to wrap up.
+ ctrl_c_loop(
+ # Main operation of loop
+ lambda: pump_workers_and_asyncore_map(
+ workers, RUNNER_PROCESS_ASYNC_MAP),
- except KeyboardInterrupt:
- # First try to drain the queue of work and let the
- # running tests complete.
- while not job_queue.empty():
- try:
- # Just drain it to stop more work from being started.
- job_queue.get_nowait()
- except Queue.Empty:
- pass
+ # Return True when we're done with the main loop.
+ lambda: workers_and_async_done(workers, RUNNER_PROCESS_ASYNC_MAP),
- print ('\nFirst KeyboardInterrupt received, stopping '
- 'future work. Press again to hard-stop existing tests.')
- try:
- for worker in workers:
- worker.join()
- except KeyboardInterrupt:
- print ('\nSecond KeyboardInterrupt received, killing '
- 'all worker process trees.')
- kill_all_worker_threads(workers, inferior_pid_events)
+ # Indicate what we do when we receive one or more Ctrl-Cs.
+ lambda ctrl_c_count: handle_ctrl_c(
+ ctrl_c_count, job_queue, workers, inferior_pid_events,
+ kill_all_worker_threads))
+ # Reap the test results.
test_results = []
while not result_queue.empty():
test_results.append(result_queue.get(block=False))
@@ -598,20 +797,49 @@
# Initialize our global state.
initialize_global_vars_threading(num_threads, test_work_items)
- pool = multiprocessing.pool.ThreadPool(
- num_threads
- # initializer=setup_global_variables,
- # initargs=(output_lock, test_counter, total_tests, test_name_len,
- # dotest_options)
- )
- return pool.map(process_dir_worker_threading_pool, test_work_items)
+ pool = multiprocessing.pool.ThreadPool(num_threads)
+ map_future = pool.map_async(
+ process_dir_worker_threading_pool, test_work_items)
+
+ return map_async_run_loop(
+ map_future, RUNNER_PROCESS_ASYNC_MAP, RESULTS_LISTENER_CHANNEL)
+
+
+def asyncore_run_loop(channel_map):
+ try:
+ asyncore.loop(None, False, channel_map)
+ except:
+ # Swallow it, we're seeing:
+ # error: (9, 'Bad file descriptor')
+ # when the listener channel is closed. Shouldn't be the case.
+ pass
def inprocess_exec_test_runner(test_work_items):
# Initialize our global state.
initialize_global_vars_multiprocessing(1, test_work_items)
- return map(process_dir_mapper_inprocess, test_work_items)
+ # Run the listener and related channel maps in a separate thread.
+ # global RUNNER_PROCESS_ASYNC_MAP
+ global RESULTS_LISTENER_CHANNEL
+ if RESULTS_LISTENER_CHANNEL is not None:
+ socket_thread = threading.Thread(
+ target=lambda: asyncore_run_loop(RUNNER_PROCESS_ASYNC_MAP))
+ socket_thread.start()
+
+ # Do the work.
+ test_results = map(process_dir_mapper_inprocess, test_work_items)
+
+ # If we have a listener channel, shut it down here.
+ if RESULTS_LISTENER_CHANNEL is not None:
+ # Close down the channel.
+ RESULTS_LISTENER_CHANNEL.close()
+ RESULTS_LISTENER_CHANNEL = None
+
+ # Wait for the listener and handlers to complete.
+ socket_thread.join()
+
+ return test_results
def walk_and_invoke(test_directory, test_subdir, dotest_argv,
test_runner_func):
@@ -624,6 +852,22 @@
test_subdir - lldb/test/ or a subfolder with the tests we're interested in
running
"""
+ # The async_map is important to keep all thread-related asyncore
+ # channels distinct when we call asyncore.loop() later on.
+ global RESULTS_LISTENER_CHANNEL, RUNNER_PROCESS_ASYNC_MAP
+ RUNNER_PROCESS_ASYNC_MAP = {}
+
+ # If we're outputting side-channel test results, create the socket
+ # listener channel and tell the inferior to send results to the
+ # port on which we'll be listening.
+ if RESULTS_FORMATTER is not None:
+ forwarding_func = lambda event: inferior_session_interceptor(
+ RESULTS_FORMATTER.process_event, event)
+ RESULTS_LISTENER_CHANNEL = (
+ dotest_channels.UnpicklingForwardingListenerChannel(
+ RUNNER_PROCESS_ASYNC_MAP, "localhost", 0, forwarding_func))
+ dotest_argv.append("--results-port")
+ dotest_argv.append(str(RESULTS_LISTENER_CHANNEL.address[1]))
# Collect the test files that we'll run.
test_work_items = []
@@ -654,7 +898,7 @@
if platform_name is None:
target = sys.platform
else:
- m = re.search('remote-(\w+)', platform_name)
+ m = re.search(r'remote-(\w+)', platform_name)
target = m.group(1)
expected_timeout = set()
@@ -759,20 +1003,94 @@
# threading-pool uses threading for the workers (in-process)
# and uses the multiprocessing.pool thread-enabled pool.
+ # This does not properly support Ctrl-C.
"threading-pool":
(lambda work_items: threading_test_runner_pool(
num_threads, work_items)),
# serial uses the subprocess-based, single process
# test runner. This provides process isolation but
- # no concurrent test running.
+ # no concurrent test execution.
"serial":
inprocess_exec_test_runner
}
+def _remove_option(args, option_name, removal_count):
+ """Removes option and related option arguments from args array.
+ @param args the array of command line arguments (in/out)
+ @param option_name the full command line representation of the
+ option that will be removed (including '--' or '-').
+ @param the count of elements to remove. A value of 1 will remove
+ just the found option, while 2 will remove the option and its first
+ argument.
+ """
+ try:
+ index = args.index(option_name)
+ # Handle the exact match case.
+ del args[index:index+removal_count]
+ return
+ except ValueError:
+ # Thanks to argparse not handling options with known arguments
+ # like other options parsing libraries (see
+ # https://bugs.python.org/issue9334), we need to support the
+ # --results-formatter-options={second-level-arguments} (note
+ # the equal sign to fool the first-level arguments parser into
+ # not treating the second-level arguments as first-level
+ # options). We're certainly at risk of getting this wrong
+ # since now we're forced into the business of trying to figure
+ # out what is an argument (although I think this
+ # implementation will suffice).
+ regex_string = "^" + option_name + "="
+ regex = re.compile(regex_string)
+ for index in range(len(args)):
+ match = regex.match(args[index])
+ if match:
+ print "found matching option= at index {}".format(index)
+ del args[index]
+ return
+ print "failed to find regex '{}'".format(regex_string)
+
+ # We didn't find the option but we should have.
+ raise Exception("failed to find option '{}' in args '{}'".format(
+ option_name, args))
+
+
+def adjust_inferior_options(dotest_argv):
+ """Adjusts the commandline args array for inferiors.
+
+ This method adjusts the inferior dotest commandline options based
+ on the parallel test runner's options. Some of the inferior options
+ will need to change to properly handle aggregation functionality.
+ """
+ global dotest_options
+
+ # If we don't have a session directory, create one.
+ if not dotest_options.s:
+ # no session log directory, we need to add this to prevent
+ # every dotest invocation from creating its own directory
+ import datetime
+ # The windows platforms don't like ':' in the pathname.
+ timestamp_started = datetime.datetime.now().strftime("%F-%H_%M_%S")
+ dotest_argv.append('-s')
+ dotest_argv.append(timestamp_started)
+ dotest_options.s = timestamp_started
+
+ # Adjust inferior results formatter options - if the parallel
+ # test runner is collecting into the user-specified test results,
+ # we'll have inferiors spawn with the --results-port option and
+ # strip the original test runner options.
+ if dotest_options.results_file is not None:
+ _remove_option(dotest_argv, "--results-file", 2)
+ if dotest_options.results_port is not None:
+ _remove_option(dotest_argv, "--results-port", 2)
+ if dotest_options.results_formatter is not None:
+ _remove_option(dotest_argv, "--results-formatter", 2)
+ if dotest_options.results_formatter_options is not None:
+ _remove_option(dotest_argv, "--results-formatter-options", 2)
+
def main(print_details_on_success, num_threads, test_subdir,
- test_runner_name):
+ test_runner_name, results_formatter):
"""Run dotest.py in inferior mode in parallel.
@param print_details_on_success the parsed value of the output-on-success
@@ -793,53 +1111,31 @@
system to choose the most appropriate test runner given desired
thread count and OS type.
+ @param results_formatter if specified, provides the TestResultsFormatter
+ instance that will format and output test result data from the
+ side-channel test results. When specified, inferior dotest calls
+ will send test results side-channel data over a socket to the parallel
+ test runner, which will forward them on to results_formatter.
"""
dotest_argv = sys.argv[1:]
- global output_on_success
+ global output_on_success, RESULTS_FORMATTER
output_on_success = print_details_on_success
+ RESULTS_FORMATTER = results_formatter
# We can't use sys.path[0] to determine the script directory
# because it doesn't work under a debugger
- test_directory = os.path.dirname(os.path.realpath(__file__))
- parser = OptionParser(usage="""\
-Run lldb test suite using a separate process for each test file.
-
- Each test will run with a time limit of 10 minutes by default.
-
- Override the default time limit of 10 minutes by setting
- the environment variable LLDB_TEST_TIMEOUT.
-
- E.g., export LLDB_TEST_TIMEOUT=10m
-
- Override the time limit for individual tests by setting
- the environment variable LLDB_[TEST NAME]_TIMEOUT.
-
- E.g., export LLDB_TESTCONCURRENTEVENTS_TIMEOUT=2m
-
- Set to "0" to run without time limit.
-
- E.g., export LLDB_TEST_TIMEOUT=0
- or export LLDB_TESTCONCURRENTEVENTS_TIMEOUT=0
-""")
parser = dotest_args.create_parser()
global dotest_options
dotest_options = dotest_args.parse_args(parser, dotest_argv)
- if not dotest_options.s:
- # no session log directory, we need to add this to prevent
- # every dotest invocation from creating its own directory
- import datetime
- # The windows platforms don't like ':' in the pathname.
- timestamp_started = datetime.datetime.now().strftime("%F-%H_%M_%S")
- dotest_argv.append('-s')
- dotest_argv.append(timestamp_started)
- dotest_options.s = timestamp_started
+ adjust_inferior_options(dotest_argv)
session_dir = os.path.join(os.getcwd(), dotest_options.s)
# The root directory was specified on the command line
+ test_directory = os.path.dirname(os.path.realpath(__file__))
if test_subdir and len(test_subdir) > 0:
test_subdir = os.path.join(test_directory, test_subdir)
else: