pw_console: Incremental stdout repl output
- Incrementally check stdout as user code is in progress.
- Clean up jinja2 template loading
- Update repl_output.jinja to look like standard python prompt output
- Remove the indentation from output, exceptions, and return values.
This makes copying python output easier.
- Add a few more code themes for readability
- Create a highly readable code theme: 'pigweed-code'
- Update repl_pane_test.py to use IsolatedAsyncioTestCase
Bug: 407
Test: Incremental Stdout steps
Change-Id: I151047bff50ed046b5eaa8114d37a1d9d8574e65
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/55100
Commit-Queue: Anthony DiGirolamo <tonymd@google.com>
Reviewed-by: Keir Mierle <keir@google.com>
diff --git a/pw_console/py/BUILD.gn b/pw_console/py/BUILD.gn
index fa335f5..89ac66b 100644
--- a/pw_console/py/BUILD.gn
+++ b/pw_console/py/BUILD.gn
@@ -33,6 +33,7 @@
"pw_console/log_store.py",
"pw_console/log_view.py",
"pw_console/mouse.py",
+ "pw_console/pigweed_code_style.py",
"pw_console/pw_ptpython_repl.py",
"pw_console/repl_pane.py",
"pw_console/search_toolbar.py",
diff --git a/pw_console/py/help_window_test.py b/pw_console/py/help_window_test.py
index 334b299..0df8695 100644
--- a/pw_console/py/help_window_test.py
+++ b/pw_console/py/help_window_test.py
@@ -14,12 +14,28 @@
"""Tests for pw_console.console_app"""
import inspect
+import logging
import unittest
-from unittest.mock import Mock
+from unittest.mock import MagicMock
+from jinja2 import Environment, PackageLoader, make_logging_undefined
from prompt_toolkit.key_binding import KeyBindings
-from pw_console.help_window import HelpWindow, KEYBIND_TEMPLATE
+from pw_console.help_window import HelpWindow
+
+_jinja_env = Environment(
+ loader=PackageLoader('pw_console'),
+ undefined=make_logging_undefined(logger=logging.getLogger('pw_console')),
+ trim_blocks=True,
+ lstrip_blocks=True,
+)
+
+
+def _create_app_mock():
+ template = _jinja_env.get_template('keybind_list.jinja')
+ mock_app = MagicMock()
+ mock_app.get_template = MagicMock(return_value=template)
+ return mock_app
class TestHelpWindow(unittest.TestCase):
@@ -28,13 +44,10 @@
self.maxDiff = None # pylint: disable=invalid-name
def test_instantiate(self) -> None:
- app = Mock()
+ app = _create_app_mock()
help_window = HelpWindow(app)
self.assertIsNotNone(help_window)
- def test_template_loads(self) -> None:
- self.assertIn('{%', KEYBIND_TEMPLATE)
-
# pylint: disable=unused-variable,unused-argument
def test_add_keybind_help_text(self) -> None:
bindings = KeyBindings()
@@ -48,7 +61,7 @@
def exit_(event):
"""Quit the application."""
- app = Mock()
+ app = _create_app_mock()
help_window = HelpWindow(app)
help_window.add_keybind_help_text('Global', bindings)
@@ -89,14 +102,16 @@
def app_focus_previous(event):
"""Move focus to the previous widget."""
- app = Mock()
+ app = _create_app_mock()
- help_window = HelpWindow(app,
- preamble='Pigweed CLI v0.1',
- additional_help_text=inspect.cleandoc("""
- Welcome to the Pigweed Console!
- Please enjoy this extra help text.
- """))
+ help_window = HelpWindow(
+ app,
+ preamble='Pigweed CLI v0.1',
+ additional_help_text=inspect.cleandoc("""
+ Welcome to the Pigweed Console!
+ Please enjoy this extra help text.
+ """),
+ )
help_window.add_keybind_help_text('Global', global_bindings)
help_window.add_keybind_help_text('Focus', focus_bindings)
help_window.generate_help_text()
diff --git a/pw_console/py/log_view_test.py b/pw_console/py/log_view_test.py
index 204075e..d110809 100644
--- a/pw_console/py/log_view_test.py
+++ b/pw_console/py/log_view_test.py
@@ -360,8 +360,7 @@
if _PYTHON_3_8:
- # pylint: disable=no-name-in-module
- from unittest import IsolatedAsyncioTestCase # type: ignore
+ from unittest import IsolatedAsyncioTestCase # type: ignore # pylint: disable=no-name-in-module
class TestLogViewFiltering(IsolatedAsyncioTestCase): # pylint: disable=undefined-variable
"""Test LogView log filtering capabilities."""
diff --git a/pw_console/py/pw_console/console_app.py b/pw_console/py/pw_console/console_app.py
index a29222b..4b1c743 100644
--- a/pw_console/py/pw_console/console_app.py
+++ b/pw_console/py/pw_console/console_app.py
@@ -21,6 +21,7 @@
from threading import Thread
from typing import Iterable, Union
+from jinja2 import Environment, PackageLoader, make_logging_undefined
from prompt_toolkit.clipboard.pyperclip import PyperclipClipboard
from prompt_toolkit.layout.menus import CompletionsMenu
from prompt_toolkit.application import Application
@@ -122,6 +123,18 @@
local_vars = local_vars or global_vars
+ # Setup the Jinja environment
+ self.jinja_env = Environment(
+ # Load templates automatically from pw_console/templates
+ loader=PackageLoader(__package__),
+ # Raise errors if variables are undefined in templates
+ undefined=make_logging_undefined(
+ logger=logging.getLogger(__package__), ),
+ # Trim whitespace in templates
+ trim_blocks=True,
+ lstrip_blocks=True,
+ )
+
# TODO(tonymd): Make these configurable per project.
self.repl_history_filename = Path.home() / '.pw_console_history'
self.search_history_filename = Path.home() / '.pw_console_search'
@@ -290,6 +303,9 @@
clipboard=PyperclipClipboard(),
)
+ def get_template(self, file_name: str):
+ return self.jinja_env.get_template(file_name)
+
def run_pane_menu_option(self, function_to_run):
function_to_run()
self.update_menu_items()
@@ -334,6 +350,22 @@
'high-contrast-dark')),
MenuItem('-'),
MenuItem(
+ 'Code: pigweed-code',
+ functools.partial(self.pw_ptpython_repl.use_code_colorscheme,
+ 'pigweed-code')),
+ MenuItem(
+ 'Code: material',
+ functools.partial(self.pw_ptpython_repl.use_code_colorscheme,
+ 'material')),
+ MenuItem(
+ 'Code: gruvbox-light',
+ functools.partial(self.pw_ptpython_repl.use_code_colorscheme,
+ 'gruvbox-light')),
+ MenuItem(
+ 'Code: gruvbox-dark',
+ functools.partial(self.pw_ptpython_repl.use_code_colorscheme,
+ 'gruvbox-dark')),
+ MenuItem(
'Code: tomorrow-night',
functools.partial(self.pw_ptpython_repl.use_code_colorscheme,
'tomorrow-night')),
diff --git a/pw_console/py/pw_console/help_window.py b/pw_console/py/pw_console/help_window.py
index 311ff3e..9d3867a 100644
--- a/pw_console/py/pw_console/help_window.py
+++ b/pw_console/py/pw_console/help_window.py
@@ -18,7 +18,6 @@
from pathlib import Path
from typing import Dict
-from jinja2 import Template
from prompt_toolkit.document import Document
from prompt_toolkit.filters import Condition
from prompt_toolkit.key_binding import KeyBindings, KeyPressEvent
@@ -35,10 +34,6 @@
_LOG = logging.getLogger(__package__)
-HELP_TEMPLATE_PATH = Path(__file__).parent / "templates" / "keybind_list.jinja"
-with HELP_TEMPLATE_PATH.open() as tmpl:
- KEYBIND_TEMPLATE = tmpl.read()
-
def _longest_line_length(text):
"""Return the longest line in the given text."""
@@ -155,12 +150,7 @@
def generate_help_text(self):
"""Generate help text based on added key bindings."""
- # pylint: disable=line-too-long
- template = Template(
- KEYBIND_TEMPLATE,
- trim_blocks=True,
- lstrip_blocks=True,
- )
+ template = self.application.get_template('keybind_list.jinja')
self.help_text = template.render(
sections=self.help_text_sections,
diff --git a/pw_console/py/pw_console/pigweed_code_style.py b/pw_console/py/pw_console/pigweed_code_style.py
new file mode 100644
index 0000000..dd115e2
--- /dev/null
+++ b/pw_console/py/pw_console/pigweed_code_style.py
@@ -0,0 +1,42 @@
+# Copyright 2021 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Brighter PigweedCode pygments style."""
+
+import copy
+from pygments.style import Style # type: ignore
+from pygments.token import Keyword, Name, Generic # type: ignore
+from pygments_style_dracula.dracula import DraculaStyle # type: ignore
+
+_style_list = copy.copy(DraculaStyle.styles)
+
+# Darker Prompt
+_style_list[Generic.Prompt] = '#bfbfbf'
+# Lighter output
+_style_list[Generic.Output] = '#f8f8f2'
+# Remove 'italic' from these
+_style_list[Keyword.Declaration] = '#8be9fd'
+_style_list[Name.Builtin] = '#8be9fd'
+_style_list[Name.Label] = '#8be9fd'
+_style_list[Name.Variable] = '#8be9fd'
+_style_list[Name.Variable.Class] = '#8be9fd'
+_style_list[Name.Variable.Global] = '#8be9fd'
+_style_list[Name.Variable.Instance] = '#8be9fd'
+
+
+class PigweedCodeStyle(Style):
+
+ background_color = "#2e2e2e"
+ default_style = ""
+
+ styles = _style_list
diff --git a/pw_console/py/pw_console/pw_ptpython_repl.py b/pw_console/py/pw_console/pw_ptpython_repl.py
index ef13590..bc68e53 100644
--- a/pw_console/py/pw_console/pw_ptpython_repl.py
+++ b/pw_console/py/pw_console/pw_ptpython_repl.py
@@ -18,7 +18,7 @@
import io
import logging
import sys
-from typing import Iterable, Optional
+from typing import Iterable, Optional, TYPE_CHECKING
from prompt_toolkit.buffer import Buffer
from prompt_toolkit.completion import merge_completers
@@ -36,6 +36,9 @@
import pw_console.text_formatting
+if TYPE_CHECKING:
+ from pw_console.repl_pane import ReplPane
+
_LOG = logging.getLogger(__package__)
@@ -79,7 +82,7 @@
self.enable_dictionary_completion = True
# Change some ptpython.repl defaults.
- self.use_code_colorscheme('tomorrow-night-bright')
+ self.use_code_colorscheme('pigweed-code')
self.show_status_bar = False
self.show_exit_confirmation = False
self.complete_private_attributes = (
@@ -98,7 +101,7 @@
CompletionVisualisation.NONE)
# Additional state variables.
- self.repl_pane = None
+ self.repl_pane: 'Optional[ReplPane]' = None
self._last_result = None
def __pt_container__(self):
@@ -119,9 +122,6 @@
"""Erase the last repl execution result."""
self._last_result = None
- def _update_output_buffer(self):
- self.repl_pane.update_output_buffer()
-
def show_result(self, result):
"""Format and save output results.
@@ -168,12 +168,13 @@
stderr_contents)
# Rebuild output buffer.
- self._update_output_buffer()
+ self.repl_pane.update_output_buffer(
+ 'pw_ptpython_repl.user_code_complete_callback')
# Trigger a prompt_toolkit application redraw.
self.repl_pane.application.application.invalidate()
- async def _run_user_code(self, text):
+ async def _run_user_code(self, text, stdout_proxy, stdin_proxy):
"""Run user code and capture stdout+err.
This fuction should be run in a separate thread from the main
@@ -187,11 +188,8 @@
original_stdout = sys.stdout
original_stderr = sys.stderr
- temp_out = io.StringIO()
- temp_err = io.StringIO()
-
- sys.stdout = temp_out
- sys.stderr = temp_err
+ sys.stdout = stdout_proxy
+ sys.stderr = stdin_proxy
# Run user repl code
try:
@@ -202,8 +200,8 @@
sys.stderr = original_stderr
# Save the captured output
- stdout_contents = temp_out.getvalue()
- stderr_contents = temp_err.getvalue()
+ stdout_contents = stdout_proxy.getvalue()
+ stderr_contents = stdin_proxy.getvalue()
return {
'stdout': stdout_contents,
@@ -217,27 +215,35 @@
# Do nothing if no text is entered.
if len(buff.text) == 0:
return False
+ if self.repl_pane is None:
+ return False
# Exit if quit or exit
if buff.text.strip() in ['quit', 'quit()', 'exit', 'exit()']:
self.repl_pane.application.application.exit() # type: ignore
+ # Create stdout and stderr proxies
+ temp_stdout = io.StringIO()
+ temp_stderr = io.StringIO()
+
# Execute the repl code in the the separate user_code thread loop.
future = asyncio.run_coroutine_threadsafe(
# This function will be executed in a separate thread.
- self._run_user_code(buff.text),
+ self._run_user_code(buff.text, temp_stdout, temp_stderr),
# Using this asyncio event loop.
self.repl_pane.application.user_code_loop) # type: ignore
+
+ # Save the input text and future object.
+ self.repl_pane.append_executed_code(buff.text, future, temp_stdout,
+ temp_stderr) # type: ignore
+
# Run user_code_complete_callback() when done.
done_callback = functools.partial(self.user_code_complete_callback,
buff.text)
future.add_done_callback(done_callback)
- # Save the input text and future object.
- self.repl_pane.append_executed_code(buff.text, future) # type: ignore
-
# Rebuild the parent ReplPane output buffer.
- self._update_output_buffer()
+ self.repl_pane.update_output_buffer('pw_ptpython_repl._accept_handler')
# TODO(tonymd): Return True if exception is found?
# Don't keep input for now. Return True to keep input text.
diff --git a/pw_console/py/pw_console/repl_pane.py b/pw_console/py/pw_console/repl_pane.py
index 78ef43d..5e1049e 100644
--- a/pw_console/py/pw_console/repl_pane.py
+++ b/pw_console/py/pw_console/repl_pane.py
@@ -13,11 +13,11 @@
# the License.
"""ReplPane class."""
+import asyncio
import concurrent
import functools
import logging
from dataclasses import dataclass
-from pathlib import Path
from typing import (
Any,
Callable,
@@ -26,7 +26,6 @@
Optional,
)
-from jinja2 import Template
from prompt_toolkit.filters import (
Condition,
has_focus,
@@ -46,7 +45,9 @@
WindowAlign,
)
from prompt_toolkit.lexers import PygmentsLexer # type: ignore
-from pygments.lexers.python import PythonLexer # type: ignore
+from pygments.lexers.python import PythonConsoleLexer # type: ignore
+# Alternative Formatting
+# from IPython.lib.lexers import IPythonConsoleLexer # type: ignore
import pw_console.mouse
import pw_console.style
@@ -58,11 +59,6 @@
_Namespace = Dict[str, Any]
_GetNamespace = Callable[[], _Namespace]
-_OUTPUT_TEMPLATE_PATH = (Path(__file__).parent / 'templates' /
- 'repl_output.jinja')
-with _OUTPUT_TEMPLATE_PATH.open() as tmpl:
- OUTPUT_TEMPLATE = tmpl.read()
-
class ReplPaneBottomToolbarBar(ConditionalContainer):
"""Repl pane bottom toolbar."""
@@ -204,11 +200,20 @@
output: str
stdout: str
stderr: str
+ output_check_task: Optional[concurrent.futures.Future] = None
@property
def is_running(self):
return not self.future.done()
+ def update_stdout(self, text: Optional[str]):
+ if text:
+ self.stdout = text
+
+ def update_stderr(self, text: Optional[str]):
+ if text:
+ self.stderr = text
+
class ReplPane:
"""Pane for reading Python input."""
@@ -249,7 +254,7 @@
focus_on_click=True,
scrollbar=True,
wrap_lines=False,
- lexer=PygmentsLexer(PythonLexer),
+ lexer=PygmentsLexer(PythonConsoleLexer),
)
# Additional keybindings for the text area.
@@ -442,7 +447,7 @@
code.future.cancel()
code.output = 'Canceled'
self.pw_ptpython_repl.clear_last_result()
- self.update_output_buffer()
+ self.update_output_buffer('repl_pane.interrupt_last_code_execution')
def _get_currently_running_code(self):
for code in self.executed_code:
@@ -461,12 +466,31 @@
text = self.get_output_buffer_text([code], show_index=False)
_LOG.debug('[PYTHON] %s\n%s', prefix, text)
- def append_executed_code(self, text, future):
+ async def periodically_check_stdout(self, user_code: UserCodeExecution,
+ stdout_proxy, stderr_proxy):
+ while not user_code.future.done():
+ await asyncio.sleep(0.3)
+ stdout_text_so_far = stdout_proxy.getvalue()
+ stderr_text_so_far = stderr_proxy.getvalue()
+ if stdout_text_so_far:
+ user_code.update_stdout(stdout_text_so_far)
+ if stderr_text_so_far:
+ user_code.update_stderr(stderr_text_so_far)
+
+ # if stdout_text_so_far or stderr_text_so_far:
+ self.update_output_buffer('repl_pane.periodic_check')
+
+ def append_executed_code(self, text, future, temp_stdout, temp_stderr):
user_code = UserCodeExecution(input=text,
future=future,
output=None,
stdout=None,
stderr=None)
+
+ background_stdout_check = asyncio.create_task(
+ self.periodically_check_stdout(user_code, temp_stdout,
+ temp_stderr))
+ user_code.output_check_task = background_stdout_check
self.executed_code.append(user_code)
self._log_executed_code(user_code, prefix='START')
@@ -476,23 +500,21 @@
result_text,
stdout_text='',
stderr_text=''):
+
code = self._get_executed_code(future)
if code:
code.output = result_text
code.stdout = stdout_text
code.stderr = stderr_text
self._log_executed_code(code, prefix='FINISH')
- self.update_output_buffer()
+ self.update_output_buffer('repl_pane.append_result_to_executed_code')
def get_output_buffer_text(self, code_items=None, show_index=True):
executed_code = code_items or self.executed_code
- template = Template(OUTPUT_TEMPLATE,
- trim_blocks=True,
- lstrip_blocks=True)
- return template.render(code_items=executed_code,
- show_index=show_index).strip()
+ template = self.application.get_template('repl_output.jinja')
+ return template.render(code_items=executed_code, show_index=show_index)
- def update_output_buffer(self):
+ def update_output_buffer(self, *unused_args):
text = self.get_output_buffer_text()
# Add an extra line break so the last cursor position is in column 0
# instead of the end of the last line.
diff --git a/pw_console/py/pw_console/style.py b/pw_console/py/pw_console/style.py
index 5996d6e..c10d579 100644
--- a/pw_console/py/pw_console/style.py
+++ b/pw_console/py/pw_console/style.py
@@ -22,9 +22,9 @@
_LOG = logging.getLogger(__package__)
-# pylint: disable=too-many-instance-attributes
@dataclass
class HighContrastDarkColors:
+ # pylint: disable=too-many-instance-attributes
default_bg = '#100f10'
default_fg = '#ffffff'
@@ -50,9 +50,9 @@
yellow_accent = '#d2e580'
-# pylint: disable=too-many-instance-attributes
@dataclass
class DarkColors:
+ # pylint: disable=too-many-instance-attributes
default_bg = '#2e2e2e'
default_fg = '#bbc2cf'
diff --git a/pw_console/py/pw_console/templates/repl_output.jinja b/pw_console/py/pw_console/templates/repl_output.jinja
index 1ccf272..0b67b3f 100644
--- a/pw_console/py/pw_console/templates/repl_output.jinja
+++ b/pw_console/py/pw_console/templates/repl_output.jinja
@@ -13,22 +13,23 @@
License for the specific language governing permissions and limitations under
the License.
#}
+
{% for code in code_items %}
{% set index = loop.index if show_index else '' %}
-{% set prompt_width = 7 + index|string|length %}
-In [{{index}}]: {{ code.input|indent(width=prompt_width) }}
-{% if code.is_running %}
-Running...
-{% else %}
+{% set prompt_indent_text = '... ' %}
+{% set input_text = code.input if code.input else '' %}
+>>> {{ input_text|indent(width=prompt_indent_text) }}
{% if code.stdout -%}
{{ code.stdout }}
{%- endif %}
{% if code.stderr -%}
{{ code.stderr }}
{%- endif %}
-{% if code.output %}
-Out[{{index}}]: {{ code.output|indent(width=prompt_width) }}
+{% if code.is_running %}
+Running...
{% endif %}
+{% if code.output %}
+{{ code.output }}
{% endif %}
{% endfor -%}
diff --git a/pw_console/py/repl_pane_test.py b/pw_console/py/repl_pane_test.py
index d76135e..73a93cf 100644
--- a/pw_console/py/repl_pane_test.py
+++ b/pw_console/py/repl_pane_test.py
@@ -16,9 +16,11 @@
import asyncio
import builtins
import inspect
+import io
+import sys
import threading
import unittest
-from unittest.mock import Mock, MagicMock
+from unittest.mock import MagicMock, call
from prompt_toolkit.application import create_app_session
from prompt_toolkit.output import (
@@ -31,115 +33,210 @@
from pw_console.repl_pane import ReplPane
from pw_console.pw_ptpython_repl import PwPtPythonRepl
+_PYTHON_3_8 = sys.version_info >= (
+ 3,
+ 8,
+)
-class TestReplPane(unittest.TestCase):
- """Tests for ReplPane."""
- def test_repl_code_return_values(self) -> None:
- """Test stdout, return values, and exceptions can be returned from
- running user repl code."""
- app = Mock()
+if _PYTHON_3_8:
+ from unittest import IsolatedAsyncioTestCase # type: ignore # pylint: disable=no-name-in-module
- global_vars = {
- '__name__': '__main__',
- '__package__': None,
- '__doc__': None,
- '__builtins__': builtins,
- }
+ class TestReplPane(IsolatedAsyncioTestCase):
+ """Tests for ReplPane."""
+ def setUp(self): # pylint: disable=invalid-name
+ self.maxDiff = None # pylint: disable=invalid-name
- pw_ptpython_repl = PwPtPythonRepl(get_globals=lambda: global_vars,
- get_locals=lambda: global_vars,
- color_depth=ColorDepth.DEPTH_8_BIT)
- repl_pane = ReplPane(
- application=app,
- python_repl=pw_ptpython_repl,
- )
- # Check pw_ptpython_repl has a reference to the parent repl_pane.
- self.assertEqual(repl_pane, pw_ptpython_repl.repl_pane)
+ def test_repl_code_return_values(self) -> None:
+ """Test stdout, return values, and exceptions can be returned from
+ running user repl code."""
+ app = MagicMock()
- # Define a function, should return nothing.
- code = inspect.cleandoc("""
- def run():
- print('The answer is ', end='')
- return 1+1+4+16+20
- """)
- # pylint: disable=protected-access
- result = asyncio.run(pw_ptpython_repl._run_user_code(code))
- self.assertEqual(result, {'stdout': '', 'stderr': '', 'result': None})
+ global_vars = {
+ '__name__': '__main__',
+ '__package__': None,
+ '__doc__': None,
+ '__builtins__': builtins,
+ }
- # Check stdout and return value
- result = asyncio.run(pw_ptpython_repl._run_user_code('run()'))
- self.assertEqual(result, {
- 'stdout': 'The answer is ',
- 'stderr': '',
- 'result': 42
- })
+ pw_ptpython_repl = PwPtPythonRepl(
+ get_globals=lambda: global_vars,
+ get_locals=lambda: global_vars,
+ color_depth=ColorDepth.DEPTH_8_BIT)
+ repl_pane = ReplPane(
+ application=app,
+ python_repl=pw_ptpython_repl,
+ )
+ # Check pw_ptpython_repl has a reference to the parent repl_pane.
+ self.assertEqual(repl_pane, pw_ptpython_repl.repl_pane)
- # Check for repl exception
- result = asyncio.run(pw_ptpython_repl._run_user_code('return "blah"'))
- self.assertIn("SyntaxError: 'return' outside function",
- pw_ptpython_repl._last_result) # type: ignore
-
- def test_user_thread(self) -> None:
- """Test user code thread."""
- with create_app_session(output=FakeOutput()):
- app = ConsoleApp(color_depth=ColorDepth.DEPTH_8_BIT)
- app.start_user_code_thread()
-
- pw_ptpython_repl = app.pw_ptpython_repl
- repl_pane = app.repl_pane
-
- pw_ptpython_repl.user_code_complete_callback = MagicMock(
- wraps=pw_ptpython_repl.user_code_complete_callback)
- user_code_done = threading.Event()
-
+ # Define a function, should return nothing.
code = inspect.cleandoc("""
- import time
def run():
- time.sleep(0.3)
print('The answer is ', end='')
return 1+1+4+16+20
""")
-
- input_buffer = MagicMock(text=code)
+ temp_stdout = io.StringIO()
+ temp_stderr = io.StringIO()
# pylint: disable=protected-access
- pw_ptpython_repl._accept_handler(input_buffer)
+ result = asyncio.run(
+ pw_ptpython_repl._run_user_code(code, temp_stdout,
+ temp_stderr))
+ self.assertEqual(result, {
+ 'stdout': '',
+ 'stderr': '',
+ 'result': None
+ })
- # Get last executed code object.
- user_code1 = repl_pane.executed_code[-1]
- # Wait for repl code to finish.
- user_code1.future.add_done_callback(
- lambda future: user_code_done.set())
- user_code_done.wait(timeout=3)
+ temp_stdout = io.StringIO()
+ temp_stderr = io.StringIO()
+ # Check stdout and return value
+ result = asyncio.run(
+ pw_ptpython_repl._run_user_code('run()', temp_stdout,
+ temp_stderr))
+ self.assertEqual(result, {
+ 'stdout': 'The answer is ',
+ 'stderr': '',
+ 'result': 42
+ })
- pw_ptpython_repl.user_code_complete_callback.assert_called_once()
- self.assertIsNotNone(user_code1)
- self.assertTrue(user_code1.future.done())
- self.assertEqual(user_code1.input, code)
- self.assertEqual(user_code1.output, None)
- # stdout / stderr may be '' or None
- self.assertFalse(user_code1.stdout)
- self.assertFalse(user_code1.stderr)
+ temp_stdout = io.StringIO()
+ temp_stderr = io.StringIO()
+ # Check for repl exception
+ result = asyncio.run(
+ pw_ptpython_repl._run_user_code('return "blah"', temp_stdout,
+ temp_stderr))
+ self.assertIn("SyntaxError: 'return' outside function",
+ pw_ptpython_repl._last_result) # type: ignore
- user_code_done.clear()
- pw_ptpython_repl.user_code_complete_callback.reset_mock()
+ async def test_user_thread(self) -> None:
+ """Test user code thread."""
- input_buffer = MagicMock(text='run()')
- pw_ptpython_repl._accept_handler(input_buffer)
+ with create_app_session(output=FakeOutput()):
+ # Setup Mocks
+ app = ConsoleApp(color_depth=ColorDepth.DEPTH_8_BIT)
+ app.start_user_code_thread()
- # Get last executed code object.
- user_code2 = repl_pane.executed_code[-1]
- # Wait for repl code to finish.
- user_code2.future.add_done_callback(
- lambda future: user_code_done.set())
- user_code_done.wait(timeout=3)
+ pw_ptpython_repl = app.pw_ptpython_repl
+ repl_pane = app.repl_pane
- pw_ptpython_repl.user_code_complete_callback.assert_called_once()
- self.assertIsNotNone(user_code2)
- self.assertTrue(user_code2.future.done())
- self.assertEqual(user_code2.input, 'run()')
- self.assertEqual(user_code2.output, '42')
- self.assertEqual(user_code2.stdout, 'The answer is ')
- self.assertFalse(user_code2.stderr)
+ # Mock update_output_buffer to track number of update calls
+ repl_pane.update_output_buffer = MagicMock(
+ wraps=repl_pane.update_output_buffer)
+
+ # Mock complete callback
+ pw_ptpython_repl.user_code_complete_callback = MagicMock(
+ wraps=pw_ptpython_repl.user_code_complete_callback)
+
+ # Repl done flag for tests
+ user_code_done = threading.Event()
+
+ # Run some code
+ code = inspect.cleandoc("""
+ import time
+ def run():
+ for i in range(2):
+ time.sleep(0.5)
+ print(i)
+ print('The answer is ', end='')
+ return 1+1+4+16+20
+ """)
+ input_buffer = MagicMock(text=code)
+ pw_ptpython_repl._accept_handler(input_buffer) # pylint: disable=protected-access
+
+ # Get last executed code object.
+ user_code1 = repl_pane.executed_code[-1]
+ # Wait for repl code to finish.
+ user_code1.future.add_done_callback(
+ lambda future: user_code_done.set())
+ # Wait for stdout monitoring to complete.
+ if user_code1.output_check_task:
+ await user_code1.output_check_task
+ # Wait for test done callback.
+ user_code_done.wait(timeout=3)
+
+ # Check user_code1 results
+ # NOTE: Avoid using assert_has_calls. Thread timing can make the
+ # test flaky.
+ expected_calls = [
+ # Initial exec start
+ call('pw_ptpython_repl._accept_handler'),
+ # Code finishes
+ call('repl_pane.append_result_to_executed_code'),
+ # Complete callback
+ call('pw_ptpython_repl.user_code_complete_callback'),
+ ]
+ for expected_call in expected_calls:
+ self.assertIn(expected_call,
+ repl_pane.update_output_buffer.mock_calls)
+
+ pw_ptpython_repl.user_code_complete_callback.assert_called_once(
+ )
+
+ self.assertIsNotNone(user_code1)
+ self.assertTrue(user_code1.future.done())
+ self.assertEqual(user_code1.input, code)
+ self.assertEqual(user_code1.output, None)
+ # stdout / stderr may be '' or None
+ self.assertFalse(user_code1.stdout)
+ self.assertFalse(user_code1.stderr)
+
+ # Reset mocks
+ user_code_done.clear()
+ pw_ptpython_repl.user_code_complete_callback.reset_mock()
+ repl_pane.update_output_buffer.reset_mock()
+
+ # Run some code
+ input_buffer = MagicMock(text='run()')
+ pw_ptpython_repl._accept_handler(input_buffer) # pylint: disable=protected-access
+
+ # Get last executed code object.
+ user_code2 = repl_pane.executed_code[-1]
+ # Wait for repl code to finish.
+ user_code2.future.add_done_callback(
+ lambda future: user_code_done.set())
+ # Wait for stdout monitoring to complete.
+ if user_code2.output_check_task:
+ await user_code2.output_check_task
+ # Wait for test done callback.
+ user_code_done.wait(timeout=3)
+
+ # Check user_code2 results
+ # NOTE: Avoid using assert_has_calls. Thread timing can make the
+ # test flaky.
+ expected_calls = [
+ # Initial exec start
+ call('pw_ptpython_repl._accept_handler'),
+ # Periodic checks, should be a total of 4:
+ # Code should take 1.0 second to run.
+ # Periodic checks every 0.3 seconds
+ # 1.0 / 0.3 = 3.33 (4) checks
+ call('repl_pane.periodic_check'),
+ call('repl_pane.periodic_check'),
+ call('repl_pane.periodic_check'),
+ # Code finishes
+ call('repl_pane.append_result_to_executed_code'),
+ # Complete callback
+ call('pw_ptpython_repl.user_code_complete_callback'),
+ # Final periodic check
+ call('repl_pane.periodic_check'),
+ ]
+ for expected_call in expected_calls:
+ self.assertIn(expected_call,
+ repl_pane.update_output_buffer.mock_calls)
+
+ pw_ptpython_repl.user_code_complete_callback.assert_called_once(
+ )
+ self.assertIsNotNone(user_code2)
+ self.assertTrue(user_code2.future.done())
+ self.assertEqual(user_code2.input, 'run()')
+ self.assertEqual(user_code2.output, '42')
+ self.assertEqual(user_code2.stdout, '0\n1\nThe answer is ')
+ self.assertFalse(user_code2.stderr)
+
+ # Reset mocks
+ user_code_done.clear()
+ pw_ptpython_repl.user_code_complete_callback.reset_mock()
+ repl_pane.update_output_buffer.reset_mock()
if __name__ == '__main__':
diff --git a/pw_console/py/setup.py b/pw_console/py/setup.py
index 7f68333..3b06852 100644
--- a/pw_console/py/setup.py
+++ b/pw_console/py/setup.py
@@ -34,7 +34,11 @@
entry_points={
'console_scripts': [
'pw-console = pw_console.__main__:main',
- ]
+ ],
+ 'pygments.styles': [
+ 'pigweed-code = '
+ 'pw_console.pigweed_code_style:PigweedCodeStyle',
+ ],
},
install_requires=[
'ipython',
diff --git a/pw_console/testing.rst b/pw_console/testing.rst
index 8b90115..9d83f87 100644
--- a/pw_console/testing.rst
+++ b/pw_console/testing.rst
@@ -426,6 +426,33 @@
- Python Input is focused
- |checkbox|
+Incremental Stdout
+^^^^^^^^^^^^^^^^^^
+
+.. list-table::
+ :widths: 5 45 45 5
+ :header-rows: 1
+
+ * - #
+ - Test Action
+ - Expected Result
+ - ✅
+
+ * - 1
+ - | Click the :guilabel:`Python Input` window title
+ - Python Input pane is focused
+ - |checkbox|
+
+ * - 2
+ - | Enter the following text and hit enter twice
+ | ``import time``
+ | ``for i in range(10):``
+ | ``print(i); time.sleep(1)``
+ - | ``Running...`` should appear in the python with
+ | increasing integers incrementally appearing above
+ | (not all at once after a delay).
+ - |checkbox|
+
Add note to the commit message
------------------------------