Skip to content
Snippets Groups Projects
Commit aec41272 authored by Mattia Mancini's avatar Mattia Mancini
Browse files

Refactor to reduce memory usage and fix bug on querying only raw data if not limit is specified

parent bd699354
No related branches found
No related tags found
1 merge request!11Refactor to reduce memory usage and fix bug on querying only raw data if not limit is specified
...@@ -23,7 +23,6 @@ import argparse ...@@ -23,7 +23,6 @@ import argparse
import sys import sys
import math import math
import ldv_migrate.connection.retrieve_db_connection as connector import ldv_migrate.connection.retrieve_db_connection as connector
from ldv_migrate.ldv_specification_interface import LDVSpecInterface from ldv_migrate.ldv_specification_interface import LDVSpecInterface
...@@ -42,7 +41,7 @@ def change_logging_level(level): ...@@ -42,7 +41,7 @@ def change_logging_level(level):
handler.setLevel(level) handler.setLevel(level)
def execute_query(connection, sql_query, data=None): def execute_query(connection, sql_query, data=None, batch_size=1000):
""" """
Execute query of the ldvadmin Database Execute query of the ldvadmin Database
There is no commit required because we are doing only read queries There is no commit required because we are doing only read queries
...@@ -55,11 +54,58 @@ def execute_query(connection, sql_query, data=None): ...@@ -55,11 +54,58 @@ def execute_query(connection, sql_query, data=None):
cursor = connection.cursor() cursor = connection.cursor()
cursor.execute(sql_query, data) cursor.execute(sql_query, data)
connection.commit() connection.commit()
return cursor.fetchall() while True:
batch_rows = cursor.fetchmany(batch_size)
if batch_rows:
for row in batch_rows:
yield row
else:
break
except Exception as exp: except Exception as exp:
logger.error("Could not execute query! '{}' results in -> {}".format(sql_query, exp)) logger.error("Could not execute query! '{}' results in -> {}".format(sql_query, exp))
def row_to_dict(dps):
dps_dict = {"obs_id": dps[0], "oid_source": dps[1], "dataproduct_source": "LOFAR LTA",
"dataproduct_type": dps[2], "project": dps[3], "activity": dps[4], "surl": dps[5],
"filesize": dps[6], "additional_meta": {"dysco_compression": dps[7]}, "location": dps[5]}
return dps_dict
def query_and_insert(connection, ldvspec_interface,
number_of_dataproducts,
query, no_limit_to_insert, insert_batch_size):
results_generator = execute_query(connection, query)
if no_limit_to_insert:
lst_all_dps = [row_to_dict(row) for row in results_generator]
res_lst_ids = ldvspec_interface.insert_multiple_dataproduct(payload=lst_all_dps)
logging.info("Added %s DataProduct objects", len(res_lst_ids))
logging.debug("Added with ids=%s", res_lst_ids)
return len(lst_all_dps)
else:
nbr_required_inserts = math.ceil(number_of_dataproducts / insert_batch_size)
total_count = 0
for cnt in range(nbr_required_inserts):
start = cnt * insert_batch_size
end = start + insert_batch_size
lst_dps = []
for _ in range(insert_batch_size):
row = next(results_generator, None)
if row:
lst_dps.append(row_to_dict(row))
else:
break
total_count += len(lst_dps)
res_lst_ids = ldvspec_interface.insert_multiple_dataproduct(payload=lst_dps)
logging.info("Insert count %s of %s: Added %s DataProduct objects [%s till %s]", cnt, nbr_required_inserts,
insert_batch_size, start, end)
logging.debug("Added with ids=%s", res_lst_ids)
return total_count
def main(): def main():
""" """
Migrates data from the ldvadmin database to a ldv-spec-db database. Migrates data from the ldvadmin database to a ldv-spec-db database.
...@@ -70,7 +116,8 @@ def main(): ...@@ -70,7 +116,8 @@ def main():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("--version", default=False, help="Show current version of this program", action="store_true") parser.add_argument("--version", default=False, help="Show current version of this program", action="store_true")
parser.add_argument("-v", "--verbose", default=False, help="More information at run time.", action="store_true") parser.add_argument("-v", "--verbose", default=False, help="More information at run time.", action="store_true")
parser.add_argument("-l", "--limit", default=0, type=int, help="Limit on the number of queries (0 is no limit)", action="store") parser.add_argument("-l", "--limit", default=0, type=int, help="Limit on the number of queries (0 is no limit)",
action="store")
parser.add_argument("--host", nargs="?", default='dev', parser.add_argument("--host", nargs="?", default='dev',
help="The ldv-spec-db host. Presets are 'dev' (default), 'test', 'prod', otherwise give a full url like https://sdc.astron.nl:5554/ldvspec/api/v1") help="The ldv-spec-db host. Presets are 'dev' (default), 'test', 'prod', otherwise give a full url like https://sdc.astron.nl:5554/ldvspec/api/v1")
...@@ -107,7 +154,11 @@ def main(): ...@@ -107,7 +154,11 @@ def main():
no_limit_to_insert = True no_limit_to_insert = True
else: else:
no_limit_to_insert = False no_limit_to_insert = False
logging.debug("Limit on number of dataproducts to insert REST is set to {}".format(args.max_nbr_dps_to_insert_per_request)) logging.debug("Limit on number of dataproducts to insert REST is set to {}".format(
args.max_nbr_dps_to_insert_per_request))
# Create connection with ldv-spec-db using REST API, use temp. token created in my test-env
ldvspec_interface = LDVSpecInterface(args.host, args.token)
query_count_all_raw_dataproducts = "select count(*) from astrowise.raw_dataproducts" query_count_all_raw_dataproducts = "select count(*) from astrowise.raw_dataproducts"
query_count_all_pipeline_dataproducts = "select count(*) from astrowise.pl_dataproducts" query_count_all_pipeline_dataproducts = "select count(*) from astrowise.pl_dataproducts"
...@@ -119,47 +170,33 @@ def main(): ...@@ -119,47 +170,33 @@ def main():
conn, tunnel = connector.connect_postgresql(args.configuration, args.section) conn, tunnel = connector.connect_postgresql(args.configuration, args.section)
count_raw_dps = execute_query(conn, query_count_all_raw_dataproducts)[0][0] count_raw_dps = execute_query(conn, query_count_all_raw_dataproducts)[0][0]
count_pl_dps = execute_query(conn, query_count_all_pipeline_dataproducts)[0][0] count_pl_dps = execute_query(conn, query_count_all_pipeline_dataproducts)[0][0]
logging.info(f"There are {count_raw_dps} raw dataproducts and {count_pl_dps} pipeline dataproduct in the ldvadmin.astrowise table!!") logging.info(
result_query_all_dps = execute_query(conn, query_all_required_fields_raw_dataproducts) "There are %s raw dataproducts and %s pipeline dataproduct in the ldvadmin.astrowise table!!",
count_raw_dps, count_pl_dps
)
n_inserted_items = query_and_insert(conn, ldvspec_interface, count_raw_dps,
query_all_required_fields_raw_dataproducts,
no_limit_to_insert, args.max_nbr_dps_to_insert_per_request)
# Are there still dataproducts left to query? # Are there still dataproducts left to query?
nbr_dps_left = len(result_query_all_dps) - args.limit nbr_dps_left = args.limits - n_inserted_items
if nbr_dps_left > 0 and args.limit > 0: if nbr_dps_left > 0 and args.limit > 0:
logging.debug("Limit on number of leftover dataproducts to query is set to {}".format(nbr_dps_left)) logging.debug("Limit on number of leftover dataproducts to query is set to {}".format(nbr_dps_left))
limit_str = "limit {}".format(nbr_dps_left) limit_str = "limit {}".format(nbr_dps_left)
else:
limit_str = ""
query_all_required_fields_pipeline_dataproducts = \ query_all_required_fields_pipeline_dataproducts = \
"select obsid, obsid_source, dp_type, project, activity, uri, size, dysco from astrowise.pl_dataproducts {}" \ "select obsid, obsid_source, dp_type, project, activity, uri, size, dysco from astrowise.pl_dataproducts {}" \
.format(limit_str) .format(limit_str)
result_query_all_dps.extend(execute_query(conn, query_all_required_fields_pipeline_dataproducts)) n_inserted_items += query_and_insert(conn, ldvspec_interface, count_pl_dps,
query_all_required_fields_pipeline_dataproducts,
logging.info("{} dataproduct retrieved from ldvadmin".format(len(result_query_all_dps))) no_limit_to_insert, args.max_nbr_dps_to_insert_per_request)
# Create connection with ldv-spec-db using REST API, use temp. token created in my test-env
ldvspec_interface = LDVSpecInterface(args.host, args.token)
# Now obtain the attributes from result an add to database using REST insert_dataproduct method logging.info("%s dataproduct retrieved from ldvadmin", n_inserted_items)
lst_all_dps = []
for dps in result_query_all_dps:
logging.debug(dps)
dps_dict = {"obs_id": dps[0], "oid_source": dps[1], "dataproduct_source": "LOFAR LTA",
"dataproduct_type": dps[2], "project": dps[3], "activity": dps[4], "surl": dps[5],
"filesize": dps[6], "additional_meta": {"dysco_compression": dps[7]}, "location": dps[5]}
lst_all_dps.append(dps_dict)
if no_limit_to_insert: print("\nThat's All Folks!\n")
res_lst_ids = ldvspec_interface.insert_multiple_dataproduct(payload=lst_all_dps)
logging.info("Added {} DataProduct objects".format(len(res_lst_ids)))
logging.debug("Added with ids={}".format(res_lst_ids))
else:
nbr_required_inserts = math.ceil(len(lst_all_dps)/args.max_nbr_dps_to_insert_per_request)
for cnt in range(nbr_required_inserts):
start = cnt * args.max_nbr_dps_to_insert_per_request
end = start + args.max_nbr_dps_to_insert_per_request
res_lst_ids = ldvspec_interface.insert_multiple_dataproduct(payload=lst_all_dps[start:end])
logging.info("Insert count {} of {}: Added {} DataProduct objects [{} till {}]".
format(cnt, nbr_required_inserts, args.max_nbr_dps_to_insert_per_request, start, end))
logging.debug("Added with ids={}".format(res_lst_ids))
logging.info("\nThat's All Folks!\n")
if __name__ == "__main__": if __name__ == "__main__":
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment