bpo-32604: Fix memory leaks in the new _xxsubinterpreters module. (GH-5507)
(cherry picked from commit 4e9da0d163731caa79811c723c703ee416c31826)
Co-authored-by: Eric Snow <ericsnowcurrently@gmail.com>
diff --git a/Lib/test/test__xxsubinterpreters.py b/Lib/test/test__xxsubinterpreters.py
index 2b17044..8d72ca2 100644
--- a/Lib/test/test__xxsubinterpreters.py
+++ b/Lib/test/test__xxsubinterpreters.py
@@ -362,13 +362,15 @@
def test_from_current(self):
main, = interpreters.list_all()
id = interpreters.create()
- script = dedent("""
+ script = dedent(f"""
import _xxsubinterpreters as _interpreters
- _interpreters.destroy({})
- """).format(id)
+ try:
+ _interpreters.destroy({id})
+ except RuntimeError:
+ pass
+ """)
- with self.assertRaises(RuntimeError):
- interpreters.run_string(id, script)
+ interpreters.run_string(id, script)
self.assertEqual(set(interpreters.list_all()), {main, id})
def test_from_sibling(self):
@@ -761,12 +763,12 @@
self.assertEqual(int(cid), 10)
def test_bad_id(self):
- ids = [-1, 2**64, "spam"]
- for cid in ids:
+ for cid in [-1, 'spam']:
with self.subTest(cid):
with self.assertRaises(ValueError):
interpreters._channel_id(cid)
-
+ with self.assertRaises(OverflowError):
+ interpreters._channel_id(2**64)
with self.assertRaises(TypeError):
interpreters._channel_id(object())
diff --git a/Modules/_xxsubinterpretersmodule.c b/Modules/_xxsubinterpretersmodule.c
index d2b5f26..7829b4c 100644
--- a/Modules/_xxsubinterpretersmodule.c
+++ b/Modules/_xxsubinterpretersmodule.c
@@ -7,6 +7,22 @@
#include "internal/pystate.h"
+static char *
+_copy_raw_string(PyObject *strobj)
+{
+ const char *str = PyUnicode_AsUTF8(strobj);
+ if (str == NULL) {
+ return NULL;
+ }
+ char *copied = PyMem_Malloc(strlen(str)+1);
+ if (str == NULL) {
+ PyErr_NoMemory();
+ return NULL;
+ }
+ strcpy(copied, str);
+ return copied;
+}
+
static PyInterpreterState *
_get_current(void)
{
@@ -31,10 +47,13 @@
}
return -1;
}
- long long cid = PyLong_AsLongLong(id);
+ int64_t cid = PyLong_AsLongLong(id);
+ Py_DECREF(id);
if (cid == -1 && PyErr_Occurred() != NULL) {
- PyErr_SetString(PyExc_ValueError,
- "'id' must be a non-negative int");
+ if (!PyErr_ExceptionMatches(PyExc_OverflowError)) {
+ PyErr_SetString(PyExc_ValueError,
+ "'id' must be a non-negative int");
+ }
return -1;
}
if (cid < 0) {
@@ -42,92 +61,131 @@
"'id' must be a non-negative int");
return -1;
}
- if (cid > INT64_MAX) {
- PyErr_SetString(PyExc_ValueError,
- "'id' too large (must be 64-bit int)");
- return -1;
- }
return cid;
}
+
/* data-sharing-specific code ***********************************************/
-typedef struct _shareditem {
- Py_UNICODE *name;
- Py_ssize_t namelen;
+struct _sharednsitem {
+ char *name;
_PyCrossInterpreterData data;
-} _shareditem;
+};
-void
-_sharedns_clear(_shareditem *shared)
+static int
+_sharednsitem_init(struct _sharednsitem *item, PyObject *key, PyObject *value)
{
- for (_shareditem *item=shared; item->name != NULL; item += 1) {
- _PyCrossInterpreterData_Release(&item->data);
+ item->name = _copy_raw_string(key);
+ if (item->name == NULL) {
+ return -1;
}
+ if (_PyObject_GetCrossInterpreterData(value, &item->data) != 0) {
+ return -1;
+ }
+ return 0;
}
-static _shareditem *
-_get_shared_ns(PyObject *shareable, Py_ssize_t *lenp)
+static void
+_sharednsitem_clear(struct _sharednsitem *item)
+{
+ if (item->name != NULL) {
+ PyMem_Free(item->name);
+ }
+ _PyCrossInterpreterData_Release(&item->data);
+}
+
+static int
+_sharednsitem_apply(struct _sharednsitem *item, PyObject *ns)
+{
+ PyObject *name = PyUnicode_FromString(item->name);
+ if (name == NULL) {
+ return -1;
+ }
+ PyObject *value = _PyCrossInterpreterData_NewObject(&item->data);
+ if (value == NULL) {
+ Py_DECREF(name);
+ return -1;
+ }
+ int res = PyDict_SetItem(ns, name, value);
+ Py_DECREF(name);
+ Py_DECREF(value);
+ return res;
+}
+
+typedef struct _sharedns {
+ Py_ssize_t len;
+ struct _sharednsitem* items;
+} _sharedns;
+
+static _sharedns *
+_sharedns_new(Py_ssize_t len)
+{
+ _sharedns *shared = PyMem_NEW(_sharedns, 1);
+ if (shared == NULL) {
+ PyErr_NoMemory();
+ return NULL;
+ }
+ shared->len = len;
+ shared->items = PyMem_NEW(struct _sharednsitem, len);
+ if (shared->items == NULL) {
+ PyErr_NoMemory();
+ PyMem_Free(shared);
+ return NULL;
+ }
+ return shared;
+}
+
+static void
+_sharedns_free(_sharedns *shared)
+{
+ for (Py_ssize_t i=0; i < shared->len; i++) {
+ _sharednsitem_clear(&shared->items[i]);
+ }
+ PyMem_Free(shared->items);
+ PyMem_Free(shared);
+}
+
+static _sharedns *
+_get_shared_ns(PyObject *shareable)
{
if (shareable == NULL || shareable == Py_None) {
- *lenp = 0;
return NULL;
}
Py_ssize_t len = PyDict_Size(shareable);
- *lenp = len;
if (len == 0) {
return NULL;
}
- _shareditem *shared = PyMem_NEW(_shareditem, len+1);
+ _sharedns *shared = _sharedns_new(len);
if (shared == NULL) {
return NULL;
}
- for (Py_ssize_t i=0; i < len; i++) {
- *(shared + i) = (_shareditem){0};
- }
Py_ssize_t pos = 0;
for (Py_ssize_t i=0; i < len; i++) {
PyObject *key, *value;
if (PyDict_Next(shareable, &pos, &key, &value) == 0) {
break;
}
- _shareditem *item = shared + i;
-
- if (_PyObject_GetCrossInterpreterData(value, &item->data) != 0) {
+ if (_sharednsitem_init(&shared->items[i], key, value) != 0) {
break;
}
- item->name = PyUnicode_AsUnicodeAndSize(key, &item->namelen);
- if (item->name == NULL) {
- _PyCrossInterpreterData_Release(&item->data);
- break;
- }
- (item + 1)->name = NULL; // Mark the next one as the last.
}
if (PyErr_Occurred()) {
- _sharedns_clear(shared);
- PyMem_Free(shared);
+ _sharedns_free(shared);
return NULL;
}
return shared;
}
static int
-_shareditem_apply(_shareditem *item, PyObject *ns)
+_sharedns_apply(_sharedns *shared, PyObject *ns)
{
- PyObject *name = PyUnicode_FromUnicode(item->name, item->namelen);
- if (name == NULL) {
- return 1;
+ for (Py_ssize_t i=0; i < shared->len; i++) {
+ if (_sharednsitem_apply(&shared->items[i], ns) != 0) {
+ return -1;
+ }
}
- PyObject *value = _PyCrossInterpreterData_NewObject(&item->data);
- if (value == NULL) {
- Py_DECREF(name);
- return 1;
- }
- int res = PyDict_SetItem(ns, name, value);
- Py_DECREF(name);
- Py_DECREF(value);
- return res;
+ return 0;
}
// Ultimately we'd like to preserve enough information about the
@@ -136,65 +194,117 @@
// of the exception in the calling interpreter.
typedef struct _sharedexception {
+ char *name;
char *msg;
} _sharedexception;
static _sharedexception *
-_get_shared_exception(void)
+_sharedexception_new(void)
{
_sharedexception *err = PyMem_NEW(_sharedexception, 1);
if (err == NULL) {
+ PyErr_NoMemory();
return NULL;
}
- PyObject *exc;
- PyObject *value;
- PyObject *tb;
- PyErr_Fetch(&exc, &value, &tb);
- PyObject *msg;
- if (value == NULL) {
- msg = PyUnicode_FromFormat("%S", exc);
+ err->name = NULL;
+ err->msg = NULL;
+ return err;
+}
+
+static void
+_sharedexception_clear(_sharedexception *exc)
+{
+ if (exc->name != NULL) {
+ PyMem_Free(exc->name);
}
- else {
- msg = PyUnicode_FromFormat("%S: %S", exc, value);
+ if (exc->msg != NULL) {
+ PyMem_Free(exc->msg);
}
- if (msg == NULL) {
- err->msg = "unable to format exception";
- return err;
+}
+
+static void
+_sharedexception_free(_sharedexception *exc)
+{
+ _sharedexception_clear(exc);
+ PyMem_Free(exc);
+}
+
+static _sharedexception *
+_sharedexception_bind(PyObject *exctype, PyObject *exc, PyObject *tb)
+{
+ assert(exctype != NULL);
+ char *failure = NULL;
+
+ _sharedexception *err = _sharedexception_new();
+ if (err == NULL) {
+ goto finally;
}
- err->msg = (char *)PyUnicode_AsUTF8(msg);
- if (err->msg == NULL) {
- err->msg = "unable to encode exception";
+
+ PyObject *name = PyUnicode_FromFormat("%S", exctype);
+ if (name == NULL) {
+ failure = "unable to format exception type name";
+ goto finally;
+ }
+ err->name = _copy_raw_string(name);
+ Py_DECREF(name);
+ if (err->name == NULL) {
+ if (PyErr_ExceptionMatches(PyExc_MemoryError)) {
+ failure = "out of memory copying exception type name";
+ }
+ failure = "unable to encode and copy exception type name";
+ goto finally;
+ }
+
+ if (exc != NULL) {
+ PyObject *msg = PyUnicode_FromFormat("%S", exc);
+ if (msg == NULL) {
+ failure = "unable to format exception message";
+ goto finally;
+ }
+ err->msg = _copy_raw_string(msg);
+ Py_DECREF(msg);
+ if (err->msg == NULL) {
+ if (PyErr_ExceptionMatches(PyExc_MemoryError)) {
+ failure = "out of memory copying exception message";
+ }
+ failure = "unable to encode and copy exception message";
+ goto finally;
+ }
+ }
+
+finally:
+ if (failure != NULL) {
+ PyErr_Clear();
+ if (err->name != NULL) {
+ PyMem_Free(err->name);
+ err->name = NULL;
+ }
+ err->msg = failure;
}
return err;
}
-static PyObject * RunFailedError;
-
-static int
-interp_exceptions_init(PyObject *ns)
-{
- // XXX Move the exceptions into per-module memory?
-
- // An uncaught exception came out of interp_run_string().
- RunFailedError = PyErr_NewException("_xxsubinterpreters.RunFailedError",
- PyExc_RuntimeError, NULL);
- if (RunFailedError == NULL) {
- return -1;
- }
- if (PyDict_SetItemString(ns, "RunFailedError", RunFailedError) != 0) {
- return -1;
- }
-
- return 0;
-}
-
static void
-_apply_shared_exception(_sharedexception *exc)
+_sharedexception_apply(_sharedexception *exc, PyObject *wrapperclass)
{
- PyErr_SetString(RunFailedError, exc->msg);
+ if (exc->name != NULL) {
+ if (exc->msg != NULL) {
+ PyErr_Format(wrapperclass, "%s: %s", exc->name, exc->msg);
+ }
+ else {
+ PyErr_SetString(wrapperclass, exc->name);
+ }
+ }
+ else if (exc->msg != NULL) {
+ PyErr_SetString(wrapperclass, exc->msg);
+ }
+ else {
+ PyErr_SetNone(wrapperclass);
+ }
}
-/* channel-specific code */
+
+/* channel-specific code ****************************************************/
static PyObject *ChannelError;
static PyObject *ChannelNotFoundError;
@@ -249,6 +359,139 @@
return 0;
}
+/* the channel queue */
+
+struct _channelitem;
+
+typedef struct _channelitem {
+ _PyCrossInterpreterData *data;
+ struct _channelitem *next;
+} _channelitem;
+
+static _channelitem *
+_channelitem_new(void)
+{
+ _channelitem *item = PyMem_NEW(_channelitem, 1);
+ if (item == NULL) {
+ PyErr_NoMemory();
+ return NULL;
+ }
+ item->data = NULL;
+ item->next = NULL;
+ return item;
+}
+
+static void
+_channelitem_clear(_channelitem *item)
+{
+ if (item->data != NULL) {
+ _PyCrossInterpreterData_Release(item->data);
+ PyMem_Free(item->data);
+ item->data = NULL;
+ }
+ item->next = NULL;
+}
+
+static void
+_channelitem_free(_channelitem *item)
+{
+ _channelitem_clear(item);
+ PyMem_Free(item);
+}
+
+static void
+_channelitem_free_all(_channelitem *item)
+{
+ while (item != NULL) {
+ _channelitem *last = item;
+ item = item->next;
+ _channelitem_free(last);
+ }
+}
+
+static _PyCrossInterpreterData *
+_channelitem_popped(_channelitem *item)
+{
+ _PyCrossInterpreterData *data = item->data;
+ item->data = NULL;
+ _channelitem_free(item);
+ return data;
+}
+
+typedef struct _channelqueue {
+ int64_t count;
+ _channelitem *first;
+ _channelitem *last;
+} _channelqueue;
+
+static _channelqueue *
+_channelqueue_new(void)
+{
+ _channelqueue *queue = PyMem_NEW(_channelqueue, 1);
+ if (queue == NULL) {
+ PyErr_NoMemory();
+ return NULL;
+ }
+ queue->count = 0;
+ queue->first = NULL;
+ queue->last = NULL;
+ return queue;
+}
+
+static void
+_channelqueue_clear(_channelqueue *queue)
+{
+ _channelitem_free_all(queue->first);
+ queue->count = 0;
+ queue->first = NULL;
+ queue->last = NULL;
+}
+
+static void
+_channelqueue_free(_channelqueue *queue)
+{
+ _channelqueue_clear(queue);
+ PyMem_Free(queue);
+}
+
+static int
+_channelqueue_put(_channelqueue *queue, _PyCrossInterpreterData *data)
+{
+ _channelitem *item = _channelitem_new();
+ if (item == NULL) {
+ return -1;
+ }
+ item->data = data;
+
+ queue->count += 1;
+ if (queue->first == NULL) {
+ queue->first = item;
+ }
+ else {
+ queue->last->next = item;
+ }
+ queue->last = item;
+ return 0;
+}
+
+static _PyCrossInterpreterData *
+_channelqueue_get(_channelqueue *queue)
+{
+ _channelitem *item = queue->first;
+ if (item == NULL) {
+ return NULL;
+ }
+ queue->first = item->next;
+ if (queue->last == item) {
+ queue->last = NULL;
+ }
+ queue->count -= 1;
+
+ return _channelitem_popped(item);
+}
+
+/* channel-interpreter associations */
+
struct _channelend;
typedef struct _channelend {
@@ -262,23 +505,28 @@
{
_channelend *end = PyMem_NEW(_channelend, 1);
if (end == NULL) {
+ PyErr_NoMemory();
return NULL;
}
-
end->next = NULL;
end->interp = interp;
-
end->open = 1;
-
return end;
}
static void
-_channelend_free_all(_channelend *end) {
+_channelend_free(_channelend *end)
+{
+ PyMem_Free(end);
+}
+
+static void
+_channelend_free_all(_channelend *end)
+{
while (end != NULL) {
_channelend *last = end;
end = end->next;
- PyMem_Free(last);
+ _channelend_free(last);
}
}
@@ -300,24 +548,7 @@
return end;
}
-struct _channelitem;
-
-typedef struct _channelitem {
- _PyCrossInterpreterData *data;
- struct _channelitem *next;
-} _channelitem;
-
-struct _channel;
-
-typedef struct _channel {
- PyThread_type_lock mutex;
-
- int open;
-
- int64_t count;
- _channelitem *first;
- _channelitem *last;
-
+typedef struct _channelassociations {
// Note that the list entries are never removed for interpreter
// for which the channel is closed. This should be a problem in
// practice. Also, a channel isn't automatically closed when an
@@ -326,6 +557,168 @@
int64_t numrecvopen;
_channelend *send;
_channelend *recv;
+} _channelends;
+
+static _channelends *
+_channelends_new(void)
+{
+ _channelends *ends = PyMem_NEW(_channelends, 1);
+ if (ends== NULL) {
+ return NULL;
+ }
+ ends->numsendopen = 0;
+ ends->numrecvopen = 0;
+ ends->send = NULL;
+ ends->recv = NULL;
+ return ends;
+}
+
+static void
+_channelends_clear(_channelends *ends)
+{
+ _channelend_free_all(ends->send);
+ ends->send = NULL;
+ ends->numsendopen = 0;
+
+ _channelend_free_all(ends->recv);
+ ends->recv = NULL;
+ ends->numrecvopen = 0;
+}
+
+static void
+_channelends_free(_channelends *ends)
+{
+ _channelends_clear(ends);
+ PyMem_Free(ends);
+}
+
+static _channelend *
+_channelends_add(_channelends *ends, _channelend *prev, int64_t interp,
+ int send)
+{
+ _channelend *end = _channelend_new(interp);
+ if (end == NULL) {
+ return NULL;
+ }
+
+ if (prev == NULL) {
+ if (send) {
+ ends->send = end;
+ }
+ else {
+ ends->recv = end;
+ }
+ }
+ else {
+ prev->next = end;
+ }
+ if (send) {
+ ends->numsendopen += 1;
+ }
+ else {
+ ends->numrecvopen += 1;
+ }
+ return end;
+}
+
+static int
+_channelends_associate(_channelends *ends, int64_t interp, int send)
+{
+ _channelend *prev;
+ _channelend *end = _channelend_find(send ? ends->send : ends->recv,
+ interp, &prev);
+ if (end != NULL) {
+ if (!end->open) {
+ PyErr_SetString(ChannelClosedError, "channel already closed");
+ return -1;
+ }
+ // already associated
+ return 0;
+ }
+ if (_channelends_add(ends, prev, interp, send) == NULL) {
+ return -1;
+ }
+ return 0;
+}
+
+static int
+_channelends_is_open(_channelends *ends)
+{
+ if (ends->numsendopen != 0 || ends->numrecvopen != 0) {
+ return 1;
+ }
+ if (ends->send == NULL && ends->recv == NULL) {
+ return 1;
+ }
+ return 0;
+}
+
+static void
+_channelends_close_end(_channelends *ends, _channelend *end, int send)
+{
+ end->open = 0;
+ if (send) {
+ ends->numsendopen -= 1;
+ }
+ else {
+ ends->numrecvopen -= 1;
+ }
+}
+
+static int
+_channelends_close_interpreter(_channelends *ends, int64_t interp, int which)
+{
+ _channelend *prev;
+ _channelend *end;
+ if (which >= 0) { // send/both
+ end = _channelend_find(ends->send, interp, &prev);
+ if (end == NULL) {
+ // never associated so add it
+ end = _channelends_add(ends, prev, interp, 1);
+ if (end == NULL) {
+ return -1;
+ }
+ }
+ _channelends_close_end(ends, end, 1);
+ }
+ if (which <= 0) { // recv/both
+ end = _channelend_find(ends->recv, interp, &prev);
+ if (end == NULL) {
+ // never associated so add it
+ end = _channelends_add(ends, prev, interp, 0);
+ if (end == NULL) {
+ return -1;
+ }
+ }
+ _channelends_close_end(ends, end, 0);
+ }
+ return 0;
+}
+
+static void
+_channelends_close_all(_channelends *ends)
+{
+ // Ensure all the "send"-associated interpreters are closed.
+ _channelend *end;
+ for (end = ends->send; end != NULL; end = end->next) {
+ _channelends_close_end(ends, end, 1);
+ }
+
+ // Ensure all the "recv"-associated interpreters are closed.
+ for (end = ends->recv; end != NULL; end = end->next) {
+ _channelends_close_end(ends, end, 0);
+ }
+}
+
+/* channels */
+
+struct _channel;
+
+typedef struct _channel {
+ PyThread_type_lock mutex;
+ _channelqueue *queue;
+ _channelends *ends;
+ int open;
} _PyChannelState;
static _PyChannelState *
@@ -342,82 +735,76 @@
"can't initialize mutex for new channel");
return NULL;
}
-
+ chan->queue = _channelqueue_new();
+ if (chan->queue == NULL) {
+ PyMem_Free(chan);
+ return NULL;
+ }
+ chan->ends = _channelends_new();
+ if (chan->ends == NULL) {
+ _channelqueue_free(chan->queue);
+ PyMem_Free(chan);
+ return NULL;
+ }
chan->open = 1;
-
- chan->count = 0;
- chan->first = NULL;
- chan->last = NULL;
-
- chan->numsendopen = 0;
- chan->numrecvopen = 0;
- chan->send = NULL;
- chan->recv = NULL;
-
return chan;
}
-static _channelend *
-_channel_add_end(_PyChannelState *chan, _channelend *prev, int64_t interp,
- int send)
+static void
+_channel_free(_PyChannelState *chan)
{
- _channelend *end = _channelend_new(interp);
- if (end == NULL) {
- return NULL;
- }
+ PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
+ _channelqueue_free(chan->queue);
+ _channelends_free(chan->ends);
+ PyThread_release_lock(chan->mutex);
- if (prev == NULL) {
- if (send) {
- chan->send = end;
- }
- else {
- chan->recv = end;
- }
- }
- else {
- prev->next = end;
- }
- if (send) {
- chan->numsendopen += 1;
- }
- else {
- chan->numrecvopen += 1;
- }
- return end;
+ PyThread_free_lock(chan->mutex);
+ PyMem_Free(chan);
}
-static _channelend *
-_channel_associate_end(_PyChannelState *chan, int64_t interp, int send)
+static int
+_channel_add(_PyChannelState *chan, int64_t interp,
+ _PyCrossInterpreterData *data)
{
+ int res = -1;
+ PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
+
if (!chan->open) {
PyErr_SetString(ChannelClosedError, "channel closed");
- return NULL;
+ goto done;
+ }
+ if (_channelends_associate(chan->ends, interp, 1) != 0) {
+ goto done;
}
- _channelend *prev;
- _channelend *end = _channelend_find(send ? chan->send : chan->recv,
- interp, &prev);
- if (end != NULL) {
- if (!end->open) {
- PyErr_SetString(ChannelClosedError, "channel already closed");
- return NULL;
- }
- // already associated
- return end;
+ if (_channelqueue_put(chan->queue, data) != 0) {
+ goto done;
}
- return _channel_add_end(chan, prev, interp, send);
+
+ res = 0;
+done:
+ PyThread_release_lock(chan->mutex);
+ return res;
}
-static void
-_channel_close_channelend(_PyChannelState *chan, _channelend *end, int send)
+static _PyCrossInterpreterData *
+_channel_next(_PyChannelState *chan, int64_t interp)
{
- end->open = 0;
- if (send) {
- chan->numsendopen -= 1;
+ _PyCrossInterpreterData *data = NULL;
+ PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
+
+ if (!chan->open) {
+ PyErr_SetString(ChannelClosedError, "channel closed");
+ goto done;
}
- else {
- chan->numrecvopen -= 1;
+ if (_channelends_associate(chan->ends, interp, 0) != 0) {
+ goto done;
}
+
+ data = _channelqueue_get(chan->queue);
+done:
+ PyThread_release_lock(chan->mutex);
+ return data;
}
static int
@@ -431,36 +818,10 @@
goto done;
}
- _channelend *prev;
- _channelend *end;
- if (which >= 0) { // send/both
- end = _channelend_find(chan->send, interp, &prev);
- if (end == NULL) {
- // never associated so add it
- end = _channel_add_end(chan, prev, interp, 1);
- if (end == NULL) {
- goto done;
- }
- }
- _channel_close_channelend(chan, end, 1);
+ if (_channelends_close_interpreter(chan->ends, interp, which) != 0) {
+ goto done;
}
- if (which <= 0) { // recv/both
- end = _channelend_find(chan->recv, interp, &prev);
- if (end == NULL) {
- // never associated so add it
- end = _channel_add_end(chan, prev, interp, 0);
- if (end == NULL) {
- goto done;
- }
- }
- _channel_close_channelend(chan, end, 0);
- }
-
- if (chan->numsendopen == 0 && chan->numrecvopen == 0) {
- if (chan->send != NULL || chan->recv != NULL) {
- chan->open = 0;
- }
- }
+ chan->open = _channelends_is_open(chan->ends);
res = 0;
done:
@@ -483,17 +844,7 @@
// We *could* also just leave these in place, since we've marked
// the channel as closed already.
-
- // Ensure all the "send"-associated interpreters are closed.
- _channelend *end;
- for (end = chan->send; end != NULL; end = end->next) {
- _channel_close_channelend(chan, end, 1);
- }
-
- // Ensure all the "recv"-associated interpreters are closed.
- for (end = chan->recv; end != NULL; end = end->next) {
- _channel_close_channelend(chan, end, 0);
- }
+ _channelends_close_all(chan->ends);
res = 0;
done:
@@ -501,90 +852,7 @@
return res;
}
-static int
-_channel_add(_PyChannelState *chan, int64_t interp,
- _PyCrossInterpreterData *data)
-{
- int res = -1;
-
- PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
- if (_channel_associate_end(chan, interp, 1) == NULL) {
- goto done;
- }
-
- _channelitem *item = PyMem_NEW(_channelitem, 1);
- if (item == NULL) {
- goto done;
- }
- item->data = data;
- item->next = NULL;
-
- chan->count += 1;
- if (chan->first == NULL) {
- chan->first = item;
- }
- chan->last = item;
-
- res = 0;
-done:
- PyThread_release_lock(chan->mutex);
- return res;
-}
-
-static _PyCrossInterpreterData *
-_channel_next(_PyChannelState *chan, int64_t interp)
-{
- _PyCrossInterpreterData *data = NULL;
- PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
- if (_channel_associate_end(chan, interp, 0) == NULL) {
- goto done;
- }
-
- _channelitem *item = chan->first;
- if (item == NULL) {
- goto done;
- }
- chan->first = item->next;
- if (chan->last == item) {
- chan->last = NULL;
- }
- chan->count -= 1;
-
- data = item->data;
- PyMem_Free(item);
-
-done:
- PyThread_release_lock(chan->mutex);
- return data;
-}
-
-static void
-_channel_clear(_PyChannelState *chan)
-{
- _channelitem *item = chan->first;
- while (item != NULL) {
- _PyCrossInterpreterData_Release(item->data);
- PyMem_Free(item->data);
- _channelitem *last = item;
- item = item->next;
- PyMem_Free(last);
- }
- chan->first = NULL;
- chan->last = NULL;
-}
-
-static void
-_channel_free(_PyChannelState *chan)
-{
- PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
- _channel_clear(chan);
- _channelend_free_all(chan->send);
- _channelend_free_all(chan->recv);
- PyThread_release_lock(chan->mutex);
-
- PyThread_free_lock(chan->mutex);
- PyMem_Free(chan);
-}
+/* the set of channels */
struct _channelref;
@@ -609,6 +877,22 @@
return ref;
}
+//static void
+//_channelref_clear(_channelref *ref)
+//{
+// ref->id = -1;
+// ref->chan = NULL;
+// ref->next = NULL;
+// ref->objcount = 0;
+//}
+
+static void
+_channelref_free(_channelref *ref)
+{
+ //_channelref_clear(ref);
+ PyMem_Free(ref);
+}
+
static _channelref *
_channelref_find(_channelref *first, int64_t id, _channelref **pprev)
{
@@ -640,7 +924,6 @@
if (channels->mutex == NULL) {
channels->mutex = PyThread_allocate_lock();
if (channels->mutex == NULL) {
- PyMem_Free(channels);
PyErr_SetString(ChannelError,
"can't initialize mutex for channel management");
return -1;
@@ -752,6 +1035,9 @@
if (pchan != NULL) {
*pchan = ref->chan;
}
+ else {
+ _channel_free(ref->chan);
+ }
ref->chan = NULL;
}
@@ -776,7 +1062,7 @@
if (pchan != NULL) {
*pchan = ref->chan;
}
- PyMem_Free(ref);
+ _channelref_free(ref);
}
static int
@@ -974,6 +1260,7 @@
return NULL;
}
_PyCrossInterpreterData_Release(data);
+ PyMem_Free(data);
return obj;
}
@@ -995,7 +1282,7 @@
// Past this point we are responsible for releasing the mutex.
// Close one or both of the two ends.
- int res =_channel_close_interpreter(chan, interp->id, send-recv);
+ int res = _channel_close_interpreter(chan, interp->id, send-recv);
PyThread_release_lock(mutex);
return res;
}
@@ -1078,6 +1365,7 @@
"'send' and 'recv' cannot both be False");
return NULL;
}
+
int end = 0;
if (send == 1) {
if (recv == 0 || recv == -1) {
@@ -1176,7 +1464,9 @@
if (id == NULL) {
return -1;
}
- return PyObject_Hash(id);
+ Py_hash_t hash = PyObject_Hash(id);
+ Py_DECREF(id);
+ return hash;
}
static PyObject *
@@ -1208,11 +1498,11 @@
Py_RETURN_NOTIMPLEMENTED;
}
int64_t othercid = PyLong_AsLongLong(other);
- // XXX decref other here?
+ Py_DECREF(other);
if (othercid == -1 && PyErr_Occurred() != NULL) {
return NULL;
}
- if (othercid < 0 || othercid > INT64_MAX) {
+ if (othercid < 0) {
equal = 0;
}
else {
@@ -1338,7 +1628,30 @@
NULL, /* tp_new */
};
-/* interpreter-specific functions *******************************************/
+
+/* interpreter-specific code ************************************************/
+
+static PyObject * RunFailedError = NULL;
+
+static int
+interp_exceptions_init(PyObject *ns)
+{
+ // XXX Move the exceptions into per-module memory?
+
+ if (RunFailedError == NULL) {
+ // An uncaught exception came out of interp_run_string().
+ RunFailedError = PyErr_NewException("_xxsubinterpreters.RunFailedError",
+ PyExc_RuntimeError, NULL);
+ if (RunFailedError == NULL) {
+ return -1;
+ }
+ if (PyDict_SetItemString(ns, "RunFailedError", RunFailedError) != 0) {
+ return -1;
+ }
+ }
+
+ return 0;
+}
static PyInterpreterState *
_look_up(PyObject *requested_id)
@@ -1396,10 +1709,12 @@
static int
_run_script(PyInterpreterState *interp, const char *codestr,
- _shareditem *shared, Py_ssize_t num_shared,
- _sharedexception **exc)
+ _sharedns *shared, _sharedexception **exc)
{
- assert(num_shared >= 0);
+ PyObject *exctype = NULL;
+ PyObject *excval = NULL;
+ PyObject *tb = NULL;
+
PyObject *main_mod = PyMapping_GetItemString(interp->modules, "__main__");
if (main_mod == NULL) {
goto error;
@@ -1413,12 +1728,9 @@
// Apply the cross-interpreter data.
if (shared != NULL) {
- for (Py_ssize_t i=0; i < num_shared; i++) {
- _shareditem *item = &shared[i];
- if (_shareditem_apply(item, ns) != 0) {
- Py_DECREF(ns);
- goto error;
- }
+ if (_sharedns_apply(shared, ns) != 0) {
+ Py_DECREF(ns);
+ goto error;
}
}
@@ -1432,11 +1744,25 @@
Py_DECREF(result); // We throw away the result.
}
+ *exc = NULL;
return 0;
error:
- *exc = _get_shared_exception();
- PyErr_Clear();
+ PyErr_Fetch(&exctype, &excval, &tb);
+
+ _sharedexception *sharedexc = _sharedexception_bind(exctype, excval, tb);
+ Py_XDECREF(exctype);
+ Py_XDECREF(excval);
+ Py_XDECREF(tb);
+ if (sharedexc == NULL) {
+ fprintf(stderr, "RunFailedError: script raised an uncaught exception");
+ PyErr_Clear();
+ sharedexc = NULL;
+ }
+ else {
+ assert(!PyErr_Occurred());
+ }
+ *exc = sharedexc;
return -1;
}
@@ -1448,8 +1774,7 @@
return -1;
}
- Py_ssize_t num_shared = -1;
- _shareditem *shared = _get_shared_ns(shareables, &num_shared);
+ _sharedns *shared = _get_shared_ns(shareables);
if (shared == NULL && PyErr_Occurred()) {
return -1;
}
@@ -1460,7 +1785,7 @@
// Run the script.
_sharedexception *exc = NULL;
- int result = _run_script(interp, codestr, shared, num_shared, &exc);
+ int result = _run_script(interp, codestr, shared, &exc);
// Switch back.
if (save_tstate != NULL) {
@@ -1469,8 +1794,8 @@
// Propagate any exception out to the caller.
if (exc != NULL) {
- _apply_shared_exception(exc);
- PyMem_Free(exc);
+ _sharedexception_apply(exc, RunFailedError);
+ _sharedexception_free(exc);
}
else if (result != 0) {
// We were unable to allocate a shared exception.
@@ -1478,8 +1803,7 @@
}
if (shared != NULL) {
- _sharedns_clear(shared);
- PyMem_Free(shared);
+ _sharedns_free(shared);
}
return result;
@@ -1612,7 +1936,9 @@
return NULL;
}
// insert at front of list
- if (PyList_Insert(ids, 0, id) < 0) {
+ int res = PyList_Insert(ids, 0, id);
+ Py_DECREF(id);
+ if (res < 0) {
Py_DECREF(ids);
return NULL;
}
@@ -1822,11 +2148,11 @@
}
PyObject *ids = PyList_New((Py_ssize_t)count);
if (ids == NULL) {
- // XXX free cids
- return NULL;
+ goto finally;
}
- for (int64_t i=0; i < count; cids++, i++) {
- PyObject *id = (PyObject *)newchannelid(&ChannelIDtype, *cids, 0,
+ int64_t *cur = cids;
+ for (int64_t i=0; i < count; cur++, i++) {
+ PyObject *id = (PyObject *)newchannelid(&ChannelIDtype, *cur, 0,
&_globals.channels, 0);
if (id == NULL) {
Py_DECREF(ids);
@@ -1835,7 +2161,9 @@
}
PyList_SET_ITEM(ids, i, id);
}
- // XXX free cids
+
+finally:
+ PyMem_Free(cids);
return ids;
}
diff --git a/Python/pystate.c b/Python/pystate.c
index a474549..8dbda73 100644
--- a/Python/pystate.c
+++ b/Python/pystate.c
@@ -1242,6 +1242,7 @@
break;
}
}
+ Py_DECREF(cls);
PyThread_release_lock(_PyRuntime.xidregistry.mutex);
return getdata;
}