Skip to content
Snippets Groups Projects
Commit 3551dcaa authored by Pierre Chanial's avatar Pierre Chanial
Browse files

Initial commit.

parents
No related branches found
No related tags found
No related merge requests found
__pycache__/
.mypy_cache/
.coverage
htmlcov
.idea/
.vscode/
docker-stack.yml
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/asottile/seed-isort-config
rev: 'v2.2.0'
hooks:
- id: seed-isort-config
- repo: https://github.com/pre-commit/mirrors-isort
rev: 'v5.9.1'
hooks:
- id: isort
- repo: https://github.com/psf/black
rev: '21.6b0'
hooks:
- id: black
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: 'v4.0.1'
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- id: double-quote-string-fixer
- repo: https://github.com/pycqa/flake8
rev: '3.9.2'
hooks:
- id: flake8
args:
- --max-line-length=88
# D106 Missing docstring in public nested class
# D401 First line should be in imperative mood
- --extend-ignore=D106,D401
additional_dependencies:
- flake8-docstrings
exclude: ^(migrations/|tests)
- repo: https://github.com/pycqa/flake8
rev: '3.9.2'
hooks:
- id: flake8
args:
- --max-line-length=88
- --extend-ignore=D
additional_dependencies:
- flake8-docstrings
exclude: ^(migrations/|app/)
- repo: https://github.com/pre-commit/mirrors-mypy
rev: 'v0.910'
hooks:
- id: mypy
additional_dependencies:
- fastapi==0.65.3
- fastapi-sqlalchemy==0.2.1
- pandas==1.3.0
- psycopg2==2.9.1
- sqlalchemy2-stubs==0.0.2a4
- types-requests==2.25.0
exclude: ^migrations/
#- repo: https://github.com/pre-commit/mirrors-pylint
# rev: 'v3.0.0a3'
# hooks:
# - id: pylint
# args:
# - --extension-pkg-whitelist=pydantic
# - --disable=line-too-long, missing-class-docstring, too-few-public-methods
# additional_dependencies: [alembic, python-dotenv, fastapi, fastapi-sqlalchemy, pandas, psycopg2, requests, toml]
# exclude: ^migrations/
# Pull base image
FROM python:3.9
# Set environment varibles
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
WORKDIR /code/
# Install Poetry
RUN curl -sSL https://raw.githubusercontent.com/python-poetry/poetry/master/get-poetry.py | POETRY_HOME=/opt/poetry python && \
cd /usr/local/bin && \
ln -s /opt/poetry/bin/poetry && \
poetry config virtualenvs.create false
# Copy poetry.lock* in case it doesn't exist in the repo
COPY ./pyproject.toml ./poetry.lock* /code/
# Allow installing dev dependencies to run tests
ARG INSTALL_DEV=false
RUN bash -c "if [ $INSTALL_DEV == 'true' ] ; then poetry install --no-root ; else poetry install --no-root --no-dev ; fi"
# For development, Jupyter remote kernel, Hydrogen
# Using inside the container:
# jupyter lab --ip=0.0.0.0 --allow-root --NotebookApp.custom_display_url=http://127.0.0.1:8888
ARG INSTALL_JUPYTER=false
RUN bash -c "if [ $INSTALL_JUPYTER == 'true' ] ; then pip install jupyterlab ; fi"
COPY ./app /code
ENV PYTHONPATH=/code
# ESAP DB
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = migrations
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .
# timezone to use when rendering the date
# within the migration file as well as the filename.
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; this defaults
# to alembic/versions. When using multiple version
# directories, initial revisions must be specified with --version-path
# version_locations = %(here)s/bar %(here)s/bat alembic/versions
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
sqlalchemy.url =
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
"""The ESAP-DB backend."""
"""Definitions of the endpoints."""
"""Dependencies that are injected in the routes."""
from collections.abc import Generator
from sqlalchemy.orm import Session
from ..db import AdminSession
def get_session() -> Generator[Session, None, None]:
"""Returns admin database session.
The transaction is automatically committed at the end.
"""
with AdminSession.begin() as session:
yield session
def get_session_as_you_go() -> Generator[Session, None, None]:
"""Returns an admin database session.
When using this session, commits must be explicitly set.
SQLAlchemy refers to this style as 'commit as you go'.
"""
with AdminSession() as session:
yield session
"""Definitions of the endpoints /api/v0."""
from fastapi import APIRouter
from . import datasets, projects, tables, users
api_v0_router = APIRouter()
api_v0_router.include_router(users.router, prefix='/users', tags=['users'])
api_v0_router.include_router(projects.router, prefix='/projects', tags=['projects'])
api_v0_router.include_router(datasets.router, tags=['datasets'])
api_v0_router.include_router(tables.router, tags=['tables'])
"""Definitions of the endpoints related the datasets."""
import logging
from fastapi import APIRouter
from sqlalchemy import text
from ...helpers import begin_transaction
from ...schemas import Dataset
logger = logging.getLogger(__name__)
router = APIRouter()
@router.get('/projects/{project}/datasets', summary='Lists the datasets of a project.')
def list_datasets(project: str) -> list[str]:
"""Lists the datasets belonging to a project."""
with begin_transaction(project) as conn:
stmt = text('SELECT schema_name FROM information_schema.schemata')
result = conn.execute(stmt)
schemas = result.scalars().all()
return [_ for _ in schemas if _filter_schema(_)]
def _filter_schema(schema: str) -> bool:
if schema in {'information_schema', 'public'}:
return False
if schema.startswith('pg_'):
return False
return True
@router.post('/projects/{project}/datasets', summary='Creates a dataset in a project.')
def post_dataset(project: str, dataset: Dataset) -> None:
"""Creates a dataset in a project."""
with begin_transaction(project) as conn:
_, schema_id = dataset.name.split('.', 1)
stmt = text(f'CREATE SCHEMA {schema_id}')
conn.execute(stmt)
"""Definitions of the endpoints related the projects."""
import logging
from typing import Any, Optional
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import create_engine, text, update
from sqlalchemy.exc import DBAPIError
from sqlalchemy.future import select
from sqlalchemy.orm import Session
from ...db import DBProject, DBProjectServer
from ...helpers import fix_sqlalchemy2_stubs_non_nullable_column
from ...schemas import Project
from ..depends import get_session, get_session_as_you_go
logger = logging.getLogger(__name__)
router = APIRouter()
@router.get('', summary='Lists the projects.', response_model=list[Project])
def list_projects(*, session: Session = Depends(get_session)) -> list[DBProject]:
"""Lists the projects visible to a user."""
stmt = select(DBProject)
return session.execute(stmt).scalars().all()
@router.post('', response_model=Project)
def create_project(
*, session: Session = Depends(get_session_as_you_go), project: Project
) -> DBProject:
"""Creates a new project."""
stmt: Any = select(DBProject).filter_by(name=project.name)
db_project = session.execute(stmt).scalars().first()
if db_project is not None:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"The project '{project.name} already exists.",
)
server = _find_best_project_server(session, project)
if server is None:
raise HTTPException(
status_code=status.HTTP_507_INSUFFICIENT_STORAGE,
detail='No database has enough storage for this project.',
)
# preempt the required storage
_update_server_available_size(session, server, -project.max_size)
session.commit()
try:
with create_engine(
fix_sqlalchemy2_stubs_non_nullable_column(server.uri),
pool_pre_ping=True,
future=True,
).connect() as conn:
stmt = text(f'CREATE DATABASE "{project.name}"')
conn.execution_options(isolation_level='AUTOCOMMIT').execute(stmt)
except DBAPIError as exc:
logger.error(str(exc))
# release the preempted storage
_update_server_available_size(session, server, +project.max_size)
session.commit()
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail='The project database could not be created: '
f'{type(exc).__name__}: {exc}',
)
uri = f'{server.uri}/{project.name}'
db_project = DBProject(
project_server_id=server.id,
name=project.name,
description=project.description,
uri=uri,
max_size=project.max_size,
)
session.add(db_project)
session.commit()
return db_project
def _find_best_project_server(
session: Session, project: Project
) -> Optional[DBProjectServer]:
stmt = select(DBProjectServer).where(
DBProjectServer.available_size >= project.max_size
)
return session.execute(stmt).scalars().first()
def _update_server_available_size(
session: Session, server: DBProjectServer, size: int
) -> None:
available_size = (
fix_sqlalchemy2_stubs_non_nullable_column(server.available_size) + size
)
stmt = (
update(DBProjectServer)
.where(DBProjectServer.id == server.id)
.values(available_size=available_size)
)
session.execute(stmt)
"""Definitions of the endpoints related the tables."""
import logging
import requests
from fastapi import APIRouter
from pandas import DataFrame
from sqlalchemy import text
from ...db import project_engines
from ...helpers import begin_transaction, table_column_names
from ...schemas import ESAPGatewayQuery
logger = logging.getLogger(__name__)
router = APIRouter()
HEADERS_JSON = {'Accept': 'application/json'}
@router.get(
'/projects/{project}/datasets/{dataset}/tables',
summary='Lists the tables of a dataset.',
)
def list_tables(project: str, dataset: str) -> list[str]:
"""Lists the table in a dataset."""
with begin_transaction(project) as conn:
stmt = text(
f"SELECT table_name FROM information_schema.tables WHERE table_schema = '{dataset}'" # noqa: E501
)
result = conn.execute(stmt)
return result.scalars().all()
@router.get(
'/projects/{project}/datasets/{dataset}/tables/{table}/content',
summary='Gets the content of a table.',
)
def get_table_content(project: str, dataset: str, table: str) -> list[dict]:
"""Returns the whole table as json."""
column_names = get_table_column_names(project, dataset, table)
with begin_transaction(project) as conn:
stmt = text(f'SELECT * FROM {dataset}.{table}')
result = conn.execute(stmt)
content = result.all()
return [dict(zip(column_names, row)) for row in content]
@router.get(
'/projects/{project}/datasets/{dataset}/tables/{table}/column-names',
summary='Gets the column names of a table.',
)
def get_table_column_names(project: str, dataset: str, table: str) -> list[str]:
"""Returns the column names of a table."""
return table_column_names(project, dataset, table)
@router.post(
'/projects/{project}/esap-gateway-operations',
summary='Performs an ESAP Gateway query and stores it in a table.',
)
def post_esap_gateway_operation(
project: str, esap_gateway_query: ESAPGatewayQuery
) -> None:
"""Creates an operation that queries the ESAP API Gateway."""
dataset, table = esap_gateway_query.table.split('.')
session = requests.Session()
session.headers.update(HEADERS_JSON)
session.trust_env = False
query = esap_gateway_query.query
page = 1
while True:
try:
_post_esap_gateway_operation_paginated(
project, dataset, table, session, query, page
)
except StopIteration:
break
page += 1
def _post_esap_gateway_operation_paginated(
project: str,
dataset: str,
table: str,
session: requests.Session,
query: dict,
page: int,
) -> None:
if query['archive_uri'] == 'apertif':
query['page'] = page
query['page_size'] = 500
data = session.get('http://esap-api:8000/esap-api/query/query', params=query)
results = data.json()
if query['archive_uri'] == 'apertif':
data = results.pop('results')
if page == 1:
if_exists = 'replace'
else:
if_exists = 'append'
dataframes = DataFrame.from_records(data)
dataframes.to_sql(
table, project_engines[project], schema=dataset, if_exists=if_exists
)
if query['archive_uri'] == 'apertif':
if page == results['pages']:
raise StopIteration
"""Definitions of the endpoints related the users."""
import logging
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from ...db import DBUser
from ...schemas import User
from ..depends import get_session
logger = logging.getLogger(__name__)
router = APIRouter()
@router.post('/', response_model=User)
def create_user(*, session: Session = Depends(get_session), user: User) -> DBUser:
"""Creates a user."""
db_user = DBUser(
first_name=user.first_name,
last_name=user.last_name,
email=user.email,
is_superuser=False,
)
session.add(db_user)
return db_user
"""This module defines the ESAP-DB configuration instance."""
import secrets
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from pydantic import AnyHttpUrl, BaseSettings, EmailStr, HttpUrl, PostgresDsn, validator
class Settings(BaseSettings):
"""The class holding the configuration settings."""
API_V0_STR: str = '/api/v0'
SECRET_KEY: str = secrets.token_urlsafe(32)
# 60 minutes * 24 hours * 8 days = 8 days
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60 * 24 * 8
SERVER_NAME: str
SERVER_HOST: AnyHttpUrl
# BACKEND_CORS_ORIGINS is a JSON-formatted list of origins
# e.g: ['http://localhost', 'http://localhost:4200', 'http://localhost:3000', \
# 'http://localhost:8080']
BACKEND_CORS_ORIGINS: List[AnyHttpUrl] = []
@validator('BACKEND_CORS_ORIGINS', pre=True)
def assemble_cors_origins(cls, v: Union[str, List[str]]) -> Union[List[str], str]:
"""Validates the CORS origins."""
if isinstance(v, str) and not v.startswith('['):
return [i.strip() for i in v.split(',')]
elif isinstance(v, (list, str)):
return v
raise ValueError(v)
PROJECT_NAME: str
SENTRY_DSN: Optional[HttpUrl] = None
@validator('SENTRY_DSN', pre=True)
def sentry_dsn_can_be_blank(cls, v: str) -> Optional[str]:
"""Validates the sentry DSN."""
if len(v) == 0:
return None
return v
DBADMIN_SERVER: str
DBADMIN_USER: str
DBADMIN_PASSWORD: str
DBADMIN_DB: str
SQLALCHEMY_DBADMIN_URI: Optional[PostgresDsn] = None
@validator('SQLALCHEMY_DBADMIN_URI', pre=True, always=True)
def assemble_dbadmin_connection(
cls, v: Optional[str], values: Dict[str, Any]
) -> Any:
"""Validates the admin database URI."""
if isinstance(v, str):
return v
return PostgresDsn.build(
scheme='postgresql',
user=values.get('DBADMIN_USER'),
password=values.get('DBADMIN_PASSWORD'),
host=values.get('DBADMIN_SERVER'),
path=f"/{values.get('DBADMIN_DB')}",
)
DBPROJECT_USER: str
DBPROJECT_PASSWORD: str
DBPROJECT_SERVERS: list[str]
FIRST_SUPERUSER: EmailStr
FIRST_SUPERUSER_PASSWORD: str
class Config:
case_sensitive = True
settings = Settings(_env_file=Path(__file__).parents[1] / '.env') # type: ignore
"""The ESAP-DB module handling the admin database."""
from .access import AdminSession, project_engines
from .models import DBProject, DBProjectServer, DBUser
__all__ = (
'AdminSession',
'DBProject',
'DBProjectServer',
'DBUser',
'project_engines',
)
"""This module provides factories to access the admin and project databases.
An instance of this class can be used to connect the admin or a project database.
"""
import collections
from typing import Iterator
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from sqlalchemy.future import select
from sqlalchemy.orm import sessionmaker
from ..config import settings
from .models import DBProject
assert settings.SQLALCHEMY_DBADMIN_URI is not None
engine = create_engine(settings.SQLALCHEMY_DBADMIN_URI, pool_pre_ping=True, future=True)
AdminSession = sessionmaker(autocommit=False, autoflush=False, bind=engine, future=True)
class ProjectEngines(collections.abc.Mapping):
"""Cache for the project engines."""
def __init__(self) -> None:
"""Constructs a new cache for the project engines."""
self._dict: dict[str, Engine] = {}
def __getitem__(self, project_name: str) -> Engine:
"""Returns the engine associated to the project."""
engine = self._dict.get(project_name)
if engine is None:
engine = self._create_project_engine(project_name)
self._dict[project_name] = engine
return engine
def __len__(self) -> int:
"""Returns the number of cached engines."""
return len(self._dict)
def __iter__(self) -> Iterator[str]:
"""Iterates through the cached engines."""
return iter(self._dict)
@staticmethod
def _create_project_engine(project_name: str) -> Engine:
"""Factory for the project engines."""
stmt = select(DBProject).where(DBProject.name == project_name)
project = AdminSession().execute(stmt).scalar_one()
uri = project.uri
return create_engine(uri, pool_pre_ping=True, future=True)
project_engines = ProjectEngines()
"""The object-relational mapping for the ESAP-DB admin database."""
from sqlalchemy import BigInteger, Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import declarative_base
# from sqlalchemy.dialects.postgresql import BIGINT as BigInteger
Base = declarative_base()
class DBProjectServer(Base):
"""The model representing an ESAP-DB database server to store the projects."""
__tablename__ = 'project_servers'
id = Column(Integer, primary_key=True, index=True)
uri = Column(String, unique=True, nullable=False)
max_size = Column(BigInteger, nullable=False)
available_size = Column(BigInteger, nullable=False)
class DBProject(Base):
"""The model representing an ESAP-DB project."""
__tablename__ = 'projects'
id = Column(Integer, primary_key=True, index=True)
project_server_id = Column(Integer, ForeignKey('project_servers.id'))
name = Column(String(256), index=True, nullable=False)
description = Column(String, nullable=False)
uri = Column(String, nullable=False)
max_size = Column(BigInteger, nullable=False)
class DBUser(Base):
"""The model representing an ESAP-DB user."""
__tablename__ = 'users'
id = Column(Integer, primary_key=True, index=True)
first_name = Column(String, nullable=False)
last_name = Column(String, nullable=False)
email = Column(String, nullable=False)
is_superuser = Column(Boolean, nullable=False)
"""Some helper functions for the API controlers."""
from contextlib import contextmanager
from typing import Iterator, Optional, TypeVar
from sqlalchemy import text
from sqlalchemy.engine import Connection
from .db import project_engines
T = TypeVar('T')
@contextmanager
def begin_transaction(project: str) -> Iterator[Connection]:
"""Returns a cursor associated with the project database."""
engine = project_engines[project]
with engine.begin() as connection:
yield connection
def table_column_names(project: str, dataset: str, table: str) -> list[str]:
"""Returns the column names of a table."""
with begin_transaction(project) as connection:
stmt = text(
f"""
SELECT column_name
FROM information_schema.columns
WHERE table_schema = '{dataset}' AND table_name = '{table}'
"""
)
result = connection.execute(stmt)
return result.scalars().all()
def fix_sqlalchemy2_stubs_non_nullable_column(arg: Optional[T]) -> T:
"""Non-nullable model columns are of type Optional[...] for mypy.
It was correctly handled by sqlalchemy-stubs, but it is not yet the case for
sqlalchemy2-stubs==0.0.2a4.
"""
assert arg is not None
return arg
"""The CRUDL routes for the ESAP-DB projects, datasets and tables."""
import logging
from pathlib import Path
from dotenv import load_dotenv
from fastapi import FastAPI
from starlette.middleware.cors import CORSMiddleware
from .apis.v0 import api_v0_router
from .config import settings
logger = logging.getLogger(__name__)
BASE_PATH = Path(__file__).parents[1].absolute()
load_dotenv(BASE_PATH / '.env')
app = FastAPI(
title=settings.PROJECT_NAME, openapi_url=f'{settings.API_V0_STR}/openapi.json'
)
# Set all CORS enabled origins
if settings.BACKEND_CORS_ORIGINS:
app.add_middleware(
CORSMiddleware,
allow_origins=[str(origin) for origin in settings.BACKEND_CORS_ORIGINS],
allow_credentials=True,
allow_methods=['*'],
allow_headers=['*'],
)
app.include_router(api_v0_router, prefix=settings.API_V0_STR)
"""The schemas of the data handled by ESAP-DB.
These schemas are used to deserialize data from or serialize data to an ESAP-DB
client.
"""
from typing import Optional
from pydantic import BaseModel
class User(BaseModel):
"""The User schema."""
first_name: str
last_name: str
email: str
class Config:
orm_mode = True
class Project(BaseModel):
"""The Project schema."""
id: Optional[int] = None
name: str
description: str = ''
max_size: int = 10 * 2 ** 30
class Config:
orm_mode = True
class Dataset(BaseModel):
"""The Dataset schema."""
name: str
description: str = ''
class Table(BaseModel):
"""The Table schema."""
name: str
description: str = ''
class ESAPGatewayQuery(BaseModel):
"""The ESAPGatewayQuery schema."""
query: dict
table: str
class Config:
schema_extra = {
'example': {
'query': {
'level': 'raw',
'collection': 'imaging',
'ra': 342.16,
'dec': 33.94,
'fov': 10,
'archive_uri': 'apertif',
},
'table': 'my_dataset.my_table_apertif',
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment