diff --git a/LTA/LTAIngest/LTAIngestClient/lib/rpc.py b/LTA/LTAIngest/LTAIngestClient/lib/rpc.py index d9fd669a7a0a0024e840e802295fa373694c0a6f..73472caed3f951139ff13eaca084071a44c3252d 100644 --- a/LTA/LTAIngest/LTAIngestClient/lib/rpc.py +++ b/LTA/LTAIngest/LTAIngestClient/lib/rpc.py @@ -1,46 +1,42 @@ #!/usr/bin/env python3 import logging -from lofar.messaging.rpc_service import RPC, DEFAULT_BROKER, DEFAULT_BUSNAME +from lofar.messaging.rpc import RPCClient, RPCClientContextManagerMixin, DEFAULT_BROKER, DEFAULT_BUSNAME, DEFAULT_RPC_TIMEOUT from lofar.lta.ingest.server.config import DEFAULT_INGEST_SERVICENAME logger = logging.getLogger(__name__) -class IngestRPC(): - def __init__(self, service_name=DEFAULT_INGEST_SERVICENAME, exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): - self.rpc = RPC(service_name=service_name, exchange=exchange, broker=broker, timeout=5*60) +class IngestRPC(RPCClientContextManagerMixin): + def __init__(self, rpc_client: RPCClient = None): + """Create an instance of the IngestRPC using the given RPCClient, + or if None given, to a default RPCClient connecting to the DEFAULT_INGEST_SERVICENAME service""" + super().__init__() + self._rpc_client = rpc_client or RPCClient(service_name=DEFAULT_INGEST_SERVICENAME) - def open(self): - self.rpc.open() - - def close(self): - self.rpc.close() - - def __enter__(self): - self.open() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.close() + @staticmethod + def create(exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): + """Create a IngestRPC connecting to the given exchange/broker on the default DEFAULT_INGEST_SERVICENAME service""" + return IngestRPC(RPCClient(service_name=DEFAULT_INGEST_SERVICENAME, + exchange=exchange, broker=broker, timeout=DEFAULT_RPC_TIMEOUT)) def removeExportJob(self, export_group_id): - return self.rpc.execute('RemoveExportJob', export_group_id=export_group_id) + return self._rpc_client.execute('RemoveExportJob', export_group_id=export_group_id) def setExportJobPriority(self, export_group_id, priority): - return self.rpc.execute('SetExportJobPriority', export_id=export_group_id, priority=priority) + return self._rpc_client.execute('SetExportJobPriority', export_id=export_group_id, priority=priority) def getStatusReport(self): - return self.rpc.execute('GetStatusReport') + return self._rpc_client.execute('GetStatusReport') def getJobStatus(self, job_id): - return self.rpc.execute('GetJobStatus', job_id=job_id)[job_id] + return self._rpc_client.execute('GetJobStatus', job_id=job_id)[job_id] def getReport(self, export_group_id): - return self.rpc.execute('GetReport', job_group_id=export_group_id) + return self._rpc_client.execute('GetReport', job_group_id=export_group_id) def getExportIds(self): - return self.rpc.execute('GetExportIds') + return self._rpc_client.execute('GetExportIds') if __name__ == '__main__':