Skip to content
Snippets Groups Projects
Commit 80a55f98 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

SW-699: adaptations to new rpc.py module

parent 95f1c455
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python3
import logging
from lofar.messaging.rpc_service import RPC, DEFAULT_BROKER, DEFAULT_BUSNAME
from lofar.messaging.rpc import RPCClient, RPCClientContextManagerMixin, DEFAULT_BROKER, DEFAULT_BUSNAME, DEFAULT_RPC_TIMEOUT
from lofar.lta.ingest.server.config import DEFAULT_INGEST_SERVICENAME
logger = logging.getLogger(__name__)
class IngestRPC():
def __init__(self, service_name=DEFAULT_INGEST_SERVICENAME, exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER):
self.rpc = RPC(service_name=service_name, exchange=exchange, broker=broker, timeout=5*60)
class IngestRPC(RPCClientContextManagerMixin):
def __init__(self, rpc_client: RPCClient = None):
"""Create an instance of the IngestRPC using the given RPCClient,
or if None given, to a default RPCClient connecting to the DEFAULT_INGEST_SERVICENAME service"""
super().__init__()
self._rpc_client = rpc_client or RPCClient(service_name=DEFAULT_INGEST_SERVICENAME)
def open(self):
self.rpc.open()
def close(self):
self.rpc.close()
def __enter__(self):
self.open()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
@staticmethod
def create(exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER):
"""Create a IngestRPC connecting to the given exchange/broker on the default DEFAULT_INGEST_SERVICENAME service"""
return IngestRPC(RPCClient(service_name=DEFAULT_INGEST_SERVICENAME,
exchange=exchange, broker=broker, timeout=DEFAULT_RPC_TIMEOUT))
def removeExportJob(self, export_group_id):
return self.rpc.execute('RemoveExportJob', export_group_id=export_group_id)
return self._rpc_client.execute('RemoveExportJob', export_group_id=export_group_id)
def setExportJobPriority(self, export_group_id, priority):
return self.rpc.execute('SetExportJobPriority', export_id=export_group_id, priority=priority)
return self._rpc_client.execute('SetExportJobPriority', export_id=export_group_id, priority=priority)
def getStatusReport(self):
return self.rpc.execute('GetStatusReport')
return self._rpc_client.execute('GetStatusReport')
def getJobStatus(self, job_id):
return self.rpc.execute('GetJobStatus', job_id=job_id)[job_id]
return self._rpc_client.execute('GetJobStatus', job_id=job_id)[job_id]
def getReport(self, export_group_id):
return self.rpc.execute('GetReport', job_group_id=export_group_id)
return self._rpc_client.execute('GetReport', job_group_id=export_group_id)
def getExportIds(self):
return self.rpc.execute('GetExportIds')
return self._rpc_client.execute('GetExportIds')
if __name__ == '__main__':
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment