diff --git a/ldvspec/ldvspec/migrate_ldvadmin_to_ldvspec.py b/ldvspec/ldvspec/migrate_ldvadmin_to_ldvspec.py deleted file mode 100644 index 67d3ed52deb876b15b00883b2ecd3bb422d19dea..0000000000000000000000000000000000000000 --- a/ldvspec/ldvspec/migrate_ldvadmin_to_ldvspec.py +++ /dev/null @@ -1,41 +0,0 @@ -#!/usr/bin/env python3 - -import logging -import datetime -import pymysql -import argparse -import os -import django -import sys -import re - -LAST_UPDATE = "25 July 2022" - - -logger = logging.getLogger(__file__) -handler = logging.StreamHandler(sys.stdout) -handler.setLevel(logging.DEBUG) -formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') -handler.setFormatter(formatter) -logger.addHandler(handler) - - -def main(): - """ - Migrates data from the ldvadmin database to a ldvspec database. - """ - logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG) - - # Check the invocation arguments - parser = argparse.ArgumentParser() - parser.add_argument("--version", default=False, help="Show current version of this program", action="store_true") - parser.add_argument("-v", "--verbose", default=False, help="More information at run time.", action="store_true") - args = parser.parse_args() - - if args.version: - print("--- migrate_ldvadmin_to_ldvspec.py (last updated {}) ---".format(LAST_UPDATE)) - return - - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/ldvspec/lofardata/models.py b/ldvspec/lofardata/models.py index 2c51cce17d577b5477d45e1ef8606e1119716735..b73943691f539c843e69d467b4c8fab9d7addfe0 100644 --- a/ldvspec/lofardata/models.py +++ b/ldvspec/lofardata/models.py @@ -10,15 +10,23 @@ class DataLocation(models.Model): @staticmethod def insert_location_from_string(location_string): """ - Insert a datalocation from a srm string (e.g. srm://surm:4321/path.tar) - + Insert a datalocation from a srm string (e.g. srm://srm-url:4321) + Data Location names are: Sara, Juelich and Poznan :param str location_string: SRM url :return: DataLocation object - rtype: DataLocations """ + # netloc will become srm-url:4321 _, netloc, *_ = urlsplit(location_string) + if "sara" in netloc.lower(): + loc_name = "Sara" + elif "fz-juelich" in netloc.lower(): + loc_name = "Juelich" + elif "psnc" in netloc.lower(): + loc_name = "Poznan" + else: + loc_name = "Unknown" - dataloc = DataLocation(name=netloc, uri=location_string.rstrip('/')) + dataloc = DataLocation(name=loc_name, uri=netloc) dataloc.save() return dataloc diff --git a/ldvspec/lofardata/scripts/ldv_specification_interface.py b/ldvspec/lofardata/scripts/ldv_specification_interface.py new file mode 100644 index 0000000000000000000000000000000000000000..f9568b435b09860506fd8e8714f473a8da40e706 --- /dev/null +++ b/ldvspec/lofardata/scripts/ldv_specification_interface.py @@ -0,0 +1,160 @@ +""" +This module provides a REST interface to the LDV specifications + +Could be that this should be replaced with ldv_specification_interface module/python package +Its kinda prototype module for now required for the migration script +""" +import requests +import logging + +from json.decoder import JSONDecodeError + +try: + from simplejson.errors import JSONDecodeError as SimpleJSONDecodeError +except ModuleNotFoundError: + from json.decoder import JSONDecodeError as SimpleJSONDecodeError + +import argparse +import datetime +from urllib.parse import urlparse, urlunparse +# ============================================================== +# The request header +ATDB_HEADER = { + 'content-type': "application/json", + 'cache-control': "no-cache" +} +LDV_HOST_DEV = "http://localhost:8000/ldvspec/api/v1" # your local development environment with Django webserver +LDV_HOST_TEST = "https://sdc-dev.astron.nl:5554/ldvspec/api/v1" # the ldv sdc test environment. +LDV_HOST_PROD = "https://sdc.astron.nl:5554/ldvspec/api/v1" # the ldv sdc production environment. + + +class APIException(Exception): + pass + +class APIMissing(Exception): + pass + + +def is_http_exception(response: requests.Response): + if response.status_code in range(400, 499): + return True + return False + + +def is_success(response: requests.Response): + if response.status_code in range(200, 300): + return True + return False + + +def is_missing(response: requests.Response): + if response.status_code == 404: + return True + return False + + +def can_retry(response: requests.Response): + if response.status_code in [408, 429]: + return True + return False + + +class LDVSpecInterface(): + """ + This class is used to connect via REST interface + """ + def __init__(self, host, token): + """ + Constructor. + :param host: the host name of the backend. + :param token: The token to login + """ + # accept some presets to set host to dev, test, acc or prod + self.host = host + if self.host == 'dev': + self.host = LDV_HOST_DEV + elif self.host == 'test': + self.host = LDV_HOST_TEST + elif self.host == 'prod': + self.host = LDV_HOST_PROD + if not self.host.endswith('/'): + self.host += '/' + self.header = ATDB_HEADER + self.header['Authorization'] = f'Token {token}' + self._session = None + + def session(self): + if self._session is None: + self._session = requests.Session() + self._session.headers.update(self.header) + return self._session + + def _request(self, url, type, query_parameters=None, payload=None): + parsed_url = urlparse(url) + if not parsed_url.path.endswith('/'): + parsed_url = parsed_url._replace(path=parsed_url.path + '/') + + url = urlunparse(parsed_url) + + if isinstance(payload, str): + response = self.session().request(type, url, data=payload, headers=self.header, params=query_parameters) + elif isinstance(payload, dict): + response = self.session().request(type, url, json=payload, headers=self.header, params=query_parameters) + else: + response = self.session().request(type, url, json=payload, headers=self.header, params=query_parameters) + + logging.debug(f"[{type} {response.url} ]") + logging.debug("Response: " + str(response.status_code) + ", " + str(response.reason)) + if is_missing(response): + raise APIMissing(url) + elif is_http_exception(response) and can_retry(response): + return self._request(url, type, query_parameters, payload) + elif is_http_exception(response): + raise APIException(f'{response.status_code}: {response.reason} {response.content}') + elif is_success(response): + try: + json_response = response.json() + + return json_response + except (SimpleJSONDecodeError, JSONDecodeError): + return response.content + raise APIException(f'Unrecognized response code {response.status_code}: {response.reason} {response.content}') + # raise (Exception("ERROR: " + response.url + " not found.")) + + # === Backend requests ================================================================================ + def do_POST_json(self, resource, payload): + """ + POST a payload to a resource (table). This creates a new object (observation or dataproduct) + This function replaces the old do_POST function that still needed to convert the json content in a very ugly + :param resource: contains the resource, for example 'observations', 'dataproducts' + :param payload: the contents of the object to create in json format + """ + url = self.host + resource + if not resource.endswith('/'): + resource += '/' + logging.debug(f'do_POST_json using url={url} and with payload: {payload}') + try: + json_response = self._request(url, 'POST', payload=payload) + if hasattr(json_response, 'id'): + return json_response['id'] + else: + return -1 + except Exception as err: + raise err + + def insert_multiple_dataproduct(self, payload): + """ + Insert multiple dataproducts. Implicit also dataproduct-location is added + :param: List of payload string + :return: List id of added dataproducts + """ + url = self.host + "insert_dataproduct/" + logging.debug(f'insert_multiple_dataproduct using url={url} and with payload: {payload}') + try: + json_response = self._request(url, 'POST', payload=payload) + response_lst_ids = [] + for resp in json_response: + response_lst_ids.append(resp['id']) + return response_lst_ids + except Exception as err: + raise err \ No newline at end of file diff --git a/ldvspec/lofardata/scripts/migrate_ldvadmin_to_ldvspec.py b/ldvspec/lofardata/scripts/migrate_ldvadmin_to_ldvspec.py new file mode 100644 index 0000000000000000000000000000000000000000..19251437a6f88da95e256cb794bbc437d71b1bd8 --- /dev/null +++ b/ldvspec/lofardata/scripts/migrate_ldvadmin_to_ldvspec.py @@ -0,0 +1,150 @@ +#!/usr/bin/env python3 +import json +import os +import time +import logging +import argparse +import sys +import psycopg2 +from ldv_specification_interface import LDVSpecInterface + + +logger = logging.getLogger(__file__) +handler = logging.StreamHandler(sys.stdout) +handler.setLevel(logging.INFO) +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +handler.setFormatter(formatter) +logger.addHandler(handler) + + +def change_logging_level(level): + logger = logging.getLogger() + logger.setLevel(level) + for handler in logger.handlers: + handler.setLevel(level) + + +def open_connection_to_postgres_database(): + """ + Create a database connection to a Postgres database + The 'connector' will be implemented differently, for now some hard coding with the assumption + that tunnel to ldvadmin is already created + :return connection to the database + """ + host = "localhost" + user = "ldvrbow" + password = "Wehn5CTYj1RcbstMSGls" + database = "ldvadmin" + port = 4321 + + connection = None + try: + logging.info("Connection to Database") + # establishing the connection + connection = psycopg2.connect( + database=database, + user=user, + password=password, + host=host, + port=port + ) + # create a cursor + cur = connection.cursor() + # display the PostgreSQL database server version + cur.execute('SELECT version()') + logging.info("PostgreSQL database version: {}".format(cur.fetchone())) + + except psycopg2.Error as exp: + logger.error("Could not connect to DataBase {}, results in {}".format(database, exp)) + connection.close() + logging.info("Database Connection Closed") + + return connection + + +def execute_query(connection, sql_query, data=None): + """ + Execute query of the ldvadmin Database + There is not commit required because we are doing only read queries + The 'connector' will be implemented differently, for now some hard coding with the assumption + that tunnel to ldvadmin is already created + :param: sql_query + :param: data (optional) the data arguments of the sql_query + :return result of the query + """ + try: + cursor = connection.cursor() + cursor.execute(sql_query, data) + connection.commit() + return cursor.fetchall() + except Exception as exp: + logger.error("Could not execute query! '{}' results in {}".format(sql_query, exp)) + + +def main(): + """ + Migrates data from the ldvadmin database to a ldv-spec-db database. + """ + logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) + + # Check the invocation arguments + parser = argparse.ArgumentParser() + parser.add_argument("--version", default=False, help="Show current version of this program", action="store_true") + parser.add_argument("-v", "--verbose", default=False, help="More information at run time.", action="store_true") + parser.add_argument("-l", "--limit", default=0, type=int, help="Limit on the number of queries (0 is no limit)", action="store") + parser.add_argument("-t", "--token", default="ad9b37a24380948601257f9c1f889b07a00ac81e", + help="Token to access the REST API of ldvspec", action="store") + parser.add_argument("--host", nargs="?", default='dev', + help="Presets are 'dev', 'test', 'prod'. Otherwise give a full url like https://atdb.astron.nl/atdb") + args = parser.parse_args() + + if args.version: + # Get file's Last modification time stamp only in terms of seconds since epoch and convert in timestamp + modification_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(os.path.getmtime(__file__))) + print("--- {} (last updated {}) ---".format(os.path.basename(__file__), modification_time)) + return + + if args.verbose: + change_logging_level(logging.DEBUG) + + if args.limit == 0: + limit_str = "" + else: + limit_str = "limit {}".format(args.limit) + logging.debug("Limit on number of dataproducts to query is set to {}".format(args.limit)) + + with open_connection_to_postgres_database() as conn: + result_query_all_dps = execute_query(conn, + "select obsid, obsid_source, dp_type, project, activity, uri, size, dysco " + "from astrowise.raw_dataproducts {}".format(limit_str)) + # Are there still dataproducts left to query? + nbr_dps_left = len(result_query_all_dps) - args.limit + if nbr_dps_left > 0 and args.limit > 0: + logging.debug("Limit on number of leftover dataproducts to query is set to {}".format(nbr_dps_left)) + limit_str = "limit {}".format(nbr_dps_left) + result_query_all_dps.extend(execute_query(conn, + "select obsid, obsid_source, dp_type, project, activity, uri, size, dysco " + "from astrowise.pl_dataproducts {}".format(limit_str))) + + logging.info("{} dataproduct retrieved from ldvadmin".format(len(result_query_all_dps))) + # Create connection with ldv-spec-db using REST API, use temp. token created in my test-env + ldvspec_interface = LDVSpecInterface(args.host, args.token) + + # Now obtain the attributes from result an add to database using REST insert_dataproduct method + lst_all_dps = [] + for dps in result_query_all_dps: + logging.debug(dps) + metadata_str = "{'dysco_compression': %s}" % dps[7] + dps_dict = {"obs_id": dps[0], "oid_source": dps[1], "dataproduct_source": "LOFAR LTA", + "dataproduct_type": dps[2], "project": dps[3], "activity": dps[4], "surl": dps[5], + "filesize": dps[6], "additional_meta": metadata_str, "location": dps[5]} + # We can do a bulk insert (there should be a max in 'payload length somewhere -> TODO + lst_all_dps.append(dps_dict) + + res_lst_ids = ldvspec_interface.insert_multiple_dataproduct(payload=lst_all_dps) + logging.info("Added {} DataProduct objects".format(len(res_lst_ids))) + logging.debug("Added with ids={}".format(res_lst_ids)) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/ldvspec/lofardata/tests/test_dataproduct.py b/ldvspec/lofardata/tests/test_dataproduct.py index 138634b3d5811b60ce73dda190f54d1cc32a2a21..47c0c656d27451e027e567cf16827c88883cf90b 100644 --- a/ldvspec/lofardata/tests/test_dataproduct.py +++ b/ldvspec/lofardata/tests/test_dataproduct.py @@ -9,14 +9,14 @@ test_object_value = dict(obs_id='12345', oid_source='SAS', dataproduct_source='l project='LT10_10', location='srm://surfsara.nl:4884/', activity='observation', - surl='srm://surfsara.nl:4884/...', + surl='srm://surfsara.nl:4884/subfolder/some_nice.tar', filesize=40, additional_meta={'dysco_compression': True}) class TestDatabaseInteraction(dtest.TestCase): def test_insert(self): - location = DataLocation(name='surfsara', uri='srm://surfsara.nl') + location = DataLocation(name='sara', uri='srm://surfsara.nl') location.save() test_values = dict(test_object_value) test_values['location'] = location @@ -36,8 +36,8 @@ class TestDatabaseInteraction(dtest.TestCase): for field in test_values: self.assertEqual(test_object_value.get(field), getattr(dp, field), msg=f'Field {field} does not coincide') - self.assertEqual('surfsara.nl:4884', dp.location.name) - self.assertEqual( 'srm://surfsara.nl:4884', dp.location.uri) + self.assertEqual('Sara', dp.location.name) + self.assertEqual('surfsara.nl:4884', dp.location.uri) class TestRESTAPI(rtest.APITestCase): @@ -47,23 +47,23 @@ class TestRESTAPI(rtest.APITestCase): def test_insert_not_allowed(self): client = rtest.APIClient() - response = client.post('/ldvspec/api/v1/data/insert/', data={}, format='json') + response = client.post('/ldvspec/api/v1/insert_dataproduct/', data={}, format='json') self.assertEqual(response_status.HTTP_403_FORBIDDEN, response.status_code) def test_insert_flat_error(self): - response = self.client.post('/ldvspec/api/v1/data/insert/', data={}, format='json') + response = self.client.post('/ldvspec/api/v1/insert_dataproduct/', data={}, format='json') self.assertEqual(response_status.HTTP_400_BAD_REQUEST, response.status_code) def test_insert_flat_single(self): test_payload = dict(test_object_value) test_payload.pop('location') - response = self.client.post('/ldvspec/api/v1/data/insert/', data=test_payload, format='json') + response = self.client.post('/ldvspec/api/v1/insert_dataproduct/', data=test_payload, format='json') self.assertEqual(response_status.HTTP_201_CREATED, response.status_code) def test_insert_flat_multi(self): test_payload = dict(test_object_value) test_payload.pop('location') - response = self.client.post('/ldvspec/api/v1/data/insert/', data=[test_payload, test_payload], format='json') + response = self.client.post('/ldvspec/api/v1/insert_dataproduct/', data=[test_payload, test_payload], format='json') self.assertEqual(response_status.HTTP_201_CREATED, response.status_code) self.assertTrue(DataProduct.objects.count() == 2, 'Not all dataproduct have been inserted') @@ -72,7 +72,7 @@ class TestRESTAPI(rtest.APITestCase): def test_insert_flat_multi_insert_single(self): test_payload = dict(test_object_value) test_payload.pop('location') - response = self.client.post('/ldvspec/api/v1/data/insert/', data=test_payload, format='json') + response = self.client.post('/ldvspec/api/v1/insert_dataproduct/', data=test_payload, format='json') self.assertEqual(response_status.HTTP_201_CREATED, response.status_code) self.assertTrue(DataProduct.objects.count() == 1, 'Not all dataproduct have been inserted') diff --git a/ldvspec/lofardata/tests/test_location.py b/ldvspec/lofardata/tests/test_location.py index 86d6466c3b45485d547ffcf21c843bdae7bfdd21..b41126e551939ef695a6c2978e614e8a955eb8bc 100644 --- a/ldvspec/lofardata/tests/test_location.py +++ b/ldvspec/lofardata/tests/test_location.py @@ -34,8 +34,8 @@ class TestDataLocation(dtest.TestCase): dataloc = DataLocation.insert_location_from_string('srm://test_site:44321/') self.assertTrue(dataloc.pk is not None, 'Cannot save object') - self.assertEqual('test_site:44321', dataloc.name) - self.assertEqual('srm://test_site:44321', dataloc.uri) + self.assertEqual('Unknown', dataloc.name) + self.assertEqual('test_site:44321', dataloc.uri) class TestRESTAPI(rtest.APITestCase): @@ -52,4 +52,4 @@ class TestRESTAPI(rtest.APITestCase): response = self.client.post('/ldvspec/api/v1/data-location/', data=dict(name='testname', uri='srm://myniceuri/'), format='json') self.assertEqual(response_status.HTTP_201_CREATED, response.status_code) - self.assertEqual('srm://myniceuri/', DataLocation.objects.get(name='testname').uri) \ No newline at end of file + self.assertEqual('myniceuri/', DataLocation.objects.get(name='testname').uri) \ No newline at end of file diff --git a/ldvspec/lofardata/urls.py b/ldvspec/lofardata/urls.py index e8fdbc8e042d4739eb6652283e48dd6efa12ff23..930e0abff8221293f6466faf10f89727460296b2 100644 --- a/ldvspec/lofardata/urls.py +++ b/ldvspec/lofardata/urls.py @@ -14,7 +14,7 @@ urlpatterns = [ # REST API path('api/v1/data/', views.DataProductView.as_view(), name='dataproduct'), - path('api/v1/data/insert/', views.InsertMultiDataproductView.as_view(), name='dataproduct-insert'), + path('api/v1/insert_dataproduct/', views.InsertMultiDataproductView.as_view(), name='dataproduct-insert'), path('api/v1/data-location/', views.DataLocationView.as_view(), name='datalocation'), path('api/v1/openapi/', get_schema_view(