Mira, no hablo inglés, pero te comparto mi código. Así puedo gestionar múltiples conexiones de base de datos en FastAPI.
" Look, I don't speak English, but I'm sharing my code with you. This way I can handle multiple connections of db in FastAPI. "
Puede utilizar la sesión de forma aislada con session_control, como una dependencia utilizando get_session y globalmente utilizando get_ctx_session.
" You can use the session in isolation with session_control, as a dependency using get_session, and globally using get_ctx_session. "
Soy programador junior, así que estoy abierto a correcciones.
" I'm a junior programmer, so I'm open to corrections. "
# core/database/connection.py
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from contextvars import ContextVar
from typing import Annotated
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from core.configs.settings import settings
class DatabaseManager:
_instances: dict[str, 'DatabaseManager'] = {}
def __new__(cls, url: str):
if url not in cls._instances:
inst = super().__new__(cls)
inst._init(url)
cls._instances[url] = inst
return cls._instances[url]
def _init(self, url: str):
self._engine = create_async_engine(url, future=True)
self._maker = async_sessionmaker(self._engine, expire_on_commit=False)
self._context_session: ContextVar[AsyncSession | None] = ContextVar('session', default=None)
# * Creates a session with greater control, with automatic commit and rollback.
# * Usage: async with DB.session_control() as session:
@asynccontextmanager
async def session_control(self) -> AsyncGenerator[AsyncSession]:
async with self._maker() as session:
token = self._context_session.set(session)
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
self._context_session.reset(token)
# * Creates a session based on session control, can be used as a FastAPI dependency
async def get_session(self) -> AsyncGenerator[AsyncSession]:
async with self.session_control() as session:
yield session
# * Creates a session based on session control, with a middleware and can be used globally
# # * Usage: session = db.get_ctx_session()
def get_ctx_session(self) -> AsyncSession:
session = self._context_session.get()
if session is None:
raise RuntimeError('No session found in context')
return session
async def connect(self):
async with self._engine.begin() as conn:
await conn.run_sync(lambda _: None)
async def disconnect(self):
await self._engine.dispose()
# ?: Instancia tus bases de datos con DatabaseManager
# * db = DatabaseManager(url_connection)
# todo: Instancias del Database Manager
DB_CORE = DatabaseManager(settings.DB_CORE)
SS_CORE = Annotated[AsyncSession, Depends(DB_CORE.get_session)]
# *: DB_OTHER = DatabaseManager(settings.DB_OTHER)
# *: SS_OTHER = Annotated[AsyncSession, Depends(DB_OTHER.get_session)]
# core/database/middlewares/ctx_session.py
from contextlib import AsyncExitStack
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.types import ASGIApp
from core.database.connection import DatabaseManager
# Lightweight middleware that opens and closes sessions for the lifecycle of a request
class DBSessionMiddleware(BaseHTTPMiddleware):
def __init__(self, app: ASGIApp, db: DatabaseManager):
super().__init__(app)
self.db = db
async def dispatch(self, request: Request, call_next):
# Open session for this DB
async with self.db.session_control():
response = await call_next(request)
return response
# Register middleware for db
app.add_middleware(DBSessionMiddleware, db=DB_CORE)
# app.add_middleware(DBSessionMiddleware, db=DB_OTHER)
--------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------
#?: Done, you can now use the databases in your services without any problems.
async def obtener_estado_usuario(id: UUID) -> EstadoUsuarioDB: # noqa: B008
session = DB_CORE.get_ctx_session()
estado = (await session.execute(select(EstadoUsuarioDB).where(EstadoUsuarioDB.id == id))).scalar_one_or_none()
return estado