pw_console: Refactor log pane classes
- Refactor log_container.py into log_view and log_store
- Move LogPane toolbar classes into log_pane_toolbars.py
- Add new tests for LogView and LogStore.
No-Docs-Update-Reason: Refactoring
Change-Id: I5187f9638558ea4ef7f0dc609cb2444201d4d028
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/52440
Commit-Queue: Anthony DiGirolamo <tonymd@google.com>
Reviewed-by: Keir Mierle <keir@google.com>
diff --git a/pw_console/py/BUILD.gn b/pw_console/py/BUILD.gn
index b194385..4efd071 100644
--- a/pw_console/py/BUILD.gn
+++ b/pw_console/py/BUILD.gn
@@ -24,9 +24,11 @@
"pw_console/console_app.py",
"pw_console/help_window.py",
"pw_console/key_bindings.py",
- "pw_console/log_container.py",
"pw_console/log_line.py",
"pw_console/log_pane.py",
+ "pw_console/log_pane_toolbars.py",
+ "pw_console/log_store.py",
+ "pw_console/log_view.py",
"pw_console/mouse.py",
"pw_console/pw_ptpython_repl.py",
"pw_console/repl_pane.py",
@@ -40,6 +42,8 @@
tests = [
"console_app_test.py",
"help_window_test.py",
+ "log_store_test.py",
+ "log_view_test.py",
"repl_pane_test.py",
"table_test.py",
"text_formatting_test.py",
diff --git a/pw_console/py/log_store_test.py b/pw_console/py/log_store_test.py
new file mode 100644
index 0000000..914c56b
--- /dev/null
+++ b/pw_console/py/log_store_test.py
@@ -0,0 +1,121 @@
+# Copyright 2021 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Tests for pw_console.log_store"""
+
+import logging
+import unittest
+from unittest.mock import MagicMock
+
+from pw_console.log_store import LogStore
+
+
+def _create_log_store():
+ log_store = LogStore()
+ viewer = MagicMock()
+ viewer.new_logs_arrived = MagicMock()
+ log_store.register_viewer(viewer)
+ return log_store, viewer
+
+
+class TestLogStore(unittest.TestCase):
+ """Tests for LogStore."""
+ def setUp(self):
+ self.maxDiff = None # pylint: disable=invalid-name
+
+ def test_get_total_count(self) -> None:
+ # self.assertEqual(self.log_store.get_total_count(), 0)
+
+ log_store, viewer = _create_log_store()
+ test_log = logging.getLogger('log_store.test')
+ # Must use the assertLogs context manager and the addHandler call.
+ with self.assertLogs(test_log, level='DEBUG') as log_context:
+ test_log.addHandler(log_store)
+ for i in range(5):
+ test_log.debug('Test log %s', i)
+
+ # Expected log message content
+ self.assertEqual(
+ log_context.output,
+ ['DEBUG:log_store.test:Test log {}'.format(i) for i in range(5)])
+ # LogStore state checks
+ viewer.new_logs_arrived.assert_called()
+ self.assertEqual(log_store.get_total_count(), 5)
+ self.assertEqual(log_store.get_last_log_line_index(), 4)
+
+ log_store.clear_logs()
+ self.assertEqual(log_store.get_total_count(), 0)
+
+ def test_channel_counts_and_prefix_width(self) -> None:
+ log_store, _viewer = _create_log_store()
+
+ # Log some messagse on 3 separate logger instances
+ for i, logger_name in enumerate([
+ 'log_store.test',
+ 'log_store.dev',
+ 'log_store.production',
+ ]):
+ test_log = logging.getLogger(logger_name)
+ with self.assertLogs(test_log, level='DEBUG') as _log_context:
+ test_log.addHandler(log_store)
+ test_log.debug('Test log message')
+ for j in range(i):
+ test_log.debug('%s', j)
+
+ self.assertEqual(log_store.channel_counts, {
+ 'log_store.test': 1,
+ 'log_store.dev': 2,
+ 'log_store.production': 3,
+ })
+ self.assertEqual(
+ log_store.get_channel_counts(),
+ 'log_store.test: 1, log_store.dev: 2, log_store.production: 3')
+
+ self.assertRegex(
+ log_store.logs[0].ansi_stripped_log,
+ r'[0-9]{8} [0-9]{2}:[0-9]{2}:[0-9]{2} DEBUG Test log message')
+ self.assertEqual(
+ log_store.channel_formatted_prefix_widths,
+ {
+ 'log_store.test': 24,
+ 'log_store.dev': 24,
+ 'log_store.production': 24
+ },
+ )
+
+ def test_render_table_header_with_metadata(self) -> None:
+ log_store, _viewer = _create_log_store()
+ test_log = logging.getLogger('log_store.test')
+
+ # Log table with extra columns
+ with self.assertLogs(test_log, level='DEBUG') as _log_context:
+ test_log.addHandler(log_store)
+ test_log.debug('Test log %s',
+ extra=dict(extra_metadata_fields={
+ 'planet': 'Jupiter',
+ 'galaxy': 'Milky Way'
+ }))
+
+ self.assertEqual(log_store.render_table_header(), [
+ ('bold', 'Time '),
+ ('', ' '),
+ ('bold', 'Lvl '),
+ ('', ' '),
+ ('bold', 'Planet '),
+ ('bold', 'Galaxy '),
+ ('bold', 'Message'),
+ ])
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/pw_console/py/log_view_test.py b/pw_console/py/log_view_test.py
new file mode 100644
index 0000000..0afae4b
--- /dev/null
+++ b/pw_console/py/log_view_test.py
@@ -0,0 +1,265 @@
+# Copyright 2021 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Tests for pw_console.log_view"""
+
+import logging
+import time
+import unittest
+from datetime import datetime
+from unittest.mock import MagicMock, patch
+from parameterized import parameterized # type: ignore
+
+from prompt_toolkit.data_structures import Point
+from prompt_toolkit.formatted_text import FormattedText
+
+from pw_console.log_view import LogView
+
+
+def _create_log_view():
+ log_pane = MagicMock()
+ log_view = LogView(log_pane)
+ return log_view, log_pane
+
+
+class TestLogView(unittest.TestCase):
+ """Tests for LogView."""
+ def setUp(self):
+ self.maxDiff = None # pylint: disable=invalid-name
+
+ def _create_log_view_with_logs(self, log_count=100):
+ log_view, log_pane = _create_log_view()
+
+ if log_count > 0:
+ test_log = logging.getLogger('log_view.test')
+ with self.assertLogs(test_log, level='DEBUG') as _log_context:
+ test_log.addHandler(log_view.log_store)
+ for i in range(log_count):
+ test_log.debug('Test log %s', i)
+
+ return log_view, log_pane
+
+ def test_follow_toggle(self) -> None:
+ log_view, _pane = _create_log_view()
+ self.assertTrue(log_view.follow)
+ log_view.toggle_follow()
+ self.assertFalse(log_view.follow)
+
+ def test_follow_scrolls_to_bottom(self) -> None:
+ log_view, _pane = _create_log_view()
+ log_view.toggle_follow()
+ self.assertFalse(log_view.follow)
+ self.assertEqual(log_view.get_current_line(), 0)
+
+ test_log = logging.getLogger('log_view.test')
+
+ # Log 5 messagse, current_line should stay at 0
+ with self.assertLogs(test_log, level='DEBUG') as _log_context:
+ test_log.addHandler(log_view.log_store)
+ for i in range(5):
+ test_log.debug('Test log %s', i)
+
+ self.assertEqual(log_view.get_total_count(), 5)
+ self.assertEqual(log_view.get_current_line(), 0)
+ # Turn follow on
+ log_view.toggle_follow()
+ self.assertTrue(log_view.follow)
+
+ # Log another messagse, current_line should move to the last.
+ with self.assertLogs(test_log, level='DEBUG') as _log_context:
+ test_log.addHandler(log_view.log_store)
+ test_log.debug('Test log')
+
+ self.assertEqual(log_view.get_total_count(), 6)
+ self.assertEqual(log_view.get_current_line(), 5)
+
+ @parameterized.expand([
+ ('no logs',
+ 80, 10, # window_width, window_height
+ 0, True, # log_count, follow_enabled
+ 0, [0, -1]), # expected_total logs, expected_indices
+ ('logs shorter than window height',
+ 80, 10, # window_width, window_height
+ 7, True, # log_count, follow_enabled
+ 7, [0, 6]), # expected_total logs, expected_indices
+ ('logs larger than window height with follow on',
+ 80, 10, # window_width, window_height
+ 20, True, # log_count, follow_enabled
+ 10, [10, 19]), # expected_total logs, expected_indices
+ ('logs larger than window height with follow off',
+ 80, 10, # window_width, window_height
+ 20, False, # log_count, follow_enabled
+ 10, [0, 9]), # expected_total logs, expected_indices
+ ]) # yapf: disable
+ def test_get_log_window_indices(
+ self,
+ _name,
+ window_width,
+ window_height,
+ log_count,
+ follow_enabled,
+ expected_total,
+ expected_indices,
+ ) -> None:
+ """Test get_log_window_indices() with various window sizes and log
+ counts."""
+ log_view, _pane = _create_log_view()
+
+ if not follow_enabled:
+ log_view.toggle_follow()
+
+ # Make required number of log messages
+ if log_count > 0:
+ test_log = logging.getLogger('log_view.test')
+ with self.assertLogs(test_log, level='DEBUG') as _log_context:
+ test_log.addHandler(log_view.log_store)
+ for i in range(log_count):
+ test_log.debug('Test log %s', i)
+
+ # Get indices
+ start_index, end_index = log_view.get_log_window_indices(
+ available_width=window_width, available_height=window_height)
+
+ self.assertEqual([start_index, end_index], expected_indices)
+
+ # Number of logs should equal the height of the window
+ self.assertEqual((end_index - start_index) + 1, expected_total)
+
+ def test_scrolling(self) -> None:
+ """Test all scrolling methods."""
+ log_view, _pane = self._create_log_view_with_logs(log_count=100)
+ # Page scrolling needs to know the current window height.
+ log_view._window_height = 10 # pylint: disable=protected-access
+
+ # Follow is on by default, current line should be at the end.
+ self.assertEqual(log_view.get_current_line(), 99)
+
+ # Move to the beginning.
+ log_view.scroll_to_top()
+ self.assertEqual(log_view.get_current_line(), 0)
+
+ # Should not be able to scroll before the beginning.
+ log_view.scroll_up()
+ self.assertEqual(log_view.get_current_line(), 0)
+ log_view.scroll_up_one_page()
+ self.assertEqual(log_view.get_current_line(), 0)
+
+ # Single and multi line movement.
+ log_view.scroll_down()
+ self.assertEqual(log_view.get_current_line(), 1)
+ log_view.scroll(5)
+ self.assertEqual(log_view.get_current_line(), 6)
+ log_view.scroll_up()
+ self.assertEqual(log_view.get_current_line(), 5)
+
+ # Page down and up.
+ log_view.scroll_down_one_page()
+ self.assertEqual(log_view.get_current_line(), 15)
+ log_view.scroll_up_one_page()
+ self.assertEqual(log_view.get_current_line(), 5)
+
+ # Move to the end.
+ log_view.scroll_to_bottom()
+ self.assertEqual(log_view.get_current_line(), 99)
+
+ # Should not be able to scroll beyond the end.
+ log_view.scroll_down()
+ self.assertEqual(log_view.get_current_line(), 99)
+ log_view.scroll_down_one_page()
+ self.assertEqual(log_view.get_current_line(), 99)
+
+ # Move up a bit.
+ log_view.scroll(-5)
+ self.assertEqual(log_view.get_current_line(), 94)
+
+ # Simulate a mouse click to scroll.
+ # Click 5 lines above current position.
+ log_view.scroll_to_position(Point(0, -5))
+ self.assertEqual(log_view.get_current_line(), 89)
+ # Click 3 lines below current position.
+ log_view.scroll_to_position(Point(0, 3))
+ self.assertEqual(log_view.get_current_line(), 92)
+
+ # Clicking if follow is enabled should not scroll.
+ log_view.toggle_follow()
+ self.assertTrue(log_view.follow)
+ self.assertEqual(log_view.get_current_line(), 99)
+ log_view.scroll_to_position(Point(0, -5))
+ self.assertEqual(log_view.get_current_line(), 99)
+
+ def test_render_content_and_cursor_position(self) -> None:
+ """Test render_content results and get_cursor_position
+
+ get_cursor_position() should return the correct position depending on
+ what line is selected."""
+
+ # Mock time to always return the same value.
+ mock_time = MagicMock( # type: ignore
+ return_value=time.mktime(
+ datetime(2021, 7, 13, 0, 0, 0).timetuple()))
+ with patch('time.time', new=mock_time):
+ log_view, log_pane = self._create_log_view_with_logs(log_count=4)
+ log_pane.current_log_pane_width = 30
+ log_pane.current_log_pane_height = 10
+
+ log_view.scroll_to_top()
+ log_view.render_content()
+ self.assertEqual(log_view.get_cursor_position(), Point(x=0, y=0))
+ log_view.scroll_to_bottom()
+ log_view.render_content()
+ self.assertEqual(log_view.get_cursor_position(), Point(x=0, y=3))
+
+ expected_line_cache = [
+ [
+ ('class:log-time', '20210713 00:00:00'),
+ ('', ' '),
+ ('class:log-level-10', 'DEBUG'),
+ ('', ' '),
+ ('', 'Test log 0'),
+ ('', '\n')
+ ],
+ [
+ ('class:log-time', '20210713 00:00:00'),
+ ('', ' '),
+ ('class:log-level-10', 'DEBUG'),
+ ('', ' '),
+ ('', 'Test log 1'),
+ ('', '\n')
+ ],
+ [
+ ('class:log-time', '20210713 00:00:00'),
+ ('', ' '),
+ ('class:log-level-10', 'DEBUG'),
+ ('', ' '),
+ ('', 'Test log 2'),
+ ('', '\n')
+ ],
+ FormattedText([
+ ('class:selected-log-line [SetCursorPosition]', ''),
+ ('class:selected-log-line class:log-time', '20210713 00:00:00'),
+ ('class:selected-log-line ', ' '),
+ ('class:selected-log-line class:log-level-10', 'DEBUG'),
+ ('class:selected-log-line ', ' '),
+ ('class:selected-log-line ', 'Test log 3'),
+ ('class:selected-log-line ', ' \n')
+ ]),
+ ] # yapf: disable
+
+ self.assertEqual(
+ list(log_view._line_fragment_cache # pylint: disable=protected-access
+ ),
+ expected_line_cache)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/pw_console/py/pw_console/console_app.py b/pw_console/py/pw_console/console_app.py
index c0aaa87..dc8cbf0 100644
--- a/pw_console/py/pw_console/console_app.py
+++ b/pw_console/py/pw_console/console_app.py
@@ -75,14 +75,18 @@
lambda: application.message and application.message != ''))
-def _add_log_handler_to_pane(logger: Union[str, logging.Logger], pane):
+def _add_log_handler_to_pane(logger: Union[str, logging.Logger],
+ pane: 'LogPane'):
"""A log pane handler for a given logger instance."""
+ if not pane:
+ return
+
if isinstance(logger, logging.Logger):
logger_instance = logger
elif isinstance(logger, str):
logger_instance = logging.getLogger(logger)
- logger_instance.addHandler(pane.log_container # type: ignore
+ logger_instance.addHandler(pane.log_view.log_store # type: ignore
)
pane.append_pane_subtitle( # type: ignore
logger_instance.name)
@@ -488,7 +492,7 @@
"""Regenerate styles for the current theme_name."""
self._current_theme = pw_console.style.generate_styles(theme_name)
- def _create_log_pane(self):
+ def _create_log_pane(self) -> 'LogPane':
# Create one log pane.
self.active_panes.appendleft(LogPane(application=self))
return self.active_panes[0]
@@ -503,7 +507,7 @@
if isinstance(pane, LogPane):
existing_log_pane = pane
break
- if not existing_log_pane or separate_log_panes:
+ if separate_log_panes or not existing_log_pane:
existing_log_pane = self._create_log_pane()
if isinstance(logger, collections.abc.Iterable):
@@ -587,7 +591,7 @@
1] = self._create_root_split()
def toggle_log_line_wrapping(self):
- """Menu item handler to toggle line wrapping of the first log pane."""
+ """Menu item handler to toggle line wrapping of all log pane."""
for pane in self.active_panes:
if isinstance(pane, LogPane):
pane.toggle_wrap_lines()
diff --git a/pw_console/py/pw_console/log_pane.py b/pw_console/py/pw_console/log_pane.py
index 893d7db..487f7cc 100644
--- a/pw_console/py/pw_console/log_pane.py
+++ b/pw_console/py/pw_console/log_pane.py
@@ -32,223 +32,20 @@
HSplit,
ScrollOffsets,
UIContent,
- VSplit,
VerticalAlign,
Window,
- WindowAlign,
)
from prompt_toolkit.layout.dimension import AnyDimension
from prompt_toolkit.mouse_events import MouseEvent, MouseEventType
import pw_console.widgets.checkbox
import pw_console.style
-from pw_console.log_container import LogContainer
-
-
-class LogPaneLineInfoBar(ConditionalContainer):
- """One line bar for showing current and total log lines."""
- @staticmethod
- def get_tokens(log_pane):
- """Return formatted text tokens for display."""
- tokens = ' Line {} / {} '.format(
- log_pane.log_container.get_current_line() + 1,
- log_pane.log_container.get_total_count(),
- )
- return [('', tokens)]
-
- def __init__(self, log_pane):
- info_bar_control = FormattedTextControl(
- functools.partial(LogPaneLineInfoBar.get_tokens, log_pane))
- info_bar_window = Window(content=info_bar_control,
- align=WindowAlign.RIGHT,
- dont_extend_width=True)
-
- super().__init__(
- VSplit([info_bar_window],
- height=1,
- style='class:toolbar_active',
- align=WindowAlign.RIGHT),
- # Only show current/total line info if not auto-following
- # logs. Similar to tmux behavior.
- filter=Condition(lambda: not log_pane.log_container.follow))
-
-
-class LogPaneTableToolbar(ConditionalContainer):
- """One line toolbar for showing table headers."""
- def __init__(self, log_pane):
- # FormattedText of the table column headers.
- table_header_bar_control = FormattedTextControl(
- log_pane.log_container.render_table_header)
- # Left justify the header content.
- table_header_bar_window = Window(content=table_header_bar_control,
- align=WindowAlign.LEFT,
- dont_extend_width=True)
- super().__init__(VSplit([table_header_bar_window],
- height=1,
- style=functools.partial(
- pw_console.style.get_toolbar_style,
- log_pane),
- align=WindowAlign.LEFT),
- filter=Condition(lambda: log_pane.table_view))
-
-
-class LogPaneBottomToolbarBar(ConditionalContainer):
- """One line toolbar for display at the bottom of the LogPane."""
- TOOLBAR_HEIGHT = 1
-
- @staticmethod
- def mouse_handler_focus(log_pane, mouse_event: MouseEvent):
- """Focus this pane on click."""
- if mouse_event.event_type == MouseEventType.MOUSE_UP:
- log_pane.application.application.layout.focus(log_pane)
- return None
- return NotImplemented
-
- @staticmethod
- def mouse_handler_toggle_table_view(log_pane, mouse_event: MouseEvent):
- """Toggle table view on click."""
- if mouse_event.event_type == MouseEventType.MOUSE_UP:
- log_pane.toggle_table_view()
- return None
- return NotImplemented
-
- @staticmethod
- def mouse_handler_toggle_wrap_lines(log_pane, mouse_event: MouseEvent):
- """Toggle wrap lines on click."""
- if mouse_event.event_type == MouseEventType.MOUSE_UP:
- log_pane.toggle_wrap_lines()
- return None
- return NotImplemented
-
- @staticmethod
- def mouse_handler_clear_history(log_pane, mouse_event: MouseEvent):
- """Clear history on click."""
- if mouse_event.event_type == MouseEventType.MOUSE_UP:
- log_pane.clear_history()
- return None
- return NotImplemented
-
- @staticmethod
- def mouse_handler_toggle_follow(log_pane, mouse_event: MouseEvent):
- """Toggle follow on click."""
- if mouse_event.event_type == MouseEventType.MOUSE_UP:
- log_pane.toggle_follow()
- return None
- return NotImplemented
-
- @staticmethod
- def get_left_text_tokens(log_pane):
- """Return toolbar indicator and title."""
-
- title = ' {} '.format(log_pane.pane_title())
- mouse_handler = functools.partial(
- LogPaneBottomToolbarBar.mouse_handler_focus, log_pane)
- return pw_console.style.get_pane_indicator(log_pane, title,
- mouse_handler)
-
- @staticmethod
- def get_center_text_tokens(log_pane):
- """Return formatted text tokens for display in the center part of the
- toolbar."""
-
- # Create mouse handler functions.
- focus = functools.partial(LogPaneBottomToolbarBar.mouse_handler_focus,
- log_pane)
- toggle_wrap_lines = functools.partial(
- LogPaneBottomToolbarBar.mouse_handler_toggle_wrap_lines, log_pane)
- clear_history = functools.partial(
- LogPaneBottomToolbarBar.mouse_handler_clear_history, log_pane)
- toggle_follow = functools.partial(
- LogPaneBottomToolbarBar.mouse_handler_toggle_follow, log_pane)
- toggle_table_view = functools.partial(
- LogPaneBottomToolbarBar.mouse_handler_toggle_table_view, log_pane)
-
- # FormattedTextTuple contents: (Style, Text, Mouse handler)
- separator_text = ('', ' ') # 2 spaces of separaton between keybinds.
-
- return [
- separator_text,
- pw_console.widgets.checkbox.to_checkbox(log_pane.table_view,
- toggle_table_view,
- end=''),
- ('class:keyhelp', 'Table:', toggle_table_view),
- ('class:keybind', 't', toggle_table_view),
- separator_text,
- pw_console.widgets.checkbox.to_checkbox(log_pane.wrap_lines,
- toggle_wrap_lines,
- end=''),
- ('class:keyhelp', 'Wrap:', toggle_wrap_lines),
- ('class:keybind', 'w', toggle_wrap_lines),
- separator_text,
- pw_console.widgets.checkbox.to_checkbox(
- log_pane.log_container.follow, toggle_follow, end=''),
- ('class:keyhelp', 'Follow:', toggle_follow),
- ('class:keybind', 'f', toggle_follow),
- separator_text,
- ('class:keyhelp', 'Clear:', clear_history),
- ('class:keybind', 'C', clear_history),
- # Remaining whitespace should focus on click.
- ('class:keybind', ' ', focus),
- ]
-
- @staticmethod
- def get_right_text_tokens(log_pane):
- """Return formatted text tokens for display."""
- focus = functools.partial(LogPaneBottomToolbarBar.mouse_handler_focus,
- log_pane)
- fragments = []
- if not has_focus(log_pane.__pt_container__())():
- fragments.append(('class:keyhelp', '[click to focus] ', focus))
- fragments.append(('', ' {} '.format(log_pane.pane_subtitle())))
- return fragments
-
- def __init__(self, log_pane):
- title_section_window = Window(
- content=FormattedTextControl(
- # Callable to get formatted text tuples.
- functools.partial(LogPaneBottomToolbarBar.get_left_text_tokens,
- log_pane)),
- align=WindowAlign.LEFT,
- dont_extend_width=True,
- )
-
- keybind_section_window = Window(
- content=FormattedTextControl(
- # Callable to get formatted text tuples.
- functools.partial(
- LogPaneBottomToolbarBar.get_center_text_tokens, log_pane)),
- align=WindowAlign.LEFT,
- dont_extend_width=False,
- )
-
- log_source_name = Window(
- content=FormattedTextControl(
- # Callable to get formatted text tuples.
- functools.partial(
- LogPaneBottomToolbarBar.get_right_text_tokens, log_pane)),
- # Right side text should appear at the far right of the toolbar
- align=WindowAlign.RIGHT,
- dont_extend_width=True,
- )
-
- toolbar_vsplit = VSplit(
- [
- title_section_window,
- keybind_section_window,
- log_source_name,
- ],
- height=LogPaneBottomToolbarBar.TOOLBAR_HEIGHT,
- style=functools.partial(pw_console.style.get_toolbar_style,
- log_pane),
- align=WindowAlign.LEFT,
- )
-
- # ConditionalContainer init()
- super().__init__(
- # Contents
- toolbar_vsplit,
- filter=Condition(lambda: log_pane.show_bottom_toolbar),
- )
+from pw_console.log_view import LogView
+from pw_console.log_pane_toolbars import (
+ BottomToolbarBar,
+ LineInfoBar,
+ TableToolbar,
+)
class LogContentControl(FormattedTextControl):
@@ -256,7 +53,7 @@
@staticmethod
def indent_wrapped_pw_log_format_line(log_pane, line_number, wrap_count):
"""Indent wrapped lines to match pw_cli timestamp & level formatter."""
- prefix_width = log_pane.log_container.get_line_wrap_prefix_width()
+ prefix_width = log_pane.log_view.get_line_wrap_prefix_width()
# Return no prefix string if no wrapping is required. If the current log
# window is smaller than the prefix width then don't indent when
@@ -267,7 +64,7 @@
prefix_string = ' ' * prefix_width
# If this line matches the selected log line, highlight it.
- if line_number == log_pane.log_container.get_cursor_position().y:
+ if line_number == log_pane.log_view.get_cursor_position().y:
return to_formatted_text(prefix_string,
style='class:selected-log-line')
@@ -303,12 +100,12 @@
@key_bindings.add('g')
def _scroll_to_top(_event: KeyPressEvent) -> None:
"""Scroll to top."""
- self.log_pane.log_container.scroll_to_top()
+ self.log_pane.log_view.scroll_to_top()
@key_bindings.add('G')
def _scroll_to_bottom(_event: KeyPressEvent) -> None:
"""Scroll to bottom."""
- self.log_pane.log_container.scroll_to_bottom()
+ self.log_pane.log_view.scroll_to_bottom()
@key_bindings.add('f')
def _toggle_follow(_event: KeyPressEvent) -> None:
@@ -319,23 +116,23 @@
@key_bindings.add('k')
def _up(_event: KeyPressEvent) -> None:
"""Select previous log line."""
- self.log_pane.log_container.scroll_up()
+ self.log_pane.log_view.scroll_up()
@key_bindings.add('down')
@key_bindings.add('j')
def _down(_event: KeyPressEvent) -> None:
"""Select next log line."""
- self.log_pane.log_container.scroll_down()
+ self.log_pane.log_view.scroll_down()
@key_bindings.add('pageup')
def _pageup(_event: KeyPressEvent) -> None:
"""Scroll the logs up by one page."""
- self.log_pane.log_container.scroll_up_one_page()
+ self.log_pane.log_view.scroll_up_one_page()
@key_bindings.add('pagedown')
def _pagedown(_event: KeyPressEvent) -> None:
"""Scroll the logs down by one page."""
- self.log_pane.log_container.scroll_down_one_page()
+ self.log_pane.log_view.scroll_down_one_page()
super().__init__(*args, key_bindings=key_bindings, **kwargs)
@@ -354,17 +151,17 @@
if mouse_event.event_type == MouseEventType.MOUSE_UP:
# Scroll to the line clicked.
- self.log_pane.log_container.scroll_to_position(mouse_position)
+ self.log_pane.log_view.scroll_to_position(mouse_position)
# Mouse event handled, return None.
return None
if mouse_event.event_type == MouseEventType.SCROLL_DOWN:
- self.log_pane.log_container.scroll_down()
+ self.log_pane.log_view.scroll_down()
# Mouse event handled, return None.
return None
if mouse_event.event_type == MouseEventType.SCROLL_UP:
- self.log_pane.log_container.scroll_up()
+ self.log_pane.log_view.scroll_up()
# Mouse event handled, return None.
return None
@@ -423,9 +220,7 @@
self._pane_subtitle = None
# Create the log container which stores and handles incoming logs.
- self.log_container = LogContainer()
- self.log_container.set_log_pane(self)
- self.log_container.set_formatting()
+ self.log_view: LogView = LogView(self)
# Log pane size variables. These are updated just befor rendering the
# pane by the LogLineHSplit class.
@@ -436,14 +231,14 @@
self.last_log_content_height = 0
# Create the bottom toolbar for the whole log pane.
- self.bottom_toolbar = LogPaneBottomToolbarBar(self)
+ self.bottom_toolbar = BottomToolbarBar(self)
- self.table_header_toolbar = LogPaneTableToolbar(self)
+ self.table_header_toolbar = TableToolbar(self)
self.log_content_control = LogContentControl(
self, # parent LogPane
# FormattedTextControl args:
- self.log_container.render_content,
+ self.log_view.render_content,
# Hide the cursor, use cursorline=True in self.log_display_window to
# indicate currently selected line.
show_cursor=False,
@@ -493,11 +288,8 @@
self),
),
floats=[
- # Floating LogPaneLineInfoBar
- Float(top=0,
- right=0,
- height=1,
- content=LogPaneLineInfoBar(self)),
+ # Floating LineInfoBar
+ Float(top=0, right=0, height=1, content=LineInfoBar(self)),
]),
filter=Condition(lambda: self.show_pane))
@@ -515,7 +307,7 @@
def pane_subtitle(self):
if not self._pane_subtitle:
- return ', '.join(self.log_container.channel_counts.keys())
+ return ', '.join(self.log_view.log_store.channel_counts.keys())
logger_names = self._pane_subtitle.split(', ')
additional_text = ''
if len(logger_names) > 1:
@@ -531,7 +323,9 @@
self.current_log_pane_width = width
if height:
# Subtract the height of the LogPaneBottomToolbarBar
- height -= LogPaneBottomToolbarBar.TOOLBAR_HEIGHT
+ height -= BottomToolbarBar.TOOLBAR_HEIGHT
+ if self.table_view:
+ height -= TableToolbar.TOOLBAR_HEIGHT
self.last_log_pane_height = self.current_log_pane_height
self.current_log_pane_height = height
@@ -551,12 +345,12 @@
def toggle_follow(self):
"""Enable or disable following log lines."""
- self.log_container.toggle_follow()
+ self.log_view.toggle_follow()
self.redraw_ui()
def clear_history(self):
"""Erase stored log lines."""
- self.log_container.clear_logs()
+ self.log_view.clear_logs()
self.redraw_ui()
def __pt_container__(self):
@@ -578,4 +372,4 @@
self.last_log_content_height = 0
def log_content_control_get_cursor_position(self):
- return self.log_container.get_cursor_position()
+ return self.log_view.get_cursor_position()
diff --git a/pw_console/py/pw_console/log_pane_toolbars.py b/pw_console/py/pw_console/log_pane_toolbars.py
new file mode 100644
index 0000000..e6959a9
--- /dev/null
+++ b/pw_console/py/pw_console/log_pane_toolbars.py
@@ -0,0 +1,248 @@
+# Copyright 2021 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""LogPane class."""
+
+from __future__ import annotations
+import functools
+from typing import TYPE_CHECKING
+
+from prompt_toolkit.filters import (
+ Condition,
+ has_focus,
+)
+from prompt_toolkit.layout import (
+ ConditionalContainer,
+ FormattedTextControl,
+ VSplit,
+ Window,
+ WindowAlign,
+ HorizontalAlign,
+)
+from prompt_toolkit.mouse_events import MouseEvent, MouseEventType
+
+import pw_console.widgets.checkbox
+import pw_console.style
+
+if TYPE_CHECKING:
+ from pw_console.log_pane import LogPane
+
+
+class LineInfoBar(ConditionalContainer):
+ """One line bar for showing current and total log lines."""
+ @staticmethod
+ def get_tokens(log_pane: 'LogPane'):
+ """Return formatted text tokens for display."""
+ tokens = ' Line {} / {} '.format(
+ log_pane.log_view.get_current_line() + 1,
+ log_pane.log_view.get_total_count(),
+ )
+ return [('', tokens)]
+
+ def __init__(self, log_pane: 'LogPane'):
+ info_bar_control = FormattedTextControl(
+ functools.partial(LineInfoBar.get_tokens, log_pane))
+ info_bar_window = Window(content=info_bar_control,
+ align=WindowAlign.RIGHT,
+ dont_extend_width=True)
+
+ super().__init__(
+ VSplit([info_bar_window],
+ height=1,
+ style='class:toolbar_active',
+ align=HorizontalAlign.RIGHT),
+ # Only show current/total line info if not auto-following
+ # logs. Similar to tmux behavior.
+ filter=Condition(lambda: not log_pane.log_view.follow))
+
+
+class TableToolbar(ConditionalContainer):
+ """One line toolbar for showing table headers."""
+ TOOLBAR_HEIGHT = 1
+
+ def __init__(self, log_pane: 'LogPane'):
+ # FormattedText of the table column headers.
+ table_header_bar_control = FormattedTextControl(
+ log_pane.log_view.render_table_header)
+ # Left justify the header content.
+ table_header_bar_window = Window(content=table_header_bar_control,
+ align=WindowAlign.LEFT,
+ dont_extend_width=False)
+ super().__init__(VSplit([table_header_bar_window],
+ height=1,
+ style=functools.partial(
+ pw_console.style.get_toolbar_style,
+ log_pane),
+ align=HorizontalAlign.LEFT),
+ filter=Condition(lambda: log_pane.table_view))
+
+
+class BottomToolbarBar(ConditionalContainer):
+ """One line toolbar for display at the bottom of the LogPane."""
+ TOOLBAR_HEIGHT = 1
+
+ @staticmethod
+ def mouse_handler_focus(log_pane: 'LogPane', mouse_event: MouseEvent):
+ """Focus this pane on click."""
+ if mouse_event.event_type == MouseEventType.MOUSE_UP:
+ log_pane.application.application.layout.focus(log_pane)
+ return None
+ return NotImplemented
+
+ @staticmethod
+ def mouse_handler_toggle_table_view(log_pane: 'LogPane',
+ mouse_event: MouseEvent):
+ """Toggle table view on click."""
+ if mouse_event.event_type == MouseEventType.MOUSE_UP:
+ log_pane.toggle_table_view()
+ return None
+ return NotImplemented
+
+ @staticmethod
+ def mouse_handler_toggle_wrap_lines(log_pane: 'LogPane',
+ mouse_event: MouseEvent):
+ """Toggle wrap lines on click."""
+ if mouse_event.event_type == MouseEventType.MOUSE_UP:
+ log_pane.toggle_wrap_lines()
+ return None
+ return NotImplemented
+
+ @staticmethod
+ def mouse_handler_clear_history(log_pane: 'LogPane',
+ mouse_event: MouseEvent):
+ """Clear history on click."""
+ if mouse_event.event_type == MouseEventType.MOUSE_UP:
+ log_pane.clear_history()
+ return None
+ return NotImplemented
+
+ @staticmethod
+ def mouse_handler_toggle_follow(log_pane: 'LogPane',
+ mouse_event: MouseEvent):
+ """Toggle follow on click."""
+ if mouse_event.event_type == MouseEventType.MOUSE_UP:
+ log_pane.toggle_follow()
+ return None
+ return NotImplemented
+
+ @staticmethod
+ def get_left_text_tokens(log_pane: 'LogPane'):
+ """Return toolbar indicator and title."""
+
+ title = ' {} '.format(log_pane.pane_title())
+ mouse_handler = functools.partial(BottomToolbarBar.mouse_handler_focus,
+ log_pane)
+ return pw_console.style.get_pane_indicator(log_pane, title,
+ mouse_handler)
+
+ @staticmethod
+ def get_center_text_tokens(log_pane: 'LogPane'):
+ """Return formatted text tokens for display in the center part of the
+ toolbar."""
+
+ # Create mouse handler functions.
+ focus = functools.partial(BottomToolbarBar.mouse_handler_focus,
+ log_pane)
+ toggle_wrap_lines = functools.partial(
+ BottomToolbarBar.mouse_handler_toggle_wrap_lines, log_pane)
+ clear_history = functools.partial(
+ BottomToolbarBar.mouse_handler_clear_history, log_pane)
+ toggle_follow = functools.partial(
+ BottomToolbarBar.mouse_handler_toggle_follow, log_pane)
+ toggle_table_view = functools.partial(
+ BottomToolbarBar.mouse_handler_toggle_table_view, log_pane)
+
+ # FormattedTextTuple contents: (Style, Text, Mouse handler)
+ separator_text = ('', ' ') # 2 spaces of separaton between keybinds.
+
+ return [
+ separator_text,
+ pw_console.widgets.checkbox.to_checkbox(log_pane.table_view,
+ toggle_table_view),
+ ('class:keybind', 't', toggle_table_view),
+ ('class:keyhelp', ': Table', toggle_table_view),
+ separator_text,
+ pw_console.widgets.checkbox.to_checkbox(log_pane.wrap_lines,
+ toggle_wrap_lines),
+ ('class:keybind', 'w', toggle_wrap_lines),
+ ('class:keyhelp', ': Wrap', toggle_wrap_lines),
+ separator_text,
+ pw_console.widgets.checkbox.to_checkbox(log_pane.log_view.follow,
+ toggle_follow),
+ ('class:keybind', 'f', toggle_follow),
+ ('class:keyhelp', ': Follow', toggle_follow),
+ separator_text,
+ ('class:keybind', 'C', clear_history),
+ ('class:keyhelp', ': Clear', clear_history),
+
+ # Remaining whitespace should focus on click.
+ ('class:keybind', ' ', focus),
+ ]
+
+ @staticmethod
+ def get_right_text_tokens(log_pane: 'LogPane'):
+ """Return formatted text tokens for display."""
+ focus = functools.partial(BottomToolbarBar.mouse_handler_focus,
+ log_pane)
+ fragments = []
+ if not has_focus(log_pane.__pt_container__())():
+ fragments.append(('class:keyhelp', '[click to focus] ', focus))
+ fragments.append(('', ' {} '.format(log_pane.pane_subtitle()), focus))
+ return fragments
+
+ def __init__(self, log_pane: 'LogPane'):
+ title_section_window = Window(
+ content=FormattedTextControl(
+ # Callable to get formatted text tuples.
+ functools.partial(BottomToolbarBar.get_left_text_tokens,
+ log_pane)),
+ align=WindowAlign.LEFT,
+ dont_extend_width=True,
+ )
+
+ keybind_section_window = Window(
+ content=FormattedTextControl(
+ # Callable to get formatted text tuples.
+ functools.partial(BottomToolbarBar.get_center_text_tokens,
+ log_pane)),
+ align=WindowAlign.LEFT,
+ dont_extend_width=False,
+ )
+
+ log_source_name = Window(
+ content=FormattedTextControl(
+ # Callable to get formatted text tuples.
+ functools.partial(BottomToolbarBar.get_right_text_tokens,
+ log_pane)),
+ # Right side text should appear at the far right of the toolbar
+ align=WindowAlign.RIGHT,
+ dont_extend_width=True,
+ )
+
+ toolbar_vsplit = VSplit(
+ [
+ title_section_window,
+ keybind_section_window,
+ log_source_name,
+ ],
+ height=BottomToolbarBar.TOOLBAR_HEIGHT,
+ style=functools.partial(pw_console.style.get_toolbar_style,
+ log_pane),
+ )
+
+ # ConditionalContainer init()
+ super().__init__(
+ # Contents
+ toolbar_vsplit,
+ filter=Condition(lambda: log_pane.show_bottom_toolbar),
+ )
diff --git a/pw_console/py/pw_console/log_store.py b/pw_console/py/pw_console/log_store.py
new file mode 100644
index 0000000..3fe794c
--- /dev/null
+++ b/pw_console/py/pw_console/log_store.py
@@ -0,0 +1,184 @@
+# Copyright 2021 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""LogStore saves logs and acts as a Python logging handler."""
+
+from __future__ import annotations
+import collections
+import logging
+import sys
+from datetime import datetime
+from typing import List, Dict, TYPE_CHECKING
+
+import pw_cli.color
+
+import pw_console.text_formatting
+from pw_console.log_line import LogLine
+from pw_console.widgets.table import TableView
+
+if TYPE_CHECKING:
+ from pw_console.log_view import LogView
+
+
+class LogStore(logging.Handler):
+ """Class to hold many log events."""
+ def __init__(self):
+ # Log storage deque for fast addition and deletion from the beginning
+ # and end of the iterable.
+ self.logs: collections.deque = collections.deque()
+
+ # Estimate of the logs in memory.
+ self.byte_size: int = 0
+
+ # Only allow this many log lines in memory.
+ self.max_history_size: int = 1000000
+
+ # Counts of logs per python logger name
+ self.channel_counts: Dict[str, int] = {}
+ # Widths of each logger prefix string. For example: the character length
+ # of the timestamp string.
+ self.channel_formatted_prefix_widths: Dict[str, int] = {}
+ # Longest of the above prefix widths.
+ self.longest_channel_prefix_width = 0
+
+ self.table: TableView = TableView()
+
+ # Erase existing logs.
+ self.clear_logs()
+
+ # List of viewers that should be notified on new log line arrival.
+ self.registered_viewers: List['LogView'] = []
+
+ super().__init__()
+
+ # Set formatting after logging.Handler init.
+ self.set_formatting()
+
+ def register_viewer(self, viewer: 'LogView'):
+ self.registered_viewers.append(viewer)
+
+ def set_formatting(self):
+ """Setup log formatting."""
+ # Copy of pw_cli log formatter
+ colors = pw_cli.color.colors(True)
+ timestamp_prefix = colors.black_on_white('%(asctime)s') + ' '
+ timestamp_format = '%Y%m%d %H:%M:%S'
+ format_string = timestamp_prefix + '%(levelname)s %(message)s'
+ formatter = logging.Formatter(format_string, timestamp_format)
+
+ self.setLevel(logging.DEBUG)
+ self.setFormatter(formatter)
+
+ # Update log time character width.
+ example_time_string = datetime.now().strftime(timestamp_format)
+ self.table.column_width_time = len(example_time_string)
+
+ def clear_logs(self):
+ """Erase all stored pane lines."""
+ self.logs = collections.deque()
+ self.byte_size = 0
+ self.channel_counts = {}
+ self.channel_formatted_prefix_widths = {}
+ self.line_index = 0
+
+ def get_channel_counts(self):
+ """Return the seen channel log counts for this conatiner."""
+ return ', '.join([
+ f'{name}: {count}' for name, count in self.channel_counts.items()
+ ])
+
+ def get_total_count(self):
+ """Total size of the logs store."""
+ return len(self.logs)
+
+ def get_last_log_line_index(self):
+ """Last valid index of the logs."""
+ # Subtract 1 since self.logs is zero indexed.
+ total = self.get_total_count()
+ return 0 if total < 0 else total - 1
+
+ def _update_log_prefix_width(self, record: logging.LogRecord):
+ """Save the formatted prefix width if this is a new logger channel
+ name."""
+ if self.formatter and (
+ record.name
+ not in self.channel_formatted_prefix_widths.keys()):
+ # Find the width of the formatted timestamp and level
+ format_string = self.formatter._fmt # pylint: disable=protected-access
+
+ # There may not be a _fmt defined.
+ if not format_string:
+ return
+
+ format_without_message = format_string.replace('%(message)s', '')
+ formatted_time_and_level = format_without_message % dict(
+ asctime=record.asctime, levelname=record.levelname)
+
+ # Delete ANSI escape sequences.
+ ansi_stripped_time_and_level = (
+ pw_console.text_formatting.strip_ansi(formatted_time_and_level)
+ )
+
+ self.channel_formatted_prefix_widths[record.name] = len(
+ ansi_stripped_time_and_level)
+
+ # Set the max width of all known formats so far.
+ self.longest_channel_prefix_width = max(
+ self.channel_formatted_prefix_widths.values())
+
+ def _append_log(self, record: logging.LogRecord):
+ """Add a new log event."""
+ # Format incoming log line.
+ formatted_log = self.format(record)
+ ansi_stripped_log = pw_console.text_formatting.strip_ansi(
+ formatted_log)
+ # Save this log.
+ self.logs.append(
+ LogLine(record=record,
+ formatted_log=formatted_log,
+ ansi_stripped_log=ansi_stripped_log))
+ # Increment this logger count
+ self.channel_counts[record.name] = self.channel_counts.get(
+ record.name, 0) + 1
+
+ # Save prefix width of this log line.
+ self._update_log_prefix_width(record)
+
+ # Parse metadata fields
+ self.logs[-1].update_metadata()
+
+ # Check for bigger column widths.
+ self.table.update_metadata_column_widths(self.logs[-1])
+
+ # Update estimated byte_size.
+ self.byte_size += sys.getsizeof(self.logs[-1])
+ # If the total log lines is > max_history_size, delete the oldest line.
+ if self.get_total_count() > self.max_history_size:
+ self.byte_size -= sys.getsizeof(self.logs.popleft())
+
+ def emit(self, record):
+ """Process a new log record.
+
+ This defines the logging.Handler emit() fuction which is called by
+ logging.Handler.handle() We don't implement handle() as it is done in
+ the parent class with thread safety and filters applied.
+ """
+ self._append_log(record)
+ # Notify viewers of new logs
+ for viewer in self.registered_viewers:
+ # TODO(tonymd): Type of viewer does not seem to be checked
+ viewer.new_logs_arrived()
+
+ def render_table_header(self):
+ """Get pre-formatted table header."""
+ return self.table.formatted_header()
diff --git a/pw_console/py/pw_console/log_container.py b/pw_console/py/pw_console/log_view.py
similarity index 63%
rename from pw_console/py/pw_console/log_container.py
rename to pw_console/py/pw_console/log_view.py
index 66f432e..fe53c52 100644
--- a/pw_console/py/pw_console/log_container.py
+++ b/pw_console/py/pw_console/log_view.py
@@ -11,14 +11,12 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
-"""LogContainer storage class."""
+"""LogView maintains a log pane's scrolling and searching state."""
import collections
import logging
-import sys
import time
-from datetime import datetime
-from typing import List, Dict, Optional
+from typing import List, Optional, TYPE_CHECKING
from prompt_toolkit.data_structures import Point
from prompt_toolkit.formatted_text import (
@@ -27,37 +25,27 @@
StyleAndTextTuples,
)
-import pw_cli.color
-
import pw_console.text_formatting
-from pw_console.log_line import LogLine
-from pw_console.widgets.table import TableView
+from pw_console.log_store import LogStore
+
+if TYPE_CHECKING:
+ from pw_console.log_pane import LogPane
_LOG = logging.getLogger(__package__)
-class LogContainer(logging.Handler):
- """Class to hold many log events."""
+class LogView:
+ """Viewing window into a LogStore."""
# pylint: disable=too-many-instance-attributes,too-many-public-methods
- def __init__(self):
- # Log storage deque for fast addition and deletion from the beginning
- # and end of the iterable.
- self.logs: collections.deque = collections.deque()
- # Estimate of the logs in memory.
- self.byte_size: int = 0
-
- # Only allow this many log lines in memory.
- self.max_history_size: int = 1000000
-
- # Counts of logs per python logger name
- self.channel_counts: Dict[str, int] = {}
- # Widths of each logger prefix string. For example: the character length
- # of the timestamp string.
- self.channel_formatted_prefix_widths: Dict[str, int] = {}
- # Longest of the above prefix widths.
- self.longest_channel_prefix_width = 0
+ def __init__(self,
+ log_pane: 'LogPane',
+ log_store: Optional[LogStore] = None):
+ # Parent LogPane reference. Updated by calling `set_log_pane()`.
+ self.log_pane = log_pane
+ self.log_store = log_store if log_store else LogStore()
+ self.log_store.register_viewer(self)
# Current log line index state variables:
self.line_index = 0
@@ -66,8 +54,6 @@
self._current_start_index = 0
self._current_end_index = 0
- self.table = TableView()
-
# LogPane prompt_toolkit container render size.
self._window_height = 20
self._window_width = 80
@@ -77,54 +63,28 @@
self._ui_update_frequency = 0.1
self._last_ui_update_time = time.time()
- # Erase existing logs.
- self.clear_logs()
- # New logs have arrived flag.
- self.has_new_logs = True
-
# Should new log lines be tailed?
self.follow = True
- # Parent LogPane reference. Updated by calling `set_log_pane()`.
- self.log_pane = None
-
# Cache of formatted text tuples used in the last UI render. Used after
# rendering by `get_cursor_position()`.
- self._line_fragment_cache = None
-
- super().__init__()
+ self._line_fragment_cache: collections.deque = collections.deque()
def set_log_pane(self, log_pane):
"""Set the parent LogPane instance."""
self.log_pane = log_pane
- def set_formatting(self):
- """Setup log formatting."""
- # Copy of pw_cli log formatter
- colors = pw_cli.color.colors(True)
- timestamp_prefix = colors.black_on_white('%(asctime)s') + ' '
- timestamp_format = '%Y%m%d %H:%M:%S'
- format_string = timestamp_prefix + '%(levelname)s %(message)s'
- formatter = logging.Formatter(format_string, timestamp_format)
-
- self.setLevel(logging.DEBUG)
- self.setFormatter(formatter)
-
- # Update log time character width.
- example_time_string = datetime.now().strftime(timestamp_format)
- self.table.column_width_time = len(example_time_string)
-
def get_current_line(self):
"""Return the currently selected log event index."""
return self.line_index
+ def get_total_count(self):
+ """Total size of the logs store."""
+ return self.log_store.get_total_count()
+
def clear_logs(self):
"""Erase all stored pane lines."""
- self.logs = collections.deque()
- self.byte_size = 0
- self.channel_counts = {}
- self.channel_formatted_prefix_widths = {}
- self.line_index = 0
+ # TODO(tonymd): Should the LogStore be erased?
def wrap_lines_enabled(self):
"""Get the parent log pane wrap lines setting."""
@@ -138,94 +98,20 @@
if self.follow:
self.scroll_to_bottom()
- def get_total_count(self):
- """Total size of the logs container."""
- return len(self.logs)
-
- def get_last_log_line_index(self):
- """Last valid index of the logs."""
- # Subtract 1 since self.logs is zero indexed.
- total = self.get_total_count()
- return 0 if total < 0 else total - 1
-
- def get_channel_counts(self):
- """Return the seen channel log counts for this conatiner."""
- return ', '.join([
- f'{name}: {count}' for name, count in self.channel_counts.items()
- ])
-
def get_line_wrap_prefix_width(self):
if self.wrap_lines_enabled():
if self.log_pane.table_view:
- return self.table.column_width_prefix_total
- return self.longest_channel_prefix_width
+ return self.log_store.table.column_width_prefix_total
+ return self.log_store.longest_channel_prefix_width
return 0
- def _update_log_prefix_width(self, record: logging.LogRecord):
- """Save the formatted prefix width if this is a new logger channel
- name."""
- if self.formatter and (
- record.name
- not in self.channel_formatted_prefix_widths.keys()):
- # Find the width of the formatted timestamp and level
- format_string = self.formatter._fmt # pylint: disable=protected-access
-
- # There may not be a _fmt defined.
- if not format_string:
- return
-
- format_without_message = format_string.replace('%(message)s', '')
- formatted_time_and_level = format_without_message % dict(
- asctime=record.asctime, levelname=record.levelname)
-
- # Delete ANSI escape sequences.
- ansi_stripped_time_and_level = (
- pw_console.text_formatting.strip_ansi(formatted_time_and_level)
- )
-
- self.channel_formatted_prefix_widths[record.name] = len(
- ansi_stripped_time_and_level)
-
- # Set the max width of all known formats so far.
- self.longest_channel_prefix_width = max(
- self.channel_formatted_prefix_widths.values())
-
- def _append_log(self, record: logging.LogRecord):
- """Add a new log event."""
- # Format incoming log line.
- formatted_log = self.format(record)
- ansi_stripped_log = pw_console.text_formatting.strip_ansi(
- formatted_log)
- # Save this log.
- self.logs.append(
- LogLine(record=record,
- formatted_log=formatted_log,
- ansi_stripped_log=ansi_stripped_log))
- # Increment this logger count
- self.channel_counts[record.name] = self.channel_counts.get(
- record.name, 0) + 1
-
- # Save prefix width of this log line.
- self._update_log_prefix_width(record)
-
- # Parse metadata fields
- self.logs[-1].update_metadata()
-
- # Check for bigger column widths.
- self.table.update_metadata_column_widths(self.logs[-1])
-
- # Update estimated byte_size.
- self.byte_size += sys.getsizeof(self.logs[-1])
- # If the total log lines is > max_history_size, delete the oldest line.
- if self.get_total_count() > self.max_history_size:
- self.byte_size -= sys.getsizeof(self.logs.popleft())
-
- # Set the has_new_logs flag.
- self.has_new_logs = True
- # If follow is on, scroll to the last line.
+ def new_logs_arrived(self):
if self.follow:
self.scroll_to_bottom()
+ # Trigger a UI update
+ self._update_prompt_toolkit_ui()
+
def _update_prompt_toolkit_ui(self):
"""Update Prompt Toolkit UI if a certain amount of time has passed."""
emit_time = time.time()
@@ -241,10 +127,6 @@
# event loop.
console_app.application.invalidate()
- def log_content_changed(self):
- """Return True if new log lines have appeared since the last render."""
- return self.has_new_logs
-
def get_cursor_position(self) -> Optional[Point]:
"""Return the position of the cursor."""
# This implementation is based on get_cursor_position from
@@ -266,16 +148,6 @@
column += len(text)
return Point(0, 0)
- def emit(self, record):
- """Process a new log record.
-
- This defines the logging.Handler emit() fuction which is called by
- logging.Handler.handle() We don't implement handle() as it is done in
- the parent class with thread safety and filters applied.
- """
- self._append_log(record)
- self._update_prompt_toolkit_ui()
-
def scroll_to_top(self):
"""Move selected index to the beginning."""
# Stop following so cursor doesn't jump back down to the bottom.
@@ -285,7 +157,7 @@
def scroll_to_bottom(self):
"""Move selected index to the end."""
# Don't change following state like scroll_to_top.
- self.line_index = max(0, self.get_last_log_line_index())
+ self.line_index = max(0, self.log_store.get_last_log_line_index())
def scroll(self, lines):
"""Scroll up or down by plus or minus lines.
@@ -298,8 +170,8 @@
# If scrolling to an index below zero, set to zero.
new_line_index = max(0, self.line_index + lines)
# If past the end, set to the last index of self.logs.
- if new_line_index >= self.get_total_count():
- new_line_index = self.get_last_log_line_index()
+ if new_line_index >= self.log_store.get_total_count():
+ new_line_index = self.log_store.get_last_log_line_index()
# Set the new selected line index.
self.line_index = new_line_index
@@ -363,19 +235,18 @@
# Use the current_window_height if line_index is less
ending_index = max(self.line_index, max_window_row_index)
- if ending_index > self.get_last_log_line_index():
- ending_index = self.get_last_log_line_index()
+ if ending_index > self.log_store.get_last_log_line_index():
+ ending_index = self.log_store.get_last_log_line_index()
# Save start and end index.
self._current_start_index = starting_index
self._current_end_index = ending_index
- self.has_new_logs = False
return starting_index, ending_index
def render_table_header(self):
"""Get pre-formatted table header."""
- return self.table.formatted_header()
+ return self.log_store.table.formatted_header()
def render_content(self) -> List:
"""Return log lines as a list of FormattedText tuples.
@@ -393,7 +264,7 @@
# If we have no logs add one with at least a single space character for
# the cursor to land on. Otherwise the cursor will be left on the line
# above the log pane container.
- if self.get_total_count() < 1:
+ if self.log_store.get_total_count() < 1:
return [(
'[SetCursorPosition]', '\n' * self._window_height
# LogContentControl.mouse_handler will handle focusing the log
@@ -414,9 +285,10 @@
break
# Grab the rendered log line using the table or standard view.
- line_fragments: StyleAndTextTuples = (self.table.formatted_row(
- self.logs[i]) if self.log_pane.table_view else
- self.logs[i].get_fragments())
+ line_fragments: StyleAndTextTuples = (
+ self.log_store.table.formatted_row(self.log_store.logs[i])
+ if self.log_pane.table_view else
+ self.log_store.logs[i].get_fragments())
# Get the width, height and remaining width.
fragment_width = fragment_list_width(line_fragments)
@@ -434,7 +306,7 @@
used_lines = line_height
# Count the number of line breaks are included in the log line.
- line_breaks = self.logs[i].ansi_stripped_log.count('\n')
+ line_breaks = self.log_store.logs[i].ansi_stripped_log.count('\n')
used_lines += line_breaks
# If this is the selected line apply a style class for highlighting.