Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
"""Definitions of the endpoints related the datasets."""
import logging
from fastapi import APIRouter
from sqlalchemy import text
from ...helpers import begin_transaction
from ...schemas import Dataset
logger = logging.getLogger(__name__)
router = APIRouter()
@router.get('/projects/{project}/datasets', summary='Lists the datasets of a project.')
def list_datasets(project: str) -> list[str]:
"""Lists the datasets belonging to a project."""
with begin_transaction(project) as conn:
stmt = text('SELECT schema_name FROM information_schema.schemata')
result = conn.execute(stmt)
schemas = result.scalars().all()
return [_ for _ in schemas if _filter_schema(_)]
def _filter_schema(schema: str) -> bool:
if schema in {'information_schema', 'public'}:
return False
if schema.startswith('pg_'):
return False
return True
@router.post('/projects/{project}/datasets', summary='Creates a dataset in a project.')
def post_dataset(project: str, dataset: Dataset) -> None:
"""Creates a dataset in a project."""
with begin_transaction(project) as conn:
_, schema_id = dataset.name.split('.', 1)
stmt = text(f'CREATE SCHEMA {schema_id}')
conn.execute(stmt)