diff --git a/ldv_migrate/ldv_migrate/migrate_ldvadmin_to_ldvspec.py b/ldv_migrate/ldv_migrate/migrate_ldvadmin_to_ldvspec.py index 3966ad13b43b77372675745554f9cbdf08e05291..cb881587cb668d2a3f262eff0ed69c164a69074f 100644 --- a/ldv_migrate/ldv_migrate/migrate_ldvadmin_to_ldvspec.py +++ b/ldv_migrate/ldv_migrate/migrate_ldvadmin_to_ldvspec.py @@ -23,7 +23,6 @@ import argparse import sys import math - import ldv_migrate.connection.retrieve_db_connection as connector from ldv_migrate.ldv_specification_interface import LDVSpecInterface @@ -42,7 +41,7 @@ def change_logging_level(level): handler.setLevel(level) -def execute_query(connection, sql_query, data=None): +def execute_query(connection, sql_query, data=None, batch_size=1000): """ Execute query of the ldvadmin Database There is no commit required because we are doing only read queries @@ -55,11 +54,58 @@ def execute_query(connection, sql_query, data=None): cursor = connection.cursor() cursor.execute(sql_query, data) connection.commit() - return cursor.fetchall() + while True: + batch_rows = cursor.fetchmany(batch_size) + if batch_rows: + for row in batch_rows: + yield row + else: + break except Exception as exp: logger.error("Could not execute query! '{}' results in -> {}".format(sql_query, exp)) +def row_to_dict(dps): + dps_dict = {"obs_id": dps[0], "oid_source": dps[1], "dataproduct_source": "LOFAR LTA", + "dataproduct_type": dps[2], "project": dps[3], "activity": dps[4], "surl": dps[5], + "filesize": dps[6], "additional_meta": {"dysco_compression": dps[7]}, "location": dps[5]} + return dps_dict + + +def query_and_insert(connection, ldvspec_interface, + number_of_dataproducts, + query, no_limit_to_insert, insert_batch_size): + results_generator = execute_query(connection, query) + + if no_limit_to_insert: + lst_all_dps = [row_to_dict(row) for row in results_generator] + res_lst_ids = ldvspec_interface.insert_multiple_dataproduct(payload=lst_all_dps) + logging.info("Added %s DataProduct objects", len(res_lst_ids)) + logging.debug("Added with ids=%s", res_lst_ids) + return len(lst_all_dps) + else: + nbr_required_inserts = math.ceil(number_of_dataproducts / insert_batch_size) + total_count = 0 + for cnt in range(nbr_required_inserts): + start = cnt * insert_batch_size + end = start + insert_batch_size + + lst_dps = [] + for _ in range(insert_batch_size): + row = next(results_generator, None) + if row: + lst_dps.append(row_to_dict(row)) + else: + break + total_count += len(lst_dps) + res_lst_ids = ldvspec_interface.insert_multiple_dataproduct(payload=lst_dps) + logging.info("Insert count %s of %s: Added %s DataProduct objects [%s till %s]", cnt, nbr_required_inserts, + insert_batch_size, start, end) + logging.debug("Added with ids=%s", res_lst_ids) + + return total_count + + def main(): """ Migrates data from the ldvadmin database to a ldv-spec-db database. @@ -70,7 +116,8 @@ def main(): parser = argparse.ArgumentParser() parser.add_argument("--version", default=False, help="Show current version of this program", action="store_true") parser.add_argument("-v", "--verbose", default=False, help="More information at run time.", action="store_true") - parser.add_argument("-l", "--limit", default=0, type=int, help="Limit on the number of queries (0 is no limit)", action="store") + parser.add_argument("-l", "--limit", default=0, type=int, help="Limit on the number of queries (0 is no limit)", + action="store") parser.add_argument("--host", nargs="?", default='dev', help="The ldv-spec-db host. Presets are 'dev' (default), 'test', 'prod', otherwise give a full url like https://sdc.astron.nl:5554/ldvspec/api/v1") @@ -107,59 +154,49 @@ def main(): no_limit_to_insert = True else: no_limit_to_insert = False - logging.debug("Limit on number of dataproducts to insert REST is set to {}".format(args.max_nbr_dps_to_insert_per_request)) + logging.debug("Limit on number of dataproducts to insert REST is set to {}".format( + args.max_nbr_dps_to_insert_per_request)) + + # Create connection with ldv-spec-db using REST API, use temp. token created in my test-env + ldvspec_interface = LDVSpecInterface(args.host, args.token) query_count_all_raw_dataproducts = "select count(*) from astrowise.raw_dataproducts" query_count_all_pipeline_dataproducts = "select count(*) from astrowise.pl_dataproducts" query_all_required_fields_raw_dataproducts = \ - "select obsid, obsid_source, dp_type, project, activity, uri, size, dysco from astrowise.raw_dataproducts {}"\ + "select obsid, obsid_source, dp_type, project, activity, uri, size, dysco from astrowise.raw_dataproducts {}" \ .format(limit_str) # Create connection using ssh tunnel with the ldvadmin database conn, tunnel = connector.connect_postgresql(args.configuration, args.section) count_raw_dps = execute_query(conn, query_count_all_raw_dataproducts)[0][0] count_pl_dps = execute_query(conn, query_count_all_pipeline_dataproducts)[0][0] - logging.info(f"There are {count_raw_dps} raw dataproducts and {count_pl_dps} pipeline dataproduct in the ldvadmin.astrowise table!!") - result_query_all_dps = execute_query(conn, query_all_required_fields_raw_dataproducts) + logging.info( + "There are %s raw dataproducts and %s pipeline dataproduct in the ldvadmin.astrowise table!!", + count_raw_dps, count_pl_dps + ) + + n_inserted_items = query_and_insert(conn, ldvspec_interface, count_raw_dps, + query_all_required_fields_raw_dataproducts, + no_limit_to_insert, args.max_nbr_dps_to_insert_per_request) # Are there still dataproducts left to query? - nbr_dps_left = len(result_query_all_dps) - args.limit + nbr_dps_left = args.limits - n_inserted_items + if nbr_dps_left > 0 and args.limit > 0: logging.debug("Limit on number of leftover dataproducts to query is set to {}".format(nbr_dps_left)) limit_str = "limit {}".format(nbr_dps_left) - query_all_required_fields_pipeline_dataproducts = \ - "select obsid, obsid_source, dp_type, project, activity, uri, size, dysco from astrowise.pl_dataproducts {}" \ - .format(limit_str) - result_query_all_dps.extend(execute_query(conn, query_all_required_fields_pipeline_dataproducts)) - - logging.info("{} dataproduct retrieved from ldvadmin".format(len(result_query_all_dps))) - # Create connection with ldv-spec-db using REST API, use temp. token created in my test-env - ldvspec_interface = LDVSpecInterface(args.host, args.token) + else: + limit_str = "" + query_all_required_fields_pipeline_dataproducts = \ + "select obsid, obsid_source, dp_type, project, activity, uri, size, dysco from astrowise.pl_dataproducts {}" \ + .format(limit_str) + n_inserted_items += query_and_insert(conn, ldvspec_interface, count_pl_dps, + query_all_required_fields_pipeline_dataproducts, + no_limit_to_insert, args.max_nbr_dps_to_insert_per_request) - # Now obtain the attributes from result an add to database using REST insert_dataproduct method - lst_all_dps = [] - for dps in result_query_all_dps: - logging.debug(dps) - dps_dict = {"obs_id": dps[0], "oid_source": dps[1], "dataproduct_source": "LOFAR LTA", - "dataproduct_type": dps[2], "project": dps[3], "activity": dps[4], "surl": dps[5], - "filesize": dps[6], "additional_meta": {"dysco_compression": dps[7]}, "location": dps[5]} - lst_all_dps.append(dps_dict) + logging.info("%s dataproduct retrieved from ldvadmin", n_inserted_items) - if no_limit_to_insert: - res_lst_ids = ldvspec_interface.insert_multiple_dataproduct(payload=lst_all_dps) - logging.info("Added {} DataProduct objects".format(len(res_lst_ids))) - logging.debug("Added with ids={}".format(res_lst_ids)) - else: - nbr_required_inserts = math.ceil(len(lst_all_dps)/args.max_nbr_dps_to_insert_per_request) - for cnt in range(nbr_required_inserts): - start = cnt * args.max_nbr_dps_to_insert_per_request - end = start + args.max_nbr_dps_to_insert_per_request - res_lst_ids = ldvspec_interface.insert_multiple_dataproduct(payload=lst_all_dps[start:end]) - logging.info("Insert count {} of {}: Added {} DataProduct objects [{} till {}]". - format(cnt, nbr_required_inserts, args.max_nbr_dps_to_insert_per_request, start, end)) - logging.debug("Added with ids={}".format(res_lst_ids)) - - logging.info("\nThat's All Folks!\n") + print("\nThat's All Folks!\n") if __name__ == "__main__":