Skip to content
Snippets Groups Projects
Commit ed3f4aa3 authored by Roy de Goei's avatar Roy de Goei
Browse files

SDC-685: Change REST API data/insert into insert_dataproduct. Logical names...

SDC-685: Change REST API data/insert into insert_dataproduct. Logical names for datalocation. Move migration script to folder. Implement migration script
parent 1aa56329
No related branches found
No related tags found
1 merge request!4SDC-685: Script which migrates ldvadmin data to ldv-spec-db
Pipeline #33823 failed
#!/usr/bin/env python3
import logging
import datetime
import pymysql
import argparse
import os
import django
import sys
import re
LAST_UPDATE = "25 July 2022"
logger = logging.getLogger(__file__)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
def main():
"""
Migrates data from the ldvadmin database to a ldvspec database.
"""
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG)
# Check the invocation arguments
parser = argparse.ArgumentParser()
parser.add_argument("--version", default=False, help="Show current version of this program", action="store_true")
parser.add_argument("-v", "--verbose", default=False, help="More information at run time.", action="store_true")
args = parser.parse_args()
if args.version:
print("--- migrate_ldvadmin_to_ldvspec.py (last updated {}) ---".format(LAST_UPDATE))
return
if __name__ == "__main__":
main()
\ No newline at end of file
......@@ -10,15 +10,23 @@ class DataLocation(models.Model):
@staticmethod
def insert_location_from_string(location_string):
"""
Insert a datalocation from a srm string (e.g. srm://surm:4321/path.tar)
Insert a datalocation from a srm string (e.g. srm://srm-url:4321)
Data Location names are: Sara, Juelich and Poznan
:param str location_string: SRM url
:return: DataLocation object
rtype: DataLocations
"""
# netloc will become srm-url:4321
_, netloc, *_ = urlsplit(location_string)
if "sara" in netloc.lower():
loc_name = "Sara"
elif "fz-juelich" in netloc.lower():
loc_name = "Juelich"
elif "psnc" in netloc.lower():
loc_name = "Poznan"
else:
loc_name = "Unknown"
dataloc = DataLocation(name=netloc, uri=location_string.rstrip('/'))
dataloc = DataLocation(name=loc_name, uri=netloc)
dataloc.save()
return dataloc
......
"""
This module provides a REST interface to the LDV specifications
Could be that this should be replaced with ldv_specification_interface module/python package
Its kinda prototype module for now required for the migration script
"""
import requests
import logging
from json.decoder import JSONDecodeError
try:
from simplejson.errors import JSONDecodeError as SimpleJSONDecodeError
except ModuleNotFoundError:
from json.decoder import JSONDecodeError as SimpleJSONDecodeError
import argparse
import datetime
from urllib.parse import urlparse, urlunparse
# ==============================================================
# The request header
ATDB_HEADER = {
'content-type': "application/json",
'cache-control': "no-cache"
}
LDV_HOST_DEV = "http://localhost:8000/ldvspec/api/v1" # your local development environment with Django webserver
LDV_HOST_TEST = "https://sdc-dev.astron.nl:5554/ldvspec/api/v1" # the ldv sdc test environment.
LDV_HOST_PROD = "https://sdc.astron.nl:5554/ldvspec/api/v1" # the ldv sdc production environment.
class APIException(Exception):
pass
class APIMissing(Exception):
pass
def is_http_exception(response: requests.Response):
if response.status_code in range(400, 499):
return True
return False
def is_success(response: requests.Response):
if response.status_code in range(200, 300):
return True
return False
def is_missing(response: requests.Response):
if response.status_code == 404:
return True
return False
def can_retry(response: requests.Response):
if response.status_code in [408, 429]:
return True
return False
class LDVSpecInterface():
"""
This class is used to connect via REST interface
"""
def __init__(self, host, token):
"""
Constructor.
:param host: the host name of the backend.
:param token: The token to login
"""
# accept some presets to set host to dev, test, acc or prod
self.host = host
if self.host == 'dev':
self.host = LDV_HOST_DEV
elif self.host == 'test':
self.host = LDV_HOST_TEST
elif self.host == 'prod':
self.host = LDV_HOST_PROD
if not self.host.endswith('/'):
self.host += '/'
self.header = ATDB_HEADER
self.header['Authorization'] = f'Token {token}'
self._session = None
def session(self):
if self._session is None:
self._session = requests.Session()
self._session.headers.update(self.header)
return self._session
def _request(self, url, type, query_parameters=None, payload=None):
parsed_url = urlparse(url)
if not parsed_url.path.endswith('/'):
parsed_url = parsed_url._replace(path=parsed_url.path + '/')
url = urlunparse(parsed_url)
if isinstance(payload, str):
response = self.session().request(type, url, data=payload, headers=self.header, params=query_parameters)
elif isinstance(payload, dict):
response = self.session().request(type, url, json=payload, headers=self.header, params=query_parameters)
else:
response = self.session().request(type, url, json=payload, headers=self.header, params=query_parameters)
logging.debug(f"[{type} {response.url} ]")
logging.debug("Response: " + str(response.status_code) + ", " + str(response.reason))
if is_missing(response):
raise APIMissing(url)
elif is_http_exception(response) and can_retry(response):
return self._request(url, type, query_parameters, payload)
elif is_http_exception(response):
raise APIException(f'{response.status_code}: {response.reason} {response.content}')
elif is_success(response):
try:
json_response = response.json()
return json_response
except (SimpleJSONDecodeError, JSONDecodeError):
return response.content
raise APIException(f'Unrecognized response code {response.status_code}: {response.reason} {response.content}')
# raise (Exception("ERROR: " + response.url + " not found."))
# === Backend requests ================================================================================
def do_POST_json(self, resource, payload):
"""
POST a payload to a resource (table). This creates a new object (observation or dataproduct)
This function replaces the old do_POST function that still needed to convert the json content in a very ugly
:param resource: contains the resource, for example 'observations', 'dataproducts'
:param payload: the contents of the object to create in json format
"""
url = self.host + resource
if not resource.endswith('/'):
resource += '/'
logging.debug(f'do_POST_json using url={url} and with payload: {payload}')
try:
json_response = self._request(url, 'POST', payload=payload)
if hasattr(json_response, 'id'):
return json_response['id']
else:
return -1
except Exception as err:
raise err
def insert_multiple_dataproduct(self, payload):
"""
Insert multiple dataproducts. Implicit also dataproduct-location is added
:param: List of payload string
:return: List id of added dataproducts
"""
url = self.host + "insert_dataproduct/"
logging.debug(f'insert_multiple_dataproduct using url={url} and with payload: {payload}')
try:
json_response = self._request(url, 'POST', payload=payload)
response_lst_ids = []
for resp in json_response:
response_lst_ids.append(resp['id'])
return response_lst_ids
except Exception as err:
raise err
\ No newline at end of file
#!/usr/bin/env python3
import json
import os
import time
import logging
import argparse
import sys
import psycopg2
from ldv_specification_interface import LDVSpecInterface
logger = logging.getLogger(__file__)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
def change_logging_level(level):
logger = logging.getLogger()
logger.setLevel(level)
for handler in logger.handlers:
handler.setLevel(level)
def open_connection_to_postgres_database():
"""
Create a database connection to a Postgres database
The 'connector' will be implemented differently, for now some hard coding with the assumption
that tunnel to ldvadmin is already created
:return connection to the database
"""
host = "localhost"
user = "ldvrbow"
password = "Wehn5CTYj1RcbstMSGls"
database = "ldvadmin"
port = 4321
connection = None
try:
logging.info("Connection to Database")
# establishing the connection
connection = psycopg2.connect(
database=database,
user=user,
password=password,
host=host,
port=port
)
# create a cursor
cur = connection.cursor()
# display the PostgreSQL database server version
cur.execute('SELECT version()')
logging.info("PostgreSQL database version: {}".format(cur.fetchone()))
except psycopg2.Error as exp:
logger.error("Could not connect to DataBase {}, results in {}".format(database, exp))
connection.close()
logging.info("Database Connection Closed")
return connection
def execute_query(connection, sql_query, data=None):
"""
Execute query of the ldvadmin Database
There is not commit required because we are doing only read queries
The 'connector' will be implemented differently, for now some hard coding with the assumption
that tunnel to ldvadmin is already created
:param: sql_query
:param: data (optional) the data arguments of the sql_query
:return result of the query
"""
try:
cursor = connection.cursor()
cursor.execute(sql_query, data)
connection.commit()
return cursor.fetchall()
except Exception as exp:
logger.error("Could not execute query! '{}' results in {}".format(sql_query, exp))
def main():
"""
Migrates data from the ldvadmin database to a ldv-spec-db database.
"""
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
# Check the invocation arguments
parser = argparse.ArgumentParser()
parser.add_argument("--version", default=False, help="Show current version of this program", action="store_true")
parser.add_argument("-v", "--verbose", default=False, help="More information at run time.", action="store_true")
parser.add_argument("-l", "--limit", default=0, type=int, help="Limit on the number of queries (0 is no limit)", action="store")
parser.add_argument("-t", "--token", default="ad9b37a24380948601257f9c1f889b07a00ac81e",
help="Token to access the REST API of ldvspec", action="store")
parser.add_argument("--host", nargs="?", default='dev',
help="Presets are 'dev', 'test', 'prod'. Otherwise give a full url like https://atdb.astron.nl/atdb")
args = parser.parse_args()
if args.version:
# Get file's Last modification time stamp only in terms of seconds since epoch and convert in timestamp
modification_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(os.path.getmtime(__file__)))
print("--- {} (last updated {}) ---".format(os.path.basename(__file__), modification_time))
return
if args.verbose:
change_logging_level(logging.DEBUG)
if args.limit == 0:
limit_str = ""
else:
limit_str = "limit {}".format(args.limit)
logging.debug("Limit on number of dataproducts to query is set to {}".format(args.limit))
with open_connection_to_postgres_database() as conn:
result_query_all_dps = execute_query(conn,
"select obsid, obsid_source, dp_type, project, activity, uri, size, dysco "
"from astrowise.raw_dataproducts {}".format(limit_str))
# Are there still dataproducts left to query?
nbr_dps_left = len(result_query_all_dps) - args.limit
if nbr_dps_left > 0 and args.limit > 0:
logging.debug("Limit on number of leftover dataproducts to query is set to {}".format(nbr_dps_left))
limit_str = "limit {}".format(nbr_dps_left)
result_query_all_dps.extend(execute_query(conn,
"select obsid, obsid_source, dp_type, project, activity, uri, size, dysco "
"from astrowise.pl_dataproducts {}".format(limit_str)))
logging.info("{} dataproduct retrieved from ldvadmin".format(len(result_query_all_dps)))
# Create connection with ldv-spec-db using REST API, use temp. token created in my test-env
ldvspec_interface = LDVSpecInterface(args.host, args.token)
# Now obtain the attributes from result an add to database using REST insert_dataproduct method
lst_all_dps = []
for dps in result_query_all_dps:
logging.debug(dps)
metadata_str = "{'dysco_compression': %s}" % dps[7]
dps_dict = {"obs_id": dps[0], "oid_source": dps[1], "dataproduct_source": "LOFAR LTA",
"dataproduct_type": dps[2], "project": dps[3], "activity": dps[4], "surl": dps[5],
"filesize": dps[6], "additional_meta": metadata_str, "location": dps[5]}
# We can do a bulk insert (there should be a max in 'payload length somewhere -> TODO
lst_all_dps.append(dps_dict)
res_lst_ids = ldvspec_interface.insert_multiple_dataproduct(payload=lst_all_dps)
logging.info("Added {} DataProduct objects".format(len(res_lst_ids)))
logging.debug("Added with ids={}".format(res_lst_ids))
if __name__ == "__main__":
main()
\ No newline at end of file
......@@ -9,14 +9,14 @@ test_object_value = dict(obs_id='12345', oid_source='SAS', dataproduct_source='l
project='LT10_10',
location='srm://surfsara.nl:4884/',
activity='observation',
surl='srm://surfsara.nl:4884/...',
surl='srm://surfsara.nl:4884/subfolder/some_nice.tar',
filesize=40,
additional_meta={'dysco_compression': True})
class TestDatabaseInteraction(dtest.TestCase):
def test_insert(self):
location = DataLocation(name='surfsara', uri='srm://surfsara.nl')
location = DataLocation(name='sara', uri='srm://surfsara.nl')
location.save()
test_values = dict(test_object_value)
test_values['location'] = location
......@@ -36,8 +36,8 @@ class TestDatabaseInteraction(dtest.TestCase):
for field in test_values:
self.assertEqual(test_object_value.get(field), getattr(dp, field), msg=f'Field {field} does not coincide')
self.assertEqual('surfsara.nl:4884', dp.location.name)
self.assertEqual( 'srm://surfsara.nl:4884', dp.location.uri)
self.assertEqual('Sara', dp.location.name)
self.assertEqual('surfsara.nl:4884', dp.location.uri)
class TestRESTAPI(rtest.APITestCase):
......@@ -47,23 +47,23 @@ class TestRESTAPI(rtest.APITestCase):
def test_insert_not_allowed(self):
client = rtest.APIClient()
response = client.post('/ldvspec/api/v1/data/insert/', data={}, format='json')
response = client.post('/ldvspec/api/v1/insert_dataproduct/', data={}, format='json')
self.assertEqual(response_status.HTTP_403_FORBIDDEN, response.status_code)
def test_insert_flat_error(self):
response = self.client.post('/ldvspec/api/v1/data/insert/', data={}, format='json')
response = self.client.post('/ldvspec/api/v1/insert_dataproduct/', data={}, format='json')
self.assertEqual(response_status.HTTP_400_BAD_REQUEST, response.status_code)
def test_insert_flat_single(self):
test_payload = dict(test_object_value)
test_payload.pop('location')
response = self.client.post('/ldvspec/api/v1/data/insert/', data=test_payload, format='json')
response = self.client.post('/ldvspec/api/v1/insert_dataproduct/', data=test_payload, format='json')
self.assertEqual(response_status.HTTP_201_CREATED, response.status_code)
def test_insert_flat_multi(self):
test_payload = dict(test_object_value)
test_payload.pop('location')
response = self.client.post('/ldvspec/api/v1/data/insert/', data=[test_payload, test_payload], format='json')
response = self.client.post('/ldvspec/api/v1/insert_dataproduct/', data=[test_payload, test_payload], format='json')
self.assertEqual(response_status.HTTP_201_CREATED, response.status_code)
self.assertTrue(DataProduct.objects.count() == 2, 'Not all dataproduct have been inserted')
......@@ -72,7 +72,7 @@ class TestRESTAPI(rtest.APITestCase):
def test_insert_flat_multi_insert_single(self):
test_payload = dict(test_object_value)
test_payload.pop('location')
response = self.client.post('/ldvspec/api/v1/data/insert/', data=test_payload, format='json')
response = self.client.post('/ldvspec/api/v1/insert_dataproduct/', data=test_payload, format='json')
self.assertEqual(response_status.HTTP_201_CREATED, response.status_code)
self.assertTrue(DataProduct.objects.count() == 1, 'Not all dataproduct have been inserted')
......
......@@ -34,8 +34,8 @@ class TestDataLocation(dtest.TestCase):
dataloc = DataLocation.insert_location_from_string('srm://test_site:44321/')
self.assertTrue(dataloc.pk is not None, 'Cannot save object')
self.assertEqual('test_site:44321', dataloc.name)
self.assertEqual('srm://test_site:44321', dataloc.uri)
self.assertEqual('Unknown', dataloc.name)
self.assertEqual('test_site:44321', dataloc.uri)
class TestRESTAPI(rtest.APITestCase):
......@@ -52,4 +52,4 @@ class TestRESTAPI(rtest.APITestCase):
response = self.client.post('/ldvspec/api/v1/data-location/', data=dict(name='testname', uri='srm://myniceuri/'),
format='json')
self.assertEqual(response_status.HTTP_201_CREATED, response.status_code)
self.assertEqual('srm://myniceuri/', DataLocation.objects.get(name='testname').uri)
\ No newline at end of file
self.assertEqual('myniceuri/', DataLocation.objects.get(name='testname').uri)
\ No newline at end of file
......@@ -14,7 +14,7 @@ urlpatterns = [
# REST API
path('api/v1/data/', views.DataProductView.as_view(), name='dataproduct'),
path('api/v1/data/insert/', views.InsertMultiDataproductView.as_view(), name='dataproduct-insert'),
path('api/v1/insert_dataproduct/', views.InsertMultiDataproductView.as_view(), name='dataproduct-insert'),
path('api/v1/data-location/', views.DataLocationView.as_view(), name='datalocation'),
path('api/v1/openapi/', get_schema_view(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment