blob: 73a93cf2c5c7491dad8c85b2e5acae3f6fbc3d1a [file] [log] [blame]
Anthony DiGirolamo8a498802021-06-14 23:52:42 -07001# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7# https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Tests for pw_console.console_app"""
15
16import asyncio
Anthony DiGirolamo8a498802021-06-14 23:52:42 -070017import builtins
Anthony DiGirolamo855b01d2021-06-18 17:11:56 -070018import inspect
Anthony DiGirolamofc1c7162021-07-29 08:41:52 -070019import io
20import sys
Anthony DiGirolamof48d0b62021-06-15 12:47:31 -070021import threading
Anthony DiGirolamo8a498802021-06-14 23:52:42 -070022import unittest
Anthony DiGirolamofc1c7162021-07-29 08:41:52 -070023from unittest.mock import MagicMock, call
Anthony DiGirolamo8a498802021-06-14 23:52:42 -070024
25from prompt_toolkit.application import create_app_session
Anthony DiGirolamoc6874802021-07-02 14:52:45 -070026from prompt_toolkit.output import (
27 ColorDepth,
28 # inclusive-language: ignore
29 DummyOutput as FakeOutput,
30)
Anthony DiGirolamo8a498802021-06-14 23:52:42 -070031
32from pw_console.console_app import ConsoleApp
33from pw_console.repl_pane import ReplPane
34from pw_console.pw_ptpython_repl import PwPtPythonRepl
35
Anthony DiGirolamofc1c7162021-07-29 08:41:52 -070036_PYTHON_3_8 = sys.version_info >= (
37 3,
38 8,
39)
Anthony DiGirolamo8a498802021-06-14 23:52:42 -070040
Anthony DiGirolamofc1c7162021-07-29 08:41:52 -070041if _PYTHON_3_8:
42 from unittest import IsolatedAsyncioTestCase # type: ignore # pylint: disable=no-name-in-module
Anthony DiGirolamo8a498802021-06-14 23:52:42 -070043
Anthony DiGirolamofc1c7162021-07-29 08:41:52 -070044 class TestReplPane(IsolatedAsyncioTestCase):
45 """Tests for ReplPane."""
46 def setUp(self): # pylint: disable=invalid-name
47 self.maxDiff = None # pylint: disable=invalid-name
Anthony DiGirolamo8a498802021-06-14 23:52:42 -070048
Anthony DiGirolamofc1c7162021-07-29 08:41:52 -070049 def test_repl_code_return_values(self) -> None:
50 """Test stdout, return values, and exceptions can be returned from
51 running user repl code."""
52 app = MagicMock()
Anthony DiGirolamo8a498802021-06-14 23:52:42 -070053
Anthony DiGirolamofc1c7162021-07-29 08:41:52 -070054 global_vars = {
55 '__name__': '__main__',
56 '__package__': None,
57 '__doc__': None,
58 '__builtins__': builtins,
59 }
Anthony DiGirolamo8a498802021-06-14 23:52:42 -070060
Anthony DiGirolamofc1c7162021-07-29 08:41:52 -070061 pw_ptpython_repl = PwPtPythonRepl(
62 get_globals=lambda: global_vars,
63 get_locals=lambda: global_vars,
64 color_depth=ColorDepth.DEPTH_8_BIT)
65 repl_pane = ReplPane(
66 application=app,
67 python_repl=pw_ptpython_repl,
68 )
69 # Check pw_ptpython_repl has a reference to the parent repl_pane.
70 self.assertEqual(repl_pane, pw_ptpython_repl.repl_pane)
Anthony DiGirolamo8a498802021-06-14 23:52:42 -070071
Anthony DiGirolamofc1c7162021-07-29 08:41:52 -070072 # Define a function, should return nothing.
Anthony DiGirolamo855b01d2021-06-18 17:11:56 -070073 code = inspect.cleandoc("""
Anthony DiGirolamo8a498802021-06-14 23:52:42 -070074 def run():
Anthony DiGirolamo8a498802021-06-14 23:52:42 -070075 print('The answer is ', end='')
76 return 1+1+4+16+20
77 """)
Anthony DiGirolamofc1c7162021-07-29 08:41:52 -070078 temp_stdout = io.StringIO()
79 temp_stderr = io.StringIO()
Anthony DiGirolamo8a498802021-06-14 23:52:42 -070080 # pylint: disable=protected-access
Anthony DiGirolamofc1c7162021-07-29 08:41:52 -070081 result = asyncio.run(
82 pw_ptpython_repl._run_user_code(code, temp_stdout,
83 temp_stderr))
84 self.assertEqual(result, {
85 'stdout': '',
86 'stderr': '',
87 'result': None
88 })
Anthony DiGirolamo8a498802021-06-14 23:52:42 -070089
Anthony DiGirolamofc1c7162021-07-29 08:41:52 -070090 temp_stdout = io.StringIO()
91 temp_stderr = io.StringIO()
92 # Check stdout and return value
93 result = asyncio.run(
94 pw_ptpython_repl._run_user_code('run()', temp_stdout,
95 temp_stderr))
96 self.assertEqual(result, {
97 'stdout': 'The answer is ',
98 'stderr': '',
99 'result': 42
100 })
Anthony DiGirolamo8a498802021-06-14 23:52:42 -0700101
Anthony DiGirolamofc1c7162021-07-29 08:41:52 -0700102 temp_stdout = io.StringIO()
103 temp_stderr = io.StringIO()
104 # Check for repl exception
105 result = asyncio.run(
106 pw_ptpython_repl._run_user_code('return "blah"', temp_stdout,
107 temp_stderr))
108 self.assertIn("SyntaxError: 'return' outside function",
109 pw_ptpython_repl._last_result) # type: ignore
Anthony DiGirolamo8a498802021-06-14 23:52:42 -0700110
Anthony DiGirolamofc1c7162021-07-29 08:41:52 -0700111 async def test_user_thread(self) -> None:
112 """Test user code thread."""
Anthony DiGirolamo8a498802021-06-14 23:52:42 -0700113
Anthony DiGirolamofc1c7162021-07-29 08:41:52 -0700114 with create_app_session(output=FakeOutput()):
115 # Setup Mocks
116 app = ConsoleApp(color_depth=ColorDepth.DEPTH_8_BIT)
117 app.start_user_code_thread()
Anthony DiGirolamo8a498802021-06-14 23:52:42 -0700118
Anthony DiGirolamofc1c7162021-07-29 08:41:52 -0700119 pw_ptpython_repl = app.pw_ptpython_repl
120 repl_pane = app.repl_pane
Anthony DiGirolamo8a498802021-06-14 23:52:42 -0700121
Anthony DiGirolamofc1c7162021-07-29 08:41:52 -0700122 # Mock update_output_buffer to track number of update calls
123 repl_pane.update_output_buffer = MagicMock(
124 wraps=repl_pane.update_output_buffer)
125
126 # Mock complete callback
127 pw_ptpython_repl.user_code_complete_callback = MagicMock(
128 wraps=pw_ptpython_repl.user_code_complete_callback)
129
130 # Repl done flag for tests
131 user_code_done = threading.Event()
132
133 # Run some code
134 code = inspect.cleandoc("""
135 import time
136 def run():
137 for i in range(2):
138 time.sleep(0.5)
139 print(i)
140 print('The answer is ', end='')
141 return 1+1+4+16+20
142 """)
143 input_buffer = MagicMock(text=code)
144 pw_ptpython_repl._accept_handler(input_buffer) # pylint: disable=protected-access
145
146 # Get last executed code object.
147 user_code1 = repl_pane.executed_code[-1]
148 # Wait for repl code to finish.
149 user_code1.future.add_done_callback(
150 lambda future: user_code_done.set())
151 # Wait for stdout monitoring to complete.
152 if user_code1.output_check_task:
153 await user_code1.output_check_task
154 # Wait for test done callback.
155 user_code_done.wait(timeout=3)
156
157 # Check user_code1 results
158 # NOTE: Avoid using assert_has_calls. Thread timing can make the
159 # test flaky.
160 expected_calls = [
161 # Initial exec start
162 call('pw_ptpython_repl._accept_handler'),
163 # Code finishes
164 call('repl_pane.append_result_to_executed_code'),
165 # Complete callback
166 call('pw_ptpython_repl.user_code_complete_callback'),
167 ]
168 for expected_call in expected_calls:
169 self.assertIn(expected_call,
170 repl_pane.update_output_buffer.mock_calls)
171
172 pw_ptpython_repl.user_code_complete_callback.assert_called_once(
173 )
174
175 self.assertIsNotNone(user_code1)
176 self.assertTrue(user_code1.future.done())
177 self.assertEqual(user_code1.input, code)
178 self.assertEqual(user_code1.output, None)
179 # stdout / stderr may be '' or None
180 self.assertFalse(user_code1.stdout)
181 self.assertFalse(user_code1.stderr)
182
183 # Reset mocks
184 user_code_done.clear()
185 pw_ptpython_repl.user_code_complete_callback.reset_mock()
186 repl_pane.update_output_buffer.reset_mock()
187
188 # Run some code
189 input_buffer = MagicMock(text='run()')
190 pw_ptpython_repl._accept_handler(input_buffer) # pylint: disable=protected-access
191
192 # Get last executed code object.
193 user_code2 = repl_pane.executed_code[-1]
194 # Wait for repl code to finish.
195 user_code2.future.add_done_callback(
196 lambda future: user_code_done.set())
197 # Wait for stdout monitoring to complete.
198 if user_code2.output_check_task:
199 await user_code2.output_check_task
200 # Wait for test done callback.
201 user_code_done.wait(timeout=3)
202
203 # Check user_code2 results
204 # NOTE: Avoid using assert_has_calls. Thread timing can make the
205 # test flaky.
206 expected_calls = [
207 # Initial exec start
208 call('pw_ptpython_repl._accept_handler'),
209 # Periodic checks, should be a total of 4:
210 # Code should take 1.0 second to run.
211 # Periodic checks every 0.3 seconds
212 # 1.0 / 0.3 = 3.33 (4) checks
213 call('repl_pane.periodic_check'),
214 call('repl_pane.periodic_check'),
215 call('repl_pane.periodic_check'),
216 # Code finishes
217 call('repl_pane.append_result_to_executed_code'),
218 # Complete callback
219 call('pw_ptpython_repl.user_code_complete_callback'),
220 # Final periodic check
221 call('repl_pane.periodic_check'),
222 ]
223 for expected_call in expected_calls:
224 self.assertIn(expected_call,
225 repl_pane.update_output_buffer.mock_calls)
226
227 pw_ptpython_repl.user_code_complete_callback.assert_called_once(
228 )
229 self.assertIsNotNone(user_code2)
230 self.assertTrue(user_code2.future.done())
231 self.assertEqual(user_code2.input, 'run()')
232 self.assertEqual(user_code2.output, '42')
233 self.assertEqual(user_code2.stdout, '0\n1\nThe answer is ')
234 self.assertFalse(user_code2.stderr)
235
236 # Reset mocks
237 user_code_done.clear()
238 pw_ptpython_repl.user_code_complete_callback.reset_mock()
239 repl_pane.update_output_buffer.reset_mock()
Anthony DiGirolamo8a498802021-06-14 23:52:42 -0700240
241
242if __name__ == '__main__':
243 unittest.main()