bpo-34616: Add PyCF_ALLOW_TOP_LEVEL_AWAIT to allow top-level await (GH-13148)

Co-Authored-By: Yury Selivanov <yury@magic.io>
diff --git a/Lib/test/test_builtin.py b/Lib/test/test_builtin.py
index 5674ea8..4a358e8 100644
--- a/Lib/test/test_builtin.py
+++ b/Lib/test/test_builtin.py
@@ -1,6 +1,7 @@
 # Python test set -- built-in functions
 
 import ast
+import asyncio
 import builtins
 import collections
 import decimal
@@ -18,9 +19,14 @@
 import unittest
 import warnings
 from contextlib import ExitStack
+from inspect import CO_COROUTINE
+from itertools import product
+from textwrap import dedent
+from types import AsyncGeneratorType, FunctionType
 from operator import neg
 from test.support import (
-    EnvironmentVarGuard, TESTFN, check_warnings, swap_attr, unlink)
+    EnvironmentVarGuard, TESTFN, check_warnings, swap_attr, unlink,
+    maybe_get_event_loop_policy)
 from test.support.script_helper import assert_python_ok
 from unittest.mock import MagicMock, patch
 try:
@@ -358,6 +364,71 @@
                 rv = ns['f']()
                 self.assertEqual(rv, tuple(expected))
 
+    def test_compile_top_level_await(self):
+        """Test whether code some top level await can be compiled.
+
+        Make sure it compiles only with the PyCF_ALLOW_TOP_LEVEL_AWAIT flag set,
+        and make sure the generated code object has the CO_COROUTINE flag set in
+        order to execute it with  `await eval(.....)` instead of exec, or via a
+        FunctionType.
+        """
+
+        # helper function just to check we can run top=level async-for
+        async def arange(n):
+            for i in range(n):
+                yield i
+
+        modes = ('single', 'exec')
+        code_samples = ['''a = await asyncio.sleep(0, result=1)''',
+        '''async for i in arange(1):
+               a = 1''',
+        '''async with asyncio.Lock() as l:
+               a = 1''']
+        policy = maybe_get_event_loop_policy()
+        try:
+            for mode, code_sample in product(modes,code_samples):
+                source = dedent(code_sample)
+                with self.assertRaises(SyntaxError, msg=f"{source=} {mode=}"):
+                    compile(source, '?' , mode)
+
+                co = compile(source,
+                             '?',
+                             mode,
+                             flags=ast.PyCF_ALLOW_TOP_LEVEL_AWAIT)
+
+                self.assertEqual(co.co_flags & CO_COROUTINE, CO_COROUTINE,
+                                 msg=f"{source=} {mode=}")
+
+
+                # test we can create and  advance a function type
+                globals_ = {'asyncio': asyncio, 'a':0, 'arange': arange}
+                async_f = FunctionType(co, globals_)
+                asyncio.run(async_f())
+                self.assertEqual(globals_['a'], 1)
+
+                # test we can await-eval,
+                globals_ = {'asyncio': asyncio, 'a':0, 'arange': arange}
+                asyncio.run(eval(co, globals_))
+                self.assertEqual(globals_['a'], 1)
+        finally:
+            asyncio.set_event_loop_policy(policy)
+
+    def test_compile_async_generator(self):
+        """
+        With the PyCF_ALLOW_TOP_LEVEL_AWAIT flag added in 3.8, we want to
+        make sure AsyncGenerators are still properly not marked with CO_COROUTINE
+        """
+        code = dedent("""async def ticker():
+                for i in range(10):
+                    yield i
+                    await asyncio.sleep(0)""")
+
+        co = compile(code, '?', 'exec', flags=ast.PyCF_ALLOW_TOP_LEVEL_AWAIT)
+        glob = {}
+        exec(co, glob)
+        self.assertEqual(type(glob['ticker']()), AsyncGeneratorType)
+
+
     def test_delattr(self):
         sys.spam = 1
         delattr(sys, 'spam')