79307116

Date: 2024-12-25 08:07:40
Score: 1
Natty:
Report link

Here is a sketch of a solution with watch channels. It seems working correctly and I will proceed for now. But I would be grateful for a more idiomatic solution with some sort of an analogue of condvar.

use rand::{thread_rng, Rng};
use tokio::sync::watch;
use tokio::time::{self, Duration};

#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
async fn main() {
    const TEST_COUNT: usize = 10_000;

    // We'll run the same "trigger an event once, let two tasks wait for it" flow 10k times.
    for i in 0..TEST_COUNT {
        let (tx, rx) = watch::channel(false);
        let mut rx1 = rx.clone();
        let mut rx2 = rx.clone();

        // Task A
        let handle1 = tokio::spawn(async move {
            // Random short sleep to scramble scheduling.
            let delay = thread_rng().gen_range(0..2);
            time::sleep(Duration::from_millis(delay)).await;

            // Check if event has already happened.
            // the explicit “borrow” check is only an optimization that avoids an unnecessary async suspension
            // in case the receiver is already up-to-date
            if !*rx1.borrow() {
                let delay = thread_rng().gen_range(0..2);
                time::sleep(Duration::from_millis(delay)).await;
                // If not, await first change to `true`.
                // Under the hood, each watch::Receiver has an internal sequence number indicating the version
                // of the channel’s state it has seen. Every time the sender calls tx.send(...),
                // this version is incremented. When you call changed().await, if the receiver’s version is out of date
                // (i.e., less than the channel’s current version), changed() returns immediately.
                // This is how the watch channel prevents “missing updates” even if the change happens
                // between your “check” and your “await.”
                rx1.changed().await.expect("watch channel closed");
            }
        });

        // Task B
        let handle2 = tokio::spawn(async move {
            let delay = thread_rng().gen_range(0..2);
            time::sleep(Duration::from_millis(delay)).await;

            if !*rx2.borrow() {
                let delay = thread_rng().gen_range(0..2);
                time::sleep(Duration::from_millis(delay)).await;
                rx2.changed().await.expect("watch channel closed");
            }
        });

        // Random short sleep before triggering event.
        // This tries to ensure the tasks might already be waiting ...
        let delay = thread_rng().gen_range(0..4);
        time::sleep(Duration::from_millis(delay)).await;

        // "Event has happened"
        tx.send(true).expect("watch channel closed");

        // Wait for both tasks to confirm receipt of the `true` state.
        handle1.await.unwrap();
        handle2.await.unwrap();

        // Print progress occasionally
        if (i + 1) % 1000 == 0 {
            println!("Finished iteration {}", i + 1);
        }
    }

    println!("All {} iterations completed successfully.", TEST_COUNT);
}
Reasons:
  • RegEx Blacklisted phrase (2): I would be grateful
  • Long answer (-1):
  • Has code block (-0.5):
  • Self-answer (0.5):
Posted by: Nikolay Zakirov