Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
"""This module provides factories to access the admin and project databases.
An instance of this class can be used to connect the admin or a project database.
"""
import collections
from typing import Iterator
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from sqlalchemy.future import select
from sqlalchemy.orm import sessionmaker
from ..config import settings
from .models import DBProject
assert settings.SQLALCHEMY_DBADMIN_URI is not None
engine = create_engine(settings.SQLALCHEMY_DBADMIN_URI, pool_pre_ping=True, future=True)
AdminSession = sessionmaker(autocommit=False, autoflush=False, bind=engine, future=True)
class ProjectEngines(collections.abc.Mapping):
"""Cache for the project engines."""
def __init__(self) -> None:
"""Constructs a new cache for the project engines."""
self._dict: dict[str, Engine] = {}
def __getitem__(self, project_name: str) -> Engine:
"""Returns the engine associated to the project."""
engine = self._dict.get(project_name)
if engine is None:
engine = self._create_project_engine(project_name)
self._dict[project_name] = engine
return engine
def __len__(self) -> int:
"""Returns the number of cached engines."""
return len(self._dict)
def __iter__(self) -> Iterator[str]:
"""Iterates through the cached engines."""
return iter(self._dict)
@staticmethod
def _create_project_engine(project_name: str) -> Engine:
"""Factory for the project engines."""
stmt = select(DBProject).where(DBProject.name == project_name)
project = AdminSession().execute(stmt).scalar_one()
uri = project.uri
return create_engine(uri, pool_pre_ping=True, future=True)
project_engines = ProjectEngines()