Select Git revision
ddrctrl.vhd
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
datasets.py 4.01 KiB
"""This module defines the `Dataset` resource, as used by the client."""
from __future__ import annotations
import base64
import pickle
from typing import Any, Optional, Union
import pandas as pd
from ..helpers import raise_for_status
from ..sessions import BasedSession
from .collections import ResourceCollection
from .tables import Table
class Dataset:
"""A collection of tables."""
def __init__(self, name: str, description: str = ''):
"""The `Dataset` constructor."""
self.name = name
self.description = description
project, dataset = name.split('.')
self.session = BasedSession(f'/projects/{project}/datasets/{dataset}')
def __eq__(self, other: Any) -> bool:
"""Tests equality between two Datasets."""
if not isinstance(other, Dataset):
return NotImplemented
return self.name == other.name and self.description == other.description
def __repr__(self) -> str:
"""The string representation of a `Dataset`."""
return f'<Dataset {self.name}>'
def delete(self) -> None:
"""Deletes this dataset, if empty."""
response = self.session.delete('')
raise_for_status(response)
@classmethod
def deserialize(cls, dataset: dict) -> Dataset:
"""Deserializes a dict-like dataset."""
return Dataset(dataset['name'], dataset['description'])
@property
def tables(self) -> ResourceCollection[Table]:
"""The member tables of this dataset."""
project, dataset = self.name.split('.')
return ResourceCollection[Table](
f'/projects/{project}/datasets/{dataset}/tables', Table
)
def create_table_from(
self, source: Union[str, pd.DataFrame], name: Optional[str] = None, description: str = '', **keywords
) -> Table:
"""Creates a table from a Pandas DataFrame."""
if not isinstance(source, (str, pd.DataFrame)):
raise TypeError('The input is not a URL, nor a Pandas DataFrame.')
data = {
'name': name,
'description': description,
'params': keywords,
}
if isinstance(source, pd.DataFrame):
data['content'] = base64.b64encode(pickle.dumps(source)).decode('ascii')
else:
data['path'] = source
response = self.session.post('/tables', data)
raise_for_status(response)
return Table.deserialize(response.json())
def create_table_as(
self, query: str, name: Optional[str] = None, description: str = ''
) -> Table:
"""Creates a table from a SQL query."""
if not isinstance(query, str):
raise TypeError('The query is not a string.')
payload = {
'name': name,
'description': description,
'query': query,
}
response = self.session.post('/tables', payload)
raise_for_status(response)
return Table.deserialize(response.json())
def create_table_from_esap_gateway_query(
self, query: dict, name: str, description: str = ''
) -> Table:
"""Sends a query request to ESAP-API-GATEWAY.
Example:
>>> query = {
... "level": "raw",
... "collection": "imaging",
... "ra": 342.16,
... "dec": 33.94,
... "fov": 10,
... "archive_uri": "apertif"
... }
>>> table = dataset.create_table_from_esap_gateway_query(
... query, 'apertif', 'Cone search'
... )
"""
if '.' not in name:
full_name = f'{self.name}.{name}'
else:
full_name = name
payload = {
'name': full_name,
'description': description,
'query': query,
}
project, _ = self.name.split('.')
session = BasedSession(f'/projects/{project}')
response = session.post('/esap-gateway-operations', payload)
raise_for_status(response)
return Table.deserialize(response.json())