Skip to content
Snippets Groups Projects
Commit 35957845 authored by Mattia Mancini's avatar Mattia Mancini
Browse files

Merge branch 'fix_out_of_memory' into 'master'

Refactor to reduce memory usage and fix bug on querying only raw data if not limit is specified

See merge request !11
parents bd699354 3463b1cc
No related branches found
No related tags found
1 merge request!11Refactor to reduce memory usage and fix bug on querying only raw data if not limit is specified
...@@ -3,7 +3,7 @@ import os ...@@ -3,7 +3,7 @@ import os
def read_config(configuration_file, section): def read_config(configuration_file, section):
full_filename = os.path.join(os.path.dirname(__file__), configuration_file) full_filename = os.path.expanduser(configuration_file)
parser = ConfigParser() parser = ConfigParser()
read_result = parser.read(full_filename) read_result = parser.read(full_filename)
# If file not found then parser returns just an empty list it does not raise an Exception! # If file not found then parser returns just an empty list it does not raise an Exception!
...@@ -16,6 +16,6 @@ def read_config(configuration_file, section): ...@@ -16,6 +16,6 @@ def read_config(configuration_file, section):
for param in params: for param in params:
db_settings[param[0]] = param[1] db_settings[param[0]] = param[1]
else: else:
raise Exception('Section {0} not found in the {1} file'.format(section, filename)) raise Exception('Section {0} not found in the {1} file'.format(section, full_filename))
return db_settings return db_settings
...@@ -14,9 +14,8 @@ try: ...@@ -14,9 +14,8 @@ try:
except ModuleNotFoundError: except ModuleNotFoundError:
from json.decoder import JSONDecodeError as SimpleJSONDecodeError from json.decoder import JSONDecodeError as SimpleJSONDecodeError
import argparse
import datetime
from urllib.parse import urlparse, urlunparse from urllib.parse import urlparse, urlunparse
# ============================================================== # ==============================================================
# The request header # The request header
REQUEST_HEADER = { REQUEST_HEADER = {
...@@ -31,6 +30,7 @@ LDV_HOST_PROD = "https://sdc.astron.nl/ldvspec/api/v1" # the ldv sdc production ...@@ -31,6 +30,7 @@ LDV_HOST_PROD = "https://sdc.astron.nl/ldvspec/api/v1" # the ldv sdc production
class APIException(Exception): class APIException(Exception):
pass pass
class APIMissing(Exception): class APIMissing(Exception):
pass pass
...@@ -59,10 +59,11 @@ def can_retry(response: requests.Response): ...@@ -59,10 +59,11 @@ def can_retry(response: requests.Response):
return False return False
class LDVSpecInterface(): class LDVSpecInterface:
""" """
This class is used to connect via REST interface This class is used to connect via REST interface
""" """
def __init__(self, host, token): def __init__(self, host, token):
""" """
Constructor. Constructor.
......
...@@ -23,7 +23,6 @@ import argparse ...@@ -23,7 +23,6 @@ import argparse
import sys import sys
import math import math
import ldv_migrate.connection.retrieve_db_connection as connector import ldv_migrate.connection.retrieve_db_connection as connector
from ldv_migrate.ldv_specification_interface import LDVSpecInterface from ldv_migrate.ldv_specification_interface import LDVSpecInterface
...@@ -42,7 +41,7 @@ def change_logging_level(level): ...@@ -42,7 +41,7 @@ def change_logging_level(level):
handler.setLevel(level) handler.setLevel(level)
def execute_query(connection, sql_query, data=None): def execute_query(connection, sql_query, data=None, batch_size=1000):
""" """
Execute query of the ldvadmin Database Execute query of the ldvadmin Database
There is no commit required because we are doing only read queries There is no commit required because we are doing only read queries
...@@ -55,11 +54,57 @@ def execute_query(connection, sql_query, data=None): ...@@ -55,11 +54,57 @@ def execute_query(connection, sql_query, data=None):
cursor = connection.cursor() cursor = connection.cursor()
cursor.execute(sql_query, data) cursor.execute(sql_query, data)
connection.commit() connection.commit()
return cursor.fetchall() batch_rows = True
while batch_rows:
batch_rows = cursor.fetchmany(batch_size)
for row in batch_rows:
yield row
except Exception as exp: except Exception as exp:
logger.error("Could not execute query! '{}' results in -> {}".format(sql_query, exp)) logger.error("Could not execute query! '{}' results in -> {}".format(sql_query, exp))
def row_to_dict(dps):
dps_dict = {"obs_id": dps[0], "oid_source": dps[1], "dataproduct_source": "LOFAR LTA",
"dataproduct_type": dps[2], "project": dps[3], "activity": dps[4], "surl": dps[5],
"filesize": dps[6], "additional_meta": {"dysco_compression": dps[7]}, "location": dps[5]}
return dps_dict
def query_and_insert(connection, ldvspec_interface,
number_of_dataproducts,
query, no_limit_to_insert, insert_batch_size):
results_generator = execute_query(connection, query)
if no_limit_to_insert:
lst_all_dps = [row_to_dict(row) for row in results_generator]
res_lst_ids = ldvspec_interface.insert_multiple_dataproduct(payload=lst_all_dps)
logging.info("Added %s DataProduct objects", len(res_lst_ids))
logging.debug("Added with ids=%s", res_lst_ids)
return len(lst_all_dps)
else:
nbr_required_inserts = math.ceil(number_of_dataproducts / insert_batch_size)
total_count = 0
for cnt in range(nbr_required_inserts):
start = cnt * insert_batch_size
end = start + insert_batch_size
lst_dps = []
for _ in range(insert_batch_size):
row = next(results_generator, None)
if row:
lst_dps.append(row_to_dict(row))
else:
break
total_count += len(lst_dps)
res_lst_ids = ldvspec_interface.insert_multiple_dataproduct(payload=lst_dps)
logging.info("Insert count %s of %s: Added %s DataProduct objects [%s till %s]", cnt, nbr_required_inserts,
insert_batch_size, start, end)
logging.debug("Added with ids=%s", res_lst_ids)
return total_count
def main(): def main():
""" """
Migrates data from the ldvadmin database to a ldv-spec-db database. Migrates data from the ldvadmin database to a ldv-spec-db database.
...@@ -70,7 +115,8 @@ def main(): ...@@ -70,7 +115,8 @@ def main():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("--version", default=False, help="Show current version of this program", action="store_true") parser.add_argument("--version", default=False, help="Show current version of this program", action="store_true")
parser.add_argument("-v", "--verbose", default=False, help="More information at run time.", action="store_true") parser.add_argument("-v", "--verbose", default=False, help="More information at run time.", action="store_true")
parser.add_argument("-l", "--limit", default=0, type=int, help="Limit on the number of queries (0 is no limit)", action="store") parser.add_argument("-l", "--limit", default=0, type=int, help="Limit on the number of queries (0 is no limit)",
action="store")
parser.add_argument("--host", nargs="?", default='dev', parser.add_argument("--host", nargs="?", default='dev',
help="The ldv-spec-db host. Presets are 'dev' (default), 'test', 'prod', otherwise give a full url like https://sdc.astron.nl:5554/ldvspec/api/v1") help="The ldv-spec-db host. Presets are 'dev' (default), 'test', 'prod', otherwise give a full url like https://sdc.astron.nl:5554/ldvspec/api/v1")
...@@ -92,7 +138,7 @@ def main(): ...@@ -92,7 +138,7 @@ def main():
logging.info("--- {} (last updated {}) ---".format(os.path.basename(__file__), modification_time)) logging.info("--- {} (last updated {}) ---".format(os.path.basename(__file__), modification_time))
return return
logging.info("\nMoment please....\n") print("\nMoment please....\n")
if args.verbose: if args.verbose:
change_logging_level(logging.DEBUG) change_logging_level(logging.DEBUG)
...@@ -107,59 +153,49 @@ def main(): ...@@ -107,59 +153,49 @@ def main():
no_limit_to_insert = True no_limit_to_insert = True
else: else:
no_limit_to_insert = False no_limit_to_insert = False
logging.debug("Limit on number of dataproducts to insert REST is set to {}".format(args.max_nbr_dps_to_insert_per_request)) logging.debug("Limit on number of dataproducts to insert REST is set to {}".format(
args.max_nbr_dps_to_insert_per_request))
# Create connection with ldv-spec-db using REST API, use temp. token created in my test-env
ldvspec_interface = LDVSpecInterface(args.host, args.token)
query_count_all_raw_dataproducts = "select count(*) from astrowise.raw_dataproducts" query_count_all_raw_dataproducts = "select count(*) from astrowise.raw_dataproducts"
query_count_all_pipeline_dataproducts = "select count(*) from astrowise.pl_dataproducts" query_count_all_pipeline_dataproducts = "select count(*) from astrowise.pl_dataproducts"
query_all_required_fields_raw_dataproducts = \ query_all_required_fields_raw_dataproducts = \
"select obsid, obsid_source, dp_type, project, activity, uri, size, dysco from astrowise.raw_dataproducts {}"\ "select obsid, obsid_source, dp_type, project, activity, uri, size, dysco from astrowise.raw_dataproducts order by id {}" \
.format(limit_str) .format(limit_str)
# Create connection using ssh tunnel with the ldvadmin database # Create connection using ssh tunnel with the ldvadmin database
conn, tunnel = connector.connect_postgresql(args.configuration, args.section) conn, tunnel = connector.connect_postgresql(args.configuration, args.section)
count_raw_dps = execute_query(conn, query_count_all_raw_dataproducts)[0][0] count_raw_dps = next(execute_query(conn, query_count_all_raw_dataproducts))[0]
count_pl_dps = execute_query(conn, query_count_all_pipeline_dataproducts)[0][0] count_pl_dps = next(execute_query(conn, query_count_all_pipeline_dataproducts))[0]
logging.info(f"There are {count_raw_dps} raw dataproducts and {count_pl_dps} pipeline dataproduct in the ldvadmin.astrowise table!!") logging.info(
result_query_all_dps = execute_query(conn, query_all_required_fields_raw_dataproducts) "There are %s raw dataproducts and %s pipeline dataproduct in the ldvadmin.astrowise table!!",
count_raw_dps, count_pl_dps
)
n_inserted_items = query_and_insert(conn, ldvspec_interface, count_raw_dps,
query_all_required_fields_raw_dataproducts,
no_limit_to_insert, args.max_nbr_dps_to_insert_per_request)
# Are there still dataproducts left to query? # Are there still dataproducts left to query?
nbr_dps_left = len(result_query_all_dps) - args.limit nbr_dps_left = args.limits - n_inserted_items
if nbr_dps_left > 0 and args.limit > 0: if nbr_dps_left > 0 and args.limit > 0:
logging.debug("Limit on number of leftover dataproducts to query is set to {}".format(nbr_dps_left)) logging.debug("Limit on number of leftover dataproducts to query is set to {}".format(nbr_dps_left))
limit_str = "limit {}".format(nbr_dps_left) limit_str = "limit {}".format(nbr_dps_left)
else:
limit_str = ""
query_all_required_fields_pipeline_dataproducts = \ query_all_required_fields_pipeline_dataproducts = \
"select obsid, obsid_source, dp_type, project, activity, uri, size, dysco from astrowise.pl_dataproducts {}" \ "select obsid, obsid_source, dp_type, project, activity, uri, size, dysco from astrowise.pl_dataproducts order by id {}" \
.format(limit_str) .format(limit_str)
result_query_all_dps.extend(execute_query(conn, query_all_required_fields_pipeline_dataproducts)) n_inserted_items += query_and_insert(conn, ldvspec_interface, count_pl_dps,
query_all_required_fields_pipeline_dataproducts,
logging.info("{} dataproduct retrieved from ldvadmin".format(len(result_query_all_dps))) no_limit_to_insert, args.max_nbr_dps_to_insert_per_request)
# Create connection with ldv-spec-db using REST API, use temp. token created in my test-env
ldvspec_interface = LDVSpecInterface(args.host, args.token)
# Now obtain the attributes from result an add to database using REST insert_dataproduct method logging.info("%s dataproduct retrieved from ldvadmin", n_inserted_items)
lst_all_dps = []
for dps in result_query_all_dps:
logging.debug(dps)
dps_dict = {"obs_id": dps[0], "oid_source": dps[1], "dataproduct_source": "LOFAR LTA",
"dataproduct_type": dps[2], "project": dps[3], "activity": dps[4], "surl": dps[5],
"filesize": dps[6], "additional_meta": {"dysco_compression": dps[7]}, "location": dps[5]}
lst_all_dps.append(dps_dict)
if no_limit_to_insert: print("\nThat's All Folks!\n")
res_lst_ids = ldvspec_interface.insert_multiple_dataproduct(payload=lst_all_dps)
logging.info("Added {} DataProduct objects".format(len(res_lst_ids)))
logging.debug("Added with ids={}".format(res_lst_ids))
else:
nbr_required_inserts = math.ceil(len(lst_all_dps)/args.max_nbr_dps_to_insert_per_request)
for cnt in range(nbr_required_inserts):
start = cnt * args.max_nbr_dps_to_insert_per_request
end = start + args.max_nbr_dps_to_insert_per_request
res_lst_ids = ldvspec_interface.insert_multiple_dataproduct(payload=lst_all_dps[start:end])
logging.info("Insert count {} of {}: Added {} DataProduct objects [{} till {}]".
format(cnt, nbr_required_inserts, args.max_nbr_dps_to_insert_per_request, start, end))
logging.debug("Added with ids={}".format(res_lst_ids))
logging.info("\nThat's All Folks!\n")
if __name__ == "__main__": if __name__ == "__main__":
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment