Skip to content
Snippets Groups Projects
Select Git revision
  • a795a90d8c45cd71b80cf7d88e53a95a5bf61c58
  • main default protected
  • 65_async_query
  • 169_codemeta
  • issue/152_cleaning_up
  • adex-main protected
  • sdc380-aladin-cone-search
  • sdc-222_multi_archive_query_review
  • 69_add_diracIAM
  • dev-nico
  • dev-dirac
  • acceptance
  • dev-zooniverse
13 results

QueryResults.js

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    convert_alta_to_adex_cache_fastapi.py 3.89 KiB
    
    # a script to read an ALTA database and write it to a adex_cache database
    
    import psycopg2
    from psycopg2 import Error
    import argparse
    
    import sql_scripts
    from scripts.beam import calc_beam
    
    
    def parse_database_url(url):
    
        # parse database url like: postgres:postgres@localhost:5432/altadb_1sept2022
        # get user, password
        if "postgresql://" in url:
            url = url.replace('postgresql://', '')
    
        userpass = url.split('@')[0]
        user = userpass.split(':')[0]
        password = userpass.split(':')[1]
    
        hostdatabase = url.split('@')[1]
    
        database = hostdatabase.split('/')[1]
        hostport = hostdatabase.split('/')[0]
        host = hostport.split(":")[0]
        db_port = hostport.split(":")[1]
    
        return user, password, host, database, db_port
    
    # conversion algorithm to extract a bit more metadata from an ALTA record
    def convert_dataproduct(row):
        name, pid, access_url, ra, dec, dt, dst, observation = row
    
        if dt in ['inspectionPlot']:
            return None
    
        if dst in ['calibrationTable']:
            return None
    
        # determine collection
        collection = 'apertif-imaging'
        if dt == 'timeSeries':
            collection = 'apertif-timeseries'
    
        level = 0
        if dst == 'uncalibratedVisibility':
            level = 0
        if dst == 'calibratedVisibility':
            level = 1
        if dst in ['continuumMF','continuumChunk','imageCube','beamCube','polarisationImage','polarisationCube','continuumCube']:
            level = 2
    
        beam = calc_beam(name)
    
        record_to_insert = (name, pid, observation, beam, ra, dec, collection, level, dt, dst, access_url)
        return record_to_insert
    
    
    def do_convert(source, target):
    
        try:
            # open the ALTA source database
            user, password, host, database, db_port = parse_database_url(source)
            source_connection = psycopg2.connect(
                database=database,
                user=user,
                password=password,
                host=host,
                port=db_port
            )
    
            # open the adex_cache target database
            user, password, host, database, db_port = parse_database_url(target)
            target_connection = psycopg2.connect(
                database=database,
                user=user,
                password=password,
                host=host,
                port=db_port
            )
    
            source_cursor = source_connection.cursor()
            target_cursor = target_connection.cursor()
    
            # first drop the existing table and recreate it
            target_cursor.execute(sql_scripts.drop_table_skyviews)
            target_cursor.execute(sql_scripts.create_table_skyviews)
            target_connection.commit()
    
            print('fetching records from ALTA...')
            source_cursor.execute(sql_scripts.select_from_alta)
            rows = source_cursor.fetchall()
            count = len(rows)
            print(str(count) + ' records fetched')
    
            print('inserting records into adex_cache...')
            insert_count = 0
            for row in rows:
                record_to_insert = convert_dataproduct(row)
                if not record_to_insert:
                    continue
    
                target_cursor.execute(sql_scripts.insert_into_skyviews,record_to_insert)
                target_connection.commit()
                insert_count = insert_count + 1
    
            print(str(insert_count) + ' inserted')
        except Error as e:
            print(e)
    
        target_connection.close()
        source_connection.close()
    
    def main():
    
        # Check the invocation arguments
        parser = argparse.ArgumentParser()
        parser.add_argument("--version", default=False, help="Show current version of this program", action="store_true")
        parser.add_argument("--source_database_url", nargs="?", default='postgres:postgres@localhost:5432/alta', help="Source database string")
        parser.add_argument("--target_database_url", nargs="?", default='postgres:postgres@localhost:5432/adex_cache', help="Source database string")
    
        args = parser.parse_args()
    
        do_convert(args.source_database_url,args.target_database_url)
    
    
    if __name__ == '__main__':
        main()