Skip to content
Snippets Groups Projects
Commit 411ab9ea authored by Nico Vermaas's avatar Nico Vermaas
Browse files

Merge branch 'SDC-708-add-ldv-migrate-script' into 'master'

Add ldv_migrate script

Closes SDC-708

See merge request !8
parents 0dd5a607 77f2fabc
Branches
Tags
1 merge request!8Add ldv_migrate script
......@@ -67,5 +67,69 @@ pip install pip install -e "git+https://git.astron.nl/ldv/ldv_utils.git#egg=atdb
atdb_inspect_transition [sas_id] [from status] [to status]
# More info and flags
atdb_csv_gen -h
atdb_inspect_transition -h
```
## ldv-migrate script
This is a script to migrate the data from the old `ldvadmin` database to the new `ldv-spec-db` database.
### Deploying (manually)
For example on `sdc@dop814.astron.nl` (this is our sdc-dev test machine)
```
# only once
> cd ~
> mkdir ldv-migrate
> cd ldvspec-migrate
> virtualenv env3.9 -p python3.9 (on ubuntu)
> python3 -m venv env3.9 (on centos)
> source env3.9/bin/activate
> pip3 install --upgrade setuptools pip
```
### Installation
```bash
# Using pip install (requires valid ssh key, until the repo is set to public)
pip install -e "git+https://git.astron.nl/ldv/ldv_utils.git#egg=ldvspec-migration&subdirectory=ldv_migrate" --upgrade
```
### Running
To test if it works
```bash
> ldv_migrate -h
usage: ldv_migrate [-h] [--version] [-v] [-l LIMIT] [-t TOKEN] [--host [HOST]] [-s SECTION] [-r MAX_NBR_DPS_TO_INSERT_PER_REQUEST]
optional arguments:
-h, --help show this help message and exit
--version Show current version of this program
-v, --verbose More information at run time.
-l LIMIT, --limit LIMIT
Limit on the number of queries (0 is no limit)
-t TOKEN, --token TOKEN
Token to access the REST API of ldvspec
--host [HOST] The ldv-spec-db host.
--configuration CONFIGURATION
Configuration file containing tunnel and ldvadmin database credentials
-s SECTION, --section SECTION
Add the configuration's section from the database.cfg.
-r MAX_NBR_DPS_TO_INSERT_PER_REQUEST, --max_nbr_dps_to_insert_per_request MAX_NBR_DPS_TO_INSERT_PER_REQUEST
The number of dataproducts to insert per REST request (0 is no limit)
```
### Examples
Some examples:
```
- Show latest version:
ldv_migrate --version
- Import only 1000 records and show more verbosity:
ldv_migrate --limit 1000 --verbose
- Import 50000 records and insert in steps of 10000 (so 5 steps)
ldv_migrate --limit 50000 --max_nbr_dps_to_insert_per_request 10000
- Import only 1000 records at production:
ldv_migrate --limit 1000 --host prod
```
......@@ -2,16 +2,15 @@
from setuptools import setup
def get_requirements() -> str:
with open("requirements.txt") as reqs:
return reqs.read().split("\n")
setup(
name="atdb-csv-gen",
name="ldvspec-migration",
version="1.0.0",
description="Tool to generate csv files for ATDB",
author="ASTRON",
description="Migration script to copy data from ldvadmin to ldv-spec-db",
author='Roy de Goei, Fanna Lautenback, Nico Vermaas for ASTRON',
packages=["atdb_csv_gen"],
install_requires=get_requirements(),
scripts=["bin/atdb_csv_gen"],
......
#!/bin/bash
# This script make a source distribution for the pip installable package in the current folder
echo "Build package for ldvspec-migration"
python --version
# Explicit give format otherwise a zip is created (Windows?)
python setup.py sdist --formats=gztar
# Next command will not close the window, can be handy if something goes wrong
exec $SHELL
\ No newline at end of file
from configparser import ConfigParser
import os
def read_config(configuration_file, section):
full_filename = os.path.join(os.path.dirname(__file__), configuration_file)
parser = ConfigParser()
read_result = parser.read(full_filename)
# If file not found then parser returns just an empty list it does not raise an Exception!
if len(read_result) == 0:
raise Exception("Configuration file with filename {0} not found".format(full_filename))
db_settings = {}
if parser.has_section(section):
params = parser.items(section)
for param in params:
db_settings[param[0]] = param[1]
else:
raise Exception('Section {0} not found in the {1} file'.format(section, filename))
return db_settings
import sys
import psycopg2
import logging
import argparse
from sshtunnel import SSHTunnelForwarder
import os
from ldv_migrate.connection.config import read_config
def connect_postgresql(configuration_file, section):
""" Connect to the PostgreSQL database server """
conn = None
tunnel = None
try:
# read connection parameters
configuration = read_config(configuration_file=configuration_file, section=section)
logging.info('Connecting PostgreSQL database %s', configuration.get('database', 'no database name given'))
host = configuration.get('host', 'no host given')
if host != 'localhost':
tunnel = open_tunnel(configuration)
conn = psycopg2.connect(host='localhost',
port=tunnel.local_bind_port,
database=configuration.get('database'),
user=configuration.get('user'),
password=configuration.get('password'))
else:
conn = psycopg2.connect(**configuration)
cur = conn.cursor()
cur.execute('SELECT version()')
db_version = cur.fetchone()
logging.info('Database version: ' + db_version[0])
except (Exception, psycopg2.DatabaseError) as error:
logging.error(error)
if tunnel is not None:
tunnel.stop()
return conn, tunnel
def open_tunnel(configuration_params):
tunnel_host = configuration_params.get('tunnelhost', "no tunnel host given")
tunnel_username = configuration_params.get('tunnelusername', "no username for the tunnel given")
host = configuration_params.get('host', "no host given")
port = int(configuration_params.get('port', "no port given"))
try:
ssh_config_file = os.path.expanduser("~/.ssh/config")
except FileNotFoundError as exc:
raise FileNotFoundError(
"Ssh config file not found on standard path '~/.ssh/config'. This is mandatory for opening the ssh tunnel"
) from exc
logging.info("Creating ssh tunnel for %s and port %s with tunnel host %s and username %s", repr(host), port,
repr(tunnel_host), repr(tunnel_username))
ssh_tunnel = SSHTunnelForwarder(
ssh_address_or_host=tunnel_host,
ssh_username=tunnel_username,
ssh_config_file=ssh_config_file,
remote_bind_address=(host, port)
)
ssh_tunnel.start()
return ssh_tunnel
def main():
"""
Opens a database connection from configuration file database.cfg
"""
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG)
# Check the invocation arguments
parser = argparse.ArgumentParser()
parser.add_argument("-s", "--section", help="Add the configuration's section from the database.cfg.")
args = parser.parse_args()
if not args.section:
logging.critical("Error: no configuration section given. Try --help")
sys.exit(-1)
return connect_postgresql(args.section)
if __name__ == '__main__':
connection, server = main()
if connection is not None:
connection.close()
logging.info('Database connection closed.')
if server is not None:
server.stop()
logging.info('Tunneled server stopped.')
"""
This module provides a REST interface to the LDV specifications
Could be that this should be replaced with ldv_specification_interface module/python package
Its kinda prototype module for now required for the migration script
"""
import requests
import logging
from json.decoder import JSONDecodeError
try:
from simplejson.errors import JSONDecodeError as SimpleJSONDecodeError
except ModuleNotFoundError:
from json.decoder import JSONDecodeError as SimpleJSONDecodeError
import argparse
import datetime
from urllib.parse import urlparse, urlunparse
# ==============================================================
# The request header
ATDB_HEADER = {
'content-type': "application/json",
'cache-control': "no-cache"
}
LDV_HOST_DEV = "http://localhost:8000/ldvspec/api/v1" # your local development environment with Django webserver
LDV_HOST_TEST = "https://sdc-dev.astron.nl:5554/ldvspec/api/v1" # the ldv sdc test environment.
LDV_HOST_PROD = "https://sdc.astron.nl:5554/ldvspec/api/v1" # the ldv sdc production environment.
class APIException(Exception):
pass
class APIMissing(Exception):
pass
def is_http_exception(response: requests.Response):
if response.status_code in range(400, 499):
return True
return False
def is_success(response: requests.Response):
if response.status_code in range(200, 300):
return True
return False
def is_missing(response: requests.Response):
if response.status_code == 404:
return True
return False
def can_retry(response: requests.Response):
if response.status_code in [408, 429]:
return True
return False
class LDVSpecInterface():
"""
This class is used to connect via REST interface
"""
def __init__(self, host, token):
"""
Constructor.
:param host: the host name of the backend.
:param token: The token to login
"""
# accept some presets to set host to dev, test, acc or prod
self.host = host
if self.host == 'dev':
self.host = LDV_HOST_DEV
elif self.host == 'test':
self.host = LDV_HOST_TEST
elif self.host == 'prod':
self.host = LDV_HOST_PROD
if not self.host.endswith('/'):
self.host += '/'
self.header = ATDB_HEADER
self.header['Authorization'] = f'Token {token}'
self._session = None
def session(self):
if self._session is None:
self._session = requests.Session()
self._session.headers.update(self.header)
return self._session
def _request(self, url, type, query_parameters=None, payload=None):
parsed_url = urlparse(url)
if not parsed_url.path.endswith('/'):
parsed_url = parsed_url._replace(path=parsed_url.path + '/')
url = urlunparse(parsed_url)
if isinstance(payload, str):
response = self.session().request(type, url, data=payload, headers=self.header, params=query_parameters)
else:
response = self.session().request(type, url, json=payload, headers=self.header, params=query_parameters)
logging.debug(f"[{type} {response.url} ]")
logging.debug("Response: " + str(response.status_code) + ", " + str(response.reason))
if is_missing(response):
raise APIMissing(url)
elif is_http_exception(response) and can_retry(response):
return self._request(url, type, query_parameters, payload)
elif is_http_exception(response):
raise APIException(f'{response.status_code}: {response.reason} {response.content}')
elif is_success(response):
try:
json_response = response.json()
return json_response
except (SimpleJSONDecodeError, JSONDecodeError):
return response.content
raise APIException(f'Unrecognized response code {response.status_code}: {response.reason} {response.content}')
# raise (Exception("ERROR: " + response.url + " not found."))
# === Backend requests ================================================================================
def do_POST_json(self, resource, payload):
"""
POST a payload to a resource (table). This creates a new object (observation or dataproduct)
This function replaces the old do_POST function that still needed to convert the json content in a very ugly
:param resource: contains the resource, for example 'observations', 'dataproducts'
:param payload: the contents of the object to create in json format
"""
url = self.host + resource
if not resource.endswith('/'):
resource += '/'
logging.debug(f'do_POST_json using url={url} and with payload: {payload}')
try:
json_response = self._request(url, 'POST', payload=payload)
if hasattr(json_response, 'id'):
return json_response['id']
else:
return -1
except Exception as err:
raise err
def insert_multiple_dataproduct(self, payload):
"""
Insert multiple dataproducts. Implicit also dataproduct-location is added
:param: List of payload string
:return: List id of added dataproducts
"""
url = self.host + "insert_dataproduct/"
logging.debug(f'insert_multiple_dataproduct using url={url} and with payload: {payload}')
try:
json_response = self._request(url, 'POST', payload=payload)
response_lst_ids = []
for resp in json_response:
response_lst_ids.append(resp['id'])
return response_lst_ids
except Exception as err:
raise err
\ No newline at end of file
#!/usr/bin/env python3
"""
This script migrates data from ldvadmin to ldv-spec-db with user arguments.
By default, the local ldv-spec-db is used and the ldvadmin on sdc-db.astron.nl
Script requires token to access the ldv-spec-db with REST API
Some examples:
- Show latest version:
python ./ldvspec/lofardata/scripts/migrate_ldvadmin_to_ldvspec.py --version
- Import only 1000 records and show more verbosity:
python ./ldvspec/lofardata/scripts/migrate_ldvadmin_to_ldvspec.py --limit 1000 --verbose
- Import 50000 records and insert in steps of 10000 (so 5 steps)
python ./ldvspec/lofardata/scripts/migrate_ldvadmin_to_ldvspec.py --limit 50000 --max_nbr_dps_to_insert_per_request 10000
- Import only 1000 records at production:
python ./ldvspec/lofardata/scripts/migrate_ldvadmin_to_ldvspec.py --limit 1000 --host prod
"""
import os
import time
import logging
import argparse
import sys
import math
import ldv_migrate.connection.retrieve_db_connection as connector
from ldv_migrate.ldv_specification_interface import LDVSpecInterface
logger = logging.getLogger(__file__)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
def change_logging_level(level):
logger = logging.getLogger()
logger.setLevel(level)
for handler in logger.handlers:
handler.setLevel(level)
def execute_query(connection, sql_query, data=None):
"""
Execute query of the ldvadmin Database
There is no commit required because we are doing only read queries
:param: connection: Database 'connection'
:param: sql_query
:param: data (optional) the data arguments of the sql_query
:return result of the query
"""
try:
cursor = connection.cursor()
cursor.execute(sql_query, data)
connection.commit()
return cursor.fetchall()
except Exception as exp:
logger.error("Could not execute query! '{}' results in -> {}".format(sql_query, exp))
def main():
"""
Migrates data from the ldvadmin database to a ldv-spec-db database.
"""
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
# Check the invocation arguments
parser = argparse.ArgumentParser()
parser.add_argument("--version", default=False, help="Show current version of this program", action="store_true")
parser.add_argument("-v", "--verbose", default=False, help="More information at run time.", action="store_true")
parser.add_argument("-l", "--limit", default=0, type=int, help="Limit on the number of queries (0 is no limit)", action="store")
parser.add_argument("-t", "--token", default="ad9b37a24380948601257f9c1f889b07a00ac81e",
help="Token to access the REST API of ldvspec", action="store")
parser.add_argument("--host", nargs="?", default='dev',
help="The ldv-spec-db host. Presets are 'dev' (default), 'test', 'prod', otherwise give a full url like https://sdc.astron.nl:5554/ldvspec/api/v1")
parser.add_argument("--configuration", default='~/shared/ldv_migrate.cfg',
help="Configuration file containing tunnel and ldvadmin database credentials")
parser.add_argument("-s", "--section", default='postgresql-ldv',
help="Add the configuration's section from the database.cfg.")
# Have payload of more millions will most likely not work
# tested with 10.000 results in 90 seconds so # 11 mil. will be at least 28 hours
parser.add_argument("-r", "--max_nbr_dps_to_insert_per_request", default=1000, type=int,
help="The number of dataproducts to insert per REST request (0 is no limit)", action="store")
args = parser.parse_args()
if args.version:
# Get file's Last modification time stamp only in terms of seconds since epoch and convert in timestamp
modification_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(os.path.getmtime(__file__)))
logging.info("--- {} (last updated {}) ---".format(os.path.basename(__file__), modification_time))
return
logging.info("\nMoment please....\n")
if args.verbose:
change_logging_level(logging.DEBUG)
if args.limit == 0:
limit_str = ""
else:
limit_str = "limit {}".format(args.limit)
logging.debug("Limit on number of dataproducts to query is set to {}".format(args.limit))
if args.max_nbr_dps_to_insert_per_request == 0:
no_limit_to_insert = True
else:
no_limit_to_insert = False
logging.debug("Limit on number of dataproducts to insert REST is set to {}".format(args.max_nbr_dps_to_insert_per_request))
query_count_all_raw_dataproducts = "select count(*) from astrowise.raw_dataproducts"
query_count_all_pipeline_dataproducts = "select count(*) from astrowise.pl_dataproducts"
query_all_required_fields_raw_dataproducts = \
"select obsid, obsid_source, dp_type, project, activity, uri, size, dysco from astrowise.raw_dataproducts {}"\
.format(limit_str)
# Create connection using ssh tunnel with the ldvadmin database
conn, tunnel = connector.connect_postgresql(args.configuration, args.section)
count_raw_dps = execute_query(conn, query_count_all_raw_dataproducts)[0][0]
count_pl_dps = execute_query(conn, query_count_all_pipeline_dataproducts)[0][0]
logging.info(f"There are {count_raw_dps} raw dataproducts and {count_pl_dps} pipeline dataproduct in the ldvadmin.astrowise table!!")
result_query_all_dps = execute_query(conn, query_all_required_fields_raw_dataproducts)
# Are there still dataproducts left to query?
nbr_dps_left = len(result_query_all_dps) - args.limit
if nbr_dps_left > 0 and args.limit > 0:
logging.debug("Limit on number of leftover dataproducts to query is set to {}".format(nbr_dps_left))
limit_str = "limit {}".format(nbr_dps_left)
query_all_required_fields_pipeline_dataproducts = \
"select obsid, obsid_source, dp_type, project, activity, uri, size, dysco from astrowise.pl_dataproducts {}" \
.format(limit_str)
result_query_all_dps.extend(execute_query(conn, query_all_required_fields_pipeline_dataproducts))
logging.info("{} dataproduct retrieved from ldvadmin".format(len(result_query_all_dps)))
# Create connection with ldv-spec-db using REST API, use temp. token created in my test-env
ldvspec_interface = LDVSpecInterface(args.host, args.token)
# Now obtain the attributes from result an add to database using REST insert_dataproduct method
lst_all_dps = []
for dps in result_query_all_dps:
logging.debug(dps)
metadata_str = "{'dysco_compression': %s}" % dps[7]
dps_dict = {"obs_id": dps[0], "oid_source": dps[1], "dataproduct_source": "LOFAR LTA",
"dataproduct_type": dps[2], "project": dps[3], "activity": dps[4], "surl": dps[5],
"filesize": dps[6], "additional_meta": metadata_str, "location": dps[5]}
lst_all_dps.append(dps_dict)
if no_limit_to_insert:
res_lst_ids = ldvspec_interface.insert_multiple_dataproduct(payload=lst_all_dps)
logging.info("Added {} DataProduct objects".format(len(res_lst_ids)))
logging.debug("Added with ids={}".format(res_lst_ids))
else:
cnt = 0
nbr_required_inserts = math.ceil(len(lst_all_dps)/args.max_nbr_dps_to_insert_per_request)
for cnt in range(nbr_required_inserts):
start = cnt * args.max_nbr_dps_to_insert_per_request
end = start + args.max_nbr_dps_to_insert_per_request
cnt += 1
res_lst_ids = ldvspec_interface.insert_multiple_dataproduct(payload=lst_all_dps[start:end])
logging.info("Insert count {} of {}: Added {} DataProduct objects [{} till {}]".
format(cnt, nbr_required_inserts, args.max_nbr_dps_to_insert_per_request, start, end))
logging.debug("Added with ids={}".format(res_lst_ids))
logging.info("\nThat's All Folks!\n")
if __name__ == "__main__":
main()
sshtunnel==0.4.0
requests==2.28.1
psycopg2-binary==2.9.3
\ No newline at end of file
from setuptools import setup, find_packages
def get_requirements() -> str:
with open("requirements.txt") as reqs:
return reqs.read().split("\n")
setup(name='ldvspec-migration',
version='1.0.0',
description='Migration script to copy data from ldvadmin to ldv-spec-db',
url='https://git.astron.nl/astron-sdc/ldv-specification/-/tree/main/ldvspec/scripts',
author='Roy de Goei, Fanna Lautenback, Nico Vermaas',
author_email='vermaas@astron.nl',
license='Apache 2.0',
install_requires=get_requirements(),
packages=find_packages(),
py_modules = ['ldv_migrate','ldv_migrate.connection'],
include_package_data=True,
entry_points={
'console_scripts': [
'ldv_migrate=ldv_migrate.migrate_ldvadmin_to_ldvspec:main',
],
},
)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment