Skip to content
Snippets Groups Projects
access.py 1.79 KiB
Newer Older
Pierre Chanial's avatar
Pierre Chanial committed
"""This module provides factories to access the admin and project databases.

An instance of this class can be used to connect the admin or a project database.
"""
import collections
from typing import Iterator

from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from sqlalchemy.future import select
from sqlalchemy.orm import sessionmaker

from ..config import settings
from .models import DBProject

assert settings.SQLALCHEMY_DBADMIN_URI is not None

engine = create_engine(settings.SQLALCHEMY_DBADMIN_URI, pool_pre_ping=True, future=True)
AdminSession = sessionmaker(autocommit=False, autoflush=False, bind=engine, future=True)


class ProjectEngines(collections.abc.Mapping):
    """Cache for the project engines."""

    def __init__(self) -> None:
        """Constructs a new cache for the project engines."""
        self._dict: dict[str, Engine] = {}

    def __getitem__(self, project_name: str) -> Engine:
        """Returns the engine associated to the project."""
        engine = self._dict.get(project_name)
        if engine is None:
            engine = self._create_project_engine(project_name)
            self._dict[project_name] = engine
        return engine

    def __len__(self) -> int:
        """Returns the number of cached engines."""
        return len(self._dict)

    def __iter__(self) -> Iterator[str]:
        """Iterates through the cached engines."""
        return iter(self._dict)

    @staticmethod
    def _create_project_engine(project_name: str) -> Engine:
        """Factory for the project engines."""
        stmt = select(DBProject).where(DBProject.name == project_name)
        project = AdminSession().execute(stmt).scalar_one()
        uri = project.uri
        return create_engine(uri, pool_pre_ping=True, future=True)


project_engines = ProjectEngines()