Module to run subprocesses with captured output

This change adds a module within pw_cli that provides a function for
running a subprocess and capturing its stdout and stderr to the main
pw log. A new log level is defined for the captured output.

The pw_unit_test runner script is updated to use this function instead
of running its subprocess directly.

The pw command is updated to run its subcommand in an async context,
preventing subcommands from needing to create their own. Fully
synchronous subcommands continue to function transparently to this.

Change-Id: Ia9efb1fbda8a38ef91a0e14113929711038710a8
diff --git a/pw_cli/py/pw_cli/__init__.py b/pw_cli/py/pw_cli/__init__.py
index 4b812ae..e0c7f08 100644
--- a/pw_cli/py/pw_cli/__init__.py
+++ b/pw_cli/py/pw_cli/__init__.py
@@ -15,3 +15,5 @@
 # Note that these imports will trigger plugin registrations.
 import pw_cli.log
 import pw_cli.watch
+
+import pw_cli.process
diff --git a/pw_cli/py/pw_cli/__main__.py b/pw_cli/py/pw_cli/__main__.py
index 8eefaba..565497e 100644
--- a/pw_cli/py/pw_cli/__main__.py
+++ b/pw_cli/py/pw_cli/__main__.py
@@ -21,6 +21,7 @@
 """
 
 import argparse
+import asyncio
 import sys
 import logging
 import importlib
@@ -39,6 +40,7 @@
   ▒█      ░█░ ░▓███▀   ▒█▓▀▓█░ ░▓████▒ ░▓████▒ ▒▓████▀
 '''
 
+
 def main(raw_args=None):
     if raw_args is None:
         raw_args = sys.argv[1:]
@@ -86,18 +88,26 @@
         command.define_args_function(subparser)
         subparser.set_defaults(_command=command.command_function)
 
+        # Check whether the sub-command's entry point is asynchronous.
+        subparser.set_defaults(
+            _run_async=asyncio.iscoroutinefunction(command.command_function))
+
     args = parser.parse_args(raw_args)
 
     args_as_dict = dict(vars(args))
     del args_as_dict['_command']
+    del args_as_dict['_run_async']
 
     # Set root log level; but then remove the arg to avoid breaking the command.
     if 'loglevel' in args_as_dict:
         logging.getLogger().setLevel(
-                getattr(logging, args_as_dict['loglevel'].upper()))
+            getattr(logging, args_as_dict['loglevel'].upper()))
         del args_as_dict['loglevel']
 
-    args._command(**args_as_dict)
+    if args._run_async:
+        asyncio.run(args._command(**args_as_dict))
+    else:
+        args._command(**args_as_dict)
 
 
 if __name__ == "__main__":
diff --git a/pw_cli/py/pw_cli/log.py b/pw_cli/py/pw_cli/log.py
index 7e650b4..ed0e629 100644
--- a/pw_cli/py/pw_cli/log.py
+++ b/pw_cli/py/pw_cli/log.py
@@ -17,6 +17,9 @@
 import pw_cli.plugins
 from pw_cli.color import Color as _Color
 
+# Log level used for captured output of a subprocess run through pw.
+LOGLEVEL_STDOUT = 21
+
 _LOG = logging.getLogger(__name__)
 
 def main():
@@ -28,6 +31,7 @@
     _LOG.error('There was an error on our last operation')
     _LOG.warning('Looks like something is amiss; consider investigating')
     _LOG.info('The operation went as expected')
+    _LOG.log(LOGLEVEL_STDOUT, 'Standard output of subprocess')
     _LOG.debug('Adding 1 to i')
 
 
@@ -37,7 +41,7 @@
     # from other input, in a way that's easier to see than plain colored text.
     logging.basicConfig(format=_Color.black_on_white('%(asctime)s') +
                         ' %(levelname)s %(message)s',
-                        datefmt='%Y%m%d %I:%M:%S %p',
+                        datefmt='%Y%m%d %H:%M:%S',
                         level=logging.INFO)
 
     # Shorten all the log levels to 3 characters for column-aligned logs.
@@ -47,6 +51,7 @@
     logging.addLevelName(logging.ERROR,    _Color.red     ('ERR'))
     logging.addLevelName(logging.WARNING,  _Color.yellow  ('WRN'))
     logging.addLevelName(logging.INFO,     _Color.magenta ('INF'))
+    logging.addLevelName(LOGLEVEL_STDOUT,  _Color.cyan    ('OUT'))
     logging.addLevelName(logging.DEBUG,    _Color.blue    ('DBG'))
     # yapf: enable
 
diff --git a/pw_cli/py/pw_cli/process.py b/pw_cli/py/pw_cli/process.py
new file mode 100644
index 0000000..767d6c0
--- /dev/null
+++ b/pw_cli/py/pw_cli/process.py
@@ -0,0 +1,60 @@
+# Copyright 2019 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+#     https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+
+"""Module for running subprocesses from pw and capturing their output."""
+
+import asyncio
+import logging
+import shlex
+
+from pw_cli.color import Color
+import pw_cli.log
+
+_LOG = logging.getLogger(__name__)
+
+
+async def run_async(*args: str, silent: bool = False) -> int:
+    """Runs a command, capturing and logging its output.
+
+    Returns the exit status of the command.
+    """
+
+    command = args[0]
+    _LOG.info('Running `%s`', shlex.join(command))
+
+    stdout = asyncio.subprocess.DEVNULL if silent else asyncio.subprocess.PIPE
+    process = await asyncio.create_subprocess_exec(
+        *command,
+        stdout=stdout,
+        stderr=asyncio.subprocess.STDOUT)
+
+    if process.stdout is not None:
+        while line := await process.stdout.readline():
+            _LOG.log(pw_cli.log.LOGLEVEL_STDOUT,
+                     '[%s] %s',
+                     Color.bold_white(process.pid),
+                     line.decode().rstrip())
+
+    status = await process.wait()
+    if status == 0:
+        _LOG.info('%s exited successfully', command[0])
+    else:
+        _LOG.error('%s exited with status %d', command[0], status)
+
+    return status
+
+
+def run(*args: str, silent: bool = False) -> int:
+    """Synchronous wrapper for run_async."""
+    return asyncio.run(run_async(args, silent))
diff --git a/pw_unit_test/py/pw_unit_test/test_runner.py b/pw_unit_test/py/pw_unit_test/test_runner.py
index c69b235..02c60d6 100644
--- a/pw_unit_test/py/pw_unit_test/test_runner.py
+++ b/pw_unit_test/py/pw_unit_test/test_runner.py
@@ -15,6 +15,7 @@
 """Runs Pigweed unit tests built using GN."""
 
 import argparse
+import asyncio
 import enum
 import json
 import logging
@@ -25,6 +26,8 @@
 
 from typing import Dict, Iterable, List, Optional, Sequence, Set, Tuple
 
+import pw_cli.process
+
 # Global logger for the script.
 _LOG: logging.Logger = logging.getLogger(__name__)
 
@@ -118,7 +121,7 @@
         self._args: Sequence[str] = args
         self._tests: List[Test] = list(tests)
 
-    def run_tests(self) -> None:
+    async def run_tests(self) -> None:
         """Runs all registered unit tests through the runner script."""
 
         for idx, test in enumerate(self._tests, 1):
@@ -128,7 +131,7 @@
             _LOG.info('%s: [RUN] %s', test_counter, test.name)
             command = [self._executable, test.file_path, *self._args]
             try:
-                status = subprocess.call(command)
+                status = await pw_cli.process.run_async(command)
                 if status == 0:
                     test.status = TestResult.SUCCESS
                     test_result = 'PASS'
@@ -295,12 +298,12 @@
 
 # TODO(frolv): Try to figure out a better solution for passing through the
 # corrected sys.argv across all pw commands.
-def find_and_run_tests(argv_copy: List[str],
-                       root: str = '',
-                       runner: str = '',
-                       runner_args: Sequence[str] = (),
-                       group: Optional[Sequence[str]] = None,
-                       test: Optional[Sequence[str]] = None) -> int:
+async def find_and_run_tests(argv_copy: List[str],
+                             root: str,
+                             runner: str,
+                             runner_args: Sequence[str] = (),
+                             group: Optional[Sequence[str]] = None,
+                             test: Optional[Sequence[str]] = None) -> int:
     """Runs some unit tests."""
 
     if runner_args:
@@ -327,28 +330,28 @@
         tests = tests_from_groups(group, root)
 
     test_runner = TestRunner(runner, runner_args, tests)
-    test_runner.run_tests()
+    await test_runner.run_tests()
 
     return 0 if test_runner.all_passed() else 1
 
 
-def run_as_plugin(**kwargs) -> None:
+async def run_as_plugin(**kwargs) -> None:
     """Entry point when running as a pw plugin."""
 
     # Replace the virtualenv file path to the script in sys.argv[0] with the
     # pw script so that users have a valid command to copy.
     argv_copy = ['pw', *sys.argv[1:]]
-    find_and_run_tests(argv_copy, **kwargs)
+    await find_and_run_tests(argv_copy, **kwargs)
 
 try:
     import pw_cli.plugins
-
     pw_cli.plugins.register(
         name='test',
         help='Runs groups of unit tests on a target',
         command_function=run_as_plugin,
         define_args_function=register_arguments,
     )
+
 except ImportError:
     pass
 
@@ -372,7 +375,7 @@
 
     args_as_dict = dict(vars(args))
     del args_as_dict['verbose']
-    return find_and_run_tests(sys.argv, **args_as_dict)
+    return asyncio.run(find_and_run_tests(sys.argv, **args_as_dict))
 
 
 if __name__ == '__main__':