Skip to content
Snippets Groups Projects
Commit e7c2df15 authored by Nico Vermaas's avatar Nico Vermaas Committed by Hugh Dickinson
Browse files

basic connectors for alta, astron_vo and samp

parent d934c2f4
No related branches found
No related tags found
No related merge requests found
from .alta_connector import alta_connector from alta.alta_connector import alta_connector
from shopping_client import shopping_client from shopping_client import shopping_client
from alta import alta_connector from alta import alta_connector
from astron_vo import astron_vo_connector
from samp import samp_connector
esap_api_host = "https://sdc-dev.astron.nl:5555/" esap_api_host = "https://sdc-dev.astron.nl:5555/"
#access_token = ""
# Instantiate alta connector # Instantiate alta connector
ac = alta_connector() ac = alta_connector()
vo = astron_vo_connector()
# Instantiate ESAP User Profile shopping client, passing alta connector # Instantiate ESAP User Profile shopping client, passing alta connector
#sc = shopping_client(host=esap_api_host, token=access_token, connectors=[ac]) #sc = shopping_client(host=esap_api_host, token=access_token, connectors=[ac,vo])
sc = shopping_client(host=esap_api_host, connectors=[ac]) sc = shopping_client(host=esap_api_host, connectors=[ac])
# Retrieve basket (prompts to enter access token obtained from ESAP GUI) # 'apertif'and 'astron_vo' items converted to pandas dataframe
basket=sc.get_basket(filter_archives=True) basket_pandas=sc.get_basket(filter_archives=True, convert_to_pandas=True)
print(basket) print('------------------------------------')
for item in basket: print("'apertif'and 'astron_vo' items converted to pandas dataframe")
print(item) print(basket_pandas)
\ No newline at end of file
# 'apertif'and 'astron_vo' items as json
basket_json=sc.get_basket(filter_archives=True, convert_to_pandas=False)
print('-----------------------------------')
print("'apertif'and 'astron_vo' items as json")
print(basket_json)
samp_connector = samp_connector()
sc = shopping_client(host=esap_api_host, token=access_token, connectors=[samp_connector])
# "'SAMP' items converted to pandas dataframe:"
basket_pandas=sc.get_basket(convert_to_pandas=True, filter_archives=True)
print('------------------------------------')
print("'SAMP' items converted to pandas dataframe:")
print(basket_pandas)
# 'SAMP' items as json:
basket_json=sc.get_basket(convert_to_pandas=False, filter_archives=True)
print('------------------------------------')
print("'SAMP' items as json:")
print(basket_json)
\ No newline at end of file
from astron_vo.astron_vo_connector import astron_vo_connector
import json
import pandas as pd
from typing import Union, Optional
class astron_vo_connector:
name = "astron_vo"
archive = "astron_vo"
def basket_item_to_pandas(
self, basket_item: Union[dict, pd.Series], validate: bool = True
) -> Optional[pd.Series]:
"""Convert an item from the shopping basket into a `pd.Series` with
optional validation.
Parameters
----------
basket_item : Union[dict, pd.Series]
A single item from a retrieved shopping basket - either a raw `dict`
or a converted `pd.Series`.
validate : bool
If `True`, check that the data in the shopping item conforms with
the expected format before attempting the conversion.
Returns
-------
Optional[pd.Series]
`pd.Series` containing the data encoded in the shopping item or
`NoneType`.
"""
if validate:
item_data = self.validate_basket_item(basket_item, return_loaded=True)
else:
item_data = json.loads(basket_item["item_data"])
if item_data:
return pd.Series(item_data)
return None
def validate_basket_item(
self, basket_item: Union[dict, pd.Series], return_loaded: bool = False
) -> Union[dict, bool, None]:
"""Check that the data in the shopping item conforms with
the expected format
Parameters
----------
basket_item : Union[dict, pd.Series]
A single item from a retrieved shopping basket - either a raw `dict`
or a converted `pd.Series`.
return_loaded : bool
If `True`, and validation succeeds return the extracted shopping item
as `dict`, otherwise return `True` if validation succeeds and `None`
otherwise.
Returns
-------
Union[dict, bool, None]
If `return_loaded` is `True`, return a `dict` containing the data
encoded in the shopping item when validation succeeds.
Otherwise if `return_loaded` is `True` validation succeeds.
If validation fails return `None`.
"""
item_data = json.loads(basket_item["item_data"])
if "archive" in item_data and item_data["archive"] == self.archive:
if return_loaded:
return item_data
else:
return True
return None
\ No newline at end of file
from samp.samp_connector import samp_connector
\ No newline at end of file
import requests
import json
import io
import getpass
import pandas as pd
from typing import Union, Optional
class samp_connector:
name = "samp"
archive = "samp"
def basket_item_to_pandas(
self, basket_item: Union[dict, pd.Series], validate: bool = True
) -> Optional[pd.Series]:
"""Convert an item from the shopping basket into a `pd.Series` with
optional validation.
Parameters
----------
basket_item : Union[dict, pd.Series]
A single item from a retrieved shopping basket - either a raw `dict`
or a converted `pd.Series`.
validate : bool
If `True`, check that the data in the shopping item conforms with
the expected format before attempting the conversion.
Returns
-------
Optional[pd.Series]
`pd.Series` containing the data encoded in the shopping item or
`NoneType`.
"""
if validate:
item_data = self.validate_basket_item(basket_item, return_loaded=True)
else:
item_data = json.loads(basket_item["item_data"])
if item_data:
return pd.Series(item_data)
return None
def validate_basket_item(
self, basket_item: Union[dict, pd.Series], return_loaded: bool = False
) -> Union[dict, bool, None]:
"""Check that the data in the shopping item conforms with
the expected format
Parameters
----------
basket_item : Union[dict, pd.Series]
A single item from a retrieved shopping basket - either a raw `dict`
or a converted `pd.Series`.
return_loaded : bool
If `True`, and validation succeeds return the extracted shopping item
as `dict`, otherwise return `True` if validation succeeds and `None`
otherwise.
Returns
-------
Union[dict, bool, None]
If `return_loaded` is `True`, return a `dict` containing the data
encoded in the shopping item when validation succeeds.
Otherwise if `return_loaded` is `True` validation succeeds.
If validation fails return `None`.
"""
item_data = json.loads(basket_item["item_data"])
if "archive" in item_data and item_data["archive"] == self.archive:
if return_loaded:
return item_data
else:
return True
return None
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment