.notify_one() / .notify_all() on a destructed std::atomic is UB.std::atomic_ref, because the object it points to must outlive the std::atomic_ref object.std::binary_semaphore.wait() and .notify_xxx() to a separate atomic variable that has a longer lifetimeBefore I get to my main question, I'm assuming that
std::mutex::unlock()stops touching this as soon as it puts the mutex in a state where another thread might lock it [...]
That's a valid assumption - the standard even mandates that this type of usage must be supported:
35.5.3.2.1 Class
mutex[thread.mutex.class](2) [ Note: After a thread A has called
unlock(), releasing a mutex, it is possible for another thread B to lock the same mutex, observe that it is no longer in use, unlock it, and destroy it, before thread A appears to have returned from its unlock call. Implementations are required to handle such scenarios correctly, as long as thread A doesn't access the mutex after the unlock call returns. These cases typically occur when a reference-counted object contains a mutex that is used to protect the reference count. — end note ]
Is it possible to build a mutex from C++20
std::atomicwithout spinning to avoid use-after-free when deleting?
Yes, it is possible.
But not as straight-forward as one might expect, due to the lifetime issues you already mentioned.
See 3. Solutions for a list of potential ways you could work around this limitiation.
Is there any hope that a future version of C++ might provide a safe way to notify on destroyed atomics?
There is a paper addressing this specific issue, that could have been part of C++26:
P2616 - Making std::atomic notification/wait operations usable in more situations
Revisions:
| Paper | Date | Target C++ Version |
|---|---|---|
P2616R4 |
2023-02-15 | C++26 |
P2616R3 |
2022-11-29 | C++26 |
P2616R2 |
2022-11-16 | C++26 |
P2616R1 |
2022-11-09 | C++26 |
P2616R0 |
2022-07-05 | C++26 |
The current state of this paper can be seen at cplusplus/papers Issue #1279 on github
(currently the repo is private due to the current committee meeting - archive.org version)
It is stuck with needs-revision since May 23, 2023 - so it's unclear if (and when) it'll ever become part of the standard.
So, my next question is, can I do anything about this?
The best I've come up with is to spin during destruction, but it's offensive to have to spin when we have futexes that are specifically designed to avoid spinning.
There's unfortunately no one-size-fits-all solution for this.
std::binary_semaphore might be an option - its acquire() and release() member functions do the wait / notify atomically, so there should be no lifetime problems.std::counting_semaphore / std::binary_semaphore)Am I missing some other trick I could use to do this without spinning - maybe with std::atomic_ref?
std::atomic_ref doesn't help in this case either unfortunately.
One of its rules is that the object it points to must outlive the atomic_ref object.
(which would not be the case if you destruct the object)
31.7 Class template
atomic_ref[atomics.ref.generic]
(3) The lifetime ([basic.life]) of an object referenced by*ptrshall exceed the lifetime of allatomic_refs that reference the object. While anyatomic_refinstances exist that reference the*ptrobject, all accesses to that object shall exclusively occur through thoseatomic_refinstances. [...]
Is there any benefit to the language making this UB, or is this basically a flaw in the language spec that atomics don't expose the full expressiveness of the underlying futexes on which they are implemented?
TODO
TODO
std::counting_semaphore / std::binary_semaphoreIt is relatively straightforward to wrap std::binary_semaphore into a custom mutex implementation that supports unlocking the mutex from a different thread than the one that locked it.
e.g.: godbolt
class my_mutex {
private:
std::binary_semaphore sem;
public:
my_mutex() : sem(1) {}
void lock() {
sem.acquire();
}
void unlock() {
sem.release();
}
bool try_lock() {
return sem.try_acquire();
}
template<class Rep, class Period>
bool try_lock_for(std::chrono::duration<Rep, Period> const& timeout) {
return sem.try_acquire_for(timeout);
}
template<class Clock, class Duration>
bool try_lock_until(std::chrono::time_point<Clock, Duration> const& timeout) {
return sem.try_acquire_until(timeout);
}
};
Upsides:
Downsides:
libc++ / libstdc++) versions.
std::atomic_ref and use the futex wait / wake syscallsAnother option would be to use std::atomic_ref for the atomic read & write operations, and manually handle the waking / sleeping part (by calling the syscalls directly).
e.g.: godbolt
class my_mutex {
private:
using state_t = std::uint32_t;
static constexpr state_t StateUnlocked = 0;
static constexpr state_t StateLocked = 1;
static constexpr state_t StateLockedWithWaiters = 2;
static_assert(std::atomic_ref<state_t>::is_always_lock_free);
state_t state = StateUnlocked;
void wait() {
// TODO use WaitOnAddress for windows, ... other oses ...
syscall(
SYS_futex,
&state,
FUTEX_WAIT_PRIVATE,
StateLockedWithWaiters,
NULL
);
}
void wake() {
// TODO use WakeOnAddress for windows, ... other oses ...
syscall(
SYS_futex,
&state,
FUTEX_WAKE_PRIVATE,
1
);
}
public:
void lock() {
state_t expected = StateUnlocked;
if(std::atomic_ref(state).compare_exchange_strong(
expected,
StateLocked,
std::memory_order::acquire,
std::memory_order::relaxed
)) [[likely]] {
return;
}
while(true) {
if(expected != StateLockedWithWaiters) {
expected = std::atomic_ref(state).exchange(
StateLockedWithWaiters,
std::memory_order::acquire
);
if(expected == StateUnlocked) {
return;
}
}
// TODO: maybe spin a little before waiting
wait();
expected = std::atomic_ref(state).load(
std::memory_order::relaxed
);
}
}
bool try_lock() {
state_t expected = StateUnlocked;
return std::atomic_ref(state).compare_exchange_strong(
expected,
StateLocked,
std::memory_order::acquire,
std::memory_order::relaxed
);
}
void unlock() {
state_t prev = std::atomic_ref(state).exchange(
StateUnlocked,
std::memory_order::release
);
if(prev == StateLockedWithWaiters) [[unlikely]] {
wake();
}
}
};
Upsides:
Downsides:
wait() and wake() manually, by directly calling the syscalls.std::atomic_ref::wait when the object gets destroyed)Another option would be to avoid waiting on the atomic variable directly by using another atomic variable (with a longer lifetime) solely for the wait / notify.
This is also how most standard libraries implement std::atomic::wait() for types that are not futex-sized.
(libstdc++ for example has a pool of 16 futexes it uses for waits on atomic variables that are non-futex sized. (__wait_flags::__proxy_wait is the flag used to handle wether the wait will be on the atomic value itself or one of the 16 proxy futexes))
e.g.: godbolt
class waiter {
private:
alignas(std::hardware_destructive_interference_size)
std::atomic<std::uint32_t> counter = 0;
public:
void notify_all() {
counter.fetch_add(1, std::memory_order::release);
counter.notify_all();
}
template <class T>
void wait(
std::atomic<T> const& var,
std::type_identity_t<T> const& oldval
) {
while (true) {
auto counterval = counter.load(std::memory_order::acquire);
if (var.load(std::memory_order::relaxed) != oldval) {
return;
}
counter.wait(counterval);
}
}
};
template <std::size_t N = 256>
class waiter_pool {
private:
static_assert(std::has_single_bit(N), "N should be a power of 2");
waiter waiters[N];
waiter& waiter_for_address(const void *ptr) {
std::uintptr_t addr = reinterpret_cast<std::uintptr_t>(ptr);
addr = std::hash<std::uintptr_t>{}(addr);
return waiters[addr % N];
}
public:
template <class T>
void notify_all(std::atomic<T> const& var) {
waiter& w = waiter_for_address(static_cast<const void*>(&var));
w.notify_all();
}
template <class T>
void wait(
std::atomic<T> const& var,
std::type_identity_t<T> const& oldval
) {
waiter& w = waiter_for_address(static_cast<const void*>(&var));
w.wait(var, oldval);
}
};
waiter_pool pool;
class my_mutex {
private:
using state_t = std::uint8_t;
static constexpr state_t StateUnlocked = 0;
static constexpr state_t StateLocked = 1;
static constexpr state_t StateLockedWithWaiters = 2;
static_assert(std::atomic<state_t>::is_always_lock_free);
std::atomic<state_t> state = StateUnlocked;
public:
void lock() {
state_t expected = StateUnlocked;
if (state.compare_exchange_strong(
expected,
StateLocked,
std::memory_order::acquire,
std::memory_order::relaxed
)) [[likely]] {
return;
}
while (true) {
if (expected != StateLockedWithWaiters) {
expected = state.exchange(
StateLockedWithWaiters,
std::memory_order::acquire
);
if (expected == StateUnlocked) {
return;
}
}
// TODO: maybe spin a little before waiting
pool.wait(state, StateLockedWithWaiters);
expected = state.load(std::memory_order_relaxed);
}
}
bool try_lock() {
state_t expected = StateUnlocked;
return state.compare_exchange_strong(
expected,
StateLocked,
std::memory_order::acquire,
std::memory_order::relaxed
);
}
void unlock() {
state_t prev = state.exchange(
StateUnlocked,
std::memory_order::release
);
if (prev == StateLockedWithWaiters) [[unlikely]] {
pool.notify_all(state);
}
}
};
Upsides:
my_mutex::state does not need to be 32 bits because the futex waiting is delegated to the pool. So instances of my_mutex can be much smaller.Downsides: