bpo-33041: Rework compiling an "async for" loop. (#6142)

* Added new opcode END_ASYNC_FOR.
* Setting global StopAsyncIteration no longer breaks "async for" loops.
* Jumping into an "async for" loop is now disabled.
* Jumping out of an "async for" loop no longer corrupts the stack.
* Simplify the compiler.
diff --git a/Lib/importlib/_bootstrap_external.py b/Lib/importlib/_bootstrap_external.py
index 27aaf55..420ecc8 100644
--- a/Lib/importlib/_bootstrap_external.py
+++ b/Lib/importlib/_bootstrap_external.py
@@ -247,6 +247,7 @@
 #     Python 3.7a4  3392 (PEP 552: Deterministic pycs #31650)
 #     Python 3.7b1  3393 (remove STORE_ANNOTATION opcode #32550)
 #     Python 3.8a1  3400 (move frame block handling to compiler #17611)
+#     Python 3.8a1  3401 (add END_ASYNC_FOR #33041)
 #
 # MAGIC must change whenever the bytecode emitted by the compiler may no
 # longer be understood by older implementations of the eval loop (usually
@@ -255,7 +256,7 @@
 # Whenever MAGIC_NUMBER is changed, the ranges in the magic_values array
 # in PC/launcher.c must also be updated.
 
-MAGIC_NUMBER = (3400).to_bytes(2, 'little') + b'\r\n'
+MAGIC_NUMBER = (3401).to_bytes(2, 'little') + b'\r\n'
 _RAW_MAGIC_NUMBER = int.from_bytes(MAGIC_NUMBER, 'little')  # For import.c
 
 _PYCACHE = '__pycache__'
diff --git a/Lib/opcode.py b/Lib/opcode.py
index 6de24c3..3fb716b 100644
--- a/Lib/opcode.py
+++ b/Lib/opcode.py
@@ -88,7 +88,7 @@
 def_op('GET_ANEXT', 51)
 def_op('BEFORE_ASYNC_WITH', 52)
 def_op('BEGIN_FINALLY', 53)
-
+def_op('END_ASYNC_FOR', 54)
 def_op('INPLACE_ADD', 55)
 def_op('INPLACE_SUBTRACT', 56)
 def_op('INPLACE_MULTIPLY', 57)
diff --git a/Lib/test/test_coroutines.py b/Lib/test/test_coroutines.py
index f4a9d2a..fe26199 100644
--- a/Lib/test/test_coroutines.py
+++ b/Lib/test/test_coroutines.py
@@ -1846,6 +1846,36 @@
             run_async(run_gen()),
             ([], [121]))
 
+    def test_comp_4_2(self):
+        async def f(it):
+            for i in it:
+                yield i
+
+        async def run_list():
+            return [i + 10 async for i in f(range(5)) if 0 < i < 4]
+        self.assertEqual(
+            run_async(run_list()),
+            ([], [11, 12, 13]))
+
+        async def run_set():
+            return {i + 10 async for i in f(range(5)) if 0 < i < 4}
+        self.assertEqual(
+            run_async(run_set()),
+            ([], {11, 12, 13}))
+
+        async def run_dict():
+            return {i + 10: i + 100 async for i in f(range(5)) if 0 < i < 4}
+        self.assertEqual(
+            run_async(run_dict()),
+            ([], {11: 101, 12: 102, 13: 103}))
+
+        async def run_gen():
+            gen = (i + 10 async for i in f(range(5)) if 0 < i < 4)
+            return [g + 100 async for g in gen]
+        self.assertEqual(
+            run_async(run_gen()),
+            ([], [111, 112, 113]))
+
     def test_comp_5(self):
         async def f(it):
             for i in it:
diff --git a/Lib/test/test_dis.py b/Lib/test/test_dis.py
index 098367c..c86f61f 100644
--- a/Lib/test/test_dis.py
+++ b/Lib/test/test_dis.py
@@ -747,8 +747,7 @@
    1: 1
 Names:
    0: b
-   1: StopAsyncIteration
-   2: c
+   1: c
 Variable names:
    0: a
    1: d"""
diff --git a/Lib/test/test_sys_settrace.py b/Lib/test/test_sys_settrace.py
index 2587794..1fa43b2 100644
--- a/Lib/test/test_sys_settrace.py
+++ b/Lib/test/test_sys_settrace.py
@@ -33,6 +33,10 @@
     async def __aexit__(self, *exc_info):
         self.output.append(-self.value)
 
+async def asynciter(iterable):
+    """Convert an iterable to an asynchronous iterator."""
+    for x in iterable:
+        yield x
 
 
 # A very basic example.  If this fails, we're in deep trouble.
@@ -720,6 +724,23 @@
             output.append(6)
         output.append(7)
 
+    @async_jump_test(4, 5, [3, 5])
+    async def test_jump_out_of_async_for_block_forwards(output):
+        for i in [1]:
+            async for i in asynciter([1, 2]):
+                output.append(3)
+                output.append(4)
+            output.append(5)
+
+    @async_jump_test(5, 2, [2, 4, 2, 4, 5, 6])
+    async def test_jump_out_of_async_for_block_backwards(output):
+        for i in [1]:
+            output.append(2)
+            async for i in asynciter([1]):
+                output.append(4)
+                output.append(5)
+            output.append(6)
+
     @jump_test(1, 2, [3])
     def test_jump_to_codeless_line(output):
         output.append(1)
@@ -1030,6 +1051,17 @@
             output.append(7)
         output.append(8)
 
+    @async_jump_test(1, 7, [7, 8])
+    async def test_jump_over_async_for_block_before_else(output):
+        output.append(1)
+        if not output:  # always false
+            async for i in asynciter([3]):
+                output.append(4)
+        else:
+            output.append(6)
+            output.append(7)
+        output.append(8)
+
     # The second set of 'jump' tests are for things that are not allowed:
 
     @jump_test(2, 3, [1], (ValueError, 'after'))
@@ -1081,12 +1113,24 @@
         for i in 1, 2:
             output.append(3)
 
+    @async_jump_test(1, 3, [], (ValueError, 'into'))
+    async def test_no_jump_forwards_into_async_for_block(output):
+        output.append(1)
+        async for i in asynciter([1, 2]):
+            output.append(3)
+
     @jump_test(3, 2, [2, 2], (ValueError, 'into'))
     def test_no_jump_backwards_into_for_block(output):
         for i in 1, 2:
             output.append(2)
         output.append(3)
 
+    @async_jump_test(3, 2, [2, 2], (ValueError, 'into'))
+    async def test_no_jump_backwards_into_async_for_block(output):
+        async for i in asynciter([1, 2]):
+            output.append(2)
+        output.append(3)
+
     @jump_test(1, 3, [], (ValueError, 'into'))
     def test_no_jump_forwards_into_with_block(output):
         output.append(1)
@@ -1220,6 +1264,17 @@
             output.append(7)
         output.append(8)
 
+    @async_jump_test(7, 4, [1, 6], (ValueError, 'into'))
+    async def test_no_jump_into_async_for_block_before_else(output):
+        output.append(1)
+        if not output:  # always false
+            async for i in asynciter([3]):
+                output.append(4)
+        else:
+            output.append(6)
+            output.append(7)
+        output.append(8)
+
     def test_no_jump_to_non_integers(self):
         self.run_test(no_jump_to_non_integers, 2, "Spam", [True])