diff --git a/CEP/TBB/TBBdatawriter/TBBOTDBBusListener-datawriter.py b/CEP/TBB/TBBdatawriter/TBBOTDBBusListener-datawriter.py deleted file mode 100644 index 5bf3298acb5aa8b4b08aec5b3f29c75479a24402..0000000000000000000000000000000000000000 --- a/CEP/TBB/TBBdatawriter/TBBOTDBBusListener-datawriter.py +++ /dev/null @@ -1,184 +0,0 @@ -#!/usr/bin/env python - -from lofar.sas.otdb.OTDBBusListener import OTDBBusListener, OTDBEventMessageHandler -from lofar.sas.otdb.otdbrpc import OTDBRPC -from lofar.common.util import waitForInterrupt - -import logging -import json -from currentobs import currentobs -from currentobs import get_all_parameters_new -import TBBdatawriterController as TBBc -import subprocess -import os, sys -import time - -logger = logging.getLogger(__name__) - - -class TBBOTDBEventMessageHandler(OTDBEventMessageHandler): - - def onObservationStarted(self, treeId, modificationTime): - global prevobsid - global proclist - global subdir - global TBBtype - logger.info("onObservationStarted(%s, %s)" % (treeId, modificationTime)) - - try: - with OTDBRPC.create() as otdbrpc: - if otdbrpc.taskGetTreeInfo(treeId)['processType']=='Observation': - obsid=treeId - if TBBtype=='datawriter': - obsid=switchdatawriter(obsid) - elif TBBtype=='metadata': - obsid=addmetadata(obsid) - elif TBBtype=='writeparset': - obsid=writeparset(obsid) - #result = otdbrpc.taskGetSpecification(otdb_id=treeId) - #spec = result['specification'] - #writeJSON(spec) - - except Exception as e: - logger.error(str(e)) - -def writeparset(obsid): - parset = get_all_parameters_new(obsid) - parsetfilename = open("/data/config/tbb/new.parset","w") - for key in sorted(parset.keys()): - print("%s = %s" % (key,parset[key]),file=parsetfilename) - parsetfilename.close() - os.rename("/data/config/tbb/new.parset","/data/config/tbb/latest.parset") - subprocess.Popen(['scp','/data/config/tbb/latest.parset','lcuhead:/opt/lofar/tbb/currentparset/']) - prevobsid=obsid - return obsid - -def addmetadata(obsid): - global subdir - global prevobsid - global proclist - p=get_all_parameters_new(obsid) - project=p['Observation.Campaign.name'] - observationID="L"+p['Observation.otdbID'] - print('Observation ',obsid,'Start time ', p['Observation.startTime'],' Stop time: ',p['Observation.stopTime']+'\n') - print('Adding metadata for for obs ',prevobsid,' at ',time.strftime('%Y-%m-%d %H:%M:%S')+'\n') - if os.access(fCurrentlyDumping,os.R_OK): - exec(compile(open(fCurrentlyDumping, "rb").read(), fCurrentlyDumping, 'exec')) - while(bCurrentlyDumping): - time.sleep(1) - exec(compile(open(fCurrentlyDumping, "rb").read(), fCurrentlyDumping, 'exec')) - try: - proc=subprocess.Popen(["python3","/opt/lofar/src/CEP/TBB/TBBdatawriter/addmetadata.py",subdir,str(prevobsid)]) - except: - print("Adding metadata failed while executing:","python3","/opt/lofar/src/CEP/TBB/TBBdatawriter/addmetadata.py",subdir,prevobsid) - #os.system("chmod 664 /data/TBB/VHECRtest/*.h5") - datawriterrunning=False - time.sleep(15) - print('New observation ',obsid,' at ',time.strftime('%Y-%m-%d %H:%M:%S')+'\n') - if project in ['LC6_009','LC6_003','CEP4_commissioning','LT10_003']: - subdir="/data/projects/"+project+"/tbb/" - else: - subdir="/data/projects/LT10_017/tbb/" - datawriterrunning=True - prevobsid=obsid - return obsid - - -def switchdatawriter(obsid): - global subdir - global prevobsid - global proclist - p=get_all_parameters_new(obsid) - observer=p['Observation.Campaign.PI'] - antennaSet=p['Observation.antennaSet'] - project=p['Observation.Campaign.name'] - observationID="L"+p['Observation.otdbID'] - filterSelection=p['Observation.bandFilter'] - print('Observation ',obsid,'Start time ', p['Observation.startTime'],' Stop time: ',p['Observation.stopTime']+'\n') - if len(proclist)>0:#datawriterrunning: - print('Stop datawriter for obs ',prevobsid,' at ',time.strftime('%Y-%m-%d %H:%M:%S')+'\n') - if os.access(fCurrentlyDumping,os.R_OK): - exec(compile(open(fCurrentlyDumping, "rb").read(), fCurrentlyDumping, 'exec')) - while(bCurrentlyDumping): - time.sleep(1) - exec(compile(open(fCurrentlyDumping, "rb").read(), fCurrentlyDumping, 'exec')) - TBBc.stopdatawriter(proclist) - #os.system("chmod 664 /data/TBB/VHECRtest/*.h5") - datawriterrunning=False - time.sleep(15) - print('Start datawriter for obs ',obsid,' at ',time.strftime('%Y-%m-%d %H:%M:%S')+'\n') - if project in ['LC6_009','LC6_003','CEP4_commissioning','LT10_003']: - subdir="/data/projects/"+project+"/tbb/" - else: - subdir="/data/projects/LT10_017/tbb/" - proclist=TBBc.startdatawriter(filename="",timeouttime=20,observer=observer,antennaSet=antennaSet,project=project,observationID=observationID,filterSelection=filterSelection,datamaindir=subdir) - datawriterrunning=True - prevobsid=obsid - return obsid - - -nogui=True -#f=open('/home/veen/logs/rundatawriter.txt','w') -#prevobsid="None" -datawriterrunning=False -#proclist=[] - -print("Starting to run datawriter, but need an observation first") - -fCurrentlyDumping="/globaldata/tbb/nowdumping.py" - -if os.access(fCurrentlyDumping,os.R_OK): - exec(compile(open(fCurrentlyDumping, "rb").read(), fCurrentlyDumping, 'exec')) -else: - print("WARNING unable to open ",fCurrentlyDumping) - bCurrentlyDumping=False - -if len(sys.argv)>2: - offset=int(sys.argv[2]) -else: - offset=0 - -#while 1: -# obsid=currentobs(offset) -# if not obsid or obsid == "None" and time.time() < 1344345492+3600: # start test on august 7 -# obsid="L63176" - #obsid="L42479" -# if not obsid or obsid == "None" or obsid==prevobsid: -# time.sleep(20) -# else: -# obsid,prevobsid,proclist=switchdatawriter(obsid,prevobsid,proclist) - -def main(): - logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) - - obsid=currentobs(offset) - #obsid="None"#currentobs(offset) - global subdir - global prevobsid - global proclist - global TBBtype - if len(sys.argv)>1: - if sys.argv[1]=='metadata': - TBBtype="metadata" - elif sys.argv[1]=='writeparset': - TBBtype="writeparset" - else: - TBBtype="datawriter" - else: - TBBtype="datawriter" - prevobsid="None" - proclist=[] - subdir="/data/projects/LT10_017/tbb/" - if obsid != "None": - if TBBtype=='datawriter': - obsid=switchdatawriter(obsid) - if TBBtype=="writeparset": - obsid=writeparset(obsid) - else: - print("No observation running, waiting for next observation") - - with OTDBBusListener(handler_type=TBBOTDBEventMessageHandler, num_threads=1): - waitForInterrupt() - -if __name__ == '__main__': - main() diff --git a/CEP/TBB/TBBdatawriter/TBBOTDBBusListener-metadata.py b/CEP/TBB/TBBdatawriter/TBBOTDBBusListener-metadata.py deleted file mode 100644 index 5bf3298acb5aa8b4b08aec5b3f29c75479a24402..0000000000000000000000000000000000000000 --- a/CEP/TBB/TBBdatawriter/TBBOTDBBusListener-metadata.py +++ /dev/null @@ -1,184 +0,0 @@ -#!/usr/bin/env python - -from lofar.sas.otdb.OTDBBusListener import OTDBBusListener, OTDBEventMessageHandler -from lofar.sas.otdb.otdbrpc import OTDBRPC -from lofar.common.util import waitForInterrupt - -import logging -import json -from currentobs import currentobs -from currentobs import get_all_parameters_new -import TBBdatawriterController as TBBc -import subprocess -import os, sys -import time - -logger = logging.getLogger(__name__) - - -class TBBOTDBEventMessageHandler(OTDBEventMessageHandler): - - def onObservationStarted(self, treeId, modificationTime): - global prevobsid - global proclist - global subdir - global TBBtype - logger.info("onObservationStarted(%s, %s)" % (treeId, modificationTime)) - - try: - with OTDBRPC.create() as otdbrpc: - if otdbrpc.taskGetTreeInfo(treeId)['processType']=='Observation': - obsid=treeId - if TBBtype=='datawriter': - obsid=switchdatawriter(obsid) - elif TBBtype=='metadata': - obsid=addmetadata(obsid) - elif TBBtype=='writeparset': - obsid=writeparset(obsid) - #result = otdbrpc.taskGetSpecification(otdb_id=treeId) - #spec = result['specification'] - #writeJSON(spec) - - except Exception as e: - logger.error(str(e)) - -def writeparset(obsid): - parset = get_all_parameters_new(obsid) - parsetfilename = open("/data/config/tbb/new.parset","w") - for key in sorted(parset.keys()): - print("%s = %s" % (key,parset[key]),file=parsetfilename) - parsetfilename.close() - os.rename("/data/config/tbb/new.parset","/data/config/tbb/latest.parset") - subprocess.Popen(['scp','/data/config/tbb/latest.parset','lcuhead:/opt/lofar/tbb/currentparset/']) - prevobsid=obsid - return obsid - -def addmetadata(obsid): - global subdir - global prevobsid - global proclist - p=get_all_parameters_new(obsid) - project=p['Observation.Campaign.name'] - observationID="L"+p['Observation.otdbID'] - print('Observation ',obsid,'Start time ', p['Observation.startTime'],' Stop time: ',p['Observation.stopTime']+'\n') - print('Adding metadata for for obs ',prevobsid,' at ',time.strftime('%Y-%m-%d %H:%M:%S')+'\n') - if os.access(fCurrentlyDumping,os.R_OK): - exec(compile(open(fCurrentlyDumping, "rb").read(), fCurrentlyDumping, 'exec')) - while(bCurrentlyDumping): - time.sleep(1) - exec(compile(open(fCurrentlyDumping, "rb").read(), fCurrentlyDumping, 'exec')) - try: - proc=subprocess.Popen(["python3","/opt/lofar/src/CEP/TBB/TBBdatawriter/addmetadata.py",subdir,str(prevobsid)]) - except: - print("Adding metadata failed while executing:","python3","/opt/lofar/src/CEP/TBB/TBBdatawriter/addmetadata.py",subdir,prevobsid) - #os.system("chmod 664 /data/TBB/VHECRtest/*.h5") - datawriterrunning=False - time.sleep(15) - print('New observation ',obsid,' at ',time.strftime('%Y-%m-%d %H:%M:%S')+'\n') - if project in ['LC6_009','LC6_003','CEP4_commissioning','LT10_003']: - subdir="/data/projects/"+project+"/tbb/" - else: - subdir="/data/projects/LT10_017/tbb/" - datawriterrunning=True - prevobsid=obsid - return obsid - - -def switchdatawriter(obsid): - global subdir - global prevobsid - global proclist - p=get_all_parameters_new(obsid) - observer=p['Observation.Campaign.PI'] - antennaSet=p['Observation.antennaSet'] - project=p['Observation.Campaign.name'] - observationID="L"+p['Observation.otdbID'] - filterSelection=p['Observation.bandFilter'] - print('Observation ',obsid,'Start time ', p['Observation.startTime'],' Stop time: ',p['Observation.stopTime']+'\n') - if len(proclist)>0:#datawriterrunning: - print('Stop datawriter for obs ',prevobsid,' at ',time.strftime('%Y-%m-%d %H:%M:%S')+'\n') - if os.access(fCurrentlyDumping,os.R_OK): - exec(compile(open(fCurrentlyDumping, "rb").read(), fCurrentlyDumping, 'exec')) - while(bCurrentlyDumping): - time.sleep(1) - exec(compile(open(fCurrentlyDumping, "rb").read(), fCurrentlyDumping, 'exec')) - TBBc.stopdatawriter(proclist) - #os.system("chmod 664 /data/TBB/VHECRtest/*.h5") - datawriterrunning=False - time.sleep(15) - print('Start datawriter for obs ',obsid,' at ',time.strftime('%Y-%m-%d %H:%M:%S')+'\n') - if project in ['LC6_009','LC6_003','CEP4_commissioning','LT10_003']: - subdir="/data/projects/"+project+"/tbb/" - else: - subdir="/data/projects/LT10_017/tbb/" - proclist=TBBc.startdatawriter(filename="",timeouttime=20,observer=observer,antennaSet=antennaSet,project=project,observationID=observationID,filterSelection=filterSelection,datamaindir=subdir) - datawriterrunning=True - prevobsid=obsid - return obsid - - -nogui=True -#f=open('/home/veen/logs/rundatawriter.txt','w') -#prevobsid="None" -datawriterrunning=False -#proclist=[] - -print("Starting to run datawriter, but need an observation first") - -fCurrentlyDumping="/globaldata/tbb/nowdumping.py" - -if os.access(fCurrentlyDumping,os.R_OK): - exec(compile(open(fCurrentlyDumping, "rb").read(), fCurrentlyDumping, 'exec')) -else: - print("WARNING unable to open ",fCurrentlyDumping) - bCurrentlyDumping=False - -if len(sys.argv)>2: - offset=int(sys.argv[2]) -else: - offset=0 - -#while 1: -# obsid=currentobs(offset) -# if not obsid or obsid == "None" and time.time() < 1344345492+3600: # start test on august 7 -# obsid="L63176" - #obsid="L42479" -# if not obsid or obsid == "None" or obsid==prevobsid: -# time.sleep(20) -# else: -# obsid,prevobsid,proclist=switchdatawriter(obsid,prevobsid,proclist) - -def main(): - logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) - - obsid=currentobs(offset) - #obsid="None"#currentobs(offset) - global subdir - global prevobsid - global proclist - global TBBtype - if len(sys.argv)>1: - if sys.argv[1]=='metadata': - TBBtype="metadata" - elif sys.argv[1]=='writeparset': - TBBtype="writeparset" - else: - TBBtype="datawriter" - else: - TBBtype="datawriter" - prevobsid="None" - proclist=[] - subdir="/data/projects/LT10_017/tbb/" - if obsid != "None": - if TBBtype=='datawriter': - obsid=switchdatawriter(obsid) - if TBBtype=="writeparset": - obsid=writeparset(obsid) - else: - print("No observation running, waiting for next observation") - - with OTDBBusListener(handler_type=TBBOTDBEventMessageHandler, num_threads=1): - waitForInterrupt() - -if __name__ == '__main__': - main() diff --git a/CEP/TBB/TBBdatawriter/TBBOTDBBusListener-writeparset.py b/CEP/TBB/TBBdatawriter/TBBOTDBBusListener-writeparset.py deleted file mode 100644 index 5bf3298acb5aa8b4b08aec5b3f29c75479a24402..0000000000000000000000000000000000000000 --- a/CEP/TBB/TBBdatawriter/TBBOTDBBusListener-writeparset.py +++ /dev/null @@ -1,184 +0,0 @@ -#!/usr/bin/env python - -from lofar.sas.otdb.OTDBBusListener import OTDBBusListener, OTDBEventMessageHandler -from lofar.sas.otdb.otdbrpc import OTDBRPC -from lofar.common.util import waitForInterrupt - -import logging -import json -from currentobs import currentobs -from currentobs import get_all_parameters_new -import TBBdatawriterController as TBBc -import subprocess -import os, sys -import time - -logger = logging.getLogger(__name__) - - -class TBBOTDBEventMessageHandler(OTDBEventMessageHandler): - - def onObservationStarted(self, treeId, modificationTime): - global prevobsid - global proclist - global subdir - global TBBtype - logger.info("onObservationStarted(%s, %s)" % (treeId, modificationTime)) - - try: - with OTDBRPC.create() as otdbrpc: - if otdbrpc.taskGetTreeInfo(treeId)['processType']=='Observation': - obsid=treeId - if TBBtype=='datawriter': - obsid=switchdatawriter(obsid) - elif TBBtype=='metadata': - obsid=addmetadata(obsid) - elif TBBtype=='writeparset': - obsid=writeparset(obsid) - #result = otdbrpc.taskGetSpecification(otdb_id=treeId) - #spec = result['specification'] - #writeJSON(spec) - - except Exception as e: - logger.error(str(e)) - -def writeparset(obsid): - parset = get_all_parameters_new(obsid) - parsetfilename = open("/data/config/tbb/new.parset","w") - for key in sorted(parset.keys()): - print("%s = %s" % (key,parset[key]),file=parsetfilename) - parsetfilename.close() - os.rename("/data/config/tbb/new.parset","/data/config/tbb/latest.parset") - subprocess.Popen(['scp','/data/config/tbb/latest.parset','lcuhead:/opt/lofar/tbb/currentparset/']) - prevobsid=obsid - return obsid - -def addmetadata(obsid): - global subdir - global prevobsid - global proclist - p=get_all_parameters_new(obsid) - project=p['Observation.Campaign.name'] - observationID="L"+p['Observation.otdbID'] - print('Observation ',obsid,'Start time ', p['Observation.startTime'],' Stop time: ',p['Observation.stopTime']+'\n') - print('Adding metadata for for obs ',prevobsid,' at ',time.strftime('%Y-%m-%d %H:%M:%S')+'\n') - if os.access(fCurrentlyDumping,os.R_OK): - exec(compile(open(fCurrentlyDumping, "rb").read(), fCurrentlyDumping, 'exec')) - while(bCurrentlyDumping): - time.sleep(1) - exec(compile(open(fCurrentlyDumping, "rb").read(), fCurrentlyDumping, 'exec')) - try: - proc=subprocess.Popen(["python3","/opt/lofar/src/CEP/TBB/TBBdatawriter/addmetadata.py",subdir,str(prevobsid)]) - except: - print("Adding metadata failed while executing:","python3","/opt/lofar/src/CEP/TBB/TBBdatawriter/addmetadata.py",subdir,prevobsid) - #os.system("chmod 664 /data/TBB/VHECRtest/*.h5") - datawriterrunning=False - time.sleep(15) - print('New observation ',obsid,' at ',time.strftime('%Y-%m-%d %H:%M:%S')+'\n') - if project in ['LC6_009','LC6_003','CEP4_commissioning','LT10_003']: - subdir="/data/projects/"+project+"/tbb/" - else: - subdir="/data/projects/LT10_017/tbb/" - datawriterrunning=True - prevobsid=obsid - return obsid - - -def switchdatawriter(obsid): - global subdir - global prevobsid - global proclist - p=get_all_parameters_new(obsid) - observer=p['Observation.Campaign.PI'] - antennaSet=p['Observation.antennaSet'] - project=p['Observation.Campaign.name'] - observationID="L"+p['Observation.otdbID'] - filterSelection=p['Observation.bandFilter'] - print('Observation ',obsid,'Start time ', p['Observation.startTime'],' Stop time: ',p['Observation.stopTime']+'\n') - if len(proclist)>0:#datawriterrunning: - print('Stop datawriter for obs ',prevobsid,' at ',time.strftime('%Y-%m-%d %H:%M:%S')+'\n') - if os.access(fCurrentlyDumping,os.R_OK): - exec(compile(open(fCurrentlyDumping, "rb").read(), fCurrentlyDumping, 'exec')) - while(bCurrentlyDumping): - time.sleep(1) - exec(compile(open(fCurrentlyDumping, "rb").read(), fCurrentlyDumping, 'exec')) - TBBc.stopdatawriter(proclist) - #os.system("chmod 664 /data/TBB/VHECRtest/*.h5") - datawriterrunning=False - time.sleep(15) - print('Start datawriter for obs ',obsid,' at ',time.strftime('%Y-%m-%d %H:%M:%S')+'\n') - if project in ['LC6_009','LC6_003','CEP4_commissioning','LT10_003']: - subdir="/data/projects/"+project+"/tbb/" - else: - subdir="/data/projects/LT10_017/tbb/" - proclist=TBBc.startdatawriter(filename="",timeouttime=20,observer=observer,antennaSet=antennaSet,project=project,observationID=observationID,filterSelection=filterSelection,datamaindir=subdir) - datawriterrunning=True - prevobsid=obsid - return obsid - - -nogui=True -#f=open('/home/veen/logs/rundatawriter.txt','w') -#prevobsid="None" -datawriterrunning=False -#proclist=[] - -print("Starting to run datawriter, but need an observation first") - -fCurrentlyDumping="/globaldata/tbb/nowdumping.py" - -if os.access(fCurrentlyDumping,os.R_OK): - exec(compile(open(fCurrentlyDumping, "rb").read(), fCurrentlyDumping, 'exec')) -else: - print("WARNING unable to open ",fCurrentlyDumping) - bCurrentlyDumping=False - -if len(sys.argv)>2: - offset=int(sys.argv[2]) -else: - offset=0 - -#while 1: -# obsid=currentobs(offset) -# if not obsid or obsid == "None" and time.time() < 1344345492+3600: # start test on august 7 -# obsid="L63176" - #obsid="L42479" -# if not obsid or obsid == "None" or obsid==prevobsid: -# time.sleep(20) -# else: -# obsid,prevobsid,proclist=switchdatawriter(obsid,prevobsid,proclist) - -def main(): - logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) - - obsid=currentobs(offset) - #obsid="None"#currentobs(offset) - global subdir - global prevobsid - global proclist - global TBBtype - if len(sys.argv)>1: - if sys.argv[1]=='metadata': - TBBtype="metadata" - elif sys.argv[1]=='writeparset': - TBBtype="writeparset" - else: - TBBtype="datawriter" - else: - TBBtype="datawriter" - prevobsid="None" - proclist=[] - subdir="/data/projects/LT10_017/tbb/" - if obsid != "None": - if TBBtype=='datawriter': - obsid=switchdatawriter(obsid) - if TBBtype=="writeparset": - obsid=writeparset(obsid) - else: - print("No observation running, waiting for next observation") - - with OTDBBusListener(handler_type=TBBOTDBEventMessageHandler, num_threads=1): - waitForInterrupt() - -if __name__ == '__main__': - main() diff --git a/CEP/TBB/TBBdatawriter/TBBOTDBBusListener.py b/CEP/TBB/TBBdatawriter/TBBOTDBBusListener.py index 5bf3298acb5aa8b4b08aec5b3f29c75479a24402..018872dcc792adff98ba181d6747c5785a13778a 100644 --- a/CEP/TBB/TBBdatawriter/TBBOTDBBusListener.py +++ b/CEP/TBB/TBBdatawriter/TBBOTDBBusListener.py @@ -1,7 +1,7 @@ #!/usr/bin/env python -from lofar.sas.otdb.OTDBBusListener import OTDBBusListener, OTDBEventMessageHandler -from lofar.sas.otdb.otdbrpc import OTDBRPC +from lofar.sas.tmss.client.tmssbuslistener import TMSSBusListener, TMSSEventMessageHandler +from lofar.sas.tmss.client.tmss_http_rest_client import TMSSsession from lofar.common.util import waitForInterrupt import logging @@ -16,7 +16,11 @@ import time logger = logging.getLogger(__name__) -class TBBOTDBEventMessageHandler(OTDBEventMessageHandler): +class TBBTMSSEventMessageHandler(TMSSEventMessageHandler): + + def onSubTaskStatusChanged(self, id: int, status:str): + pass + def onObservationStarted(self, treeId, modificationTime): global prevobsid