Adds parallel work queue index to test events, stdout/stderr results support.
See http://reviews.llvm.org/D12983 for details.
llvm-svn: 248036
diff --git a/lldb/test/dosep.py b/lldb/test/dosep.py
index 0148508..18d697c 100755
--- a/lldb/test/dosep.py
+++ b/lldb/test/dosep.py
@@ -80,7 +80,12 @@
RUNNER_PROCESS_ASYNC_MAP = None
RESULTS_LISTENER_CHANNEL = None
-def setup_global_variables(lock, counter, total, name_len, options):
+"""Contains an optional function pointer that can return the worker index
+ for the given thread/process calling it. Returns a 0-based index."""
+GET_WORKER_INDEX = None
+
+def setup_global_variables(
+ lock, counter, total, name_len, options, worker_index_map):
global output_lock, test_counter, total_tests, test_name_len
global dotest_options
output_lock = lock
@@ -89,6 +94,22 @@
test_name_len = name_len
dotest_options = options
+ if worker_index_map is not None:
+ # We'll use the output lock for this to avoid sharing another lock.
+ # This won't be used much.
+ index_lock = lock
+
+ def get_worker_index_use_pid():
+ """Returns a 0-based, process-unique index for the worker."""
+ pid = os.getpid()
+ with index_lock:
+ if pid not in worker_index_map:
+ worker_index_map[pid] = len(worker_index_map)
+ return worker_index_map[pid]
+
+ global GET_WORKER_INDEX
+ GET_WORKER_INDEX = get_worker_index_use_pid
+
def report_test_failure(name, command, output):
global output_lock
@@ -150,31 +171,6 @@
return passes, failures, unexpected_successes
-def inferior_session_interceptor(forwarding_func, event):
- """Intercepts session begin/end events, passing through everyting else.
-
- @param forwarding_func a callable object to pass along the event if it
- is not one that gets intercepted.
-
- @param event the test result event received.
- """
-
- if event is not None and isinstance(event, dict):
- if "event" in event:
- if event["event"] == "session_begin":
- # Swallow it. Could report on inferior here if we
- # cared.
- return
- elif event["event"] == "session_end":
- # Swallow it. Could report on inferior here if we
- # cared. More usefully, we can verify that the
- # inferior went down hard if we don't receive this.
- return
-
- # Pass it along.
- forwarding_func(event)
-
-
def call_with_timeout(command, timeout, name, inferior_pid_events):
"""Run command with a timeout if possible.
-s QUIT will create a coredump if they are enabled on your system
@@ -183,6 +179,10 @@
if timeout_command and timeout != "0":
command = [timeout_command, '-s', 'QUIT', timeout] + command
+ if GET_WORKER_INDEX is not None:
+ worker_index = GET_WORKER_INDEX()
+ command.extend([
+ "--event-add-entries", "worker_index={}".format(worker_index)])
# Specifying a value for close_fds is unsupported on Windows when using
# subprocess.PIPE
if os.name != "nt":
@@ -263,7 +263,8 @@
def process_dir_worker_multiprocessing(
a_output_lock, a_test_counter, a_total_tests, a_test_name_len,
- a_dotest_options, job_queue, result_queue, inferior_pid_events):
+ a_dotest_options, job_queue, result_queue, inferior_pid_events,
+ worker_index_map):
"""Worker thread main loop when in multiprocessing mode.
Takes one directory specification at a time and works on it."""
@@ -273,7 +274,7 @@
# Setup the global state for the worker process.
setup_global_variables(
a_output_lock, a_test_counter, a_total_tests, a_test_name_len,
- a_dotest_options)
+ a_dotest_options, worker_index_map)
# Keep grabbing entries from the queue until done.
while not job_queue.empty():
@@ -441,14 +442,36 @@
# rest of the flat module.
global output_lock
output_lock = multiprocessing.RLock()
+
initialize_global_vars_common(num_threads, test_work_items)
def initialize_global_vars_threading(num_threads, test_work_items):
+ """Initializes global variables used in threading mode.
+ @param num_threads specifies the number of workers used.
+ @param test_work_items specifies all the work items
+ that will be processed.
+ """
# Initialize the global state we'll use to communicate with the
# rest of the flat module.
global output_lock
output_lock = threading.RLock()
+
+ index_lock = threading.RLock()
+ index_map = {}
+
+ def get_worker_index_threading():
+ """Returns a 0-based, thread-unique index for the worker thread."""
+ thread_id = threading.current_thread().ident
+ with index_lock:
+ if thread_id not in index_map:
+ index_map[thread_id] = len(index_map)
+ return index_map[thread_id]
+
+
+ global GET_WORKER_INDEX
+ GET_WORKER_INDEX = get_worker_index_threading
+
initialize_global_vars_common(num_threads, test_work_items)
@@ -630,6 +653,10 @@
# hold 2 * (num inferior dotest.py processes started) entries.
inferior_pid_events = multiprocessing.Queue(4096)
+ # Worker dictionary allows each worker to figure out its worker index.
+ manager = multiprocessing.Manager()
+ worker_index_map = manager.dict()
+
# Create workers. We don't use multiprocessing.Pool due to
# challenges with handling ^C keyboard interrupts.
workers = []
@@ -643,7 +670,8 @@
dotest_options,
job_queue,
result_queue,
- inferior_pid_events))
+ inferior_pid_events,
+ worker_index_map))
worker.start()
workers.append(worker)
@@ -717,11 +745,14 @@
# Initialize our global state.
initialize_global_vars_multiprocessing(num_threads, test_work_items)
+ manager = multiprocessing.Manager()
+ worker_index_map = manager.dict()
+
pool = multiprocessing.Pool(
num_threads,
initializer=setup_global_variables,
initargs=(output_lock, test_counter, total_tests, test_name_len,
- dotest_options))
+ dotest_options, worker_index_map))
# Start the map operation (async mode).
map_future = pool.map_async(
@@ -819,6 +850,10 @@
# Initialize our global state.
initialize_global_vars_multiprocessing(1, test_work_items)
+ # We're always worker index 0
+ global GET_WORKER_INDEX
+ GET_WORKER_INDEX = lambda: 0
+
# Run the listener and related channel maps in a separate thread.
# global RUNNER_PROCESS_ASYNC_MAP
global RESULTS_LISTENER_CHANNEL
@@ -861,8 +896,7 @@
# listener channel and tell the inferior to send results to the
# port on which we'll be listening.
if RESULTS_FORMATTER is not None:
- forwarding_func = lambda event: inferior_session_interceptor(
- RESULTS_FORMATTER.process_event, event)
+ forwarding_func = RESULTS_FORMATTER.process_event
RESULTS_LISTENER_CHANNEL = (
dotest_channels.UnpicklingForwardingListenerChannel(
RUNNER_PROCESS_ASYNC_MAP, "localhost", 0, forwarding_func))
@@ -1088,6 +1122,11 @@
if dotest_options.results_formatter_options is not None:
_remove_option(dotest_argv, "--results-formatter-options", 2)
+ # Remove test runner name if present.
+ if dotest_options.test_runner_name is not None:
+ _remove_option(dotest_argv, "--test-runner-name", 2)
+
+
def main(print_details_on_success, num_threads, test_subdir,
test_runner_name, results_formatter):
"""Run dotest.py in inferior mode in parallel.