diff --git a/ldv_migrate/ldv_migrate/connection/config.py b/ldv_migrate/ldv_migrate/connection/config.py index 36fac48e7dd550efd41d9a5a8a357c45d4af1b0d..d73f07f62ae03345dc149dcecca08fa6e846fcc2 100644 --- a/ldv_migrate/ldv_migrate/connection/config.py +++ b/ldv_migrate/ldv_migrate/connection/config.py @@ -3,7 +3,7 @@ import os def read_config(configuration_file, section): - full_filename = os.path.join(os.path.dirname(__file__), configuration_file) + full_filename = os.path.expanduser(configuration_file) parser = ConfigParser() read_result = parser.read(full_filename) # If file not found then parser returns just an empty list it does not raise an Exception! @@ -16,6 +16,6 @@ def read_config(configuration_file, section): for param in params: db_settings[param[0]] = param[1] else: - raise Exception('Section {0} not found in the {1} file'.format(section, filename)) + raise Exception('Section {0} not found in the {1} file'.format(section, full_filename)) return db_settings diff --git a/ldv_migrate/ldv_migrate/ldv_specification_interface.py b/ldv_migrate/ldv_migrate/ldv_specification_interface.py index 1ef670ff3284e5da8deff620211cb81d3c0ecb4e..9a59881843038eafd9e6932e728901cc608356e7 100644 --- a/ldv_migrate/ldv_migrate/ldv_specification_interface.py +++ b/ldv_migrate/ldv_migrate/ldv_specification_interface.py @@ -14,9 +14,8 @@ try: except ModuleNotFoundError: from json.decoder import JSONDecodeError as SimpleJSONDecodeError -import argparse -import datetime from urllib.parse import urlparse, urlunparse + # ============================================================== # The request header REQUEST_HEADER = { @@ -31,6 +30,7 @@ LDV_HOST_PROD = "https://sdc.astron.nl/ldvspec/api/v1" # the ldv sdc production class APIException(Exception): pass + class APIMissing(Exception): pass @@ -59,10 +59,11 @@ def can_retry(response: requests.Response): return False -class LDVSpecInterface(): +class LDVSpecInterface: """ This class is used to connect via REST interface """ + def __init__(self, host, token): """ Constructor. @@ -155,4 +156,4 @@ class LDVSpecInterface(): response_lst_ids.append(resp['id']) return response_lst_ids except Exception as err: - raise err \ No newline at end of file + raise err diff --git a/ldv_migrate/ldv_migrate/migrate_ldvadmin_to_ldvspec.py b/ldv_migrate/ldv_migrate/migrate_ldvadmin_to_ldvspec.py index 3966ad13b43b77372675745554f9cbdf08e05291..1ae4251324fc500557105b35d4f6ede2df971075 100644 --- a/ldv_migrate/ldv_migrate/migrate_ldvadmin_to_ldvspec.py +++ b/ldv_migrate/ldv_migrate/migrate_ldvadmin_to_ldvspec.py @@ -23,7 +23,6 @@ import argparse import sys import math - import ldv_migrate.connection.retrieve_db_connection as connector from ldv_migrate.ldv_specification_interface import LDVSpecInterface @@ -42,7 +41,7 @@ def change_logging_level(level): handler.setLevel(level) -def execute_query(connection, sql_query, data=None): +def execute_query(connection, sql_query, data=None, batch_size=1000): """ Execute query of the ldvadmin Database There is no commit required because we are doing only read queries @@ -55,11 +54,57 @@ def execute_query(connection, sql_query, data=None): cursor = connection.cursor() cursor.execute(sql_query, data) connection.commit() - return cursor.fetchall() + batch_rows = True + while batch_rows: + batch_rows = cursor.fetchmany(batch_size) + for row in batch_rows: + yield row + except Exception as exp: logger.error("Could not execute query! '{}' results in -> {}".format(sql_query, exp)) +def row_to_dict(dps): + dps_dict = {"obs_id": dps[0], "oid_source": dps[1], "dataproduct_source": "LOFAR LTA", + "dataproduct_type": dps[2], "project": dps[3], "activity": dps[4], "surl": dps[5], + "filesize": dps[6], "additional_meta": {"dysco_compression": dps[7]}, "location": dps[5]} + return dps_dict + + +def query_and_insert(connection, ldvspec_interface, + number_of_dataproducts, + query, no_limit_to_insert, insert_batch_size): + results_generator = execute_query(connection, query) + + if no_limit_to_insert: + lst_all_dps = [row_to_dict(row) for row in results_generator] + res_lst_ids = ldvspec_interface.insert_multiple_dataproduct(payload=lst_all_dps) + logging.info("Added %s DataProduct objects", len(res_lst_ids)) + logging.debug("Added with ids=%s", res_lst_ids) + return len(lst_all_dps) + else: + nbr_required_inserts = math.ceil(number_of_dataproducts / insert_batch_size) + total_count = 0 + for cnt in range(nbr_required_inserts): + start = cnt * insert_batch_size + end = start + insert_batch_size + + lst_dps = [] + for _ in range(insert_batch_size): + row = next(results_generator, None) + if row: + lst_dps.append(row_to_dict(row)) + else: + break + total_count += len(lst_dps) + res_lst_ids = ldvspec_interface.insert_multiple_dataproduct(payload=lst_dps) + logging.info("Insert count %s of %s: Added %s DataProduct objects [%s till %s]", cnt, nbr_required_inserts, + insert_batch_size, start, end) + logging.debug("Added with ids=%s", res_lst_ids) + + return total_count + + def main(): """ Migrates data from the ldvadmin database to a ldv-spec-db database. @@ -70,7 +115,8 @@ def main(): parser = argparse.ArgumentParser() parser.add_argument("--version", default=False, help="Show current version of this program", action="store_true") parser.add_argument("-v", "--verbose", default=False, help="More information at run time.", action="store_true") - parser.add_argument("-l", "--limit", default=0, type=int, help="Limit on the number of queries (0 is no limit)", action="store") + parser.add_argument("-l", "--limit", default=0, type=int, help="Limit on the number of queries (0 is no limit)", + action="store") parser.add_argument("--host", nargs="?", default='dev', help="The ldv-spec-db host. Presets are 'dev' (default), 'test', 'prod', otherwise give a full url like https://sdc.astron.nl:5554/ldvspec/api/v1") @@ -92,7 +138,7 @@ def main(): logging.info("--- {} (last updated {}) ---".format(os.path.basename(__file__), modification_time)) return - logging.info("\nMoment please....\n") + print("\nMoment please....\n") if args.verbose: change_logging_level(logging.DEBUG) @@ -107,59 +153,49 @@ def main(): no_limit_to_insert = True else: no_limit_to_insert = False - logging.debug("Limit on number of dataproducts to insert REST is set to {}".format(args.max_nbr_dps_to_insert_per_request)) + logging.debug("Limit on number of dataproducts to insert REST is set to {}".format( + args.max_nbr_dps_to_insert_per_request)) + + # Create connection with ldv-spec-db using REST API, use temp. token created in my test-env + ldvspec_interface = LDVSpecInterface(args.host, args.token) query_count_all_raw_dataproducts = "select count(*) from astrowise.raw_dataproducts" query_count_all_pipeline_dataproducts = "select count(*) from astrowise.pl_dataproducts" query_all_required_fields_raw_dataproducts = \ - "select obsid, obsid_source, dp_type, project, activity, uri, size, dysco from astrowise.raw_dataproducts {}"\ + "select obsid, obsid_source, dp_type, project, activity, uri, size, dysco from astrowise.raw_dataproducts order by id {}" \ .format(limit_str) # Create connection using ssh tunnel with the ldvadmin database conn, tunnel = connector.connect_postgresql(args.configuration, args.section) - count_raw_dps = execute_query(conn, query_count_all_raw_dataproducts)[0][0] - count_pl_dps = execute_query(conn, query_count_all_pipeline_dataproducts)[0][0] - logging.info(f"There are {count_raw_dps} raw dataproducts and {count_pl_dps} pipeline dataproduct in the ldvadmin.astrowise table!!") - result_query_all_dps = execute_query(conn, query_all_required_fields_raw_dataproducts) + count_raw_dps = next(execute_query(conn, query_count_all_raw_dataproducts))[0] + count_pl_dps = next(execute_query(conn, query_count_all_pipeline_dataproducts))[0] + logging.info( + "There are %s raw dataproducts and %s pipeline dataproduct in the ldvadmin.astrowise table!!", + count_raw_dps, count_pl_dps + ) + + n_inserted_items = query_and_insert(conn, ldvspec_interface, count_raw_dps, + query_all_required_fields_raw_dataproducts, + no_limit_to_insert, args.max_nbr_dps_to_insert_per_request) # Are there still dataproducts left to query? - nbr_dps_left = len(result_query_all_dps) - args.limit + nbr_dps_left = args.limits - n_inserted_items + if nbr_dps_left > 0 and args.limit > 0: logging.debug("Limit on number of leftover dataproducts to query is set to {}".format(nbr_dps_left)) limit_str = "limit {}".format(nbr_dps_left) - query_all_required_fields_pipeline_dataproducts = \ - "select obsid, obsid_source, dp_type, project, activity, uri, size, dysco from astrowise.pl_dataproducts {}" \ - .format(limit_str) - result_query_all_dps.extend(execute_query(conn, query_all_required_fields_pipeline_dataproducts)) - - logging.info("{} dataproduct retrieved from ldvadmin".format(len(result_query_all_dps))) - # Create connection with ldv-spec-db using REST API, use temp. token created in my test-env - ldvspec_interface = LDVSpecInterface(args.host, args.token) + else: + limit_str = "" + query_all_required_fields_pipeline_dataproducts = \ + "select obsid, obsid_source, dp_type, project, activity, uri, size, dysco from astrowise.pl_dataproducts order by id {}" \ + .format(limit_str) + n_inserted_items += query_and_insert(conn, ldvspec_interface, count_pl_dps, + query_all_required_fields_pipeline_dataproducts, + no_limit_to_insert, args.max_nbr_dps_to_insert_per_request) - # Now obtain the attributes from result an add to database using REST insert_dataproduct method - lst_all_dps = [] - for dps in result_query_all_dps: - logging.debug(dps) - dps_dict = {"obs_id": dps[0], "oid_source": dps[1], "dataproduct_source": "LOFAR LTA", - "dataproduct_type": dps[2], "project": dps[3], "activity": dps[4], "surl": dps[5], - "filesize": dps[6], "additional_meta": {"dysco_compression": dps[7]}, "location": dps[5]} - lst_all_dps.append(dps_dict) + logging.info("%s dataproduct retrieved from ldvadmin", n_inserted_items) - if no_limit_to_insert: - res_lst_ids = ldvspec_interface.insert_multiple_dataproduct(payload=lst_all_dps) - logging.info("Added {} DataProduct objects".format(len(res_lst_ids))) - logging.debug("Added with ids={}".format(res_lst_ids)) - else: - nbr_required_inserts = math.ceil(len(lst_all_dps)/args.max_nbr_dps_to_insert_per_request) - for cnt in range(nbr_required_inserts): - start = cnt * args.max_nbr_dps_to_insert_per_request - end = start + args.max_nbr_dps_to_insert_per_request - res_lst_ids = ldvspec_interface.insert_multiple_dataproduct(payload=lst_all_dps[start:end]) - logging.info("Insert count {} of {}: Added {} DataProduct objects [{} till {}]". - format(cnt, nbr_required_inserts, args.max_nbr_dps_to_insert_per_request, start, end)) - logging.debug("Added with ids={}".format(res_lst_ids)) - - logging.info("\nThat's All Folks!\n") + print("\nThat's All Folks!\n") if __name__ == "__main__":