blob: d280270af91bf6a0b6bab13153e1035652212fee [file] [log] [blame]
Eric Snow7f8bfc92018-01-29 18:23:44 -07001import contextlib
2import os
3import pickle
4from textwrap import dedent, indent
5import threading
6import unittest
7
8from test import support
9from test.support import script_helper
10
11interpreters = support.import_module('_xxsubinterpreters')
12
13
14def _captured_script(script):
15 r, w = os.pipe()
16 indented = script.replace('\n', '\n ')
17 wrapped = dedent(f"""
18 import contextlib
19 with open({w}, 'w') as chan:
20 with contextlib.redirect_stdout(chan):
21 {indented}
22 """)
23 return wrapped, open(r)
24
25
26def _run_output(interp, request, shared=None):
27 script, chan = _captured_script(request)
28 with chan:
29 interpreters.run_string(interp, script, shared)
30 return chan.read()
31
32
33@contextlib.contextmanager
34def _running(interp):
35 r, w = os.pipe()
36 def run():
37 interpreters.run_string(interp, dedent(f"""
38 # wait for "signal"
39 with open({r}) as chan:
40 chan.read()
41 """))
42
43 t = threading.Thread(target=run)
44 t.start()
45
46 yield
47
48 with open(w, 'w') as chan:
49 chan.write('done')
50 t.join()
51
52
53class IsShareableTests(unittest.TestCase):
54
55 def test_default_shareables(self):
56 shareables = [
57 # singletons
58 None,
59 # builtin objects
60 b'spam',
61 ]
62 for obj in shareables:
63 with self.subTest(obj):
64 self.assertTrue(
65 interpreters.is_shareable(obj))
66
67 def test_not_shareable(self):
68 class Cheese:
69 def __init__(self, name):
70 self.name = name
71 def __str__(self):
72 return self.name
73
74 class SubBytes(bytes):
75 """A subclass of a shareable type."""
76
77 not_shareables = [
78 # singletons
79 True,
80 False,
81 NotImplemented,
82 ...,
83 # builtin types and objects
84 type,
85 object,
86 object(),
87 Exception(),
88 42,
89 100.0,
90 'spam',
91 # user-defined types and objects
92 Cheese,
93 Cheese('Wensleydale'),
94 SubBytes(b'spam'),
95 ]
96 for obj in not_shareables:
97 with self.subTest(obj):
98 self.assertFalse(
99 interpreters.is_shareable(obj))
100
101
102class TestBase(unittest.TestCase):
103
104 def tearDown(self):
105 for id in interpreters.list_all():
106 if id == 0: # main
107 continue
108 try:
109 interpreters.destroy(id)
110 except RuntimeError:
111 pass # already destroyed
112
113 for cid in interpreters.channel_list_all():
114 try:
115 interpreters.channel_destroy(cid)
116 except interpreters.ChannelNotFoundError:
117 pass # already destroyed
118
119
120class ListAllTests(TestBase):
121
122 def test_initial(self):
123 main = interpreters.get_main()
124 ids = interpreters.list_all()
125 self.assertEqual(ids, [main])
126
127 def test_after_creating(self):
128 main = interpreters.get_main()
129 first = interpreters.create()
130 second = interpreters.create()
131 ids = interpreters.list_all()
132 self.assertEqual(ids, [main, first, second])
133
134 def test_after_destroying(self):
135 main = interpreters.get_main()
136 first = interpreters.create()
137 second = interpreters.create()
138 interpreters.destroy(first)
139 ids = interpreters.list_all()
140 self.assertEqual(ids, [main, second])
141
142
143class GetCurrentTests(TestBase):
144
145 def test_main(self):
146 main = interpreters.get_main()
147 cur = interpreters.get_current()
148 self.assertEqual(cur, main)
149
150 def test_subinterpreter(self):
151 main = interpreters.get_main()
152 interp = interpreters.create()
153 out = _run_output(interp, dedent("""
154 import _xxsubinterpreters as _interpreters
Miss Islington (bot)3db05a32018-02-16 18:15:24 -0800155 print(int(_interpreters.get_current()))
Eric Snow7f8bfc92018-01-29 18:23:44 -0700156 """))
157 cur = int(out.strip())
158 _, expected = interpreters.list_all()
159 self.assertEqual(cur, expected)
160 self.assertNotEqual(cur, main)
161
162
163class GetMainTests(TestBase):
164
165 def test_from_main(self):
166 [expected] = interpreters.list_all()
167 main = interpreters.get_main()
168 self.assertEqual(main, expected)
169
170 def test_from_subinterpreter(self):
171 [expected] = interpreters.list_all()
172 interp = interpreters.create()
173 out = _run_output(interp, dedent("""
174 import _xxsubinterpreters as _interpreters
Miss Islington (bot)3db05a32018-02-16 18:15:24 -0800175 print(int(_interpreters.get_main()))
Eric Snow7f8bfc92018-01-29 18:23:44 -0700176 """))
177 main = int(out.strip())
178 self.assertEqual(main, expected)
179
180
181class IsRunningTests(TestBase):
182
183 def test_main(self):
184 main = interpreters.get_main()
185 self.assertTrue(interpreters.is_running(main))
186
187 def test_subinterpreter(self):
188 interp = interpreters.create()
189 self.assertFalse(interpreters.is_running(interp))
190
191 with _running(interp):
192 self.assertTrue(interpreters.is_running(interp))
193 self.assertFalse(interpreters.is_running(interp))
194
195 def test_from_subinterpreter(self):
196 interp = interpreters.create()
197 out = _run_output(interp, dedent(f"""
198 import _xxsubinterpreters as _interpreters
Miss Islington (bot)3db05a32018-02-16 18:15:24 -0800199 if _interpreters.is_running({int(interp)}):
Eric Snow7f8bfc92018-01-29 18:23:44 -0700200 print(True)
201 else:
202 print(False)
203 """))
204 self.assertEqual(out.strip(), 'True')
205
206 def test_already_destroyed(self):
207 interp = interpreters.create()
208 interpreters.destroy(interp)
209 with self.assertRaises(RuntimeError):
210 interpreters.is_running(interp)
211
212 def test_does_not_exist(self):
213 with self.assertRaises(RuntimeError):
214 interpreters.is_running(1_000_000)
215
216 def test_bad_id(self):
217 with self.assertRaises(RuntimeError):
218 interpreters.is_running(-1)
219
220
Miss Islington (bot)3db05a32018-02-16 18:15:24 -0800221class InterpreterIDTests(TestBase):
222
223 def test_with_int(self):
224 id = interpreters.InterpreterID(10, force=True)
225
226 self.assertEqual(int(id), 10)
227
228 def test_coerce_id(self):
229 id = interpreters.InterpreterID('10', force=True)
230 self.assertEqual(int(id), 10)
231
232 id = interpreters.InterpreterID(10.0, force=True)
233 self.assertEqual(int(id), 10)
234
235 class Int(str):
236 def __init__(self, value):
237 self._value = value
238 def __int__(self):
239 return self._value
240
241 id = interpreters.InterpreterID(Int(10), force=True)
242 self.assertEqual(int(id), 10)
243
244 def test_bad_id(self):
245 for id in [-1, 'spam']:
246 with self.subTest(id):
247 with self.assertRaises(ValueError):
248 interpreters.InterpreterID(id)
249 with self.assertRaises(OverflowError):
250 interpreters.InterpreterID(2**64)
251 with self.assertRaises(TypeError):
252 interpreters.InterpreterID(object())
253
254 def test_does_not_exist(self):
255 id = interpreters.channel_create()
256 with self.assertRaises(RuntimeError):
257 interpreters.InterpreterID(int(id) + 1) # unforced
258
259 def test_repr(self):
260 id = interpreters.InterpreterID(10, force=True)
261 self.assertEqual(repr(id), 'InterpreterID(10)')
262
263 def test_equality(self):
264 id1 = interpreters.create()
265 id2 = interpreters.InterpreterID(int(id1))
266 id3 = interpreters.create()
267
268 self.assertTrue(id1 == id1)
269 self.assertTrue(id1 == id2)
270 self.assertTrue(id1 == int(id1))
271 self.assertFalse(id1 == id3)
272
273 self.assertFalse(id1 != id1)
274 self.assertFalse(id1 != id2)
275 self.assertTrue(id1 != id3)
276
277
Eric Snow7f8bfc92018-01-29 18:23:44 -0700278class CreateTests(TestBase):
279
280 def test_in_main(self):
281 id = interpreters.create()
282
283 self.assertIn(id, interpreters.list_all())
284
285 @unittest.skip('enable this test when working on pystate.c')
286 def test_unique_id(self):
287 seen = set()
288 for _ in range(100):
289 id = interpreters.create()
290 interpreters.destroy(id)
291 seen.add(id)
292
293 self.assertEqual(len(seen), 100)
294
295 def test_in_thread(self):
296 lock = threading.Lock()
297 id = None
298 def f():
299 nonlocal id
300 id = interpreters.create()
301 lock.acquire()
302 lock.release()
303
304 t = threading.Thread(target=f)
305 with lock:
306 t.start()
307 t.join()
308 self.assertIn(id, interpreters.list_all())
309
310 def test_in_subinterpreter(self):
311 main, = interpreters.list_all()
312 id1 = interpreters.create()
313 out = _run_output(id1, dedent("""
314 import _xxsubinterpreters as _interpreters
315 id = _interpreters.create()
Miss Islington (bot)3db05a32018-02-16 18:15:24 -0800316 print(int(id))
Eric Snow7f8bfc92018-01-29 18:23:44 -0700317 """))
318 id2 = int(out.strip())
319
320 self.assertEqual(set(interpreters.list_all()), {main, id1, id2})
321
322 def test_in_threaded_subinterpreter(self):
323 main, = interpreters.list_all()
324 id1 = interpreters.create()
325 id2 = None
326 def f():
327 nonlocal id2
328 out = _run_output(id1, dedent("""
329 import _xxsubinterpreters as _interpreters
330 id = _interpreters.create()
Miss Islington (bot)3db05a32018-02-16 18:15:24 -0800331 print(int(id))
Eric Snow7f8bfc92018-01-29 18:23:44 -0700332 """))
333 id2 = int(out.strip())
334
335 t = threading.Thread(target=f)
336 t.start()
337 t.join()
338
339 self.assertEqual(set(interpreters.list_all()), {main, id1, id2})
340
341 def test_after_destroy_all(self):
342 before = set(interpreters.list_all())
343 # Create 3 subinterpreters.
344 ids = []
345 for _ in range(3):
346 id = interpreters.create()
347 ids.append(id)
348 # Now destroy them.
349 for id in ids:
350 interpreters.destroy(id)
351 # Finally, create another.
352 id = interpreters.create()
353 self.assertEqual(set(interpreters.list_all()), before | {id})
354
355 def test_after_destroy_some(self):
356 before = set(interpreters.list_all())
357 # Create 3 subinterpreters.
358 id1 = interpreters.create()
359 id2 = interpreters.create()
360 id3 = interpreters.create()
361 # Now destroy 2 of them.
362 interpreters.destroy(id1)
363 interpreters.destroy(id3)
364 # Finally, create another.
365 id = interpreters.create()
366 self.assertEqual(set(interpreters.list_all()), before | {id, id2})
367
368
369class DestroyTests(TestBase):
370
371 def test_one(self):
372 id1 = interpreters.create()
373 id2 = interpreters.create()
374 id3 = interpreters.create()
375 self.assertIn(id2, interpreters.list_all())
376 interpreters.destroy(id2)
377 self.assertNotIn(id2, interpreters.list_all())
378 self.assertIn(id1, interpreters.list_all())
379 self.assertIn(id3, interpreters.list_all())
380
381 def test_all(self):
382 before = set(interpreters.list_all())
383 ids = set()
384 for _ in range(3):
385 id = interpreters.create()
386 ids.add(id)
387 self.assertEqual(set(interpreters.list_all()), before | ids)
388 for id in ids:
389 interpreters.destroy(id)
390 self.assertEqual(set(interpreters.list_all()), before)
391
392 def test_main(self):
393 main, = interpreters.list_all()
394 with self.assertRaises(RuntimeError):
395 interpreters.destroy(main)
396
397 def f():
398 with self.assertRaises(RuntimeError):
399 interpreters.destroy(main)
400
401 t = threading.Thread(target=f)
402 t.start()
403 t.join()
404
405 def test_already_destroyed(self):
406 id = interpreters.create()
407 interpreters.destroy(id)
408 with self.assertRaises(RuntimeError):
409 interpreters.destroy(id)
410
411 def test_does_not_exist(self):
412 with self.assertRaises(RuntimeError):
413 interpreters.destroy(1_000_000)
414
415 def test_bad_id(self):
416 with self.assertRaises(RuntimeError):
417 interpreters.destroy(-1)
418
419 def test_from_current(self):
420 main, = interpreters.list_all()
421 id = interpreters.create()
Miss Islington (bot)f33eced2018-02-02 21:38:57 -0800422 script = dedent(f"""
Eric Snow7f8bfc92018-01-29 18:23:44 -0700423 import _xxsubinterpreters as _interpreters
Miss Islington (bot)f33eced2018-02-02 21:38:57 -0800424 try:
Miss Islington (bot)3db05a32018-02-16 18:15:24 -0800425 _interpreters.destroy({int(id)})
Miss Islington (bot)f33eced2018-02-02 21:38:57 -0800426 except RuntimeError:
427 pass
428 """)
Eric Snow7f8bfc92018-01-29 18:23:44 -0700429
Miss Islington (bot)f33eced2018-02-02 21:38:57 -0800430 interpreters.run_string(id, script)
Eric Snow7f8bfc92018-01-29 18:23:44 -0700431 self.assertEqual(set(interpreters.list_all()), {main, id})
432
433 def test_from_sibling(self):
434 main, = interpreters.list_all()
435 id1 = interpreters.create()
436 id2 = interpreters.create()
Miss Islington (bot)3db05a32018-02-16 18:15:24 -0800437 script = dedent(f"""
Eric Snow7f8bfc92018-01-29 18:23:44 -0700438 import _xxsubinterpreters as _interpreters
Miss Islington (bot)3db05a32018-02-16 18:15:24 -0800439 _interpreters.destroy({int(id2)})
440 """)
Eric Snow7f8bfc92018-01-29 18:23:44 -0700441 interpreters.run_string(id1, script)
442
443 self.assertEqual(set(interpreters.list_all()), {main, id1})
444
445 def test_from_other_thread(self):
446 id = interpreters.create()
447 def f():
448 interpreters.destroy(id)
449
450 t = threading.Thread(target=f)
451 t.start()
452 t.join()
453
454 def test_still_running(self):
455 main, = interpreters.list_all()
456 interp = interpreters.create()
457 with _running(interp):
458 with self.assertRaises(RuntimeError):
459 interpreters.destroy(interp)
460 self.assertTrue(interpreters.is_running(interp))
461
462
463class RunStringTests(TestBase):
464
465 SCRIPT = dedent("""
466 with open('{}', 'w') as out:
467 out.write('{}')
468 """)
469 FILENAME = 'spam'
470
471 def setUp(self):
472 super().setUp()
473 self.id = interpreters.create()
474 self._fs = None
475
476 def tearDown(self):
477 if self._fs is not None:
478 self._fs.close()
479 super().tearDown()
480
481 @property
482 def fs(self):
483 if self._fs is None:
484 self._fs = FSFixture(self)
485 return self._fs
486
487 def test_success(self):
488 script, file = _captured_script('print("it worked!", end="")')
489 with file:
490 interpreters.run_string(self.id, script)
491 out = file.read()
492
493 self.assertEqual(out, 'it worked!')
494
495 def test_in_thread(self):
496 script, file = _captured_script('print("it worked!", end="")')
497 with file:
498 def f():
499 interpreters.run_string(self.id, script)
500
501 t = threading.Thread(target=f)
502 t.start()
503 t.join()
504 out = file.read()
505
506 self.assertEqual(out, 'it worked!')
507
508 def test_create_thread(self):
509 script, file = _captured_script("""
510 import threading
511 def f():
512 print('it worked!', end='')
513
514 t = threading.Thread(target=f)
515 t.start()
516 t.join()
517 """)
518 with file:
519 interpreters.run_string(self.id, script)
520 out = file.read()
521
522 self.assertEqual(out, 'it worked!')
523
524 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
525 def test_fork(self):
526 import tempfile
527 with tempfile.NamedTemporaryFile('w+') as file:
528 file.write('')
529 file.flush()
530
531 expected = 'spam spam spam spam spam'
532 script = dedent(f"""
533 # (inspired by Lib/test/test_fork.py)
534 import os
535 pid = os.fork()
536 if pid == 0: # child
537 with open('{file.name}', 'w') as out:
538 out.write('{expected}')
539 # Kill the unittest runner in the child process.
540 os._exit(1)
541 else:
542 SHORT_SLEEP = 0.1
543 import time
544 for _ in range(10):
545 spid, status = os.waitpid(pid, os.WNOHANG)
546 if spid == pid:
547 break
548 time.sleep(SHORT_SLEEP)
549 assert(spid == pid)
550 """)
551 interpreters.run_string(self.id, script)
552
553 file.seek(0)
554 content = file.read()
555 self.assertEqual(content, expected)
556
557 def test_already_running(self):
558 with _running(self.id):
559 with self.assertRaises(RuntimeError):
560 interpreters.run_string(self.id, 'print("spam")')
561
562 def test_does_not_exist(self):
563 id = 0
564 while id in interpreters.list_all():
565 id += 1
566 with self.assertRaises(RuntimeError):
567 interpreters.run_string(id, 'print("spam")')
568
569 def test_error_id(self):
570 with self.assertRaises(RuntimeError):
571 interpreters.run_string(-1, 'print("spam")')
572
573 def test_bad_id(self):
574 with self.assertRaises(TypeError):
575 interpreters.run_string('spam', 'print("spam")')
576
577 def test_bad_script(self):
578 with self.assertRaises(TypeError):
579 interpreters.run_string(self.id, 10)
580
581 def test_bytes_for_script(self):
582 with self.assertRaises(TypeError):
583 interpreters.run_string(self.id, b'print("spam")')
584
585 @contextlib.contextmanager
586 def assert_run_failed(self, exctype, msg=None):
587 with self.assertRaises(interpreters.RunFailedError) as caught:
588 yield
589 if msg is None:
590 self.assertEqual(str(caught.exception).split(':')[0],
591 str(exctype))
592 else:
593 self.assertEqual(str(caught.exception),
594 "{}: {}".format(exctype, msg))
595
596 def test_invalid_syntax(self):
597 with self.assert_run_failed(SyntaxError):
598 # missing close paren
599 interpreters.run_string(self.id, 'print("spam"')
600
601 def test_failure(self):
602 with self.assert_run_failed(Exception, 'spam'):
603 interpreters.run_string(self.id, 'raise Exception("spam")')
604
605 def test_SystemExit(self):
606 with self.assert_run_failed(SystemExit, '42'):
607 interpreters.run_string(self.id, 'raise SystemExit(42)')
608
609 def test_sys_exit(self):
610 with self.assert_run_failed(SystemExit):
611 interpreters.run_string(self.id, dedent("""
612 import sys
613 sys.exit()
614 """))
615
616 with self.assert_run_failed(SystemExit, '42'):
617 interpreters.run_string(self.id, dedent("""
618 import sys
619 sys.exit(42)
620 """))
621
622 def test_with_shared(self):
623 r, w = os.pipe()
624
625 shared = {
626 'spam': b'ham',
627 'eggs': b'-1',
628 'cheddar': None,
629 }
630 script = dedent(f"""
631 eggs = int(eggs)
632 spam = 42
633 result = spam + eggs
634
635 ns = dict(vars())
636 del ns['__builtins__']
637 import pickle
638 with open({w}, 'wb') as chan:
639 pickle.dump(ns, chan)
640 """)
641 interpreters.run_string(self.id, script, shared)
642 with open(r, 'rb') as chan:
643 ns = pickle.load(chan)
644
645 self.assertEqual(ns['spam'], 42)
646 self.assertEqual(ns['eggs'], -1)
647 self.assertEqual(ns['result'], 41)
648 self.assertIsNone(ns['cheddar'])
649
650 def test_shared_overwrites(self):
651 interpreters.run_string(self.id, dedent("""
652 spam = 'eggs'
653 ns1 = dict(vars())
654 del ns1['__builtins__']
655 """))
656
657 shared = {'spam': b'ham'}
658 script = dedent(f"""
659 ns2 = dict(vars())
660 del ns2['__builtins__']
661 """)
662 interpreters.run_string(self.id, script, shared)
663
664 r, w = os.pipe()
665 script = dedent(f"""
666 ns = dict(vars())
667 del ns['__builtins__']
668 import pickle
669 with open({w}, 'wb') as chan:
670 pickle.dump(ns, chan)
671 """)
672 interpreters.run_string(self.id, script)
673 with open(r, 'rb') as chan:
674 ns = pickle.load(chan)
675
676 self.assertEqual(ns['ns1']['spam'], 'eggs')
677 self.assertEqual(ns['ns2']['spam'], b'ham')
678 self.assertEqual(ns['spam'], b'ham')
679
680 def test_shared_overwrites_default_vars(self):
681 r, w = os.pipe()
682
683 shared = {'__name__': b'not __main__'}
684 script = dedent(f"""
685 spam = 42
686
687 ns = dict(vars())
688 del ns['__builtins__']
689 import pickle
690 with open({w}, 'wb') as chan:
691 pickle.dump(ns, chan)
692 """)
693 interpreters.run_string(self.id, script, shared)
694 with open(r, 'rb') as chan:
695 ns = pickle.load(chan)
696
697 self.assertEqual(ns['__name__'], b'not __main__')
698
699 def test_main_reused(self):
700 r, w = os.pipe()
701 interpreters.run_string(self.id, dedent(f"""
702 spam = True
703
704 ns = dict(vars())
705 del ns['__builtins__']
706 import pickle
707 with open({w}, 'wb') as chan:
708 pickle.dump(ns, chan)
709 del ns, pickle, chan
710 """))
711 with open(r, 'rb') as chan:
712 ns1 = pickle.load(chan)
713
714 r, w = os.pipe()
715 interpreters.run_string(self.id, dedent(f"""
716 eggs = False
717
718 ns = dict(vars())
719 del ns['__builtins__']
720 import pickle
721 with open({w}, 'wb') as chan:
722 pickle.dump(ns, chan)
723 """))
724 with open(r, 'rb') as chan:
725 ns2 = pickle.load(chan)
726
727 self.assertIn('spam', ns1)
728 self.assertNotIn('eggs', ns1)
729 self.assertIn('eggs', ns2)
730 self.assertIn('spam', ns2)
731
732 def test_execution_namespace_is_main(self):
733 r, w = os.pipe()
734
735 script = dedent(f"""
736 spam = 42
737
738 ns = dict(vars())
739 ns['__builtins__'] = str(ns['__builtins__'])
740 import pickle
741 with open({w}, 'wb') as chan:
742 pickle.dump(ns, chan)
743 """)
744 interpreters.run_string(self.id, script)
745 with open(r, 'rb') as chan:
746 ns = pickle.load(chan)
747
748 ns.pop('__builtins__')
749 ns.pop('__loader__')
750 self.assertEqual(ns, {
751 '__name__': '__main__',
752 '__annotations__': {},
753 '__doc__': None,
754 '__package__': None,
755 '__spec__': None,
756 'spam': 42,
757 })
758
Miss Islington (bot)3db05a32018-02-16 18:15:24 -0800759 # XXX Fix this test!
760 @unittest.skip('blocking forever')
Eric Snow7f8bfc92018-01-29 18:23:44 -0700761 def test_still_running_at_exit(self):
762 script = dedent(f"""
763 from textwrap import dedent
764 import threading
765 import _xxsubinterpreters as _interpreters
Miss Islington (bot)3db05a32018-02-16 18:15:24 -0800766 id = _interpreters.create()
Eric Snow7f8bfc92018-01-29 18:23:44 -0700767 def f():
768 _interpreters.run_string(id, dedent('''
769 import time
770 # Give plenty of time for the main interpreter to finish.
771 time.sleep(1_000_000)
772 '''))
773
774 t = threading.Thread(target=f)
775 t.start()
776 """)
777 with support.temp_dir() as dirname:
778 filename = script_helper.make_script(dirname, 'interp', script)
779 with script_helper.spawn_python(filename) as proc:
780 retcode = proc.wait()
781
782 self.assertEqual(retcode, 0)
783
784
785class ChannelIDTests(TestBase):
786
787 def test_default_kwargs(self):
788 cid = interpreters._channel_id(10, force=True)
789
790 self.assertEqual(int(cid), 10)
791 self.assertEqual(cid.end, 'both')
792
793 def test_with_kwargs(self):
794 cid = interpreters._channel_id(10, send=True, force=True)
795 self.assertEqual(cid.end, 'send')
796
797 cid = interpreters._channel_id(10, send=True, recv=False, force=True)
798 self.assertEqual(cid.end, 'send')
799
800 cid = interpreters._channel_id(10, recv=True, force=True)
801 self.assertEqual(cid.end, 'recv')
802
803 cid = interpreters._channel_id(10, recv=True, send=False, force=True)
804 self.assertEqual(cid.end, 'recv')
805
806 cid = interpreters._channel_id(10, send=True, recv=True, force=True)
807 self.assertEqual(cid.end, 'both')
808
809 def test_coerce_id(self):
810 cid = interpreters._channel_id('10', force=True)
811 self.assertEqual(int(cid), 10)
812
813 cid = interpreters._channel_id(10.0, force=True)
814 self.assertEqual(int(cid), 10)
815
816 class Int(str):
817 def __init__(self, value):
818 self._value = value
819 def __int__(self):
820 return self._value
821
822 cid = interpreters._channel_id(Int(10), force=True)
823 self.assertEqual(int(cid), 10)
824
825 def test_bad_id(self):
Miss Islington (bot)f33eced2018-02-02 21:38:57 -0800826 for cid in [-1, 'spam']:
Eric Snow7f8bfc92018-01-29 18:23:44 -0700827 with self.subTest(cid):
828 with self.assertRaises(ValueError):
829 interpreters._channel_id(cid)
Miss Islington (bot)f33eced2018-02-02 21:38:57 -0800830 with self.assertRaises(OverflowError):
831 interpreters._channel_id(2**64)
Eric Snow7f8bfc92018-01-29 18:23:44 -0700832 with self.assertRaises(TypeError):
833 interpreters._channel_id(object())
834
835 def test_bad_kwargs(self):
836 with self.assertRaises(ValueError):
837 interpreters._channel_id(10, send=False, recv=False)
838
839 def test_does_not_exist(self):
840 cid = interpreters.channel_create()
841 with self.assertRaises(interpreters.ChannelNotFoundError):
842 interpreters._channel_id(int(cid) + 1) # unforced
843
844 def test_repr(self):
845 cid = interpreters._channel_id(10, force=True)
846 self.assertEqual(repr(cid), 'ChannelID(10)')
847
848 cid = interpreters._channel_id(10, send=True, force=True)
849 self.assertEqual(repr(cid), 'ChannelID(10, send=True)')
850
851 cid = interpreters._channel_id(10, recv=True, force=True)
852 self.assertEqual(repr(cid), 'ChannelID(10, recv=True)')
853
854 cid = interpreters._channel_id(10, send=True, recv=True, force=True)
855 self.assertEqual(repr(cid), 'ChannelID(10)')
856
857 def test_equality(self):
858 cid1 = interpreters.channel_create()
859 cid2 = interpreters._channel_id(int(cid1))
860 cid3 = interpreters.channel_create()
861
862 self.assertTrue(cid1 == cid1)
863 self.assertTrue(cid1 == cid2)
864 self.assertTrue(cid1 == int(cid1))
865 self.assertFalse(cid1 == cid3)
866
867 self.assertFalse(cid1 != cid1)
868 self.assertFalse(cid1 != cid2)
869 self.assertTrue(cid1 != cid3)
870
871
872class ChannelTests(TestBase):
873
874 def test_sequential_ids(self):
875 before = interpreters.channel_list_all()
876 id1 = interpreters.channel_create()
877 id2 = interpreters.channel_create()
878 id3 = interpreters.channel_create()
879 after = interpreters.channel_list_all()
880
881 self.assertEqual(id2, int(id1) + 1)
882 self.assertEqual(id3, int(id2) + 1)
883 self.assertEqual(set(after) - set(before), {id1, id2, id3})
884
885 def test_ids_global(self):
886 id1 = interpreters.create()
887 out = _run_output(id1, dedent("""
888 import _xxsubinterpreters as _interpreters
889 cid = _interpreters.channel_create()
890 print(int(cid))
891 """))
892 cid1 = int(out.strip())
893
894 id2 = interpreters.create()
895 out = _run_output(id2, dedent("""
896 import _xxsubinterpreters as _interpreters
897 cid = _interpreters.channel_create()
898 print(int(cid))
899 """))
900 cid2 = int(out.strip())
901
902 self.assertEqual(cid2, int(cid1) + 1)
903
904 ####################
905
906 def test_drop_single_user(self):
907 cid = interpreters.channel_create()
908 interpreters.channel_send(cid, b'spam')
909 interpreters.channel_recv(cid)
910 interpreters.channel_drop_interpreter(cid, send=True, recv=True)
911
912 with self.assertRaises(interpreters.ChannelClosedError):
913 interpreters.channel_send(cid, b'eggs')
914 with self.assertRaises(interpreters.ChannelClosedError):
915 interpreters.channel_recv(cid)
916
917 def test_drop_multiple_users(self):
918 cid = interpreters.channel_create()
919 id1 = interpreters.create()
920 id2 = interpreters.create()
921 interpreters.run_string(id1, dedent(f"""
922 import _xxsubinterpreters as _interpreters
923 _interpreters.channel_send({int(cid)}, b'spam')
924 """))
925 out = _run_output(id2, dedent(f"""
926 import _xxsubinterpreters as _interpreters
927 obj = _interpreters.channel_recv({int(cid)})
928 _interpreters.channel_drop_interpreter({int(cid)})
929 print(repr(obj))
930 """))
931 interpreters.run_string(id1, dedent(f"""
932 _interpreters.channel_drop_interpreter({int(cid)})
933 """))
934
935 self.assertEqual(out.strip(), "b'spam'")
936
937 def test_drop_no_kwargs(self):
938 cid = interpreters.channel_create()
939 interpreters.channel_send(cid, b'spam')
940 interpreters.channel_recv(cid)
941 interpreters.channel_drop_interpreter(cid)
942
943 with self.assertRaises(interpreters.ChannelClosedError):
944 interpreters.channel_send(cid, b'eggs')
945 with self.assertRaises(interpreters.ChannelClosedError):
946 interpreters.channel_recv(cid)
947
948 def test_drop_multiple_times(self):
949 cid = interpreters.channel_create()
950 interpreters.channel_send(cid, b'spam')
951 interpreters.channel_recv(cid)
952 interpreters.channel_drop_interpreter(cid, send=True, recv=True)
953
954 with self.assertRaises(interpreters.ChannelClosedError):
955 interpreters.channel_drop_interpreter(cid, send=True, recv=True)
956
957 def test_drop_with_unused_items(self):
958 cid = interpreters.channel_create()
959 interpreters.channel_send(cid, b'spam')
960 interpreters.channel_send(cid, b'ham')
961 interpreters.channel_drop_interpreter(cid, send=True, recv=True)
962
963 with self.assertRaises(interpreters.ChannelClosedError):
964 interpreters.channel_recv(cid)
965
966 def test_drop_never_used(self):
967 cid = interpreters.channel_create()
968 interpreters.channel_drop_interpreter(cid)
969
970 with self.assertRaises(interpreters.ChannelClosedError):
971 interpreters.channel_send(cid, b'spam')
972 with self.assertRaises(interpreters.ChannelClosedError):
973 interpreters.channel_recv(cid)
974
975 def test_drop_by_unassociated_interp(self):
976 cid = interpreters.channel_create()
977 interpreters.channel_send(cid, b'spam')
978 interp = interpreters.create()
979 interpreters.run_string(interp, dedent(f"""
980 import _xxsubinterpreters as _interpreters
981 _interpreters.channel_drop_interpreter({int(cid)})
982 """))
983 obj = interpreters.channel_recv(cid)
984 interpreters.channel_drop_interpreter(cid)
985
986 with self.assertRaises(interpreters.ChannelClosedError):
987 interpreters.channel_send(cid, b'eggs')
988 self.assertEqual(obj, b'spam')
989
990 def test_drop_close_if_unassociated(self):
991 cid = interpreters.channel_create()
992 interp = interpreters.create()
993 interpreters.run_string(interp, dedent(f"""
994 import _xxsubinterpreters as _interpreters
995 obj = _interpreters.channel_send({int(cid)}, b'spam')
996 _interpreters.channel_drop_interpreter({int(cid)})
997 """))
998
999 with self.assertRaises(interpreters.ChannelClosedError):
1000 interpreters.channel_recv(cid)
1001
1002 def test_drop_partially(self):
1003 # XXX Is partial close too wierd/confusing?
1004 cid = interpreters.channel_create()
1005 interpreters.channel_send(cid, None)
1006 interpreters.channel_recv(cid)
1007 interpreters.channel_send(cid, b'spam')
1008 interpreters.channel_drop_interpreter(cid, send=True)
1009 obj = interpreters.channel_recv(cid)
1010
1011 self.assertEqual(obj, b'spam')
1012
1013 def test_drop_used_multiple_times_by_single_user(self):
1014 cid = interpreters.channel_create()
1015 interpreters.channel_send(cid, b'spam')
1016 interpreters.channel_send(cid, b'spam')
1017 interpreters.channel_send(cid, b'spam')
1018 interpreters.channel_recv(cid)
1019 interpreters.channel_drop_interpreter(cid, send=True, recv=True)
1020
1021 with self.assertRaises(interpreters.ChannelClosedError):
1022 interpreters.channel_send(cid, b'eggs')
1023 with self.assertRaises(interpreters.ChannelClosedError):
1024 interpreters.channel_recv(cid)
1025
1026 ####################
1027
1028 def test_close_single_user(self):
1029 cid = interpreters.channel_create()
1030 interpreters.channel_send(cid, b'spam')
1031 interpreters.channel_recv(cid)
1032 interpreters.channel_close(cid)
1033
1034 with self.assertRaises(interpreters.ChannelClosedError):
1035 interpreters.channel_send(cid, b'eggs')
1036 with self.assertRaises(interpreters.ChannelClosedError):
1037 interpreters.channel_recv(cid)
1038
1039 def test_close_multiple_users(self):
1040 cid = interpreters.channel_create()
1041 id1 = interpreters.create()
1042 id2 = interpreters.create()
1043 interpreters.run_string(id1, dedent(f"""
1044 import _xxsubinterpreters as _interpreters
1045 _interpreters.channel_send({int(cid)}, b'spam')
1046 """))
1047 interpreters.run_string(id2, dedent(f"""
1048 import _xxsubinterpreters as _interpreters
1049 _interpreters.channel_recv({int(cid)})
1050 """))
1051 interpreters.channel_close(cid)
1052 with self.assertRaises(interpreters.RunFailedError) as cm:
1053 interpreters.run_string(id1, dedent(f"""
1054 _interpreters.channel_send({int(cid)}, b'spam')
1055 """))
1056 self.assertIn('ChannelClosedError', str(cm.exception))
1057 with self.assertRaises(interpreters.RunFailedError) as cm:
1058 interpreters.run_string(id2, dedent(f"""
1059 _interpreters.channel_send({int(cid)}, b'spam')
1060 """))
1061 self.assertIn('ChannelClosedError', str(cm.exception))
1062
1063 def test_close_multiple_times(self):
1064 cid = interpreters.channel_create()
1065 interpreters.channel_send(cid, b'spam')
1066 interpreters.channel_recv(cid)
1067 interpreters.channel_close(cid)
1068
1069 with self.assertRaises(interpreters.ChannelClosedError):
1070 interpreters.channel_close(cid)
1071
1072 def test_close_with_unused_items(self):
1073 cid = interpreters.channel_create()
1074 interpreters.channel_send(cid, b'spam')
1075 interpreters.channel_send(cid, b'ham')
1076 interpreters.channel_close(cid)
1077
1078 with self.assertRaises(interpreters.ChannelClosedError):
1079 interpreters.channel_recv(cid)
1080
1081 def test_close_never_used(self):
1082 cid = interpreters.channel_create()
1083 interpreters.channel_close(cid)
1084
1085 with self.assertRaises(interpreters.ChannelClosedError):
1086 interpreters.channel_send(cid, b'spam')
1087 with self.assertRaises(interpreters.ChannelClosedError):
1088 interpreters.channel_recv(cid)
1089
1090 def test_close_by_unassociated_interp(self):
1091 cid = interpreters.channel_create()
1092 interpreters.channel_send(cid, b'spam')
1093 interp = interpreters.create()
1094 interpreters.run_string(interp, dedent(f"""
1095 import _xxsubinterpreters as _interpreters
1096 _interpreters.channel_close({int(cid)})
1097 """))
1098 with self.assertRaises(interpreters.ChannelClosedError):
1099 interpreters.channel_recv(cid)
1100 with self.assertRaises(interpreters.ChannelClosedError):
1101 interpreters.channel_close(cid)
1102
1103 def test_close_used_multiple_times_by_single_user(self):
1104 cid = interpreters.channel_create()
1105 interpreters.channel_send(cid, b'spam')
1106 interpreters.channel_send(cid, b'spam')
1107 interpreters.channel_send(cid, b'spam')
1108 interpreters.channel_recv(cid)
1109 interpreters.channel_close(cid)
1110
1111 with self.assertRaises(interpreters.ChannelClosedError):
1112 interpreters.channel_send(cid, b'eggs')
1113 with self.assertRaises(interpreters.ChannelClosedError):
1114 interpreters.channel_recv(cid)
1115
1116 ####################
1117
1118 def test_send_recv_main(self):
1119 cid = interpreters.channel_create()
1120 orig = b'spam'
1121 interpreters.channel_send(cid, orig)
1122 obj = interpreters.channel_recv(cid)
1123
1124 self.assertEqual(obj, orig)
1125 self.assertIsNot(obj, orig)
1126
1127 def test_send_recv_same_interpreter(self):
1128 id1 = interpreters.create()
1129 out = _run_output(id1, dedent("""
1130 import _xxsubinterpreters as _interpreters
1131 cid = _interpreters.channel_create()
1132 orig = b'spam'
1133 _interpreters.channel_send(cid, orig)
1134 obj = _interpreters.channel_recv(cid)
1135 assert obj is not orig
1136 assert obj == orig
1137 """))
1138
1139 def test_send_recv_different_interpreters(self):
1140 cid = interpreters.channel_create()
1141 id1 = interpreters.create()
1142 out = _run_output(id1, dedent(f"""
1143 import _xxsubinterpreters as _interpreters
1144 _interpreters.channel_send({int(cid)}, b'spam')
1145 """))
1146 obj = interpreters.channel_recv(cid)
1147
1148 self.assertEqual(obj, b'spam')
1149
1150 def test_send_not_found(self):
1151 with self.assertRaises(interpreters.ChannelNotFoundError):
1152 interpreters.channel_send(10, b'spam')
1153
1154 def test_recv_not_found(self):
1155 with self.assertRaises(interpreters.ChannelNotFoundError):
1156 interpreters.channel_recv(10)
1157
1158 def test_recv_empty(self):
1159 cid = interpreters.channel_create()
1160 with self.assertRaises(interpreters.ChannelEmptyError):
1161 interpreters.channel_recv(cid)
1162
1163 def test_run_string_arg(self):
1164 cid = interpreters.channel_create()
1165 interp = interpreters.create()
1166
1167 out = _run_output(interp, dedent("""
1168 import _xxsubinterpreters as _interpreters
1169 print(cid.end)
1170 _interpreters.channel_send(cid, b'spam')
1171 """),
1172 dict(cid=cid.send))
1173 obj = interpreters.channel_recv(cid)
1174
1175 self.assertEqual(obj, b'spam')
1176 self.assertEqual(out.strip(), 'send')
1177
1178
1179if __name__ == '__main__':
1180 unittest.main()