Skip to content
Snippets Groups Projects
Commit fb337d4e authored by Nico Vermaas's avatar Nico Vermaas
Browse files

Merge branch 'oracle_and_lta' into 'main'

Add oracle connector

See merge request !12
parents e9698b55 a4ef21ee
No related branches found
No related tags found
1 merge request!12Add oracle connector
Pipeline #56368 passed
--datasource=oracle
--oracle_lib_dir=D:\oracle\instantclient_21_10
--connector=lta.DuplicateUnspecified
--data_host=awtier0:tier123@db.test.lofar.target.rug.nl:1521
--adex_backend_host=http://localhost:8000/adex_backend/
--adex_resource=ancillary_dp/create
--adex_token=9519e433ba37487f1a18121dfb1957d992fbb790
--batch_size=10000
--collection=none
--simulate_post
\ No newline at end of file
--datasource=oracle
--oracle_lib_dir=D:\oracle\instantclient_21_10
--connector=lta.SkyImages
--data_host=awtier0:tier123@db.test.lofar.target.rug.nl:1521
--adex_backend_host=http://localhost:8000/adex_backend/
--adex_resource=primary_dp/create
--adex_token=9519e433ba37487f1a18121dfb1957d992fbb790
--clear_collection
--batch_size=10000
--collection=lofar-skyimage
......@@ -38,6 +38,7 @@ dependencies = [
'psycopg2-binary >= 2.9.5, < 3.0',
'pyvo >= 1.4, < 2.0',
'importlib-metadata >= 0.12; python_version < "3.8"',
'cx-Oracle >= 8.0'
]
dynamic = ["version"]
......
import os, sys, argparse
from scraper.vo import vo_scraper
from scraper.postgres import postgres_scraper
from scraper.oracle import oracle_scraper
import scraper
def main():
......@@ -37,6 +38,9 @@ def main():
parser.add_argument("--data_host",
default="https://vo.astron.nl/tap/",
help="service/table, either VO or postgres. Examples: https://vo.astron.nl/tap/apertif_dr1.continuum_images, postgres:postgres@localhost:5432/alta")
parser.add_argument("--oracle_lib_dir",
default="D:\oracle\instantclient_21_10",
help="Directory where oracle client libraries are installed (only needed when using oracle connectors and the oracle libraries are not available on the path")
parser.add_argument("--connector",
default="lotss_dr2_mosaics_connector",
......@@ -102,5 +106,8 @@ def main():
if args.datasource.upper() == 'POSTGRES':
postgres_scraper.run(args)
if args.datasource.upper() == 'ORACLE':
oracle_scraper.run(args)
if __name__ == '__main__':
main()
import datetime
class DuplicateUnspecified():
"""
This is the (Oracle) connector that reads a simple example from the LTA
"""
def __init__(self):
self.sql_select_statement = """
SELECT unspec_dp.filename, COUNT(*) AS Occurrence
FROM AWOPER.UnspecifiedDataProduct unspec_dp
GROUP BY unspec_dp.filename HAVING COUNT(*)>1
"""
def translate(self, row, args):
"""
parse the specific row that comes from the postgres SQL query (sql_select_statement),
and translate it into the standard json payload for posting to the ADEX backend REST API
:param row: the results from the SQL query to a postgres service
:param args: the commandline arguments, but only args.collection is currently used
:return: payload: the json payload for the post to the ADEX backend
"""
payload = dict(
name=row[0],
count = row[1]
)
return payload
class SkyImages():
"""
This is the (Oracle) connector that reads a simple example from the LTA
"""
def __init__(self):
self.sql_select_statement = """
SELECT *
FROM AWOPER.SkyImageDataProduct dp
"""
def translate(self, row, args):
"""
parse the specific row that comes from the postgres SQL query (sql_select_statement),
and translate it into the standard json payload for posting to the ADEX backend REST API
:param row: the results from the SQL query to a postgres service
:param args: the commandline arguments, but only args.collection is currently used
:return: payload: the json payload for the post to the ADEX backend
"""
payload = dict(
pid=str(row[32]),
name=row[11],
dp_type="skyimage",
format=row[14], #this yields 'archive/tar', which is not a defined format by stakeholders
locality="tape",
#access_url=row['access_url'],
ra=float(row[38]/10),
dec=float(row[39]/10),
equinox="J2000.0",
#timestamp=row[10],
data_provider="ASTRON",
dataset_id=row[11].split('_')[0],
collection=args.collection,
pipeline_url=row[21],
pipeline_version = row[23],
direction_coordinate = str(row[32])
)
return payload
\ No newline at end of file
import cx_Oracle
import datetime
from scraper.adex_io import ADEX
from scraper.oracle.connectors import lta
def parse_database_url(url):
""""
parse a database url like: awtier0:<password>@db.lofar.target.rug.nl:1521
and cut it up in convenient parts for the psycopg2.connect
:param url: url that describes a database connection
:return: user, password, host, database, db_port
"""
# parse database url like: awtier0:<password>>@db.lofar.target.rug.nl:1521
if "oracle://" in url:
url = url.replace('oracle://','')
userpass = url.split('@')[0]
user = userpass.split(':')[0]
password = userpass.split(':')[1]
host_and_port = url.split('@')[1]
host = host_and_port.split(":")[0]
db_port = host_and_port.split(":")[1]
return user, password, host, db_port
def run(args):
""""
run the scraper Oracle functionality with the parameters that come in through args
:param args: the object with commandline parameters
:return
"""
print('oracle_scraper.run...')
try:
# init an ADEX instance
adex = ADEX(args)
adex.check_clear_resources(args)
# instantiate connector for translation from oracle to ADEX
connector_module,connector_name = args.connector.split('.')
if connector_module.upper() == 'LTA':
connector_class = getattr(lta, connector_name)
connector = connector_class()
# get the credentials and connection info from the data_host parameter
user, password, host, db_port = parse_database_url(args.data_host)
# if the oracle client library is given as a parameter, then initialize it
if args.oracle_lib_dir:
cx_Oracle.init_oracle_client(lib_dir=args.oracle_lib_dir)
# connect to oracle database
LTADSN = cx_Oracle.makedsn(host, db_port, service_name=host)
source_connection = cx_Oracle.connect(
dsn=LTADSN,
user=user,
password=password)
source_cursor = source_connection.cursor()
# read the data (oracle is fast enough to not use batches)
print('fetching records from the oracle database...')
source_cursor.execute(connector.sql_select_statement)
rows = source_cursor.fetchall()
count = len(rows)
print(f"{count} records fetched from {args.data_host}")
# iterate the results and store them in the adex_cache database
records = []
t1 = datetime.datetime.now()
for row in rows:
record = connector.translate(row, args)
if not record:
continue
# construct a list of records
records.append(record)
# post to ADEX in batches
t2 = datetime.datetime.now()
print(f"{count} translated and ready to post {args.adex_backend_host} in {t2 - t1}")
batch = []
total_count = 0
for record in records:
batch.append(record)
# when reaching the batch_size, post to ADEX backend
if len(batch) == int(args.batch_size):
total_count = total_count + len(batch)
t1 = datetime.datetime.now()
if not args.simulate_post:
adex.do_POST_json(args.adex_resource, batch)
t2 = datetime.datetime.now()
print(f"{len(batch)} ({total_count}) posted to {args.adex_backend_host} in {t2-t1}")
batch.clear()
# post the remainder
t1 = datetime.datetime.now()
if not args.simulate_post:
adex.do_POST_json(args.adex_resource, batch)
t2 = datetime.datetime.now()
total_count = total_count + len(batch)
print(f"{len(batch)} ({total_count}) posted to {args.adex_backend_host} in {t2 - t1}")
except Exception as error:
print(f"ERROR: {error}")
return
......@@ -22,6 +22,9 @@ class LincSkymap():
duration = float(row['t_end']) - float(row['t_start'])
#sampling_time = duration / time_samples
# do we need this is to link activities to dataproducts? or can we use row['activity'], which contains the SAS_ID.
pipeline_run_id = row['pipeline_run_id']
payload = dict(
pid=row['imageTitle'],
name=row['imageTitle'],
......@@ -45,6 +48,8 @@ class LincSkymap():
PSF_size=None,
#sky_footprint=str(row['s_region']),
sky_footprint=None,
# is this enough? or should the dataproducts be linked to the pipeline through 'pipeline_run_id' instead?
dataset_id=str(row['activity']),
collection=args.collection,
......@@ -52,7 +57,6 @@ class LincSkymap():
pipeline_version=row['pipeline_version'],
)
# are these parameters that we can create the Acitivies with?
return payload
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment