diff --git a/README.md b/README.md index fec85a452347841340b425e11e8d1579c6032b9f..d2ea05fa9835e4a56724486cf98a148ad9ba3588 100644 --- a/README.md +++ b/README.md @@ -67,5 +67,67 @@ pip install pip install -e "git+https://git.astron.nl/ldv/ldv_utils.git#egg=atdb atdb_inspect_transition [sas_id] [from status] [to status] # More info and flags -atdb_csv_gen -h +atdb_inspect_transition -h +``` + + +## ldvspec-migration script +This is a quick script that is probably used only once to migrate the data from the +old `ldvadmin` database to the new `ldv-spec-db` database. + +### Installation +```bash +# Using pip install (requires valid ssh key, until the repo is set to public) +pip install pip install -e "git+https://git.astron.nl/ldv/ldv_utils.git#egg=ldvspec-migration&subdirectory=ldv_migrate" +``` + +### Building +Manual from a checked out repo. +Look for the 'scripts' directory and run `./build.sh`. +This will create a 'dist' directory, containing the pip installable archive. +`ldvspec-migration-1.0.0.tar.gz` + +### Deploying (manually) +For example on `sdc@dop814.astron.nl` (this is our sdc-dev test machine) + ``` +# only once +> cd ~ +> mkdir ldv-migrate +> cd ldvspec-migrate +> virtualenv env3.8 -p python3.8 (on ubuntu) +> python3 -m venv env3.8 (on centos) +> source env3.8/bin/activate +``` + +Copy the package (`ldvspec-migration-1.0.0.tar.gz`) into that directory. +And pip install it... +``` +> pip install pip install ./ldvspec-migration-1.0.0.tar.gz --upgrade +Successfully installed ldvspec-migration-1.0.0 +``` + + +### Running + +To test if it works +``` +(env3.8) nvermaas@nicodev:~/ldv-migrate$ ldv_migrate -h +usage: ldv_migrate [-h] [--version] [-v] [-l LIMIT] [-t TOKEN] [--host [HOST]] [-s SECTION] [-r MAX_NBR_DPS_TO_INSERT_PER_REQUEST] + +optional arguments: + -h, --help show this help message and exit + --version Show current version of this program + -v, --verbose More information at run time. + -l LIMIT, --limit LIMIT + Limit on the number of queries (0 is no limit) + -t TOKEN, --token TOKEN + Token to access the REST API of ldvspec + --host [HOST] The ldv-spec-db host. Presets are 'dev', 'test', 'prod', otherwise give a full url like https://atdb.astron.nl/atdb + -s SECTION, --section SECTION + Add the configuration's section from the database.cfg. + -r MAX_NBR_DPS_TO_INSERT_PER_REQUEST, --max_nbr_dps_to_insert_per_request MAX_NBR_DPS_TO_INSERT_PER_REQUEST + The number of dataproducts to insert per REST request (0 is no limit) + +``` + diff --git a/atdb_csv_gen/setup.py b/atdb_csv_gen/setup.py index 5716628387e4b5a2443c757dc996c39635c99c45..4c3e72982f9101aa639342eac4de8de2d847f0dd 100644 --- a/atdb_csv_gen/setup.py +++ b/atdb_csv_gen/setup.py @@ -2,16 +2,15 @@ from setuptools import setup - def get_requirements() -> str: with open("requirements.txt") as reqs: return reqs.read().split("\n") setup( - name="atdb-csv-gen", + name="ldvspec-migration", version="1.0.0", - description="Tool to generate csv files for ATDB", - author="ASTRON", + description="Migration script to copy data from ldvadmin to ldv-spec-db", + author='Roy de Goei, Fanna Lautenback, Nico Vermaas for ASTRON', packages=["atdb_csv_gen"], install_requires=get_requirements(), scripts=["bin/atdb_csv_gen"], diff --git a/ldv_migrate/build.sh b/ldv_migrate/build.sh new file mode 100644 index 0000000000000000000000000000000000000000..a45e14328a6faeaefcd4afa81f9a0b16dbc3e72f --- /dev/null +++ b/ldv_migrate/build.sh @@ -0,0 +1,9 @@ +#!/bin/bash +# This script make a source distribution for the pip installable package in the current folder +echo "Build package for ldvspec-migration" +python --version +# Explicit give format otherwise a zip is created (Windows?) +python setup.py sdist --formats=gztar + +# Next command will not close the window, can be handy if something goes wrong +exec $SHELL \ No newline at end of file diff --git a/ldv_migrate/ldv_migrate/__init__.py b/ldv_migrate/ldv_migrate/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/ldv_migrate/ldv_migrate/connection/__init__.py b/ldv_migrate/ldv_migrate/connection/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/ldv_migrate/ldv_migrate/connection/config.py b/ldv_migrate/ldv_migrate/connection/config.py new file mode 100644 index 0000000000000000000000000000000000000000..aa517437534e4cd3898ae6d983079dc152e11e00 --- /dev/null +++ b/ldv_migrate/ldv_migrate/connection/config.py @@ -0,0 +1,21 @@ +from configparser import ConfigParser +import os + + +def read_config(section, filename='database.cfg'): + full_filename = os.path.join(os.path.dirname(__file__), filename) + parser = ConfigParser() + read_result = parser.read(full_filename) + # If file not found then parser returns just an empty list it does not raise an Exception! + if len(read_result) == 0: + raise Exception("Configuration file with filename {0} not found".format(full_filename)) + + db_settings = {} + if parser.has_section(section): + params = parser.items(section) + for param in params: + db_settings[param[0]] = param[1] + else: + raise Exception('Section {0} not found in the {1} file'.format(section, filename)) + + return db_settings diff --git a/ldv_migrate/ldv_migrate/connection/database.cfg b/ldv_migrate/ldv_migrate/connection/database.cfg new file mode 100644 index 0000000000000000000000000000000000000000..05df7ee81622d5db71a04bf57c38b2016bbc0e36 --- /dev/null +++ b/ldv_migrate/ldv_migrate/connection/database.cfg @@ -0,0 +1,15 @@ +[postgresql-local] +host=localhost +port=5433 +database=ldv-spec-db +user=postgres +password=secret + +[postgresql-ldv] +tunnelhost=dop821.astron.nl +tunnelusername=sdco +host=sdc-db.astron.nl +port=5432 +database=ldvadmin +user=ldvrbow +password=Wehn5CTYj1RcbstMSGls \ No newline at end of file diff --git a/ldv_migrate/ldv_migrate/connection/retrieve_db_connection.py b/ldv_migrate/ldv_migrate/connection/retrieve_db_connection.py new file mode 100644 index 0000000000000000000000000000000000000000..251cfb68520dad125b8fc138af585b75ec11fedc --- /dev/null +++ b/ldv_migrate/ldv_migrate/connection/retrieve_db_connection.py @@ -0,0 +1,95 @@ +import sys + +import psycopg2 +from connection.config import read_config +import logging +import argparse +from sshtunnel import SSHTunnelForwarder +import os + + +def connect_postgresql(section): + """ Connect to the PostgreSQL database server """ + conn = None + tunnel = None + try: + # read connection parameters + configuration = read_config(section=section) + + logging.info('Connecting PostgreSQL database %s', configuration.get('database', 'no database name given')) + + host = configuration.get('host', 'no host given') + if host != 'localhost': + tunnel = open_tunnel(configuration) + conn = psycopg2.connect(host='localhost', + port=tunnel.local_bind_port, + database=configuration.get('database'), + user=configuration.get('user'), + password=configuration.get('password')) + else: + conn = psycopg2.connect(**configuration) + + cur = conn.cursor() + cur.execute('SELECT version()') + db_version = cur.fetchone() + logging.info('Database version: ' + db_version[0]) + + except (Exception, psycopg2.DatabaseError) as error: + logging.error(error) + if tunnel is not None: + tunnel.stop() + + return conn, tunnel + + +def open_tunnel(configuration_params): + tunnel_host = configuration_params.get('tunnelhost', "no tunnel host given") + tunnel_username = configuration_params.get('tunnelusername', "no username for the tunnel given") + host = configuration_params.get('host', "no host given") + port = int(configuration_params.get('port', "no port given")) + + try: + ssh_config_file = os.path.expanduser("~/.ssh/config") + except FileNotFoundError as exc: + raise FileNotFoundError( + "Ssh config file not found on standard path '~/.ssh/config'. This is mandatory for opening the ssh tunnel" + ) from exc + + logging.info("Creating ssh tunnel for %s and port %s with tunnel host %s and username %s", repr(host), port, + repr(tunnel_host), repr(tunnel_username)) + ssh_tunnel = SSHTunnelForwarder( + ssh_address_or_host=tunnel_host, + ssh_username=tunnel_username, + ssh_config_file=ssh_config_file, + remote_bind_address=(host, port) + ) + ssh_tunnel.start() + return ssh_tunnel + + +def main(): + """ + Opens a database connection from configuration file database.cfg + """ + logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG) + + # Check the invocation arguments + parser = argparse.ArgumentParser() + parser.add_argument("-s", "--section", help="Add the configuration's section from the database.cfg.") + args = parser.parse_args() + + if not args.section: + logging.critical("Error: no configuration section given. Try --help") + sys.exit(-1) + + return connect_postgresql(args.section) + + +if __name__ == '__main__': + connection, server = main() + if connection is not None: + connection.close() + logging.info('Database connection closed.') + if server is not None: + server.stop() + logging.info('Tunneled server stopped.') diff --git a/ldv_migrate/ldv_migrate/connection/test.py b/ldv_migrate/ldv_migrate/connection/test.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/ldv_migrate/ldv_migrate/ldv_specification_interface.py b/ldv_migrate/ldv_migrate/ldv_specification_interface.py new file mode 100644 index 0000000000000000000000000000000000000000..d0469ad552c8646e191c6bdebc55387e75e72451 --- /dev/null +++ b/ldv_migrate/ldv_migrate/ldv_specification_interface.py @@ -0,0 +1,158 @@ +""" +This module provides a REST interface to the LDV specifications + +Could be that this should be replaced with ldv_specification_interface module/python package +Its kinda prototype module for now required for the migration script +""" +import requests +import logging + +from json.decoder import JSONDecodeError + +try: + from simplejson.errors import JSONDecodeError as SimpleJSONDecodeError +except ModuleNotFoundError: + from json.decoder import JSONDecodeError as SimpleJSONDecodeError + +import argparse +import datetime +from urllib.parse import urlparse, urlunparse +# ============================================================== +# The request header +ATDB_HEADER = { + 'content-type': "application/json", + 'cache-control': "no-cache" +} +LDV_HOST_DEV = "http://localhost:8000/ldvspec/api/v1" # your local development environment with Django webserver +LDV_HOST_TEST = "https://sdc-dev.astron.nl:5554/ldvspec/api/v1" # the ldv sdc test environment. +LDV_HOST_PROD = "https://sdc.astron.nl:5554/ldvspec/api/v1" # the ldv sdc production environment. + + +class APIException(Exception): + pass + +class APIMissing(Exception): + pass + + +def is_http_exception(response: requests.Response): + if response.status_code in range(400, 499): + return True + return False + + +def is_success(response: requests.Response): + if response.status_code in range(200, 300): + return True + return False + + +def is_missing(response: requests.Response): + if response.status_code == 404: + return True + return False + + +def can_retry(response: requests.Response): + if response.status_code in [408, 429]: + return True + return False + + +class LDVSpecInterface(): + """ + This class is used to connect via REST interface + """ + def __init__(self, host, token): + """ + Constructor. + :param host: the host name of the backend. + :param token: The token to login + """ + # accept some presets to set host to dev, test, acc or prod + self.host = host + if self.host == 'dev': + self.host = LDV_HOST_DEV + elif self.host == 'test': + self.host = LDV_HOST_TEST + elif self.host == 'prod': + self.host = LDV_HOST_PROD + if not self.host.endswith('/'): + self.host += '/' + self.header = ATDB_HEADER + self.header['Authorization'] = f'Token {token}' + self._session = None + + def session(self): + if self._session is None: + self._session = requests.Session() + self._session.headers.update(self.header) + return self._session + + def _request(self, url, type, query_parameters=None, payload=None): + parsed_url = urlparse(url) + if not parsed_url.path.endswith('/'): + parsed_url = parsed_url._replace(path=parsed_url.path + '/') + + url = urlunparse(parsed_url) + + if isinstance(payload, str): + response = self.session().request(type, url, data=payload, headers=self.header, params=query_parameters) + else: + response = self.session().request(type, url, json=payload, headers=self.header, params=query_parameters) + + logging.debug(f"[{type} {response.url} ]") + logging.debug("Response: " + str(response.status_code) + ", " + str(response.reason)) + if is_missing(response): + raise APIMissing(url) + elif is_http_exception(response) and can_retry(response): + return self._request(url, type, query_parameters, payload) + elif is_http_exception(response): + raise APIException(f'{response.status_code}: {response.reason} {response.content}') + elif is_success(response): + try: + json_response = response.json() + + return json_response + except (SimpleJSONDecodeError, JSONDecodeError): + return response.content + raise APIException(f'Unrecognized response code {response.status_code}: {response.reason} {response.content}') + # raise (Exception("ERROR: " + response.url + " not found.")) + + # === Backend requests ================================================================================ + def do_POST_json(self, resource, payload): + """ + POST a payload to a resource (table). This creates a new object (observation or dataproduct) + This function replaces the old do_POST function that still needed to convert the json content in a very ugly + :param resource: contains the resource, for example 'observations', 'dataproducts' + :param payload: the contents of the object to create in json format + """ + url = self.host + resource + if not resource.endswith('/'): + resource += '/' + logging.debug(f'do_POST_json using url={url} and with payload: {payload}') + try: + json_response = self._request(url, 'POST', payload=payload) + if hasattr(json_response, 'id'): + return json_response['id'] + else: + return -1 + except Exception as err: + raise err + + def insert_multiple_dataproduct(self, payload): + """ + Insert multiple dataproducts. Implicit also dataproduct-location is added + :param: List of payload string + :return: List id of added dataproducts + """ + url = self.host + "insert_dataproduct/" + logging.debug(f'insert_multiple_dataproduct using url={url} and with payload: {payload}') + try: + json_response = self._request(url, 'POST', payload=payload) + response_lst_ids = [] + for resp in json_response: + response_lst_ids.append(resp['id']) + return response_lst_ids + except Exception as err: + raise err \ No newline at end of file diff --git a/ldv_migrate/ldv_migrate/migrate_ldvadmin_to_ldvspec.py b/ldv_migrate/ldv_migrate/migrate_ldvadmin_to_ldvspec.py new file mode 100644 index 0000000000000000000000000000000000000000..1b4ef604fcc451a9d9bc50177d50f9c59d488b05 --- /dev/null +++ b/ldv_migrate/ldv_migrate/migrate_ldvadmin_to_ldvspec.py @@ -0,0 +1,165 @@ +#!/usr/bin/env python3 +""" +This script migrates data from ldvadmin to ldv-spec-db with user arguments. +By default, the local ldv-spec-db is used and the ldvadmin on sdc-db.astron.nl +Script requires token to access the ldv-spec-db with REST API + +Some examples: +- Show latest version: + python ./ldvspec/lofardata/scripts/migrate_ldvadmin_to_ldvspec.py --version +- Import only 1000 records and show more verbosity: + python ./ldvspec/lofardata/scripts/migrate_ldvadmin_to_ldvspec.py --limit 1000 --verbose +- Import 50000 records and insert in steps of 10000 (so 5 steps) + python ./ldvspec/lofardata/scripts/migrate_ldvadmin_to_ldvspec.py --limit 50000 --max_nbr_dps_to_insert_per_request 10000 +- Import only 1000 records at production: + python ./ldvspec/lofardata/scripts/migrate_ldvadmin_to_ldvspec.py --limit 1000 --host prod + +""" +import os +import time +import logging +import argparse +import sys +import math + +import connection.retrieve_db_connection as connector +from ldv_specification_interface import LDVSpecInterface + +logger = logging.getLogger(__file__) +handler = logging.StreamHandler(sys.stdout) +handler.setLevel(logging.INFO) +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +handler.setFormatter(formatter) +logger.addHandler(handler) + + +def change_logging_level(level): + logger = logging.getLogger() + logger.setLevel(level) + for handler in logger.handlers: + handler.setLevel(level) + + +def execute_query(connection, sql_query, data=None): + """ + Execute query of the ldvadmin Database + There is no commit required because we are doing only read queries + :param: connection: Database 'connection' + :param: sql_query + :param: data (optional) the data arguments of the sql_query + :return result of the query + """ + try: + cursor = connection.cursor() + cursor.execute(sql_query, data) + connection.commit() + return cursor.fetchall() + except Exception as exp: + logger.error("Could not execute query! '{}' results in -> {}".format(sql_query, exp)) + + +def main(): + """ + Migrates data from the ldvadmin database to a ldv-spec-db database. + """ + logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) + + # Check the invocation arguments + parser = argparse.ArgumentParser() + parser.add_argument("--version", default=False, help="Show current version of this program", action="store_true") + parser.add_argument("-v", "--verbose", default=False, help="More information at run time.", action="store_true") + parser.add_argument("-l", "--limit", default=0, type=int, help="Limit on the number of queries (0 is no limit)", action="store") + parser.add_argument("-t", "--token", default="ad9b37a24380948601257f9c1f889b07a00ac81e", + help="Token to access the REST API of ldvspec", action="store") + parser.add_argument("--host", nargs="?", default='dev', + help="The ldv-spec-db host. Presets are 'dev', 'test', 'prod', otherwise give a full url like https://atdb.astron.nl/atdb") + parser.add_argument("-s", "--section", default='postgresql-ldv', + help="Add the configuration's section from the database.cfg.") + # Have payload of more millions will most likely not work + # tested with 10.000 results in 90 seconds so # 11 mil. will be at least 28 hours + parser.add_argument("-r", "--max_nbr_dps_to_insert_per_request", default=1000, type=int, + help="The number of dataproducts to insert per REST request (0 is no limit)", action="store") + + args = parser.parse_args() + + if args.version: + # Get file's Last modification time stamp only in terms of seconds since epoch and convert in timestamp + modification_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(os.path.getmtime(__file__))) + logging.info("--- {} (last updated {}) ---".format(os.path.basename(__file__), modification_time)) + return + + logging.info("\nMoment please....\n") + + if args.verbose: + change_logging_level(logging.DEBUG) + + if args.limit == 0: + limit_str = "" + else: + limit_str = "limit {}".format(args.limit) + logging.debug("Limit on number of dataproducts to query is set to {}".format(args.limit)) + + if args.max_nbr_dps_to_insert_per_request == 0: + no_limit_to_insert = True + else: + no_limit_to_insert = False + logging.debug("Limit on number of dataproducts to insert REST is set to {}".format(args.max_nbr_dps_to_insert_per_request)) + + query_count_all_raw_dataproducts = "select count(*) from astrowise.raw_dataproducts" + query_count_all_pipeline_dataproducts = "select count(*) from astrowise.pl_dataproducts" + query_all_required_fields_raw_dataproducts = \ + "select obsid, obsid_source, dp_type, project, activity, uri, size, dysco from astrowise.raw_dataproducts {}"\ + .format(limit_str) + + # Create connection using ssh tunnel with the ldvadmin database + conn, tunnel = connector.connect_postgresql(args.section) + count_raw_dps = execute_query(conn, query_count_all_raw_dataproducts)[0][0] + count_pl_dps = execute_query(conn, query_count_all_pipeline_dataproducts)[0][0] + logging.info(f"There are {count_raw_dps} raw dataproducts and {count_pl_dps} pipeline dataproduct in the ldvadmin.astrowise table!!") + result_query_all_dps = execute_query(conn, query_all_required_fields_raw_dataproducts) + + # Are there still dataproducts left to query? + nbr_dps_left = len(result_query_all_dps) - args.limit + if nbr_dps_left > 0 and args.limit > 0: + logging.debug("Limit on number of leftover dataproducts to query is set to {}".format(nbr_dps_left)) + limit_str = "limit {}".format(nbr_dps_left) + query_all_required_fields_pipeline_dataproducts = \ + "select obsid, obsid_source, dp_type, project, activity, uri, size, dysco from astrowise.pl_dataproducts {}" \ + .format(limit_str) + result_query_all_dps.extend(execute_query(conn, query_all_required_fields_pipeline_dataproducts)) + + logging.info("{} dataproduct retrieved from ldvadmin".format(len(result_query_all_dps))) + # Create connection with ldv-spec-db using REST API, use temp. token created in my test-env + ldvspec_interface = LDVSpecInterface(args.host, args.token) + + # Now obtain the attributes from result an add to database using REST insert_dataproduct method + lst_all_dps = [] + for dps in result_query_all_dps: + logging.debug(dps) + metadata_str = "{'dysco_compression': %s}" % dps[7] + dps_dict = {"obs_id": dps[0], "oid_source": dps[1], "dataproduct_source": "LOFAR LTA", + "dataproduct_type": dps[2], "project": dps[3], "activity": dps[4], "surl": dps[5], + "filesize": dps[6], "additional_meta": metadata_str, "location": dps[5]} + lst_all_dps.append(dps_dict) + + if no_limit_to_insert: + res_lst_ids = ldvspec_interface.insert_multiple_dataproduct(payload=lst_all_dps) + logging.info("Added {} DataProduct objects".format(len(res_lst_ids))) + logging.debug("Added with ids={}".format(res_lst_ids)) + else: + cnt = 0 + nbr_required_inserts = math.ceil(len(lst_all_dps)/args.max_nbr_dps_to_insert_per_request) + for cnt in range(nbr_required_inserts): + start = cnt * args.max_nbr_dps_to_insert_per_request + end = start + args.max_nbr_dps_to_insert_per_request + cnt += 1 + res_lst_ids = ldvspec_interface.insert_multiple_dataproduct(payload=lst_all_dps[start:end]) + logging.info("Insert count {} of {}: Added {} DataProduct objects [{} till {}]". + format(cnt, nbr_required_inserts, args.max_nbr_dps_to_insert_per_request, start, end)) + logging.debug("Added with ids={}".format(res_lst_ids)) + + logging.info("\nThat's All Folks!\n") + + +if __name__ == "__main__": + main() diff --git a/ldv_migrate/requirements.txt b/ldv_migrate/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..78fbaa96c128c1f3f93a76c32f3acb2dd5ff149b --- /dev/null +++ b/ldv_migrate/requirements.txt @@ -0,0 +1,3 @@ +sshtunnel==0.4.0 +requests==2.28.1 +psycopg2-binary==2.9.3 \ No newline at end of file diff --git a/ldv_migrate/setup.py b/ldv_migrate/setup.py new file mode 100644 index 0000000000000000000000000000000000000000..1813e2a44f9110e3bc64062522871f89ca025cbc --- /dev/null +++ b/ldv_migrate/setup.py @@ -0,0 +1,22 @@ +from setuptools import setup, find_packages + +def get_requirements() -> str: + with open("requirements.txt") as reqs: + return reqs.read().split("\n") + +setup(name='ldvspec-migration', + version='1.0.0', + description='Migration script to copy data from ldvadmin to ldv-spec-db', + url='https://git.astron.nl/astron-sdc/ldv-specification/-/tree/main/ldvspec/scripts', + author='Roy de Goei, Fanna Lautenback, Nico Vermaas', + author_email='vermaas@astron.nl', + license='Apache 2.0', + install_requires=get_requirements(), + packages=find_packages(), + include_package_data=True, + entry_points={ + 'console_scripts': [ + 'ldv_migrate=ldv_migrate.migrate_ldvadmin_to_ldvspec:main', + ], + }, + ) \ No newline at end of file