diff --git a/SAS/DataManagement/Cleanup/AutoCleanupService/autocleanupservice b/SAS/DataManagement/Cleanup/AutoCleanupService/autocleanupservice index 58df134ef2c8ed7d64cf90f247d9e87959d7b2af..b4e51327f044d96c5a5af614ee183cf715a4fddf 100755 --- a/SAS/DataManagement/Cleanup/AutoCleanupService/autocleanupservice +++ b/SAS/DataManagement/Cleanup/AutoCleanupService/autocleanupservice @@ -19,14 +19,14 @@ # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # -from lofar.lta.ingest.client.ingestbuslistener import IngestBusListener +from lofar.lta.ingest.client.ingestbuslistener import IngestEventMessageHandler, IngestEventMesssageBusListener from lofar.sas.datamanagement.cleanup.rpc import CleanupRPC -from lofar.sas.datamanagement.cleanup.config import DEFAULT_SERVICENAME as CLEANUP_SERVICENAME +from lofar.sas.datamanagement.cleanup.config import DEFAULT_CLEANUP_SERVICENAME from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC -from lofar.messaging.Service import * +from lofar.messaging import * from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME import logging @@ -35,44 +35,25 @@ logger = logging.getLogger(__name__) import os import os.path -# command to connect lexar003 ingest notification exchange to scu001 notification queue for autocleanupservice -# qpid-route route add scu001.control.lofar lexar003.lexar.control.lofar lofar.lta.ingest.notification lofar.lta.ingest.notification.autocleanupservice LTAIngest.# -class AutoCleanupIngestBusListener(IngestBusListener): - def __init__(self, - busname=DEFAULT_BUSNAME - broker=DEFAULT_BROKER): - super(AutoCleanupIngestBusListener, self).__init__(busname=busname, broker=broker) +class AutoCleanupIngestEventMessageHandler(IngestEventMessageHandler): + def __init__(self, exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): + super().__init__() + self.__curpc = CleanupRPC.create(exchange=exchange, broker=broker) + self.__momrpc = MoMQueryRPC.create(exchange=exchange, broker=broker) - self.__curpc = CleanupRPC(busname=busname, broker=broker) - self.__momrpc = MoMQueryRPC(busname=busname, broker=broker, timeout=180) - - def start_listening(self, numthreads=1): + def start_handling(self): self.__curpc.open() self.__momrpc.open() - super(AutoCleanupIngestBusListener, self).start_listening(numthreads) + super().start_handling() - def stop_listening(self): + def stop_handling(self): self.__momrpc.close() self.__curpc.close() - super(AutoCleanupIngestBusListener, self).stop_listening() - - def onJobStarted(self, job_dict): - self._logJobNotification('started ', job_dict); - - def onJobFinished(self, job_dict): - self._logJobNotification('finished', job_dict); - - def onJobFailed(self, job_dict): - self._logJobNotification('failed ', job_dict, level=logging.WARN); - - def onTaskProgress(self, task_dict): - self._logJobNotification('task progress', task_dict); + super().stop_handling() def onTaskFinished(self, task_dict): try: - self._logJobNotification('task finised', task_dict); - if task_dict.get('type','').lower() != 'mom': return @@ -164,35 +145,38 @@ class AutoCleanupIngestBusListener(IngestBusListener): cleanupUpstreamDataIfPossible(mom_id) logger.info('finished analyzing upstream tasks of mom_id %s for further data cleanup', mom_id) except Exception as e: - logger.error('Error while cleaning up data for upstream tasks of %s: %s', otdb_id, e) + logger.exception('Error while cleaning up data for upstream tasks of %s: %s', otdb_id, e) + raise def main(): from lofar.common.util import waitForInterrupt - from lofar.messaging import setQpidLogLevel - from lofar.common import dbcredentials from optparse import OptionParser # Check the invocation arguments parser = OptionParser('%prog [options]', - description='run ingest mom adapter, which receives jobs from MoM, and updates ingest statuses to MoM') - parser.add_option('-q', '--broker', dest='broker', type='string', default='localhost', help='Address of the qpid broker, default: %default') - parser.add_option('--busname', - dest='busname', - type='string', - default=DEFAULT_BUSNAME, - help='Name of the bus exchange on the qpid broker, default: %default') + description='run autocleanupservice, which listens for finished ingest jobs, and then automatically deletes the ingested data') + parser.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, + help='Address of the messaging broker, default: %default') + parser.add_option("-e", "--exchange", dest="exchange", type="string", default=DEFAULT_BUSNAME, + help="Name of the bus exchange on the broker, [default: %default]") (options, args) = parser.parse_args() logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) - logger.info('Starting AutoCleanupIngestBusListener...') + logger.info('Starting AutoCleanupService...') - with AutoCleanupIngestBusListener(busname=options.busname, - broker=options.broker) as autocleanup_ingestlistener: + with IngestEventMesssageBusListener(AutoCleanupIngestEventMessageHandler, + handler_kwargs={'exchange': options.exchange, + 'broker': options.broker}, + num_threads=1): + with ToBus() as t: + t.send(EventMessage(subject="LTA.Ingest.notification.TaskFinished", + content={'type':'MoM', + 'otdb_id': 0})) waitForInterrupt() - logger.info('Stopped AutoCleanupIngestBusListener') + logger.info('Stopped AutoCleanupService') if __name__ == '__main__': main()