* Fix missing memory barriers

* Fix comments about memory ordering
 -----BEGIN PGP SIGNATURE-----
 
 iQFIBAABCAAyFiEE8TM4V0tmI4mGbHaCv/vSX3jHroMFAmQHIqoUHHBib256aW5p
 QHJlZGhhdC5jb20ACgkQv/vSX3jHroPYBwgArUaS0KGrBM1XmRUUpXnJokmA37n8
 ft477na+XW+p9VYi27B0R01P8j+AkCrAO0Ir1MLG7axjn5KiRMnbf2uBgqasEREv
 repJEXsqISoxA6vvAvnehKHAI9zu8b7frRc/30b6EOrrZpn0JKePSNRTyBu2seGO
 NFDXPVA2Wom+xXaNSEGt0dmoJ6AzEVIZKhUIwyvUWOC7MXuuIkRWn9/nySUdvEt0
 RIFPPk7JCjnEc32vb4Xnq/Ncsy20tMIM1hlDxMOVNq3brjeSCzS0PPPSjE/X5OtW
 Yn5YS0nCyD7wjP2dkXI4I1lUPxUUx6LvMz1aGbJCfyjSX41mNES/agoGgA==
 =KEUo
 -----END PGP SIGNATURE-----

Merge tag 'for-upstream-mb' of https://gitlab.com/bonzini/qemu into staging

* Fix missing memory barriers
* Fix comments about memory ordering

# -----BEGIN PGP SIGNATURE-----
#
# iQFIBAABCAAyFiEE8TM4V0tmI4mGbHaCv/vSX3jHroMFAmQHIqoUHHBib256aW5p
# QHJlZGhhdC5jb20ACgkQv/vSX3jHroPYBwgArUaS0KGrBM1XmRUUpXnJokmA37n8
# ft477na+XW+p9VYi27B0R01P8j+AkCrAO0Ir1MLG7axjn5KiRMnbf2uBgqasEREv
# repJEXsqISoxA6vvAvnehKHAI9zu8b7frRc/30b6EOrrZpn0JKePSNRTyBu2seGO
# NFDXPVA2Wom+xXaNSEGt0dmoJ6AzEVIZKhUIwyvUWOC7MXuuIkRWn9/nySUdvEt0
# RIFPPk7JCjnEc32vb4Xnq/Ncsy20tMIM1hlDxMOVNq3brjeSCzS0PPPSjE/X5OtW
# Yn5YS0nCyD7wjP2dkXI4I1lUPxUUx6LvMz1aGbJCfyjSX41mNES/agoGgA==
# =KEUo
# -----END PGP SIGNATURE-----
# gpg: Signature made Tue 07 Mar 2023 11:40:26 GMT
# gpg:                using RSA key F13338574B662389866C7682BFFBD25F78C7AE83
# gpg:                issuer "pbonzini@redhat.com"
# gpg: Good signature from "Paolo Bonzini <bonzini@gnu.org>" [full]
# gpg:                 aka "Paolo Bonzini <pbonzini@redhat.com>" [full]
# Primary key fingerprint: 46F5 9FBD 57D6 12E7 BFD4  E2F7 7E15 100C CD36 69B1
#      Subkey fingerprint: F133 3857 4B66 2389 866C  7682 BFFB D25F 78C7 AE83

* tag 'for-upstream-mb' of https://gitlab.com/bonzini/qemu:
  async: clarify usage of barriers in the polling case
  async: update documentation of the memory barriers
  physmem: add missing memory barrier
  qemu-coroutine-lock: add smp_mb__after_rmw()
  aio-wait: switch to smp_mb__after_rmw()
  edu: add smp_mb__after_rmw()
  qemu-thread-win32: cleanup, fix, document QemuEvent
  qemu-thread-posix: cleanup, fix, document QemuEvent
  qatomic: add smp_mb__before/after_rmw()

Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
This commit is contained in:
Peter Maydell 2023-03-07 17:02:06 +00:00
commit 7b0f0aa55f
9 changed files with 186 additions and 70 deletions

View file

@ -27,7 +27,8 @@ provides macros that fall in three camps:
- weak atomic access and manual memory barriers: ``qatomic_read()``,
``qatomic_set()``, ``smp_rmb()``, ``smp_wmb()``, ``smp_mb()``,
``smp_mb_acquire()``, ``smp_mb_release()``, ``smp_read_barrier_depends()``;
``smp_mb_acquire()``, ``smp_mb_release()``, ``smp_read_barrier_depends()``,
``smp_mb__before_rmw()``, ``smp_mb__after_rmw()``;
- sequentially consistent atomic access: everything else.
@ -472,7 +473,7 @@ and memory barriers, and the equivalents in QEMU:
sequential consistency.
- in QEMU, ``qatomic_read()`` and ``qatomic_set()`` do not participate in
the total ordering enforced by sequentially-consistent operations.
the ordering enforced by read-modify-write operations.
This is because QEMU uses the C11 memory model. The following example
is correct in Linux but not in QEMU:
@ -488,9 +489,24 @@ and memory barriers, and the equivalents in QEMU:
because the read of ``y`` can be moved (by either the processor or the
compiler) before the write of ``x``.
Fixing this requires an ``smp_mb()`` memory barrier between the write
of ``x`` and the read of ``y``. In the common case where only one thread
writes ``x``, it is also possible to write it like this:
Fixing this requires a full memory barrier between the write of ``x`` and
the read of ``y``. QEMU provides ``smp_mb__before_rmw()`` and
``smp_mb__after_rmw()``; they act both as an optimization,
avoiding the memory barrier on processors where it is unnecessary,
and as a clarification of this corner case of the C11 memory model:
+--------------------------------+
| QEMU (correct) |
+================================+
| :: |
| |
| a = qatomic_fetch_add(&x, 2);|
| smp_mb__after_rmw(); |
| b = qatomic_read(&y); |
+--------------------------------+
In the common case where only one thread writes ``x``, it is also possible
to write it like this:
+--------------------------------+
| QEMU (correct) |

View file

@ -267,6 +267,8 @@ static void edu_mmio_write(void *opaque, hwaddr addr, uint64_t val,
case 0x20:
if (val & EDU_STATUS_IRQFACT) {
qatomic_or(&edu->status, EDU_STATUS_IRQFACT);
/* Order check of the COMPUTING flag after setting IRQFACT. */
smp_mb__after_rmw();
} else {
qatomic_and(&edu->status, ~EDU_STATUS_IRQFACT);
}
@ -349,6 +351,9 @@ static void *edu_fact_thread(void *opaque)
qemu_mutex_unlock(&edu->thr_mutex);
qatomic_and(&edu->status, ~EDU_STATUS_COMPUTING);
/* Clear COMPUTING flag before checking IRQFACT. */
smp_mb__after_rmw();
if (qatomic_read(&edu->status) & EDU_STATUS_IRQFACT) {
qemu_mutex_lock_iothread();
edu_raise_irq(edu, FACT_IRQ);

View file

@ -85,7 +85,7 @@ extern AioWait global_aio_wait;
/* Increment wait_->num_waiters before evaluating cond. */ \
qatomic_inc(&wait_->num_waiters); \
/* Paired with smp_mb in aio_wait_kick(). */ \
smp_mb(); \
smp_mb__after_rmw(); \
if (ctx_ && in_aio_context_home_thread(ctx_)) { \
while ((cond)) { \
aio_poll(ctx_, true); \

View file

@ -245,6 +245,20 @@
#define smp_wmb() smp_mb_release()
#define smp_rmb() smp_mb_acquire()
/*
* SEQ_CST is weaker than the older __sync_* builtins and Linux
* kernel read-modify-write atomics. Provide a macro to obtain
* the same semantics.
*/
#if !defined(QEMU_SANITIZE_THREAD) && \
(defined(__i386__) || defined(__x86_64__) || defined(__s390x__))
# define smp_mb__before_rmw() signal_barrier()
# define smp_mb__after_rmw() signal_barrier()
#else
# define smp_mb__before_rmw() smp_mb()
# define smp_mb__after_rmw() smp_mb()
#endif
/* qatomic_mb_read/set semantics map Java volatile variables. They are
* less expensive on some platforms (notably POWER) than fully
* sequentially consistent operations.
@ -259,7 +273,8 @@
#if !defined(QEMU_SANITIZE_THREAD) && \
(defined(__i386__) || defined(__x86_64__) || defined(__s390x__))
/* This is more efficient than a store plus a fence. */
# define qatomic_mb_set(ptr, i) ((void)qatomic_xchg(ptr, i))
# define qatomic_mb_set(ptr, i) \
({ (void)qatomic_xchg(ptr, i); smp_mb__after_rmw(); })
#else
# define qatomic_mb_set(ptr, i) \
({ qatomic_store_release(ptr, i); smp_mb(); })

View file

@ -2927,6 +2927,8 @@ void cpu_register_map_client(QEMUBH *bh)
qemu_mutex_lock(&map_client_list_lock);
client->bh = bh;
QLIST_INSERT_HEAD(&map_client_list, client, link);
/* Write map_client_list before reading in_use. */
smp_mb();
if (!qatomic_read(&bounce.in_use)) {
cpu_notify_map_clients_locked();
}
@ -3116,6 +3118,7 @@ void address_space_unmap(AddressSpace *as, void *buffer, hwaddr len,
qemu_vfree(bounce.buffer);
bounce.buffer = NULL;
memory_region_unref(bounce.mr);
/* Clear in_use before reading map_client_list. */
qatomic_mb_set(&bounce.in_use, false);
cpu_notify_map_clients();
}

View file

@ -74,14 +74,21 @@ static void aio_bh_enqueue(QEMUBH *bh, unsigned new_flags)
unsigned old_flags;
/*
* The memory barrier implicit in qatomic_fetch_or makes sure that:
* 1. idle & any writes needed by the callback are done before the
* locations are read in the aio_bh_poll.
* 2. ctx is loaded before the callback has a chance to execute and bh
* could be freed.
* Synchronizes with atomic_fetch_and() in aio_bh_dequeue(), ensuring that
* insertion starts after BH_PENDING is set.
*/
old_flags = qatomic_fetch_or(&bh->flags, BH_PENDING | new_flags);
if (!(old_flags & BH_PENDING)) {
/*
* At this point the bottom half becomes visible to aio_bh_poll().
* This insertion thus synchronizes with QSLIST_MOVE_ATOMIC in
* aio_bh_poll(), ensuring that:
* 1. any writes needed by the callback are visible from the callback
* after aio_bh_dequeue() returns bh.
* 2. ctx is loaded before the callback has a chance to execute and bh
* could be freed.
*/
QSLIST_INSERT_HEAD_ATOMIC(&ctx->bh_list, bh, next);
}
@ -107,11 +114,8 @@ static QEMUBH *aio_bh_dequeue(BHList *head, unsigned *flags)
QSLIST_REMOVE_HEAD(head, next);
/*
* The qatomic_and is paired with aio_bh_enqueue(). The implicit memory
* barrier ensures that the callback sees all writes done by the scheduling
* thread. It also ensures that the scheduling thread sees the cleared
* flag before bh->cb has run, and thus will call aio_notify again if
* necessary.
* Synchronizes with qatomic_fetch_or() in aio_bh_enqueue(), ensuring that
* the removal finishes before BH_PENDING is reset.
*/
*flags = qatomic_fetch_and(&bh->flags,
~(BH_PENDING | BH_SCHEDULED | BH_IDLE));
@ -158,6 +162,7 @@ int aio_bh_poll(AioContext *ctx)
BHListSlice *s;
int ret = 0;
/* Synchronizes with QSLIST_INSERT_HEAD_ATOMIC in aio_bh_enqueue(). */
QSLIST_MOVE_ATOMIC(&slice.bh_list, &ctx->bh_list);
QSIMPLEQ_INSERT_TAIL(&ctx->bh_slice_list, &slice, next);
@ -448,15 +453,15 @@ LuringState *aio_get_linux_io_uring(AioContext *ctx)
void aio_notify(AioContext *ctx)
{
/*
* Write e.g. bh->flags before writing ctx->notified. Pairs with smp_mb in
* aio_notify_accept.
* Write e.g. ctx->bh_list before writing ctx->notified. Pairs with
* smp_mb() in aio_notify_accept().
*/
smp_wmb();
qatomic_set(&ctx->notified, true);
/*
* Write ctx->notified before reading ctx->notify_me. Pairs
* with smp_mb in aio_ctx_prepare or aio_poll.
* Write ctx->notified (and also ctx->bh_list) before reading ctx->notify_me.
* Pairs with smp_mb() in aio_ctx_prepare or aio_poll.
*/
smp_mb();
if (qatomic_read(&ctx->notify_me)) {
@ -469,8 +474,9 @@ void aio_notify_accept(AioContext *ctx)
qatomic_set(&ctx->notified, false);
/*
* Write ctx->notified before reading e.g. bh->flags. Pairs with smp_wmb
* in aio_notify.
* Order reads of ctx->notified (in aio_context_notifier_poll()) and the
* above clearing of ctx->notified before reads of e.g. bh->flags. Pairs
* with smp_wmb() in aio_notify.
*/
smp_mb();
}
@ -493,6 +499,11 @@ static bool aio_context_notifier_poll(void *opaque)
EventNotifier *e = opaque;
AioContext *ctx = container_of(e, AioContext, notifier);
/*
* No need for load-acquire because we just want to kick the
* event loop. aio_notify_accept() takes care of synchronizing
* the event loop with the producers.
*/
return qatomic_read(&ctx->notified);
}

View file

@ -201,10 +201,16 @@ static void coroutine_fn qemu_co_mutex_lock_slowpath(AioContext *ctx,
trace_qemu_co_mutex_lock_entry(mutex, self);
push_waiter(mutex, &w);
/*
* Add waiter before reading mutex->handoff. Pairs with qatomic_mb_set
* in qemu_co_mutex_unlock.
*/
smp_mb__after_rmw();
/* This is the "Responsibility Hand-Off" protocol; a lock() picks from
* a concurrent unlock() the responsibility of waking somebody up.
*/
old_handoff = qatomic_mb_read(&mutex->handoff);
old_handoff = qatomic_read(&mutex->handoff);
if (old_handoff &&
has_waiters(mutex) &&
qatomic_cmpxchg(&mutex->handoff, old_handoff, 0) == old_handoff) {
@ -303,6 +309,7 @@ void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
}
our_handoff = mutex->sequence;
/* Set handoff before checking for waiters. */
qatomic_mb_set(&mutex->handoff, our_handoff);
if (!has_waiters(mutex)) {
/* The concurrent lock has not added itself yet, so it

View file

@ -384,13 +384,21 @@ void qemu_event_destroy(QemuEvent *ev)
void qemu_event_set(QemuEvent *ev)
{
/* qemu_event_set has release semantics, but because it *loads*
assert(ev->initialized);
/*
* Pairs with both qemu_event_reset() and qemu_event_wait().
*
* qemu_event_set has release semantics, but because it *loads*
* ev->value we need a full memory barrier here.
*/
assert(ev->initialized);
smp_mb();
if (qatomic_read(&ev->value) != EV_SET) {
if (qatomic_xchg(&ev->value, EV_SET) == EV_BUSY) {
int old = qatomic_xchg(&ev->value, EV_SET);
/* Pairs with memory barrier in kernel futex_wait system call. */
smp_mb__after_rmw();
if (old == EV_BUSY) {
/* There were waiters, wake them up. */
qemu_futex_wake(ev, INT_MAX);
}
@ -399,18 +407,19 @@ void qemu_event_set(QemuEvent *ev)
void qemu_event_reset(QemuEvent *ev)
{
unsigned value;
assert(ev->initialized);
value = qatomic_read(&ev->value);
smp_mb_acquire();
if (value == EV_SET) {
/*
* If there was a concurrent reset (or even reset+wait),
* do nothing. Otherwise change EV_SET->EV_FREE.
*/
qatomic_or(&ev->value, EV_FREE);
}
/*
* If there was a concurrent reset (or even reset+wait),
* do nothing. Otherwise change EV_SET->EV_FREE.
*/
qatomic_or(&ev->value, EV_FREE);
/*
* Order reset before checking the condition in the caller.
* Pairs with the first memory barrier in qemu_event_set().
*/
smp_mb__after_rmw();
}
void qemu_event_wait(QemuEvent *ev)
@ -418,20 +427,40 @@ void qemu_event_wait(QemuEvent *ev)
unsigned value;
assert(ev->initialized);
value = qatomic_read(&ev->value);
smp_mb_acquire();
/*
* qemu_event_wait must synchronize with qemu_event_set even if it does
* not go down the slow path, so this load-acquire is needed that
* synchronizes with the first memory barrier in qemu_event_set().
*
* If we do go down the slow path, there is no requirement at all: we
* might miss a qemu_event_set() here but ultimately the memory barrier in
* qemu_futex_wait() will ensure the check is done correctly.
*/
value = qatomic_load_acquire(&ev->value);
if (value != EV_SET) {
if (value == EV_FREE) {
/*
* Leave the event reset and tell qemu_event_set that there
* are waiters. No need to retry, because there cannot be
* a concurrent busy->free transition. After the CAS, the
* event will be either set or busy.
* Leave the event reset and tell qemu_event_set that there are
* waiters. No need to retry, because there cannot be a concurrent
* busy->free transition. After the CAS, the event will be either
* set or busy.
*
* This cmpxchg doesn't have particular ordering requirements if it
* succeeds (moving the store earlier can only cause qemu_event_set()
* to issue _more_ wakeups), the failing case needs acquire semantics
* like the load above.
*/
if (qatomic_cmpxchg(&ev->value, EV_FREE, EV_BUSY) == EV_SET) {
return;
}
}
/*
* This is the final check for a concurrent set, so it does need
* a smp_mb() pairing with the second barrier of qemu_event_set().
* The barrier is inside the FUTEX_WAIT system call.
*/
qemu_futex_wait(ev, EV_BUSY);
}
}

View file

@ -272,12 +272,20 @@ void qemu_event_destroy(QemuEvent *ev)
void qemu_event_set(QemuEvent *ev)
{
assert(ev->initialized);
/* qemu_event_set has release semantics, but because it *loads*
/*
* Pairs with both qemu_event_reset() and qemu_event_wait().
*
* qemu_event_set has release semantics, but because it *loads*
* ev->value we need a full memory barrier here.
*/
smp_mb();
if (qatomic_read(&ev->value) != EV_SET) {
if (qatomic_xchg(&ev->value, EV_SET) == EV_BUSY) {
int old = qatomic_xchg(&ev->value, EV_SET);
/* Pairs with memory barrier after ResetEvent. */
smp_mb__after_rmw();
if (old == EV_BUSY) {
/* There were waiters, wake them up. */
SetEvent(ev->event);
}
@ -286,17 +294,19 @@ void qemu_event_set(QemuEvent *ev)
void qemu_event_reset(QemuEvent *ev)
{
unsigned value;
assert(ev->initialized);
value = qatomic_read(&ev->value);
smp_mb_acquire();
if (value == EV_SET) {
/* If there was a concurrent reset (or even reset+wait),
* do nothing. Otherwise change EV_SET->EV_FREE.
*/
qatomic_or(&ev->value, EV_FREE);
}
/*
* If there was a concurrent reset (or even reset+wait),
* do nothing. Otherwise change EV_SET->EV_FREE.
*/
qatomic_or(&ev->value, EV_FREE);
/*
* Order reset before checking the condition in the caller.
* Pairs with the first memory barrier in qemu_event_set().
*/
smp_mb__after_rmw();
}
void qemu_event_wait(QemuEvent *ev)
@ -304,29 +314,49 @@ void qemu_event_wait(QemuEvent *ev)
unsigned value;
assert(ev->initialized);
value = qatomic_read(&ev->value);
smp_mb_acquire();
/*
* qemu_event_wait must synchronize with qemu_event_set even if it does
* not go down the slow path, so this load-acquire is needed that
* synchronizes with the first memory barrier in qemu_event_set().
*
* If we do go down the slow path, there is no requirement at all: we
* might miss a qemu_event_set() here but ultimately the memory barrier in
* qemu_futex_wait() will ensure the check is done correctly.
*/
value = qatomic_load_acquire(&ev->value);
if (value != EV_SET) {
if (value == EV_FREE) {
/* qemu_event_set is not yet going to call SetEvent, but we are
* going to do another check for EV_SET below when setting EV_BUSY.
* At that point it is safe to call WaitForSingleObject.
/*
* Here the underlying kernel event is reset, but qemu_event_set is
* not yet going to call SetEvent. However, there will be another
* check for EV_SET below when setting EV_BUSY. At that point it
* is safe to call WaitForSingleObject.
*/
ResetEvent(ev->event);
/* Tell qemu_event_set that there are waiters. No need to retry
* because there cannot be a concurrent busy->free transition.
* After the CAS, the event will be either set or busy.
/*
* It is not clear whether ResetEvent provides this barrier; kernel
* APIs (KeResetEvent/KeClearEvent) do not. Better safe than sorry!
*/
smp_mb();
/*
* Leave the event reset and tell qemu_event_set that there are
* waiters. No need to retry, because there cannot be a concurrent
* busy->free transition. After the CAS, the event will be either
* set or busy.
*/
if (qatomic_cmpxchg(&ev->value, EV_FREE, EV_BUSY) == EV_SET) {
value = EV_SET;
} else {
value = EV_BUSY;
return;
}
}
if (value == EV_BUSY) {
WaitForSingleObject(ev->event, INFINITE);
}
/*
* ev->value is now EV_BUSY. Since we didn't observe EV_SET,
* qemu_event_set() must observe EV_BUSY and call SetEvent().
*/
WaitForSingleObject(ev->event, INFINITE);
}
}