Select Git revision
QueryResults.js
-
Hugh Dickinson authoredHugh Dickinson authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
convert_alta_to_adex_cache_fastapi.py 3.89 KiB
# a script to read an ALTA database and write it to a adex_cache database
import psycopg2
from psycopg2 import Error
import argparse
import sql_scripts
from scripts.beam import calc_beam
def parse_database_url(url):
# parse database url like: postgres:postgres@localhost:5432/altadb_1sept2022
# get user, password
if "postgresql://" in url:
url = url.replace('postgresql://', '')
userpass = url.split('@')[0]
user = userpass.split(':')[0]
password = userpass.split(':')[1]
hostdatabase = url.split('@')[1]
database = hostdatabase.split('/')[1]
hostport = hostdatabase.split('/')[0]
host = hostport.split(":")[0]
db_port = hostport.split(":")[1]
return user, password, host, database, db_port
# conversion algorithm to extract a bit more metadata from an ALTA record
def convert_dataproduct(row):
name, pid, access_url, ra, dec, dt, dst, observation = row
if dt in ['inspectionPlot']:
return None
if dst in ['calibrationTable']:
return None
# determine collection
collection = 'apertif-imaging'
if dt == 'timeSeries':
collection = 'apertif-timeseries'
level = 0
if dst == 'uncalibratedVisibility':
level = 0
if dst == 'calibratedVisibility':
level = 1
if dst in ['continuumMF','continuumChunk','imageCube','beamCube','polarisationImage','polarisationCube','continuumCube']:
level = 2
beam = calc_beam(name)
record_to_insert = (name, pid, observation, beam, ra, dec, collection, level, dt, dst, access_url)
return record_to_insert
def do_convert(source, target):
try:
# open the ALTA source database
user, password, host, database, db_port = parse_database_url(source)
source_connection = psycopg2.connect(
database=database,
user=user,
password=password,
host=host,
port=db_port
)
# open the adex_cache target database
user, password, host, database, db_port = parse_database_url(target)
target_connection = psycopg2.connect(
database=database,
user=user,
password=password,
host=host,
port=db_port
)
source_cursor = source_connection.cursor()
target_cursor = target_connection.cursor()
# first drop the existing table and recreate it
target_cursor.execute(sql_scripts.drop_table_skyviews)
target_cursor.execute(sql_scripts.create_table_skyviews)
target_connection.commit()
print('fetching records from ALTA...')
source_cursor.execute(sql_scripts.select_from_alta)
rows = source_cursor.fetchall()
count = len(rows)
print(str(count) + ' records fetched')
print('inserting records into adex_cache...')
insert_count = 0
for row in rows:
record_to_insert = convert_dataproduct(row)
if not record_to_insert:
continue
target_cursor.execute(sql_scripts.insert_into_skyviews,record_to_insert)
target_connection.commit()
insert_count = insert_count + 1
print(str(insert_count) + ' inserted')
except Error as e:
print(e)
target_connection.close()
source_connection.close()
def main():
# Check the invocation arguments
parser = argparse.ArgumentParser()
parser.add_argument("--version", default=False, help="Show current version of this program", action="store_true")
parser.add_argument("--source_database_url", nargs="?", default='postgres:postgres@localhost:5432/alta', help="Source database string")
parser.add_argument("--target_database_url", nargs="?", default='postgres:postgres@localhost:5432/adex_cache', help="Source database string")
args = parser.parse_args()
do_convert(args.source_database_url,args.target_database_url)
if __name__ == '__main__':
main()