mirror of
https://github.com/python/cpython
synced 2024-11-02 10:13:47 +00:00
208 lines
7.1 KiB
C
208 lines
7.1 KiB
C
// Implementation of biased reference counting inter-thread queue.
|
|
//
|
|
// Biased reference counting maintains two refcount fields in each object:
|
|
// ob_ref_local and ob_ref_shared. The true refcount is the sum of these two
|
|
// fields. In some cases, when refcounting operations are split across threads,
|
|
// the ob_ref_shared field can be negative (although the total refcount must
|
|
// be at least zero). In this case, the thread that decremented the refcount
|
|
// requests that the owning thread give up ownership and merge the refcount
|
|
// fields. This file implements the mechanism for doing so.
|
|
//
|
|
// Each thread state maintains a queue of objects whose refcounts it should
|
|
// merge. The thread states are stored in a per-interpreter hash table by
|
|
// thread id. The hash table has a fixed size and uses a linked list to store
|
|
// thread states within each bucket.
|
|
//
|
|
// The queueing thread uses the eval breaker mechanism to notify the owning
|
|
// thread that it has objects to merge. Additionally, all queued objects are
|
|
// merged during GC.
|
|
#include "Python.h"
|
|
#include "pycore_object.h" // _Py_ExplicitMergeRefcount
|
|
#include "pycore_brc.h" // struct _brc_thread_state
|
|
#include "pycore_ceval.h" // _Py_set_eval_breaker_bit
|
|
#include "pycore_llist.h" // struct llist_node
|
|
#include "pycore_pystate.h" // _PyThreadStateImpl
|
|
|
|
#ifdef Py_GIL_DISABLED
|
|
|
|
// Get the hashtable bucket for a given thread id.
|
|
static struct _brc_bucket *
|
|
get_bucket(PyInterpreterState *interp, uintptr_t tid)
|
|
{
|
|
return &interp->brc.table[tid % _Py_BRC_NUM_BUCKETS];
|
|
}
|
|
|
|
// Find the thread state in a hash table bucket by thread id.
|
|
static _PyThreadStateImpl *
|
|
find_thread_state(struct _brc_bucket *bucket, uintptr_t thread_id)
|
|
{
|
|
struct llist_node *node;
|
|
llist_for_each(node, &bucket->root) {
|
|
// Get the containing _PyThreadStateImpl from the linked-list node.
|
|
_PyThreadStateImpl *ts = llist_data(node, _PyThreadStateImpl,
|
|
brc.bucket_node);
|
|
if (ts->brc.tid == thread_id) {
|
|
return ts;
|
|
}
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
// Enqueue an object to be merged by the owning thread. This steals a
|
|
// reference to the object.
|
|
void
|
|
_Py_brc_queue_object(PyObject *ob)
|
|
{
|
|
PyInterpreterState *interp = _PyInterpreterState_GET();
|
|
|
|
uintptr_t ob_tid = _Py_atomic_load_uintptr(&ob->ob_tid);
|
|
if (ob_tid == 0) {
|
|
// The owning thread may have concurrently decided to merge the
|
|
// refcount fields.
|
|
Py_DECREF(ob);
|
|
return;
|
|
}
|
|
|
|
struct _brc_bucket *bucket = get_bucket(interp, ob_tid);
|
|
PyMutex_Lock(&bucket->mutex);
|
|
_PyThreadStateImpl *tstate = find_thread_state(bucket, ob_tid);
|
|
if (tstate == NULL) {
|
|
// If we didn't find the owning thread then it must have already exited.
|
|
// It's safe (and necessary) to merge the refcount. Subtract one when
|
|
// merging because we've stolen a reference.
|
|
Py_ssize_t refcount = _Py_ExplicitMergeRefcount(ob, -1);
|
|
PyMutex_Unlock(&bucket->mutex);
|
|
if (refcount == 0) {
|
|
_Py_Dealloc(ob);
|
|
}
|
|
return;
|
|
}
|
|
|
|
if (_PyObjectStack_Push(&tstate->brc.objects_to_merge, ob) < 0) {
|
|
PyMutex_Unlock(&bucket->mutex);
|
|
|
|
// Fall back to stopping all threads and manually merging the refcount
|
|
// if we can't enqueue the object to be merged.
|
|
_PyEval_StopTheWorld(interp);
|
|
Py_ssize_t refcount = _Py_ExplicitMergeRefcount(ob, -1);
|
|
_PyEval_StartTheWorld(interp);
|
|
|
|
if (refcount == 0) {
|
|
_Py_Dealloc(ob);
|
|
}
|
|
return;
|
|
}
|
|
|
|
// Notify owning thread
|
|
_Py_set_eval_breaker_bit(&tstate->base, _PY_EVAL_EXPLICIT_MERGE_BIT);
|
|
|
|
PyMutex_Unlock(&bucket->mutex);
|
|
}
|
|
|
|
static void
|
|
merge_queued_objects(_PyObjectStack *to_merge)
|
|
{
|
|
PyObject *ob;
|
|
while ((ob = _PyObjectStack_Pop(to_merge)) != NULL) {
|
|
// Subtract one when merging because the queue had a reference.
|
|
Py_ssize_t refcount = _Py_ExplicitMergeRefcount(ob, -1);
|
|
if (refcount == 0) {
|
|
_Py_Dealloc(ob);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Process this thread's queue of objects to merge.
|
|
void
|
|
_Py_brc_merge_refcounts(PyThreadState *tstate)
|
|
{
|
|
struct _brc_thread_state *brc = &((_PyThreadStateImpl *)tstate)->brc;
|
|
struct _brc_bucket *bucket = get_bucket(tstate->interp, brc->tid);
|
|
|
|
assert(brc->tid == _Py_ThreadId());
|
|
|
|
// Append all objects into a local stack. We don't want to hold the lock
|
|
// while calling destructors.
|
|
PyMutex_Lock(&bucket->mutex);
|
|
_PyObjectStack_Merge(&brc->local_objects_to_merge, &brc->objects_to_merge);
|
|
PyMutex_Unlock(&bucket->mutex);
|
|
|
|
// Process the local stack until it's empty
|
|
merge_queued_objects(&brc->local_objects_to_merge);
|
|
}
|
|
|
|
void
|
|
_Py_brc_init_state(PyInterpreterState *interp)
|
|
{
|
|
struct _brc_state *brc = &interp->brc;
|
|
for (Py_ssize_t i = 0; i < _Py_BRC_NUM_BUCKETS; i++) {
|
|
llist_init(&brc->table[i].root);
|
|
}
|
|
}
|
|
|
|
void
|
|
_Py_brc_init_thread(PyThreadState *tstate)
|
|
{
|
|
struct _brc_thread_state *brc = &((_PyThreadStateImpl *)tstate)->brc;
|
|
uintptr_t tid = _Py_ThreadId();
|
|
|
|
// Add ourself to the hashtable
|
|
struct _brc_bucket *bucket = get_bucket(tstate->interp, tid);
|
|
PyMutex_Lock(&bucket->mutex);
|
|
brc->tid = tid;
|
|
llist_insert_tail(&bucket->root, &brc->bucket_node);
|
|
PyMutex_Unlock(&bucket->mutex);
|
|
}
|
|
|
|
void
|
|
_Py_brc_remove_thread(PyThreadState *tstate)
|
|
{
|
|
struct _brc_thread_state *brc = &((_PyThreadStateImpl *)tstate)->brc;
|
|
if (brc->tid == 0) {
|
|
// The thread state may have been created, but never bound to a native
|
|
// thread and therefore never added to the hashtable.
|
|
assert(tstate->_status.bound == 0);
|
|
return;
|
|
}
|
|
|
|
struct _brc_bucket *bucket = get_bucket(tstate->interp, brc->tid);
|
|
|
|
// We need to fully process any objects to merge before removing ourself
|
|
// from the hashtable. It is not safe to perform any refcount operations
|
|
// after we are removed. After that point, other threads treat our objects
|
|
// as abandoned and may merge the objects' refcounts directly.
|
|
bool empty = false;
|
|
while (!empty) {
|
|
// Process the local stack until it's empty
|
|
merge_queued_objects(&brc->local_objects_to_merge);
|
|
|
|
PyMutex_Lock(&bucket->mutex);
|
|
empty = (brc->objects_to_merge.head == NULL);
|
|
if (empty) {
|
|
llist_remove(&brc->bucket_node);
|
|
}
|
|
else {
|
|
_PyObjectStack_Merge(&brc->local_objects_to_merge,
|
|
&brc->objects_to_merge);
|
|
}
|
|
PyMutex_Unlock(&bucket->mutex);
|
|
}
|
|
|
|
assert(brc->local_objects_to_merge.head == NULL);
|
|
assert(brc->objects_to_merge.head == NULL);
|
|
}
|
|
|
|
void
|
|
_Py_brc_after_fork(PyInterpreterState *interp)
|
|
{
|
|
// Unlock all bucket mutexes. Some of the buckets may be locked because
|
|
// locks can be handed off to a parked thread (see lock.c). We don't have
|
|
// to worry about consistency here, because no thread can be actively
|
|
// modifying a bucket, but it might be paused (not yet woken up) on a
|
|
// PyMutex_Lock while holding that lock.
|
|
for (Py_ssize_t i = 0; i < _Py_BRC_NUM_BUCKETS; i++) {
|
|
_PyMutex_at_fork_reinit(&interp->brc.table[i].mutex);
|
|
}
|
|
}
|
|
|
|
#endif /* Py_GIL_DISABLED */
|