diff --git a/CEP/TBB/TBBdatawriter/TBBOTDBBusListener.py b/CEP/TBB/TBBdatawriter/TBBOTDBBusListener.py index 018872dcc792adff98ba181d6747c5785a13778a..71dfef063441f4102f460fc2c8e6ff22d7dbc858 100644 --- a/CEP/TBB/TBBdatawriter/TBBOTDBBusListener.py +++ b/CEP/TBB/TBBdatawriter/TBBOTDBBusListener.py @@ -18,33 +18,40 @@ logger = logging.getLogger(__name__) class TBBTMSSEventMessageHandler(TMSSEventMessageHandler): - def onSubTaskStatusChanged(self, id: int, status:str): - pass + def __init__(self): + super().__init__(log_event_messages=False) + self.tmss_client = TMSSsession.create_from_dbcreds_for_ldap() + def start_handling(self): + self.tmss_client.open() + super().start_handling() - def onObservationStarted(self, treeId, modificationTime): - global prevobsid - global proclist - global subdir - global TBBtype - logger.info("onObservationStarted(%s, %s)" % (treeId, modificationTime)) + def stop_handling(self): + super().stop_handling() + self.tmss_client.close() - try: - with OTDBRPC.create() as otdbrpc: - if otdbrpc.taskGetTreeInfo(treeId)['processType']=='Observation': - obsid=treeId + def onSubTaskStatusChanged(self, id: int, status:str): + if status == 'started': + subtask = self.tmss_client.get_subtask(id) + + if subtask['subtask_type'] == 'observation': + global prevobsid + global proclist + global subdir + global TBBtype + logger.info("onSubTaskStatusChanged: observation subtask id=% started", id) + + try: + obsid=id if TBBtype=='datawriter': obsid=switchdatawriter(obsid) elif TBBtype=='metadata': obsid=addmetadata(obsid) elif TBBtype=='writeparset': obsid=writeparset(obsid) - #result = otdbrpc.taskGetSpecification(otdb_id=treeId) - #spec = result['specification'] - #writeJSON(spec) - except Exception as e: - logger.error(str(e)) + except Exception as e: + logger.error(str(e)) def writeparset(obsid): parset = get_all_parameters_new(obsid) @@ -181,7 +188,7 @@ def main(): else: print("No observation running, waiting for next observation") - with OTDBBusListener(handler_type=TBBOTDBEventMessageHandler, num_threads=1): + with TMSSBusListener(handler_type=TBBTMSSEventMessageHandler, num_threads=1): waitForInterrupt() if __name__ == '__main__': diff --git a/CEP/TBB/TBBdatawriter/currentobs.py b/CEP/TBB/TBBdatawriter/currentobs.py index 232c1f4ce04215a15b2604226508b475d4ebad45..147c770fb4fe4f0884fbb744a1ee3aced75bb75f 100755 --- a/CEP/TBB/TBBdatawriter/currentobs.py +++ b/CEP/TBB/TBBdatawriter/currentobs.py @@ -4,26 +4,27 @@ # import os and bfdata package #import bfdata as bf -from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC -from lofar.sas.otdb.otdbrpc import OTDBRPC from datetime import datetime from datetime import timedelta +from lofar.sas.tmss.client.tmss_http_rest_client import TMSSsession def currentobs(offset=0): """ Get's a task list with current tasks. * offset * offset in seconds. Use a negative value to get an observation in the past - """ - d=timedelta(0,offset) - with RADBRPC.create() as radb_rpc: - tasks=radb_rpc.getTasks(lower_bound=datetime.utcnow()+d, upper_bound=datetime.utcnow()+d,task_type='observation') + """ + d=timedelta(0,offset) + with TMSSsession.create_from_dbcreds_for_ldap() as tmss_client: + observation_subtasks = tmss_client.get_subtasks(subtask_type='observation', + scheduled_stop_time_greater_then=datetime.utcnow()+d, + scheduled_start_time_less_then=datetime.utcnow()+d) - if len(tasks)==0: + if len(observation_subtasks)==0: return "None" - elif len(tasks)==1: - return tasks[0]['otdb_id'] + elif len(observation_subtasks)==1: + return observation_subtasks[0]['id'] elif len(tasks)>1: # change this to check for stations involved - return tasks[0]['otdb_id'] + return observation_subtasks[0]['id'] def get_all_parameters_new(obsid, useFilename=False): if useFilename: @@ -31,8 +32,12 @@ def get_all_parameters_new(obsid, useFilename=False): assert False if type(obsid) == type(""): obsid=int(obsid.strip('L')) - with OTDBRPC.create() as otdbrpc: - parset = otdbrpc.taskGetSpecification(obsid)['specification'] + + with TMSSsession.create_from_dbcreds_for_ldap() as tmss_client: + parset = tmss_client.get_subtask_parset(obsid) + # the tmss parset is a plain text string + # tbb scripts expect a dict. convert it. + parset = {parts[0].strip():parts[2].strip() for parts in [line.partition('=') for line in parset.splitlines()]} for k in list(parset.keys()): parset[k.replace("ObsSW.","")]=parset[k] return parset