Skip to content
Snippets Groups Projects
Commit 4ffc5f19 authored by Mattia Mancini's avatar Mattia Mancini
Browse files

Add lofarquery script

parent 6300b001
No related branches found
No related tags found
3 merge requests!4Add lofarquery lta,!3Add lofarquery lta,!2Add lofarquery lta
from argparse import ArgumentParser
from sqlalchemy import create_engine, schema
from configparser import ConfigParser
import logging
import os.path, os
from typing import *
import pandas
CONNECTION_STRING = 'oracle+cx_oracle://{user}:{password}@db.lofar.target.rug.nl/?service_name=db.lofar.target.rug.nl'
DEFAULT_CONFIG_PATHS = ('~/.config/lofarquery/config.ini', '~/.awe/Environment.cfg')
DEFAULT_QUERY = '''
SELECT FILEOBJECT.OBJECT_ID AS OBJECT_ID,
O.OBSERVATIONID AS SASID,
C2.STARTTIME AS STARTTIME,
C2.ENDTIME AS ENDTIME,
C2.DURATION AS DURATION,
SUP.BEAMNUMBER AS BEAMNUMBER,
C2.STATIONSUBBAND AS SUBBAND,
O.INSTRUMENTFILTER AS FILTER,
O.ANTENNASET AS ANTENNASET,
P.RIGHTASCENSION AS RIGHTASCENSION,
P.DECLINATION AS DECLINATION,
SA.STATION_NAME AS STATIONS,
C2.CENTRALFREQUENCY AS CENTRALFREQUENCY,
STR.PRIMARY_URL AS PRIMARY_URL,
STR.SECONDARY_URL AS SECONDARY_URL FROM AWOPER.FILEOBJECT JOIN
AWOPER.STORAGETICKETRESOURCE STR ON FILEOBJECT.STORAGE_TICKET_RESOURCE=STR.OBJECT_ID JOIN
AWOPER.STORAGETICKET ST ON ST.OBJECT_ID=STR.TICKET JOIN
AWOPER.CORRELATEDDATAPRODUCT C2 on FILEOBJECT.DATA_OBJECT = C2.OBJECT_ID JOIN
AWOPER.SUBARRAYPOINTING SUP ON SUP.OBJECT_ID = C2.SUBARRAYPOINTING JOIN
AWOPER.POINTING P ON P.OBJECT_ID = SUP.POINTING JOIN
AWOPER.OBSERVATION O ON C2.OBSERVATION = O.OBJECT_ID JOIN
(SELECT O$S.OBJECT_ID AS OID, LISTAGG(S.NAME, ',') WITHIN GROUP ( ORDER BY S.NAME) AS STATION_NAME FROM
AWOPER.OBSERVATION$STATIONS O$S JOIN
AWOPER.STATION S on O$S.COLUMN_VALUE = S.OBJECT_ID GROUP BY O$S.OBJECT_ID) SA ON SA.OID = O.OBJECT_ID
WHERE FILEOBJECT.DATA_OBJECT$ = 'CorrelatedDataProduct' and FILEOBJECT.ISVALID >= 1 ORDER BY SASID, BEAMNUMBER, SUBBAND
'''
logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s')
logger = logging.getLogger('LOFAR generator')
def parse_args():
parser = ArgumentParser(description='Query the LTA to obtain the dataproduct list')
parser.add_argument('--config', help='configuration file where the credentials are stored', default='')
parser.add_argument('outfile', help='where to store the queried table')
parser.add_argument('--verbose', help='set verbose logging level', action='store_true')
parser.add_argument('--chunk_size', help='save data in the csv in chunks of rows', default=1000)
return parser.parse_args()
def create_db_engine(user: str, password: str):
connection_string: str = CONNECTION_STRING.format(user=user, password=password)
return create_engine(connection_string)
def read_user_credentials(file_paths: str) -> Dict:
parser = ConfigParser()
parser.read(file_paths)
credentials = {'user': parser['global']['database_user'], 'password': parser['global']['database_password']}
return credentials
def find_config_file(config_file_path: str):
if config_file_path:
if os.path.exists(config_file_path):
return [config_file_path]
else:
logging.warning('Cannot find configuration file %s: using default...', config_file_path)
for path in DEFAULT_CONFIG_PATHS:
expanded_path = os.path.expandvars(os.path.expanduser(path))
if os.path.exists(expanded_path):
return expanded_path
else:
logging.error('Cannot find a configuration file please specify one')
raise SystemExit(1)
def setup_logging(is_debug):
if is_debug:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
def perform_query(db_engine, path_to_store, chunk_size=1000):
iterator = pandas.read_sql_query(DEFAULT_QUERY, db_engine, chunksize=chunk_size)
if os.path.exists(path_to_store):
logging.debug('older file found removing...')
os.remove(path_to_store)
for chunk_id, chunk in enumerate(iterator):
logger.info('received chunk %s of %s rows', chunk_id, chunk_size)
chunk.to_csv(path_to_store, mode='a', header=True if chunk_id is 0 else False)
def main():
args = parse_args()
setup_logging(args.verbose)
config_file = find_config_file(args.config)
logger.debug('retrieving user credentials')
credentials = read_user_credentials(config_file)
logger.debug('credentials found')
logger.debug('creating database database connection')
engine = create_db_engine(**credentials)
logger.debug('database connection established')
logger.info('performing query')
perform_query(engine, args.outfile, chunk_size=args.chunk_size)
if __name__ == '__main__':
main()
......@@ -6,3 +6,5 @@ scikit-learn==0.22
scipy==1.4.1
sklearn==0.0
pandas
sqlalchemy
cx_Oracle
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment