Skip to content
Snippets Groups Projects
Commit 055253d7 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

SW-699: adapted ingest to new messaging

parent 41dee8ea
No related branches found
No related tags found
No related merge requests found
...@@ -31,7 +31,7 @@ import logging ...@@ -31,7 +31,7 @@ import logging
logger = logging.getLogger() logger = logging.getLogger()
class IngestBusListener(AbstractBusListener): class IngestBusListener(AbstractBusListener):
def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): def __init__(self, busname=DEFAULT_BUSNAME, queue_name=None, broker=DEFAULT_BROKER):
""" """
IngestBusListener listens on the lofar notification message bus and calls (empty) on<SomeMessage> methods when such a message is received. IngestBusListener listens on the lofar notification message bus and calls (empty) on<SomeMessage> methods when such a message is received.
Typical usage is to derive your own subclass from IngestBusListener and implement the specific on<SomeMessage> methods that you are interested in. Typical usage is to derive your own subclass from IngestBusListener and implement the specific on<SomeMessage> methods that you are interested in.
...@@ -39,8 +39,11 @@ class IngestBusListener(AbstractBusListener): ...@@ -39,8 +39,11 @@ class IngestBusListener(AbstractBusListener):
:param broker: valid Qpid broker host :param broker: valid Qpid broker host
""" """
self.subject_prefix = DEFAULT_INGEST_NOTIFICATION_PREFIX + "." self.subject_prefix = DEFAULT_INGEST_NOTIFICATION_PREFIX + "."
super(IngestBusListener, self).__init__(exchange_name=busname, routing_key=self.subject_prefix+"#", broker=broker) super(IngestBusListener, self).__init__(exchange_name=busname, routing_key=self.subject_prefix+"#",
queue_name=queue_name, broker=broker)
def start_listening(self):
super(IngestBusListener, self).start_listening()
def _handleMessage(self, msg): def _handleMessage(self, msg):
try: try:
...@@ -161,8 +164,8 @@ class IngestBusListener(AbstractBusListener): ...@@ -161,8 +164,8 @@ class IngestBusListener(AbstractBusListener):
class JobsMonitor(IngestBusListener): class JobsMonitor(IngestBusListener):
def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, listen_for_all_jobs=True): def __init__(self, busname=DEFAULT_BUSNAME, queue_name=None, broker=DEFAULT_BROKER, listen_for_all_jobs=True):
super(JobsMonitor, self).__init__(busname=busname, broker=broker) super(JobsMonitor, self).__init__(busname=busname, queue_name=queue_name, broker=broker)
self.__monitored_jobs = set() self.__monitored_jobs = set()
self.__listen_for_all_jobs = listen_for_all_jobs self.__listen_for_all_jobs = listen_for_all_jobs
......
...@@ -3,4 +3,4 @@ lofar_add_package(LTAIngestAdminServer) ...@@ -3,4 +3,4 @@ lofar_add_package(LTAIngestAdminServer)
lofar_add_package(LTAIngestTransferServer) lofar_add_package(LTAIngestTransferServer)
lofar_add_package(LTAIngestWebServer) lofar_add_package(LTAIngestWebServer)
lofar_package(LTAIngestServer 2.0 DEPENDS LTAIngestAdminServer LTAIngestTransferServer) lofar_package(LTAIngestServer 2.0 DEPENDS LTAIngestAdminServer LTAIngestTransferServer MoMQueryService)
...@@ -29,7 +29,7 @@ from lofar.lta.ingest.server.config import DEFAULT_INGEST_JOBS_QUEUENAME, DEFAUL ...@@ -29,7 +29,7 @@ from lofar.lta.ingest.server.config import DEFAULT_INGEST_JOBS_QUEUENAME, DEFAUL
from lofar.messaging import CommandMessage, EventMessage, FromBus, ToBus from lofar.messaging import CommandMessage, EventMessage, FromBus, ToBus
from lofar.messaging import create_queue, create_binding from lofar.messaging import create_queue, create_binding
from lofar.messaging.Service import Service, MessageHandlerInterface from lofar.messaging.Service import Service, MessageHandlerInterface
from lofar.common.util import convertIntKeysToString, humanreadablesize from lofar.common.util import humanreadablesize
from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
import os import os
...@@ -961,7 +961,7 @@ class IngestJobManager: ...@@ -961,7 +961,7 @@ class IngestJobManager:
except Exception as e: except Exception as e:
logger.error(e) logger.error(e)
return convertIntKeysToString(result) return result
def setExportJobPriority(self, export_id, priority): def setExportJobPriority(self, export_id, priority):
priority = max(0, min(9, int(priority))) priority = max(0, min(9, int(priority)))
...@@ -1133,7 +1133,9 @@ Total Files: %(total)i ...@@ -1133,7 +1133,9 @@ Total Files: %(total)i
# and add these to the extra_mail_addresses # and add these to the extra_mail_addresses
done_group_mom_jobs = [job for job in done_group_jobs if job.get('Type', '').lower() == 'mom'] done_group_mom_jobs = [job for job in done_group_jobs if job.get('Type', '').lower() == 'mom']
mom_export_ids = set([int(job['JobId'].split('_')[1]) for job in done_group_mom_jobs if 'JobId' in job]) mom_export_ids = set([int(job['JobId'].split('_')[1]) for job in done_group_mom_jobs if 'JobId' in job])
project_mom2ids = set([self.__momrpc.getObjectDetails(mom_export_id).get(mom_export_id).get('project_mom2id') for mom_export_id in mom_export_ids]) for i in range(10):
mom_objects_details = self.__momrpc.getObjectDetails(mom_export_ids)
project_mom2ids = set(obj_details.get('project_mom2id') for obj_details in mom_objects_details.values())
project_mom2ids = [x for x in project_mom2ids if x is not None] project_mom2ids = [x for x in project_mom2ids if x is not None]
for project_mom2id in project_mom2ids: for project_mom2id in project_mom2ids:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment