79824799

Date: 2025-11-19 18:12:47
Score: 1.5
Natty:
Report link

If we simplify the question to remove all the unnecessary details, the desired usage may look like this:

  1. For the pool
    let pool = SqlitePool::connect("sqlite://db/kip.db").await?;

    {
        let participant_repo = SqliteParticipantRepository::new(&pool);
        participant_repo.save().await?;
        participant_repo.save().await?;
    }
  1. For transactions
    let pool = SqlitePool::connect("sqlite://db/kip.db").await?;

    {
        let mut tx = pool.begin().await?;
        let participant_repo = SqliteParticipantRepository::new(&mut tx);
        participant_repo.save().await?;
        participant_repo.save().await?;

        let participant_repo_2 = SqliteParticipantRepository::new(&mut tx);
        participant_repo_2.save().await?;
        participant_repo_2.save().await?;
        tx.commit().await?;
    }

Notice how it's possible to call the repository methods multiple times (there's no surprise though, it should be possible to, as opposed to the case when the repository methods take the &mut self instead of &self).

The second usage examaple also shows that it's possible to share the same transaction across multiple repositories (in my case it's the same SqliteParticipantRepository, but the point still holds).


In order to achieve the consuming code shown above, the repository may look like this:

use crate::mut_acquire::MutAcquire;
use crate::participant_repository::ParticipantRepository;
use crate::sqlite_repository_error::SqliteRepositoryError;
use std::ops::DerefMut;
use tokio::sync::Mutex;
use uuid::Uuid;

pub struct SqliteParticipantRepository<A>
where
    A: MutAcquire + Sync + Send,
{
    acquirer: Mutex<A>,
}

impl<A> SqliteParticipantRepository<A>
where
    A: MutAcquire + Sync + Send,
{
    pub fn new(acquiree: A) -> Self {
        Self {
            acquirer: Mutex::new(acquiree),
        }
    }
}

impl<A> ParticipantRepository for SqliteParticipantRepository<A>
where
    A: MutAcquire + Sync + Send,
{
    type Error = SqliteRepositoryError;

    async fn save(&self) -> Result<(), Self::Error> {
        let user_id = format!("u_{}", Uuid::new_v4());
        let participant_id = format!("p_{}", Uuid::new_v4());
        let name = "Alice".to_string();

        let mut acquirer = self.acquirer.lock().await;
        let conn = acquirer.deref_mut().acquire_executor();

        sqlx::query!(
            "INSERT INTO users (user_id, name) VALUES (?, ?)",
            user_id,
            name,
        )
        .execute(conn)
        .await?;

        drop(acquirer);

        let mut acquirer = self.acquirer.lock().await;
        let conn = acquirer.deref_mut().acquire_executor();

        sqlx::query!(
            "INSERT INTO participants (participant_id, name) VALUES (?, ?)",
            participant_id,
            name
        )
        .execute(conn)
        .await?;

        drop(acquirer);

        Ok(())
    }
}

For reference, here's the ParticipantRepository trait definition:

pub trait ParticipantRepository: Send + Sync {
    type Error: RepositoryError + Send + Sync;

    async fn save(&self) -> Result<(), Self::Error>;
}

The most interesting part about it is the custom MutAcquire:

use sqlx::{SqliteConnection, SqliteExecutor, SqlitePool, SqliteTransaction};

pub trait MutAcquire {
    type Executor<'a>: SqliteExecutor<'a>
    where
        Self: 'a;

    fn acquire_executor<'a>(&'a mut self) -> Self::Executor<'a>;
}

impl MutAcquire for &SqlitePool {
    type Executor<'a>
    where
        Self: 'a,
    = Self;

    fn acquire_executor<'a>(&'a mut self) -> Self::Executor<'a> {
        *self
    }
}

impl<'c> MutAcquire for &mut SqliteTransaction<'c> {
    type Executor<'a>
    where
        Self: 'a,
    = &'a mut SqliteConnection;

    fn acquire_executor<'a>(&'a mut self) -> Self::Executor<'a> {
        &mut ***self
    }
}

Basically, this is the Acquire trait from sqlx, but implemented to take the &mut self instead of consuming it (self). If I were to use the original Acquire trait, it wouldn't be possible to call .acquire() inside the repo's save method, as it would require the save method to take ownership of self as well, which would make the repo one-time-use-only. And this is not very useful.

I genuinely don't understand what stopped the sqlx devs from providing such a trait out of the box. The only thing I can assume is that it's impossible to implement it for all the same targets as Acquire is implemented for. But still, I think it would be more than enough to provide it pre-implemented at least for only the &SqlitePool and &mut SqliteTransaction.

Nevertheless, the code shown above compiles and works.

I have my doubts regarding the Mutex usage in the repo methods, but at the same time I can't really imagine a scenario where it could create some performance bottleneck: when dealing with the pool, the call to .acquire() will simply yield a new connection from the pool, and when the save method is called for a tx-enabled repo, then we actually want the same transaction (connection) to be used for all the internal queries, and thus even if the mutex is locked at the time a query should be run, it's only for the better. At least, if I understand it correctly.


Another important detail is that the code above enables us to make multiple queries on one single repo method, which is sometimes quite useful.


P.S. In my OP I was mistaken saying that Executor is implemented for &mut Transaction. In fact, it's implemented for &mut *Transaction (notice the asterisk), which is basically the same as &mut Connection. It was one of the many reasons why my code refused to compile. Besides that, it seems to be straight impossible to make a repo generic over any Executor. And partially, this is exactly the reason why the sqlx devs introduced this new Acquire trait.

Regarding my concerns that Acquire requires the consumer to know which method to call - either begin or acquire - it seems like there's no difference in my case. In my case I decide upfront whether the save method will be called with a pool or a transaction. And when it's used with a transaction, the acquire method will take its underlying connection, pretty much the same way as it'd do for a pool. And the calling side will decide what to supply the repository with upon its creation. And if we need to call some methods as part of one single transaction, then the calling side will be responsible for beginning it, while the repo (as a consumer) just has to acquire the underlying connection.


Important: this solution does not work if I mark the ParticipantRepository trait with #[async_trait::async_trait] or if I convert its futures to Send with #[trait_variant::make(Send)]. This effectively means that it's impossible to use my solution in Axum applications (which was the initial intent for me). So after all, it's kinda useless, but I still decided to share it in case someone doesn't really care about Axum-and-alike contexts.

Reasons:
  • RegEx Blacklisted phrase (2): I have my doubt
  • Long answer (-1):
  • Has code block (-0.5):
  • Self-answer (0.5):
  • Low reputation (0.5):
Posted by: smellyshovel