79723708

Date: 2025-08-02 23:29:52
Score: 3
Natty:
Report link

Mira, no hablo inglés, pero te comparto mi código. Así puedo gestionar múltiples conexiones de base de datos en FastAPI.
" Look, I don't speak English, but I'm sharing my code with you. This way I can handle multiple connections of db in FastAPI. "

Puede utilizar la sesión de forma aislada con session_control, como una dependencia utilizando get_session y globalmente utilizando get_ctx_session.
" You can use the session in isolation with session_control, as a dependency using get_session, and globally using get_ctx_session. "

Soy programador junior, así que estoy abierto a correcciones.
" I'm a junior programmer, so I'm open to corrections. "

# core/database/connection.py
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from contextvars import ContextVar
from typing import Annotated

from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine

from core.configs.settings import settings


class DatabaseManager:
    _instances: dict[str, 'DatabaseManager'] = {}

    def __new__(cls, url: str):
        if url not in cls._instances:
            inst = super().__new__(cls)
            inst._init(url)
            cls._instances[url] = inst
        return cls._instances[url]

    def _init(self, url: str):
        self._engine = create_async_engine(url, future=True)
        self._maker = async_sessionmaker(self._engine, expire_on_commit=False)
        self._context_session: ContextVar[AsyncSession | None] = ContextVar('session', default=None)

    # * Creates a session with greater control, with automatic commit and rollback.
    # * Usage: async with DB.session_control() as session:
    @asynccontextmanager
    async def session_control(self) -> AsyncGenerator[AsyncSession]:
        async with self._maker() as session:
            token = self._context_session.set(session)
            try:
                yield session
                await session.commit()
            except Exception:
                await session.rollback()
                raise
            finally:
                self._context_session.reset(token)

    # * Creates a session based on session control, can be used as a FastAPI dependency
    async def get_session(self) -> AsyncGenerator[AsyncSession]:
        async with self.session_control() as session:
            yield session

    # * Creates a session based on session control, with a middleware and can be used globally 
    # # * Usage: session = db.get_ctx_session()
    def get_ctx_session(self) -> AsyncSession:
        session = self._context_session.get()
        if session is None:
            raise RuntimeError('No session found in context')
        return session

    async def connect(self):
        async with self._engine.begin() as conn:
            await conn.run_sync(lambda _: None)

    async def disconnect(self):
        await self._engine.dispose()


# ?: Instancia tus bases de datos con DatabaseManager
# * db = DatabaseManager(url_connection)

# todo: Instancias del Database Manager
DB_CORE = DatabaseManager(settings.DB_CORE)
SS_CORE = Annotated[AsyncSession, Depends(DB_CORE.get_session)]

# *: DB_OTHER = DatabaseManager(settings.DB_OTHER)
# *: SS_OTHER = Annotated[AsyncSession, Depends(DB_OTHER.get_session)]

# core/database/middlewares/ctx_session.py
from contextlib import AsyncExitStack
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.types import ASGIApp
from core.database.connection import DatabaseManager

# Lightweight middleware that opens and closes sessions for the lifecycle of a request
class DBSessionMiddleware(BaseHTTPMiddleware):
    def __init__(self, app: ASGIApp, db: DatabaseManager):
        super().__init__(app)
        self.db = db

    async def dispatch(self, request: Request, call_next):
        # Open session for this DB
        async with self.db.session_control():
            response = await call_next(request)
            return response

# Register middleware for db
app.add_middleware(DBSessionMiddleware, db=DB_CORE)
# app.add_middleware(DBSessionMiddleware, db=DB_OTHER)

--------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------

#?: Done, you can now use the databases in your services without any problems.

async def obtener_estado_usuario(id: UUID) -> EstadoUsuarioDB:  # noqa: B008
    session = DB_CORE.get_ctx_session()
    estado = (await session.execute(select(EstadoUsuarioDB).where(EstadoUsuarioDB.id == id))).scalar_one_or_none()
    return estado
Reasons:
  • Blacklisted phrase (2): código
  • Blacklisted phrase (2): estoy
  • Whitelisted phrase (-1.5): You can use
  • Long answer (-1):
  • Has code block (-0.5):
  • Contains question mark (0.5):
  • Filler text (0.5): --------------------------------------------------------------------------------------------------------
  • Filler text (0): --------------------------------------------------------------------------------------------------------
  • Low reputation (1):
Posted by: Samuel Aguilar