Here is a sketch of a solution with watch
channels. It seems working correctly and I will proceed for now. But I would be grateful for a more idiomatic solution with some sort of an analogue of condvar.
use rand::{thread_rng, Rng};
use tokio::sync::watch;
use tokio::time::{self, Duration};
#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
async fn main() {
const TEST_COUNT: usize = 10_000;
// We'll run the same "trigger an event once, let two tasks wait for it" flow 10k times.
for i in 0..TEST_COUNT {
let (tx, rx) = watch::channel(false);
let mut rx1 = rx.clone();
let mut rx2 = rx.clone();
// Task A
let handle1 = tokio::spawn(async move {
// Random short sleep to scramble scheduling.
let delay = thread_rng().gen_range(0..2);
time::sleep(Duration::from_millis(delay)).await;
// Check if event has already happened.
// the explicit “borrow” check is only an optimization that avoids an unnecessary async suspension
// in case the receiver is already up-to-date
if !*rx1.borrow() {
let delay = thread_rng().gen_range(0..2);
time::sleep(Duration::from_millis(delay)).await;
// If not, await first change to `true`.
// Under the hood, each watch::Receiver has an internal sequence number indicating the version
// of the channel’s state it has seen. Every time the sender calls tx.send(...),
// this version is incremented. When you call changed().await, if the receiver’s version is out of date
// (i.e., less than the channel’s current version), changed() returns immediately.
// This is how the watch channel prevents “missing updates” even if the change happens
// between your “check” and your “await.”
rx1.changed().await.expect("watch channel closed");
}
});
// Task B
let handle2 = tokio::spawn(async move {
let delay = thread_rng().gen_range(0..2);
time::sleep(Duration::from_millis(delay)).await;
if !*rx2.borrow() {
let delay = thread_rng().gen_range(0..2);
time::sleep(Duration::from_millis(delay)).await;
rx2.changed().await.expect("watch channel closed");
}
});
// Random short sleep before triggering event.
// This tries to ensure the tasks might already be waiting ...
let delay = thread_rng().gen_range(0..4);
time::sleep(Duration::from_millis(delay)).await;
// "Event has happened"
tx.send(true).expect("watch channel closed");
// Wait for both tasks to confirm receipt of the `true` state.
handle1.await.unwrap();
handle2.await.unwrap();
// Print progress occasionally
if (i + 1) % 1000 == 0 {
println!("Finished iteration {}", i + 1);
}
}
println!("All {} iterations completed successfully.", TEST_COUNT);
}