From a77fa0512466bfd6edbab2aa3c830cdd1b0f5ba8 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 17 Oct 2023 17:51:52 -0600 Subject: [PATCH] gh-76785: Clean Up the Channels Module (gh-110568) --- Include/cpython/pystate.h | 2 +- Modules/_xxinterpchannelsmodule.c | 616 ++++++++++++++++-------------- Python/pystate.c | 14 +- 3 files changed, 335 insertions(+), 297 deletions(-) diff --git a/Include/cpython/pystate.h b/Include/cpython/pystate.h index 40102f88550..995f02eab58 100644 --- a/Include/cpython/pystate.h +++ b/Include/cpython/pystate.h @@ -291,7 +291,7 @@ struct _xid { // with deleted interpreters. Note that IDs are never re-used, so // each one will always correspond to a specific interpreter // (whether still alive or not). - int64_t interp; + int64_t interpid; // new_object is a function that returns a new object in the current // interpreter given the data. The resulting object (a new // reference) will be equivalent to the original object. This field diff --git a/Modules/_xxinterpchannelsmodule.c b/Modules/_xxinterpchannelsmodule.c index 2e2878d5c20..b618592bf00 100644 --- a/Modules/_xxinterpchannelsmodule.c +++ b/Modules/_xxinterpchannelsmodule.c @@ -28,7 +28,7 @@ _globals (static struct globals): next_id; (int64_t) mutex (PyThread_type_lock) head (linked list of struct _channelref *): - id (int64_t) + cid (int64_t) objcount (Py_ssize_t) next (struct _channelref *): ... @@ -42,7 +42,7 @@ _globals (static struct globals): numsendopen (int64_t) numrecvopen (int64_t) send (struct _channelend *): - interp (int64_t) + interpid (int64_t) open (int) next (struct _channelend *) recv (struct _channelend *): @@ -55,7 +55,7 @@ _globals (static struct globals): data (_PyCrossInterpreterData *): data (void *) obj (PyObject *) - interp (int64_t) + interpid (int64_t) new_object (xid_newobjectfunc) free (xid_freefunc) last (struct _channelitem *): @@ -269,7 +269,7 @@ wait_for_lock(PyThread_type_lock mutex, PY_TIMEOUT_T timeout) typedef struct { PyObject_HEAD Py_buffer *view; - int64_t interp; + int64_t interpid; } XIBufferViewObject; static PyObject * @@ -277,21 +277,21 @@ xibufferview_from_xid(PyTypeObject *cls, _PyCrossInterpreterData *data) { assert(data->data != NULL); assert(data->obj == NULL); - assert(data->interp >= 0); + assert(data->interpid >= 0); XIBufferViewObject *self = PyObject_Malloc(sizeof(XIBufferViewObject)); if (self == NULL) { return NULL; } PyObject_Init((PyObject *)self, cls); self->view = (Py_buffer *)data->data; - self->interp = data->interp; + self->interpid = data->interpid; return (PyObject *)self; } static void xibufferview_dealloc(XIBufferViewObject *self) { - PyInterpreterState *interp = _PyInterpreterState_LookUpID(self->interp); + PyInterpreterState *interp = _PyInterpreterState_LookUpID(self->interpid); /* If the interpreter is no longer alive then we have problems, since other objects may be using the buffer still. */ assert(interp != NULL); @@ -495,6 +495,7 @@ _get_current_xibufferview_type(void) #define CHANNEL_BOTH 0 #define CHANNEL_RECV -1 + /* channel errors */ #define ERR_CHANNEL_NOT_FOUND -2 @@ -593,6 +594,7 @@ handle_channel_error(int err, PyObject *mod, int64_t cid) return 1; } + /* the channel queue */ typedef uintptr_t _channelitem_id_t; @@ -711,7 +713,7 @@ _channelitem_clear(_channelitem *item) item->next = NULL; if (item->data != NULL) { - // It was allocated in _channel_send(). + // It was allocated in channel_send(). (void)_release_xid_data(item->data, XID_IGNORE_EXC & XID_FREE); item->data = NULL; } @@ -911,14 +913,14 @@ _channelqueue_remove(_channelqueue *queue, _channelitem_id_t itemid, } static void -_channelqueue_drop_interpreter(_channelqueue *queue, int64_t interp) +_channelqueue_clear_interpreter(_channelqueue *queue, int64_t interpid) { _channelitem *prev = NULL; _channelitem *next = queue->first; while (next != NULL) { _channelitem *item = next; next = item->next; - if (item->data->interp == interp) { + if (item->data->interpid == interpid) { if (prev == NULL) { queue->first = item->next; } @@ -934,18 +936,19 @@ _channelqueue_drop_interpreter(_channelqueue *queue, int64_t interp) } } + /* channel-interpreter associations */ struct _channelend; typedef struct _channelend { struct _channelend *next; - int64_t interp; + int64_t interpid; int open; } _channelend; static _channelend * -_channelend_new(int64_t interp) +_channelend_new(int64_t interpid) { _channelend *end = GLOBAL_MALLOC(_channelend); if (end == NULL) { @@ -953,7 +956,7 @@ _channelend_new(int64_t interp) return NULL; } end->next = NULL; - end->interp = interp; + end->interpid = interpid; end->open = 1; return end; } @@ -975,12 +978,12 @@ _channelend_free_all(_channelend *end) } static _channelend * -_channelend_find(_channelend *first, int64_t interp, _channelend **pprev) +_channelend_find(_channelend *first, int64_t interpid, _channelend **pprev) { _channelend *prev = NULL; _channelend *end = first; while (end != NULL) { - if (end->interp == interp) { + if (end->interpid == interpid) { break; } prev = end; @@ -1037,10 +1040,10 @@ _channelends_free(_channelends *ends) } static _channelend * -_channelends_add(_channelends *ends, _channelend *prev, int64_t interp, +_channelends_add(_channelends *ends, _channelend *prev, int64_t interpid, int send) { - _channelend *end = _channelend_new(interp); + _channelend *end = _channelend_new(interpid); if (end == NULL) { return NULL; } @@ -1066,11 +1069,11 @@ _channelends_add(_channelends *ends, _channelend *prev, int64_t interp, } static int -_channelends_associate(_channelends *ends, int64_t interp, int send) +_channelends_associate(_channelends *ends, int64_t interpid, int send) { _channelend *prev; _channelend *end = _channelend_find(send ? ends->send : ends->recv, - interp, &prev); + interpid, &prev); if (end != NULL) { if (!end->open) { return ERR_CHANNEL_CLOSED; @@ -1078,7 +1081,7 @@ _channelends_associate(_channelends *ends, int64_t interp, int send) // already associated return 0; } - if (_channelends_add(ends, prev, interp, send) == NULL) { + if (_channelends_add(ends, prev, interpid, send) == NULL) { return -1; } return 0; @@ -1088,16 +1091,20 @@ static int _channelends_is_open(_channelends *ends) { if (ends->numsendopen != 0 || ends->numrecvopen != 0) { + // At least one interpreter is still associated with the channel + // (and hasn't been released). return 1; } + // XXX This is wrong if an end can ever be removed. if (ends->send == NULL && ends->recv == NULL) { + // The channel has never had any interpreters associated with it. return 1; } return 0; } static void -_channelends_close_end(_channelends *ends, _channelend *end, int send) +_channelends_release_end(_channelends *ends, _channelend *end, int send) { end->open = 0; if (send) { @@ -1109,51 +1116,37 @@ _channelends_close_end(_channelends *ends, _channelend *end, int send) } static int -_channelends_close_interpreter(_channelends *ends, int64_t interp, int which) +_channelends_release_interpreter(_channelends *ends, int64_t interpid, int which) { _channelend *prev; _channelend *end; if (which >= 0) { // send/both - end = _channelend_find(ends->send, interp, &prev); + end = _channelend_find(ends->send, interpid, &prev); if (end == NULL) { // never associated so add it - end = _channelends_add(ends, prev, interp, 1); + end = _channelends_add(ends, prev, interpid, 1); if (end == NULL) { return -1; } } - _channelends_close_end(ends, end, 1); + _channelends_release_end(ends, end, 1); } if (which <= 0) { // recv/both - end = _channelend_find(ends->recv, interp, &prev); + end = _channelend_find(ends->recv, interpid, &prev); if (end == NULL) { // never associated so add it - end = _channelends_add(ends, prev, interp, 0); + end = _channelends_add(ends, prev, interpid, 0); if (end == NULL) { return -1; } } - _channelends_close_end(ends, end, 0); + _channelends_release_end(ends, end, 0); } return 0; } static void -_channelends_drop_interpreter(_channelends *ends, int64_t interp) -{ - _channelend *end; - end = _channelend_find(ends->send, interp, NULL); - if (end != NULL) { - _channelends_close_end(ends, end, 1); - } - end = _channelend_find(ends->recv, interp, NULL); - if (end != NULL) { - _channelends_close_end(ends, end, 0); - } -} - -static void -_channelends_close_all(_channelends *ends, int which, int force) +_channelends_release_all(_channelends *ends, int which, int force) { // XXX Handle the ends. // XXX Handle force is True. @@ -1161,16 +1154,32 @@ _channelends_close_all(_channelends *ends, int which, int force) // Ensure all the "send"-associated interpreters are closed. _channelend *end; for (end = ends->send; end != NULL; end = end->next) { - _channelends_close_end(ends, end, 1); + _channelends_release_end(ends, end, 1); } // Ensure all the "recv"-associated interpreters are closed. for (end = ends->recv; end != NULL; end = end->next) { - _channelends_close_end(ends, end, 0); + _channelends_release_end(ends, end, 0); } } -/* channels */ +static void +_channelends_clear_interpreter(_channelends *ends, int64_t interpid) +{ + // XXX Actually remove the entries? + _channelend *end; + end = _channelend_find(ends->send, interpid, NULL); + if (end != NULL) { + _channelends_release_end(ends, end, 1); + } + end = _channelend_find(ends->recv, interpid, NULL); + if (end != NULL) { + _channelends_release_end(ends, end, 0); + } +} + + +/* each channel's state */ struct _channel; struct _channel_closing; @@ -1183,12 +1192,12 @@ typedef struct _channel { _channelends *ends; int open; struct _channel_closing *closing; -} _PyChannelState; +} _channel_state; -static _PyChannelState * +static _channel_state * _channel_new(PyThread_type_lock mutex) { - _PyChannelState *chan = GLOBAL_MALLOC(_PyChannelState); + _channel_state *chan = GLOBAL_MALLOC(_channel_state); if (chan == NULL) { return NULL; } @@ -1210,7 +1219,7 @@ _channel_new(PyThread_type_lock mutex) } static void -_channel_free(_PyChannelState *chan) +_channel_free(_channel_state *chan) { _channel_clear_closing(chan); PyThread_acquire_lock(chan->mutex, WAIT_LOCK); @@ -1223,7 +1232,7 @@ _channel_free(_PyChannelState *chan) } static int -_channel_add(_PyChannelState *chan, int64_t interp, +_channel_add(_channel_state *chan, int64_t interpid, _PyCrossInterpreterData *data, _waiting_t *waiting) { int res = -1; @@ -1233,7 +1242,7 @@ _channel_add(_PyChannelState *chan, int64_t interp, res = ERR_CHANNEL_CLOSED; goto done; } - if (_channelends_associate(chan->ends, interp, 1) != 0) { + if (_channelends_associate(chan->ends, interpid, 1) != 0) { res = ERR_CHANNEL_INTERP_CLOSED; goto done; } @@ -1250,7 +1259,7 @@ _channel_add(_PyChannelState *chan, int64_t interp, } static int -_channel_next(_PyChannelState *chan, int64_t interp, +_channel_next(_channel_state *chan, int64_t interpid, _PyCrossInterpreterData **p_data, _waiting_t **p_waiting) { int err = 0; @@ -1260,7 +1269,7 @@ _channel_next(_PyChannelState *chan, int64_t interp, err = ERR_CHANNEL_CLOSED; goto done; } - if (_channelends_associate(chan->ends, interp, 0) != 0) { + if (_channelends_associate(chan->ends, interpid, 0) != 0) { err = ERR_CHANNEL_INTERP_CLOSED; goto done; } @@ -1281,7 +1290,7 @@ _channel_next(_PyChannelState *chan, int64_t interp, } static void -_channel_remove(_PyChannelState *chan, _channelitem_id_t itemid) +_channel_remove(_channel_state *chan, _channelitem_id_t itemid) { _PyCrossInterpreterData *data = NULL; _waiting_t *waiting = NULL; @@ -1301,7 +1310,7 @@ _channel_remove(_PyChannelState *chan, _channelitem_id_t itemid) } static int -_channel_close_interpreter(_PyChannelState *chan, int64_t interp, int end) +_channel_release_interpreter(_channel_state *chan, int64_t interpid, int end) { PyThread_acquire_lock(chan->mutex, WAIT_LOCK); @@ -1311,10 +1320,12 @@ _channel_close_interpreter(_PyChannelState *chan, int64_t interp, int end) goto done; } - if (_channelends_close_interpreter(chan->ends, interp, end) != 0) { + if (_channelends_release_interpreter(chan->ends, interpid, end) != 0) { goto done; } chan->open = _channelends_is_open(chan->ends); + // XXX Clear the queue if not empty? + // XXX Activate the "closing" mechanism? res = 0; done: @@ -1322,20 +1333,8 @@ _channel_close_interpreter(_PyChannelState *chan, int64_t interp, int end) return res; } -static void -_channel_drop_interpreter(_PyChannelState *chan, int64_t interp) -{ - PyThread_acquire_lock(chan->mutex, WAIT_LOCK); - - _channelqueue_drop_interpreter(chan->queue, interp); - _channelends_drop_interpreter(chan->ends, interp); - chan->open = _channelends_is_open(chan->ends); - - PyThread_release_lock(chan->mutex); -} - static int -_channel_close_all(_PyChannelState *chan, int end, int force) +_channel_release_all(_channel_state *chan, int end, int force) { int res = -1; PyThread_acquire_lock(chan->mutex, WAIT_LOCK); @@ -1349,12 +1348,13 @@ _channel_close_all(_PyChannelState *chan, int end, int force) res = ERR_CHANNEL_NOT_EMPTY; goto done; } + // XXX Clear the queue? chan->open = 0; // We *could* also just leave these in place, since we've marked // the channel as closed already. - _channelends_close_all(chan->ends, end, force); + _channelends_release_all(chan->ends, end, force); res = 0; done: @@ -1362,25 +1362,39 @@ _channel_close_all(_PyChannelState *chan, int end, int force) return res; } +static void +_channel_clear_interpreter(_channel_state *chan, int64_t interpid) +{ + PyThread_acquire_lock(chan->mutex, WAIT_LOCK); + + _channelqueue_clear_interpreter(chan->queue, interpid); + _channelends_clear_interpreter(chan->ends, interpid); + chan->open = _channelends_is_open(chan->ends); + + PyThread_release_lock(chan->mutex); +} + + /* the set of channels */ struct _channelref; typedef struct _channelref { - int64_t id; - _PyChannelState *chan; + int64_t cid; + _channel_state *chan; struct _channelref *next; + // The number of ChannelID objects referring to this channel. Py_ssize_t objcount; } _channelref; static _channelref * -_channelref_new(int64_t id, _PyChannelState *chan) +_channelref_new(int64_t cid, _channel_state *chan) { _channelref *ref = GLOBAL_MALLOC(_channelref); if (ref == NULL) { return NULL; } - ref->id = id; + ref->cid = cid; ref->chan = chan; ref->next = NULL; ref->objcount = 0; @@ -1390,7 +1404,7 @@ _channelref_new(int64_t id, _PyChannelState *chan) //static void //_channelref_clear(_channelref *ref) //{ -// ref->id = -1; +// ref->cid = -1; // ref->chan = NULL; // ref->next = NULL; // ref->objcount = 0; @@ -1407,12 +1421,12 @@ _channelref_free(_channelref *ref) } static _channelref * -_channelref_find(_channelref *first, int64_t id, _channelref **pprev) +_channelref_find(_channelref *first, int64_t cid, _channelref **pprev) { _channelref *prev = NULL; _channelref *ref = first; while (ref != NULL) { - if (ref->id == id) { + if (ref->cid == cid) { break; } prev = ref; @@ -1424,6 +1438,7 @@ _channelref_find(_channelref *first, int64_t id, _channelref **pprev) return ref; } + typedef struct _channels { PyThread_type_lock mutex; _channelref *head; @@ -1454,27 +1469,27 @@ _channels_fini(_channels *channels) static int64_t _channels_next_id(_channels *channels) // needs lock { - int64_t id = channels->next_id; - if (id < 0) { + int64_t cid = channels->next_id; + if (cid < 0) { /* overflow */ return -1; } channels->next_id += 1; - return id; + return cid; } static int -_channels_lookup(_channels *channels, int64_t id, PyThread_type_lock *pmutex, - _PyChannelState **res) +_channels_lookup(_channels *channels, int64_t cid, PyThread_type_lock *pmutex, + _channel_state **res) { int err = -1; - _PyChannelState *chan = NULL; + _channel_state *chan = NULL; PyThread_acquire_lock(channels->mutex, WAIT_LOCK); if (pmutex != NULL) { *pmutex = NULL; } - _channelref *ref = _channelref_find(channels->head, id, NULL); + _channelref *ref = _channelref_find(channels->head, cid, NULL); if (ref == NULL) { err = ERR_CHANNEL_NOT_FOUND; goto done; @@ -1501,18 +1516,18 @@ _channels_lookup(_channels *channels, int64_t id, PyThread_type_lock *pmutex, } static int64_t -_channels_add(_channels *channels, _PyChannelState *chan) +_channels_add(_channels *channels, _channel_state *chan) { int64_t cid = -1; PyThread_acquire_lock(channels->mutex, WAIT_LOCK); // Create a new ref. - int64_t id = _channels_next_id(channels); - if (id < 0) { + int64_t _cid = _channels_next_id(channels); + if (_cid < 0) { cid = ERR_NO_NEXT_CHANNEL_ID; goto done; } - _channelref *ref = _channelref_new(id, chan); + _channelref *ref = _channelref_new(_cid, chan); if (ref == NULL) { goto done; } @@ -1523,17 +1538,17 @@ _channels_add(_channels *channels, _PyChannelState *chan) channels->head = ref; channels->numopen += 1; - cid = id; + cid = _cid; done: PyThread_release_lock(channels->mutex); return cid; } /* forward */ -static int _channel_set_closing(struct _channelref *, PyThread_type_lock); +static int _channel_set_closing(_channelref *, PyThread_type_lock); static int -_channels_close(_channels *channels, int64_t cid, _PyChannelState **pchan, +_channels_close(_channels *channels, int64_t cid, _channel_state **pchan, int end, int force) { int res = -1; @@ -1557,7 +1572,7 @@ _channels_close(_channels *channels, int64_t cid, _PyChannelState **pchan, goto done; } else { - int err = _channel_close_all(ref->chan, end, force); + int err = _channel_release_all(ref->chan, end, force); if (err != 0) { if (end == CHANNEL_SEND && err == ERR_CHANNEL_NOT_EMPTY) { if (ref->chan->closing != NULL) { @@ -1599,7 +1614,7 @@ _channels_close(_channels *channels, int64_t cid, _PyChannelState **pchan, static void _channels_remove_ref(_channels *channels, _channelref *ref, _channelref *prev, - _PyChannelState **pchan) + _channel_state **pchan) { if (ref == channels->head) { channels->head = ref->next; @@ -1616,7 +1631,7 @@ _channels_remove_ref(_channels *channels, _channelref *ref, _channelref *prev, } static int -_channels_remove(_channels *channels, int64_t id, _PyChannelState **pchan) +_channels_remove(_channels *channels, int64_t cid, _channel_state **pchan) { int res = -1; PyThread_acquire_lock(channels->mutex, WAIT_LOCK); @@ -1626,7 +1641,7 @@ _channels_remove(_channels *channels, int64_t id, _PyChannelState **pchan) } _channelref *prev = NULL; - _channelref *ref = _channelref_find(channels->head, id, &prev); + _channelref *ref = _channelref_find(channels->head, cid, &prev); if (ref == NULL) { res = ERR_CHANNEL_NOT_FOUND; goto done; @@ -1641,12 +1656,12 @@ _channels_remove(_channels *channels, int64_t id, _PyChannelState **pchan) } static int -_channels_add_id_object(_channels *channels, int64_t id) +_channels_add_id_object(_channels *channels, int64_t cid) { int res = -1; PyThread_acquire_lock(channels->mutex, WAIT_LOCK); - _channelref *ref = _channelref_find(channels->head, id, NULL); + _channelref *ref = _channelref_find(channels->head, cid, NULL); if (ref == NULL) { res = ERR_CHANNEL_NOT_FOUND; goto done; @@ -1660,12 +1675,12 @@ _channels_add_id_object(_channels *channels, int64_t id) } static void -_channels_drop_id_object(_channels *channels, int64_t id) +_channels_release_cid_object(_channels *channels, int64_t cid) { PyThread_acquire_lock(channels->mutex, WAIT_LOCK); _channelref *prev = NULL; - _channelref *ref = _channelref_find(channels->head, id, &prev); + _channelref *ref = _channelref_find(channels->head, cid, &prev); if (ref == NULL) { // Already destroyed. goto done; @@ -1674,7 +1689,7 @@ _channels_drop_id_object(_channels *channels, int64_t id) // Destroy if no longer used. if (ref->objcount == 0) { - _PyChannelState *chan = NULL; + _channel_state *chan = NULL; _channels_remove_ref(channels, ref, prev, &chan); if (chan != NULL) { _channel_free(chan); @@ -1696,7 +1711,7 @@ _channels_list_all(_channels *channels, int64_t *count) } _channelref *ref = channels->head; for (int64_t i=0; ref != NULL; ref = ref->next, i++) { - ids[i] = ref->id; + ids[i] = ref->cid; } *count = channels->numopen; @@ -1707,29 +1722,30 @@ _channels_list_all(_channels *channels, int64_t *count) } static void -_channels_drop_interpreter(_channels *channels, int64_t interp) +_channels_clear_interpreter(_channels *channels, int64_t interpid) { PyThread_acquire_lock(channels->mutex, WAIT_LOCK); _channelref *ref = channels->head; for (; ref != NULL; ref = ref->next) { if (ref->chan != NULL) { - _channel_drop_interpreter(ref->chan, interp); + _channel_clear_interpreter(ref->chan, interpid); } } PyThread_release_lock(channels->mutex); } + /* support for closing non-empty channels */ struct _channel_closing { - struct _channelref *ref; + _channelref *ref; }; static int -_channel_set_closing(struct _channelref *ref, PyThread_type_lock mutex) { - struct _channel *chan = ref->chan; +_channel_set_closing(_channelref *ref, PyThread_type_lock mutex) { + _channel_state *chan = ref->chan; if (chan == NULL) { // already closed return 0; @@ -1753,7 +1769,7 @@ _channel_set_closing(struct _channelref *ref, PyThread_type_lock mutex) { } static void -_channel_clear_closing(struct _channel *chan) { +_channel_clear_closing(_channel_state *chan) { PyThread_acquire_lock(chan->mutex, WAIT_LOCK); if (chan->closing != NULL) { GLOBAL_FREE(chan->closing); @@ -1763,7 +1779,7 @@ _channel_clear_closing(struct _channel *chan) { } static void -_channel_finish_closing(struct _channel *chan) { +_channel_finish_closing(_channel_state *chan) { struct _channel_closing *closing = chan->closing; if (closing == NULL) { return; @@ -1775,32 +1791,35 @@ _channel_finish_closing(struct _channel *chan) { _channel_free(chan); } + /* "high"-level channel-related functions */ +// Create a new channel. static int64_t -_channel_create(_channels *channels) +channel_create(_channels *channels) { PyThread_type_lock mutex = PyThread_allocate_lock(); if (mutex == NULL) { return ERR_CHANNEL_MUTEX_INIT; } - _PyChannelState *chan = _channel_new(mutex); + _channel_state *chan = _channel_new(mutex); if (chan == NULL) { PyThread_free_lock(mutex); return -1; } - int64_t id = _channels_add(channels, chan); - if (id < 0) { + int64_t cid = _channels_add(channels, chan); + if (cid < 0) { _channel_free(chan); } - return id; + return cid; } +// Completely destroy the channel. static int -_channel_destroy(_channels *channels, int64_t id) +channel_destroy(_channels *channels, int64_t cid) { - _PyChannelState *chan = NULL; - int err = _channels_remove(channels, id, &chan); + _channel_state *chan = NULL; + int err = _channels_remove(channels, cid, &chan); if (err != 0) { return err; } @@ -1810,19 +1829,23 @@ _channel_destroy(_channels *channels, int64_t id) return 0; } +// Push an object onto the channel. +// The current interpreter gets associated with the send end of the channel. +// Optionally request to be notified when it is received. static int -_channel_send(_channels *channels, int64_t id, PyObject *obj, - _waiting_t *waiting) +channel_send(_channels *channels, int64_t cid, PyObject *obj, + _waiting_t *waiting) { PyInterpreterState *interp = _get_current_interp(); if (interp == NULL) { return -1; } + int64_t interpid = PyInterpreterState_GetID(interp); // Look up the channel. PyThread_type_lock mutex = NULL; - _PyChannelState *chan = NULL; - int err = _channels_lookup(channels, id, &mutex, &chan); + _channel_state *chan = NULL; + int err = _channels_lookup(channels, cid, &mutex, &chan); if (err != 0) { return err; } @@ -1847,8 +1870,7 @@ _channel_send(_channels *channels, int64_t id, PyObject *obj, } // Add the data to the channel. - int res = _channel_add(chan, PyInterpreterState_GetID(interp), - data, waiting); + int res = _channel_add(chan, interpid, data, waiting); PyThread_release_lock(mutex); if (res != 0) { // We may chain an exception here: @@ -1860,12 +1882,13 @@ _channel_send(_channels *channels, int64_t id, PyObject *obj, return 0; } +// Basically, un-send an object. static void -_channel_clear_sent(_channels *channels, int64_t cid, _waiting_t *waiting) +channel_clear_sent(_channels *channels, int64_t cid, _waiting_t *waiting) { // Look up the channel. PyThread_type_lock mutex = NULL; - _PyChannelState *chan = NULL; + _channel_state *chan = NULL; int err = _channels_lookup(channels, cid, &mutex, &chan); if (err != 0) { // The channel was already closed, etc. @@ -1881,8 +1904,9 @@ _channel_clear_sent(_channels *channels, int64_t cid, _waiting_t *waiting) PyThread_release_lock(mutex); } +// Like channel_send(), but strictly wait for the object to be received. static int -_channel_send_wait(_channels *channels, int64_t cid, PyObject *obj, +channel_send_wait(_channels *channels, int64_t cid, PyObject *obj, PY_TIMEOUT_T timeout) { // We use a stack variable here, so we must ensure that &waiting @@ -1894,7 +1918,7 @@ _channel_send_wait(_channels *channels, int64_t cid, PyObject *obj, } /* Queue up the object. */ - int res = _channel_send(channels, cid, obj, &waiting); + int res = channel_send(channels, cid, obj, &waiting); if (res < 0) { assert(waiting.status == WAITING_NO_STATUS); goto finally; @@ -1906,7 +1930,7 @@ _channel_send_wait(_channels *channels, int64_t cid, PyObject *obj, _waiting_finish_releasing(&waiting); /* The send() call is failing now, so make sure the item won't be received. */ - _channel_clear_sent(channels, cid, &waiting); + channel_clear_sent(channels, cid, &waiting); assert(waiting.status == WAITING_RELEASED); if (!waiting.received) { res = -1; @@ -1932,8 +1956,11 @@ _channel_send_wait(_channels *channels, int64_t cid, PyObject *obj, return res; } +// Pop the next object off the channel. Fail if empty. +// The current interpreter gets associated with the recv end of the channel. +// XXX Support a "wait" mutex? static int -_channel_recv(_channels *channels, int64_t id, PyObject **res) +channel_recv(_channels *channels, int64_t cid, PyObject **res) { int err; *res = NULL; @@ -1946,11 +1973,12 @@ _channel_recv(_channels *channels, int64_t id, PyObject **res) } return 0; } + int64_t interpid = PyInterpreterState_GetID(interp); // Look up the channel. PyThread_type_lock mutex = NULL; - _PyChannelState *chan = NULL; - err = _channels_lookup(channels, id, &mutex, &chan); + _channel_state *chan = NULL; + err = _channels_lookup(channels, cid, &mutex, &chan); if (err != 0) { return err; } @@ -1960,8 +1988,7 @@ _channel_recv(_channels *channels, int64_t id, PyObject **res) // Pop off the next item from the channel. _PyCrossInterpreterData *data = NULL; _waiting_t *waiting = NULL; - err = _channel_next(chan, PyInterpreterState_GetID(interp), &data, - &waiting); + err = _channel_next(chan, interpid, &data, &waiting); PyThread_release_lock(mutex); if (err != 0) { return err; @@ -1975,14 +2002,14 @@ _channel_recv(_channels *channels, int64_t id, PyObject **res) PyObject *obj = _PyCrossInterpreterData_NewObject(data); if (obj == NULL) { assert(PyErr_Occurred()); - // It was allocated in _channel_send(), so we free it. + // It was allocated in channel_send(), so we free it. (void)_release_xid_data(data, XID_IGNORE_EXC | XID_FREE); if (waiting != NULL) { _waiting_release(waiting, 0); } return -1; } - // It was allocated in _channel_send(), so we free it. + // It was allocated in channel_send(), so we free it. int release_res = _release_xid_data(data, XID_FREE); if (release_res < 0) { // The source interpreter has been destroyed already. @@ -2003,40 +2030,49 @@ _channel_recv(_channels *channels, int64_t id, PyObject **res) return 0; } +// Disallow send/recv for the current interpreter. +// The channel is marked as closed if no other interpreters +// are currently associated. static int -_channel_drop(_channels *channels, int64_t id, int send, int recv) +channel_release(_channels *channels, int64_t cid, int send, int recv) { PyInterpreterState *interp = _get_current_interp(); if (interp == NULL) { return -1; } + int64_t interpid = PyInterpreterState_GetID(interp); // Look up the channel. PyThread_type_lock mutex = NULL; - _PyChannelState *chan = NULL; - int err = _channels_lookup(channels, id, &mutex, &chan); + _channel_state *chan = NULL; + int err = _channels_lookup(channels, cid, &mutex, &chan); if (err != 0) { return err; } // Past this point we are responsible for releasing the mutex. // Close one or both of the two ends. - int res = _channel_close_interpreter(chan, PyInterpreterState_GetID(interp), send-recv); + int res = _channel_release_interpreter(chan, interpid, send-recv); PyThread_release_lock(mutex); return res; } +// Close the channel (for all interpreters). Fail if it's already closed. +// Close immediately if it's empty. Otherwise, disallow sending and +// finally close once empty. Optionally, immediately clear and close it. static int -_channel_close(_channels *channels, int64_t id, int end, int force) +channel_close(_channels *channels, int64_t cid, int end, int force) { - return _channels_close(channels, id, NULL, end, force); + return _channels_close(channels, cid, NULL, end, force); } +// Return true if the identified interpreter is associated +// with the given end of the channel. static int -_channel_is_associated(_channels *channels, int64_t cid, int64_t interp, +channel_is_associated(_channels *channels, int64_t cid, int64_t interpid, int send) { - _PyChannelState *chan = NULL; + _channel_state *chan = NULL; int err = _channels_lookup(channels, cid, NULL, &chan); if (err != 0) { return err; @@ -2046,16 +2082,17 @@ _channel_is_associated(_channels *channels, int64_t cid, int64_t interp, } _channelend *end = _channelend_find(send ? chan->ends->send : chan->ends->recv, - interp, NULL); + interpid, NULL); return (end != NULL && end->open); } + /* ChannelID class */ typedef struct channelid { PyObject_HEAD - int64_t id; + int64_t cid; int end; int resolve; _channels *channels; @@ -2076,7 +2113,7 @@ channel_id_converter(PyObject *arg, void *ptr) module_state *state = get_module_state(data->module); assert(state != NULL); if (PyObject_TypeCheck(arg, state->ChannelIDType)) { - cid = ((channelid *)arg)->id; + cid = ((channelid *)arg)->cid; end = ((channelid *)arg)->end; } else if (PyIndex_Check(arg)) { @@ -2111,7 +2148,7 @@ newchannelid(PyTypeObject *cls, int64_t cid, int end, _channels *channels, if (self == NULL) { return -1; } - self->id = cid; + self->cid = cid; self->end = end; self->resolve = resolve; self->channels = channels; @@ -2176,22 +2213,22 @@ _channelid_new(PyObject *mod, PyTypeObject *cls, end = CHANNEL_RECV; } - PyObject *id = NULL; + PyObject *cidobj = NULL; int err = newchannelid(cls, cid, end, _global_channels(), force, resolve, - (channelid **)&id); + (channelid **)&cidobj); if (handle_channel_error(err, mod, cid)) { - assert(id == NULL); + assert(cidobj == NULL); return NULL; } - assert(id != NULL); - return id; + assert(cidobj != NULL); + return cidobj; } static void channelid_dealloc(PyObject *self) { - int64_t cid = ((channelid *)self)->id; + int64_t cid = ((channelid *)self)->cid; _channels *channels = ((channelid *)self)->channels; PyTypeObject *tp = Py_TYPE(self); @@ -2204,7 +2241,7 @@ channelid_dealloc(PyObject *self) // like we do for _abc._abc_data? Py_DECREF(tp); - _channels_drop_id_object(channels, cid); + _channels_release_cid_object(channels, cid); } static PyObject * @@ -2213,44 +2250,44 @@ channelid_repr(PyObject *self) PyTypeObject *type = Py_TYPE(self); const char *name = _PyType_Name(type); - channelid *cid = (channelid *)self; + channelid *cidobj = (channelid *)self; const char *fmt; - if (cid->end == CHANNEL_SEND) { + if (cidobj->end == CHANNEL_SEND) { fmt = "%s(%" PRId64 ", send=True)"; } - else if (cid->end == CHANNEL_RECV) { + else if (cidobj->end == CHANNEL_RECV) { fmt = "%s(%" PRId64 ", recv=True)"; } else { fmt = "%s(%" PRId64 ")"; } - return PyUnicode_FromFormat(fmt, name, cid->id); + return PyUnicode_FromFormat(fmt, name, cidobj->cid); } static PyObject * channelid_str(PyObject *self) { - channelid *cid = (channelid *)self; - return PyUnicode_FromFormat("%" PRId64 "", cid->id); + channelid *cidobj = (channelid *)self; + return PyUnicode_FromFormat("%" PRId64 "", cidobj->cid); } static PyObject * channelid_int(PyObject *self) { - channelid *cid = (channelid *)self; - return PyLong_FromLongLong(cid->id); + channelid *cidobj = (channelid *)self; + return PyLong_FromLongLong(cidobj->cid); } static Py_hash_t channelid_hash(PyObject *self) { - channelid *cid = (channelid *)self; - PyObject *id = PyLong_FromLongLong(cid->id); - if (id == NULL) { + channelid *cidobj = (channelid *)self; + PyObject *pyid = PyLong_FromLongLong(cidobj->cid); + if (pyid == NULL) { return -1; } - Py_hash_t hash = PyObject_Hash(id); - Py_DECREF(id); + Py_hash_t hash = PyObject_Hash(pyid); + Py_DECREF(pyid); return hash; } @@ -2276,11 +2313,11 @@ channelid_richcompare(PyObject *self, PyObject *other, int op) goto done; } - channelid *cid = (channelid *)self; + channelid *cidobj = (channelid *)self; int equal; if (PyObject_TypeCheck(other, state->ChannelIDType)) { - channelid *othercid = (channelid *)other; - equal = (cid->end == othercid->end) && (cid->id == othercid->id); + channelid *othercidobj = (channelid *)other; + equal = (cidobj->end == othercidobj->end) && (cidobj->cid == othercidobj->cid); } else if (PyLong_Check(other)) { /* Fast path */ @@ -2289,10 +2326,10 @@ channelid_richcompare(PyObject *self, PyObject *other, int op) if (othercid == -1 && PyErr_Occurred()) { goto done; } - equal = !overflow && (othercid >= 0) && (cid->id == othercid); + equal = !overflow && (othercid >= 0) && (cidobj->cid == othercid); } else if (PyNumber_Check(other)) { - PyObject *pyid = PyLong_FromLongLong(cid->id); + PyObject *pyid = PyLong_FromLongLong(cidobj->cid); if (pyid == NULL) { goto done; } @@ -2317,16 +2354,16 @@ channelid_richcompare(PyObject *self, PyObject *other, int op) return res; } -static PyTypeObject * _get_current_channel_end_type(int end); +static PyTypeObject * _get_current_channelend_type(int end); static PyObject * -_channel_from_cid(PyObject *cid, int end) +_channelobj_from_cidobj(PyObject *cidobj, int end) { - PyObject *cls = (PyObject *)_get_current_channel_end_type(end); + PyObject *cls = (PyObject *)_get_current_channelend_type(end); if (cls == NULL) { return NULL; } - PyObject *chan = PyObject_CallFunctionObjArgs(cls, cid, NULL); + PyObject *chan = PyObject_CallFunctionObjArgs(cls, cidobj, NULL); Py_DECREF(cls); if (chan == NULL) { return NULL; @@ -2335,7 +2372,7 @@ _channel_from_cid(PyObject *cid, int end) } struct _channelid_xid { - int64_t id; + int64_t cid; int end; int resolve; }; @@ -2357,16 +2394,16 @@ _channelid_from_xid(_PyCrossInterpreterData *data) } // Note that we do not preserve the "resolve" flag. - PyObject *cid = NULL; - int err = newchannelid(state->ChannelIDType, xid->id, xid->end, + PyObject *cidobj = NULL; + int err = newchannelid(state->ChannelIDType, xid->cid, xid->end, _global_channels(), 0, 0, - (channelid **)&cid); + (channelid **)&cidobj); if (err != 0) { - assert(cid == NULL); - (void)handle_channel_error(err, mod, xid->id); + assert(cidobj == NULL); + (void)handle_channel_error(err, mod, xid->cid); goto done; } - assert(cid != NULL); + assert(cidobj != NULL); if (xid->end == 0) { goto done; } @@ -2375,17 +2412,17 @@ _channelid_from_xid(_PyCrossInterpreterData *data) } /* Try returning a high-level channel end but fall back to the ID. */ - PyObject *chan = _channel_from_cid(cid, xid->end); + PyObject *chan = _channelobj_from_cidobj(cidobj, xid->end); if (chan == NULL) { PyErr_Clear(); goto done; } - Py_DECREF(cid); - cid = chan; + Py_DECREF(cidobj); + cidobj = chan; done: Py_DECREF(mod); - return cid; + return cidobj; } static int @@ -2400,7 +2437,7 @@ _channelid_shared(PyThreadState *tstate, PyObject *obj, return -1; } struct _channelid_xid *xid = (struct _channelid_xid *)data->data; - xid->id = ((channelid *)obj)->id; + xid->cid = ((channelid *)obj)->cid; xid->end = ((channelid *)obj)->end; xid->resolve = ((channelid *)obj)->resolve; return 0; @@ -2410,30 +2447,30 @@ static PyObject * channelid_end(PyObject *self, void *end) { int force = 1; - channelid *cid = (channelid *)self; + channelid *cidobj = (channelid *)self; if (end != NULL) { - PyObject *id = NULL; - int err = newchannelid(Py_TYPE(self), cid->id, *(int *)end, - cid->channels, force, cid->resolve, - (channelid **)&id); + PyObject *obj = NULL; + int err = newchannelid(Py_TYPE(self), cidobj->cid, *(int *)end, + cidobj->channels, force, cidobj->resolve, + (channelid **)&obj); if (err != 0) { - assert(id == NULL); + assert(obj == NULL); PyObject *mod = get_module_from_type(Py_TYPE(self)); if (mod == NULL) { return NULL; } - (void)handle_channel_error(err, mod, cid->id); + (void)handle_channel_error(err, mod, cidobj->cid); Py_DECREF(mod); return NULL; } - assert(id != NULL); - return id; + assert(obj != NULL); + return obj; } - if (cid->end == CHANNEL_SEND) { + if (cidobj->end == CHANNEL_SEND) { return PyUnicode_InternFromString("send"); } - if (cid->end == CHANNEL_RECV) { + if (cidobj->end == CHANNEL_RECV) { return PyUnicode_InternFromString("recv"); } return PyUnicode_InternFromString("both"); @@ -2455,7 +2492,7 @@ static PyGetSetDef channelid_getsets[] = { PyDoc_STRVAR(channelid_doc, "A channel ID identifies a channel and may be used as an int."); -static PyType_Slot ChannelIDType_slots[] = { +static PyType_Slot channelid_typeslots[] = { {Py_tp_dealloc, (destructor)channelid_dealloc}, {Py_tp_doc, (void *)channelid_doc}, {Py_tp_repr, (reprfunc)channelid_repr}, @@ -2469,12 +2506,12 @@ static PyType_Slot ChannelIDType_slots[] = { {0, NULL}, }; -static PyType_Spec ChannelIDType_spec = { +static PyType_Spec channelid_typespec = { .name = MODULE_NAME ".ChannelID", .basicsize = sizeof(channelid), .flags = (Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_DISALLOW_INSTANTIATION | Py_TPFLAGS_IMMUTABLETYPE), - .slots = ChannelIDType_slots, + .slots = channelid_typeslots, }; @@ -2483,7 +2520,7 @@ static PyType_Spec ChannelIDType_spec = { // XXX Use a new __xid__ protocol instead? static PyTypeObject * -_get_current_channel_end_type(int end) +_get_current_channelend_type(int end) { module_state *state = _get_current_module_state(); if (state == NULL) { @@ -2519,24 +2556,24 @@ _get_current_channel_end_type(int end) } static PyObject * -_channel_end_from_xid(_PyCrossInterpreterData *data) +_channelend_from_xid(_PyCrossInterpreterData *data) { - channelid *cid = (channelid *)_channelid_from_xid(data); - if (cid == NULL) { + channelid *cidobj = (channelid *)_channelid_from_xid(data); + if (cidobj == NULL) { return NULL; } - PyTypeObject *cls = _get_current_channel_end_type(cid->end); + PyTypeObject *cls = _get_current_channelend_type(cidobj->end); if (cls == NULL) { - Py_DECREF(cid); + Py_DECREF(cidobj); return NULL; } - PyObject *obj = PyObject_CallOneArg((PyObject *)cls, (PyObject *)cid); - Py_DECREF(cid); + PyObject *obj = PyObject_CallOneArg((PyObject *)cls, (PyObject *)cidobj); + Py_DECREF(cidobj); return obj; } static int -_channel_end_shared(PyThreadState *tstate, PyObject *obj, +_channelend_shared(PyThreadState *tstate, PyObject *obj, _PyCrossInterpreterData *data) { PyObject *cidobj = PyObject_GetAttrString(obj, "_id"); @@ -2548,12 +2585,12 @@ _channel_end_shared(PyThreadState *tstate, PyObject *obj, if (res < 0) { return -1; } - data->new_object = _channel_end_from_xid; + data->new_object = _channelend_from_xid; return 0; } static int -set_channel_end_types(PyObject *mod, PyTypeObject *send, PyTypeObject *recv) +set_channelend_types(PyObject *mod, PyTypeObject *send, PyTypeObject *recv) { module_state *state = get_module_state(mod); if (state == NULL) { @@ -2570,16 +2607,17 @@ set_channel_end_types(PyObject *mod, PyTypeObject *send, PyTypeObject *recv) state->send_channel_type = (PyTypeObject *)Py_NewRef(send); state->recv_channel_type = (PyTypeObject *)Py_NewRef(recv); - if (register_xid_class(send, _channel_end_shared, xid_classes)) { + if (register_xid_class(send, _channelend_shared, xid_classes)) { return -1; } - if (register_xid_class(recv, _channel_end_shared, xid_classes)) { + if (register_xid_class(recv, _channelend_shared, xid_classes)) { return -1; } return 0; } + /* module level code ********************************************************/ /* globals is the process-global state for the module. It holds all @@ -2635,15 +2673,15 @@ clear_interpreter(void *data) } PyInterpreterState *interp = (PyInterpreterState *)data; assert(interp == _get_current_interp()); - int64_t id = PyInterpreterState_GetID(interp); - _channels_drop_interpreter(&_globals.channels, id); + int64_t interpid = PyInterpreterState_GetID(interp); + _channels_clear_interpreter(&_globals.channels, interpid); } static PyObject * -channel_create(PyObject *self, PyObject *Py_UNUSED(ignored)) +channelsmod_create(PyObject *self, PyObject *Py_UNUSED(ignored)) { - int64_t cid = _channel_create(&_globals.channels); + int64_t cid = channel_create(&_globals.channels); if (cid < 0) { (void)handle_channel_error(-1, self, cid); return NULL; @@ -2652,30 +2690,30 @@ channel_create(PyObject *self, PyObject *Py_UNUSED(ignored)) if (state == NULL) { return NULL; } - PyObject *id = NULL; + PyObject *cidobj = NULL; int err = newchannelid(state->ChannelIDType, cid, 0, &_globals.channels, 0, 0, - (channelid **)&id); + (channelid **)&cidobj); if (handle_channel_error(err, self, cid)) { - assert(id == NULL); - err = _channel_destroy(&_globals.channels, cid); + assert(cidobj == NULL); + err = channel_destroy(&_globals.channels, cid); if (handle_channel_error(err, self, cid)) { // XXX issue a warning? } return NULL; } - assert(id != NULL); - assert(((channelid *)id)->channels != NULL); - return id; + assert(cidobj != NULL); + assert(((channelid *)cidobj)->channels != NULL); + return cidobj; } -PyDoc_STRVAR(channel_create_doc, +PyDoc_STRVAR(channelsmod_create_doc, "channel_create() -> cid\n\ \n\ Create a new cross-interpreter channel and return a unique generated ID."); static PyObject * -channel_destroy(PyObject *self, PyObject *args, PyObject *kwds) +channelsmod_destroy(PyObject *self, PyObject *args, PyObject *kwds) { static char *kwlist[] = {"cid", NULL}; int64_t cid; @@ -2688,21 +2726,21 @@ channel_destroy(PyObject *self, PyObject *args, PyObject *kwds) } cid = cid_data.cid; - int err = _channel_destroy(&_globals.channels, cid); + int err = channel_destroy(&_globals.channels, cid); if (handle_channel_error(err, self, cid)) { return NULL; } Py_RETURN_NONE; } -PyDoc_STRVAR(channel_destroy_doc, +PyDoc_STRVAR(channelsmod_destroy_doc, "channel_destroy(cid)\n\ \n\ Close and finalize the channel. Afterward attempts to use the channel\n\ will behave as though it never existed."); static PyObject * -channel_list_all(PyObject *self, PyObject *Py_UNUSED(ignored)) +channelsmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored)) { int64_t count = 0; int64_t *cids = _channels_list_all(&_globals.channels, &count); @@ -2724,17 +2762,17 @@ channel_list_all(PyObject *self, PyObject *Py_UNUSED(ignored)) } int64_t *cur = cids; for (int64_t i=0; i < count; cur++, i++) { - PyObject *id = NULL; + PyObject *cidobj = NULL; int err = newchannelid(state->ChannelIDType, *cur, 0, &_globals.channels, 0, 0, - (channelid **)&id); + (channelid **)&cidobj); if (handle_channel_error(err, self, *cur)) { - assert(id == NULL); + assert(cidobj == NULL); Py_SETREF(ids, NULL); break; } - assert(id != NULL); - PyList_SET_ITEM(ids, (Py_ssize_t)i, id); + assert(cidobj != NULL); + PyList_SET_ITEM(ids, (Py_ssize_t)i, cidobj); } finally: @@ -2742,13 +2780,13 @@ channel_list_all(PyObject *self, PyObject *Py_UNUSED(ignored)) return ids; } -PyDoc_STRVAR(channel_list_all_doc, +PyDoc_STRVAR(channelsmod_list_all_doc, "channel_list_all() -> [cid]\n\ \n\ Return the list of all IDs for active channels."); static PyObject * -channel_list_interpreters(PyObject *self, PyObject *args, PyObject *kwds) +channelsmod_list_interpreters(PyObject *self, PyObject *args, PyObject *kwds) { static char *kwlist[] = {"cid", "send", NULL}; int64_t cid; /* Channel ID */ @@ -2756,8 +2794,8 @@ channel_list_interpreters(PyObject *self, PyObject *args, PyObject *kwds) .module = self, }; int send = 0; /* Send or receive end? */ - int64_t id; - PyObject *ids, *id_obj; + int64_t interpid; + PyObject *ids, *interpid_obj; PyInterpreterState *interp; if (!PyArg_ParseTupleAndKeywords( @@ -2774,20 +2812,20 @@ channel_list_interpreters(PyObject *self, PyObject *args, PyObject *kwds) interp = PyInterpreterState_Head(); while (interp != NULL) { - id = PyInterpreterState_GetID(interp); - assert(id >= 0); - int res = _channel_is_associated(&_globals.channels, cid, id, send); + interpid = PyInterpreterState_GetID(interp); + assert(interpid >= 0); + int res = channel_is_associated(&_globals.channels, cid, interpid, send); if (res < 0) { (void)handle_channel_error(res, self, cid); goto except; } if (res) { - id_obj = PyInterpreterState_GetIDObject(interp); - if (id_obj == NULL) { + interpid_obj = PyInterpreterState_GetIDObject(interp); + if (interpid_obj == NULL) { goto except; } - res = PyList_Insert(ids, 0, id_obj); - Py_DECREF(id_obj); + res = PyList_Insert(ids, 0, interpid_obj); + Py_DECREF(interpid_obj); if (res < 0) { goto except; } @@ -2804,7 +2842,7 @@ channel_list_interpreters(PyObject *self, PyObject *args, PyObject *kwds) return ids; } -PyDoc_STRVAR(channel_list_interpreters_doc, +PyDoc_STRVAR(channelsmod_list_interpreters_doc, "channel_list_interpreters(cid, *, send) -> [id]\n\ \n\ Return the list of all interpreter IDs associated with an end of the channel.\n\ @@ -2814,7 +2852,7 @@ receive end."); static PyObject * -channel_send(PyObject *self, PyObject *args, PyObject *kwds) +channelsmod_send(PyObject *self, PyObject *args, PyObject *kwds) { static char *kwlist[] = {"cid", "obj", "blocking", "timeout", NULL}; struct channel_id_converter_data cid_data = { @@ -2838,10 +2876,10 @@ channel_send(PyObject *self, PyObject *args, PyObject *kwds) /* Queue up the object. */ int err = 0; if (blocking) { - err = _channel_send_wait(&_globals.channels, cid, obj, timeout); + err = channel_send_wait(&_globals.channels, cid, obj, timeout); } else { - err = _channel_send(&_globals.channels, cid, obj, NULL); + err = channel_send(&_globals.channels, cid, obj, NULL); } if (handle_channel_error(err, self, cid)) { return NULL; @@ -2850,14 +2888,14 @@ channel_send(PyObject *self, PyObject *args, PyObject *kwds) Py_RETURN_NONE; } -PyDoc_STRVAR(channel_send_doc, +PyDoc_STRVAR(channelsmod_send_doc, "channel_send(cid, obj, blocking=True)\n\ \n\ Add the object's data to the channel's queue.\n\ By default this waits for the object to be received."); static PyObject * -channel_send_buffer(PyObject *self, PyObject *args, PyObject *kwds) +channelsmod_send_buffer(PyObject *self, PyObject *args, PyObject *kwds) { static char *kwlist[] = {"cid", "obj", "blocking", "timeout", NULL}; struct channel_id_converter_data cid_data = { @@ -2887,10 +2925,10 @@ channel_send_buffer(PyObject *self, PyObject *args, PyObject *kwds) /* Queue up the object. */ int err = 0; if (blocking) { - err = _channel_send_wait(&_globals.channels, cid, tempobj, timeout); + err = channel_send_wait(&_globals.channels, cid, tempobj, timeout); } else { - err = _channel_send(&_globals.channels, cid, tempobj, NULL); + err = channel_send(&_globals.channels, cid, tempobj, NULL); } Py_DECREF(tempobj); if (handle_channel_error(err, self, cid)) { @@ -2900,14 +2938,14 @@ channel_send_buffer(PyObject *self, PyObject *args, PyObject *kwds) Py_RETURN_NONE; } -PyDoc_STRVAR(channel_send_buffer_doc, +PyDoc_STRVAR(channelsmod_send_buffer_doc, "channel_send_buffer(cid, obj, blocking=True)\n\ \n\ Add the object's buffer to the channel's queue.\n\ By default this waits for the object to be received."); static PyObject * -channel_recv(PyObject *self, PyObject *args, PyObject *kwds) +channelsmod_recv(PyObject *self, PyObject *args, PyObject *kwds) { static char *kwlist[] = {"cid", "default", NULL}; int64_t cid; @@ -2922,7 +2960,7 @@ channel_recv(PyObject *self, PyObject *args, PyObject *kwds) cid = cid_data.cid; PyObject *obj = NULL; - int err = _channel_recv(&_globals.channels, cid, &obj); + int err = channel_recv(&_globals.channels, cid, &obj); if (handle_channel_error(err, self, cid)) { return NULL; } @@ -2939,7 +2977,7 @@ channel_recv(PyObject *self, PyObject *args, PyObject *kwds) return obj; } -PyDoc_STRVAR(channel_recv_doc, +PyDoc_STRVAR(channelsmod_recv_doc, "channel_recv(cid, [default]) -> obj\n\ \n\ Return a new object from the data at the front of the channel's queue.\n\ @@ -2948,7 +2986,7 @@ If there is nothing to receive then raise ChannelEmptyError, unless\n\ a default value is provided. In that case return it."); static PyObject * -channel_close(PyObject *self, PyObject *args, PyObject *kwds) +channelsmod_close(PyObject *self, PyObject *args, PyObject *kwds) { static char *kwlist[] = {"cid", "send", "recv", "force", NULL}; int64_t cid; @@ -2966,14 +3004,14 @@ channel_close(PyObject *self, PyObject *args, PyObject *kwds) } cid = cid_data.cid; - int err = _channel_close(&_globals.channels, cid, send-recv, force); + int err = channel_close(&_globals.channels, cid, send-recv, force); if (handle_channel_error(err, self, cid)) { return NULL; } Py_RETURN_NONE; } -PyDoc_STRVAR(channel_close_doc, +PyDoc_STRVAR(channelsmod_close_doc, "channel_close(cid, *, send=None, recv=None, force=False)\n\ \n\ Close the channel for all interpreters.\n\ @@ -3001,7 +3039,7 @@ Once the channel's ID has no more ref counts in any interpreter\n\ the channel will be destroyed."); static PyObject * -channel_release(PyObject *self, PyObject *args, PyObject *kwds) +channelsmod_release(PyObject *self, PyObject *args, PyObject *kwds) { // Note that only the current interpreter is affected. static char *kwlist[] = {"cid", "send", "recv", "force", NULL}; @@ -3027,14 +3065,14 @@ channel_release(PyObject *self, PyObject *args, PyObject *kwds) // XXX Handle force is True. // XXX Fix implicit release. - int err = _channel_drop(&_globals.channels, cid, send, recv); + int err = channel_release(&_globals.channels, cid, send, recv); if (handle_channel_error(err, self, cid)) { return NULL; } Py_RETURN_NONE; } -PyDoc_STRVAR(channel_release_doc, +PyDoc_STRVAR(channelsmod_release_doc, "channel_release(cid, *, send=None, recv=None, force=True)\n\ \n\ Close the channel for the current interpreter. 'send' and 'recv'\n\ @@ -3042,7 +3080,7 @@ Close the channel for the current interpreter. 'send' and 'recv'\n\ ends are closed. Closing an already closed end is a noop."); static PyObject * -channel__channel_id(PyObject *self, PyObject *args, PyObject *kwds) +channelsmod__channel_id(PyObject *self, PyObject *args, PyObject *kwds) { module_state *state = get_module_state(self); if (state == NULL) { @@ -3058,7 +3096,7 @@ channel__channel_id(PyObject *self, PyObject *args, PyObject *kwds) } static PyObject * -channel__register_end_types(PyObject *self, PyObject *args, PyObject *kwds) +channelsmod__register_end_types(PyObject *self, PyObject *args, PyObject *kwds) { static char *kwlist[] = {"send", "recv", NULL}; PyObject *send; @@ -3079,7 +3117,7 @@ channel__register_end_types(PyObject *self, PyObject *args, PyObject *kwds) PyTypeObject *cls_send = (PyTypeObject *)send; PyTypeObject *cls_recv = (PyTypeObject *)recv; - if (set_channel_end_types(self, cls_send, cls_recv) < 0) { + if (set_channelend_types(self, cls_send, cls_recv) < 0) { return NULL; } @@ -3087,27 +3125,27 @@ channel__register_end_types(PyObject *self, PyObject *args, PyObject *kwds) } static PyMethodDef module_functions[] = { - {"create", channel_create, - METH_NOARGS, channel_create_doc}, - {"destroy", _PyCFunction_CAST(channel_destroy), - METH_VARARGS | METH_KEYWORDS, channel_destroy_doc}, - {"list_all", channel_list_all, - METH_NOARGS, channel_list_all_doc}, - {"list_interpreters", _PyCFunction_CAST(channel_list_interpreters), - METH_VARARGS | METH_KEYWORDS, channel_list_interpreters_doc}, - {"send", _PyCFunction_CAST(channel_send), - METH_VARARGS | METH_KEYWORDS, channel_send_doc}, - {"send_buffer", _PyCFunction_CAST(channel_send_buffer), - METH_VARARGS | METH_KEYWORDS, channel_send_buffer_doc}, - {"recv", _PyCFunction_CAST(channel_recv), - METH_VARARGS | METH_KEYWORDS, channel_recv_doc}, - {"close", _PyCFunction_CAST(channel_close), - METH_VARARGS | METH_KEYWORDS, channel_close_doc}, - {"release", _PyCFunction_CAST(channel_release), - METH_VARARGS | METH_KEYWORDS, channel_release_doc}, - {"_channel_id", _PyCFunction_CAST(channel__channel_id), + {"create", channelsmod_create, + METH_NOARGS, channelsmod_create_doc}, + {"destroy", _PyCFunction_CAST(channelsmod_destroy), + METH_VARARGS | METH_KEYWORDS, channelsmod_destroy_doc}, + {"list_all", channelsmod_list_all, + METH_NOARGS, channelsmod_list_all_doc}, + {"list_interpreters", _PyCFunction_CAST(channelsmod_list_interpreters), + METH_VARARGS | METH_KEYWORDS, channelsmod_list_interpreters_doc}, + {"send", _PyCFunction_CAST(channelsmod_send), + METH_VARARGS | METH_KEYWORDS, channelsmod_send_doc}, + {"send_buffer", _PyCFunction_CAST(channelsmod_send_buffer), + METH_VARARGS | METH_KEYWORDS, channelsmod_send_buffer_doc}, + {"recv", _PyCFunction_CAST(channelsmod_recv), + METH_VARARGS | METH_KEYWORDS, channelsmod_recv_doc}, + {"close", _PyCFunction_CAST(channelsmod_close), + METH_VARARGS | METH_KEYWORDS, channelsmod_close_doc}, + {"release", _PyCFunction_CAST(channelsmod_release), + METH_VARARGS | METH_KEYWORDS, channelsmod_release_doc}, + {"_channel_id", _PyCFunction_CAST(channelsmod__channel_id), METH_VARARGS | METH_KEYWORDS, NULL}, - {"_register_end_types", _PyCFunction_CAST(channel__register_end_types), + {"_register_end_types", _PyCFunction_CAST(channelsmod__register_end_types), METH_VARARGS | METH_KEYWORDS, NULL}, {NULL, NULL} /* sentinel */ @@ -3143,7 +3181,7 @@ module_exec(PyObject *mod) // ChannelID state->ChannelIDType = add_new_type( - mod, &ChannelIDType_spec, _channelid_shared, xid_classes); + mod, &channelid_typespec, _channelid_shared, xid_classes); if (state->ChannelIDType == NULL) { goto error; } diff --git a/Python/pystate.c b/Python/pystate.c index 92cf741f4ca..2e6f07e6003 100644 --- a/Python/pystate.c +++ b/Python/pystate.c @@ -2428,7 +2428,7 @@ _xidata_init(_PyCrossInterpreterData *data) assert(data->data == NULL); assert(data->obj == NULL); *data = (_PyCrossInterpreterData){0}; - data->interp = -1; + data->interpid = -1; } static inline void @@ -2465,7 +2465,7 @@ _PyCrossInterpreterData_Init(_PyCrossInterpreterData *data, // Ideally every object would know its owning interpreter. // Until then, we have to rely on the caller to identify it // (but we don't need it in all cases). - data->interp = (interp != NULL) ? interp->id : -1; + data->interpid = (interp != NULL) ? interp->id : -1; data->new_object = new_object; } @@ -2494,7 +2494,7 @@ _PyCrossInterpreterData_Clear(PyInterpreterState *interp, { assert(data != NULL); // This must be called in the owning interpreter. - assert(interp == NULL || data->interp == interp->id); + assert(interp == NULL || data->interpid == interp->id); _xidata_clear(data); } @@ -2505,7 +2505,7 @@ _check_xidata(PyThreadState *tstate, _PyCrossInterpreterData *data) // data->obj may be NULL, so we don't check it. - if (data->interp < 0) { + if (data->interpid < 0) { _PyErr_SetString(tstate, PyExc_SystemError, "missing interp"); return -1; } @@ -2557,7 +2557,7 @@ _PyObject_GetCrossInterpreterData(PyObject *obj, _PyCrossInterpreterData *data) // Reset data before re-populating. *data = (_PyCrossInterpreterData){0}; - data->interp = -1; + data->interpid = -1; // Call the "getdata" func for the object. Py_INCREF(obj); @@ -2573,7 +2573,7 @@ _PyObject_GetCrossInterpreterData(PyObject *obj, _PyCrossInterpreterData *data) } // Fill in the blanks and validate the result. - data->interp = interp->id; + data->interpid = interp->id; if (_check_xidata(tstate, data) != 0) { (void)_PyCrossInterpreterData_Release(data); return -1; @@ -2636,7 +2636,7 @@ _xidata_release(_PyCrossInterpreterData *data, int rawfree) } // Switch to the original interpreter. - PyInterpreterState *interp = _PyInterpreterState_LookUpID(data->interp); + PyInterpreterState *interp = _PyInterpreterState_LookUpID(data->interpid); if (interp == NULL) { // The interpreter was already destroyed. // This function shouldn't have been called.