bpo-30773: Fix ag_running; prohibit running athrow/asend/aclose in parallel (GH-7468) (#16486)

(cherry picked from commit fc4a044a3c54ce21e9ed150f7d769fb479d34c49)

Co-authored-by: Yury Selivanov <yury@magic.io>
diff --git a/Objects/genobject.c b/Objects/genobject.c
index f1e9fec..6285219 100644
--- a/Objects/genobject.c
+++ b/Objects/genobject.c
@@ -1342,7 +1342,8 @@
 
 static PyMemberDef async_gen_memberlist[] = {
     {"ag_frame",   T_OBJECT, offsetof(PyAsyncGenObject, ag_frame),   READONLY},
-    {"ag_running", T_BOOL,   offsetof(PyAsyncGenObject, ag_running), READONLY},
+    {"ag_running", T_BOOL,   offsetof(PyAsyncGenObject, ag_running_async),
+        READONLY},
     {"ag_code",    T_OBJECT, offsetof(PyAsyncGenObject, ag_code),    READONLY},
     {NULL}      /* Sentinel */
 };
@@ -1436,6 +1437,7 @@
     o->ag_finalizer = NULL;
     o->ag_closed = 0;
     o->ag_hooks_inited = 0;
+    o->ag_running_async = 0;
     return (PyObject*)o;
 }
 
@@ -1483,6 +1485,7 @@
             gen->ag_closed = 1;
         }
 
+        gen->ag_running_async = 0;
         return NULL;
     }
 
@@ -1490,6 +1493,7 @@
         /* async yield */
         _PyGen_SetStopIterationValue(((_PyAsyncGenWrappedValue*)result)->agw_val);
         Py_DECREF(result);
+        gen->ag_running_async = 0;
         return NULL;
     }
 
@@ -1534,12 +1538,20 @@
     }
 
     if (o->ags_state == AWAITABLE_STATE_INIT) {
+        if (o->ags_gen->ag_running_async) {
+            PyErr_SetString(
+                PyExc_RuntimeError,
+                "anext(): asynchronous generator is already running");
+            return NULL;
+        }
+
         if (arg == NULL || arg == Py_None) {
             arg = o->ags_sendval;
         }
         o->ags_state = AWAITABLE_STATE_ITER;
     }
 
+    o->ags_gen->ag_running_async = 1;
     result = gen_send_ex((PyGenObject*)o->ags_gen, arg, 0, 0);
     result = async_gen_unwrap_value(o->ags_gen, result);
 
@@ -1803,8 +1815,23 @@
     }
 
     if (o->agt_state == AWAITABLE_STATE_INIT) {
+        if (o->agt_gen->ag_running_async) {
+            if (o->agt_args == NULL) {
+                PyErr_SetString(
+                    PyExc_RuntimeError,
+                    "aclose(): asynchronous generator is already running");
+            }
+            else {
+                PyErr_SetString(
+                    PyExc_RuntimeError,
+                    "athrow(): asynchronous generator is already running");
+            }
+            return NULL;
+        }
+
         if (o->agt_gen->ag_closed) {
-            PyErr_SetNone(PyExc_StopIteration);
+            o->agt_state = AWAITABLE_STATE_CLOSED;
+            PyErr_SetNone(PyExc_StopAsyncIteration);
             return NULL;
         }
 
@@ -1814,6 +1841,7 @@
         }
 
         o->agt_state = AWAITABLE_STATE_ITER;
+        o->agt_gen->ag_running_async = 1;
 
         if (o->agt_args == NULL) {
             /* aclose() mode */
@@ -1859,6 +1887,7 @@
         /* aclose() mode */
         if (retval) {
             if (_PyAsyncGenWrappedValue_CheckExact(retval)) {
+                o->agt_gen->ag_running_async = 0;
                 Py_DECREF(retval);
                 goto yield_close;
             }
@@ -1872,11 +1901,13 @@
     }
 
 yield_close:
+    o->agt_gen->ag_running_async = 0;
     PyErr_SetString(
         PyExc_RuntimeError, ASYNC_GEN_IGNORED_EXIT_MSG);
     return NULL;
 
 check_error:
+    o->agt_gen->ag_running_async = 0;
     if (PyErr_ExceptionMatches(PyExc_StopAsyncIteration) ||
             PyErr_ExceptionMatches(PyExc_GeneratorExit))
     {
@@ -1911,6 +1942,7 @@
     } else {
         /* aclose() mode */
         if (retval && _PyAsyncGenWrappedValue_CheckExact(retval)) {
+            o->agt_gen->ag_running_async = 0;
             Py_DECREF(retval);
             PyErr_SetString(PyExc_RuntimeError, ASYNC_GEN_IGNORED_EXIT_MSG);
             return NULL;