Skip to content
Snippets Groups Projects
Commit dbadfbcd authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-1377: replaced OTDBBuslistener/OTDBEventHandler by TMSSBuslistener/TMSSEventMessageHandler

parent a2f6855f
No related branches found
No related tags found
1 merge request!736TMSS-1377 & TMSS-1376: TMSS-adapted TBB scripts and gitlab/docker TBB builds
......@@ -18,33 +18,40 @@ logger = logging.getLogger(__name__)
class TBBTMSSEventMessageHandler(TMSSEventMessageHandler):
def onSubTaskStatusChanged(self, id: int, status:str):
pass
def __init__(self):
super().__init__(log_event_messages=False)
self.tmss_client = TMSSsession.create_from_dbcreds_for_ldap()
def start_handling(self):
self.tmss_client.open()
super().start_handling()
def onObservationStarted(self, treeId, modificationTime):
global prevobsid
global proclist
global subdir
global TBBtype
logger.info("onObservationStarted(%s, %s)" % (treeId, modificationTime))
def stop_handling(self):
super().stop_handling()
self.tmss_client.close()
try:
with OTDBRPC.create() as otdbrpc:
if otdbrpc.taskGetTreeInfo(treeId)['processType']=='Observation':
obsid=treeId
def onSubTaskStatusChanged(self, id: int, status:str):
if status == 'started':
subtask = self.tmss_client.get_subtask(id)
if subtask['subtask_type'] == 'observation':
global prevobsid
global proclist
global subdir
global TBBtype
logger.info("onSubTaskStatusChanged: observation subtask id=% started", id)
try:
obsid=id
if TBBtype=='datawriter':
obsid=switchdatawriter(obsid)
elif TBBtype=='metadata':
obsid=addmetadata(obsid)
elif TBBtype=='writeparset':
obsid=writeparset(obsid)
#result = otdbrpc.taskGetSpecification(otdb_id=treeId)
#spec = result['specification']
#writeJSON(spec)
except Exception as e:
logger.error(str(e))
except Exception as e:
logger.error(str(e))
def writeparset(obsid):
parset = get_all_parameters_new(obsid)
......@@ -181,7 +188,7 @@ def main():
else:
print("No observation running, waiting for next observation")
with OTDBBusListener(handler_type=TBBOTDBEventMessageHandler, num_threads=1):
with TMSSBusListener(handler_type=TBBTMSSEventMessageHandler, num_threads=1):
waitForInterrupt()
if __name__ == '__main__':
......
......@@ -4,26 +4,27 @@
# import os and bfdata package
#import bfdata as bf
from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC
from lofar.sas.otdb.otdbrpc import OTDBRPC
from datetime import datetime
from datetime import timedelta
from lofar.sas.tmss.client.tmss_http_rest_client import TMSSsession
def currentobs(offset=0):
""" Get's a task list with current tasks.
* offset * offset in seconds. Use a negative value to get an observation in the past
"""
d=timedelta(0,offset)
with RADBRPC.create() as radb_rpc:
tasks=radb_rpc.getTasks(lower_bound=datetime.utcnow()+d, upper_bound=datetime.utcnow()+d,task_type='observation')
"""
d=timedelta(0,offset)
with TMSSsession.create_from_dbcreds_for_ldap() as tmss_client:
observation_subtasks = tmss_client.get_subtasks(subtask_type='observation',
scheduled_stop_time_greater_then=datetime.utcnow()+d,
scheduled_start_time_less_then=datetime.utcnow()+d)
if len(tasks)==0:
if len(observation_subtasks)==0:
return "None"
elif len(tasks)==1:
return tasks[0]['otdb_id']
elif len(observation_subtasks)==1:
return observation_subtasks[0]['id']
elif len(tasks)>1:
# change this to check for stations involved
return tasks[0]['otdb_id']
return observation_subtasks[0]['id']
def get_all_parameters_new(obsid, useFilename=False):
if useFilename:
......@@ -31,8 +32,12 @@ def get_all_parameters_new(obsid, useFilename=False):
assert False
if type(obsid) == type(""):
obsid=int(obsid.strip('L'))
with OTDBRPC.create() as otdbrpc:
parset = otdbrpc.taskGetSpecification(obsid)['specification']
with TMSSsession.create_from_dbcreds_for_ldap() as tmss_client:
parset = tmss_client.get_subtask_parset(obsid)
# the tmss parset is a plain text string
# tbb scripts expect a dict. convert it.
parset = {parts[0].strip():parts[2].strip() for parts in [line.partition('=') for line in parset.splitlines()]}
for k in list(parset.keys()):
parset[k.replace("ObsSW.","")]=parset[k]
return parset
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment