Skip to content
Snippets Groups Projects
Commit 441622f7 authored by Auke Klazema's avatar Auke Klazema
Browse files

SW-705: Convert TaskManagement to the new messaging system

parent 5b514cd1
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python3 #!/usr/bin/env python3
from lofar.messaging.RPC import RPCWrapper from lofar.messaging.rpc_service import RPC
from lofar.messaging import DEFAULT_BUSNAME, DEFAULT_BROKER from lofar.messaging import DEFAULT_BUSNAME, DEFAULT_BROKER
from lofar.mac.services.taskmanagement.common.config import DEFAULT_SERVICENAME from lofar.mac.services.taskmanagement.common.config import DEFAULT_SERVICENAME
class TaskManagementRPC(RPCWrapper): class TaskManagementRPC():
def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, timeout=120, verbose=False): def __init__(self, exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, timeout=120):
super(TaskManagementRPC, self).__init__(busname=busname, servicename=DEFAULT_SERVICENAME, broker=broker, timeout=timeout, verbose=verbose) self.rpc = RPC(exchange=exchange, service_name=DEFAULT_SERVICENAME, broker=broker,
timeout=timeout)
def abort_task(self, otdb_id): def abort_task(self, otdb_id):
result = self.rpc('AbortTask', otdb_id=otdb_id) result = self.rpc.execute('AbortTask', otdb_id=otdb_id)
return result return result
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
# #
# You should have received a copy of the GNU General Public License along # You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#!/usr/bin/env python3
# $Id$ # $Id$
""" """
...@@ -27,7 +26,7 @@ import logging ...@@ -27,7 +26,7 @@ import logging
from lofar.mac.services.taskmanagement.common.config import DEFAULT_SERVICENAME from lofar.mac.services.taskmanagement.common.config import DEFAULT_SERVICENAME
from lofar.messaging import Service, DEFAULT_BROKER, DEFAULT_BUSNAME from lofar.messaging import Service, DEFAULT_BROKER, DEFAULT_BUSNAME
from lofar.messaging.Service import MessageHandlerInterface from lofar.messaging.rpc_service import ServiceMessageHandler
from lofar.sas.otdb.otdbrpc import OTDBRPC, OTDBPRCException from lofar.sas.otdb.otdbrpc import OTDBRPC, OTDBPRCException
from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC
from lofar.mac.observation_control_rpc import ObservationControlRPCClient from lofar.mac.observation_control_rpc import ObservationControlRPCClient
...@@ -35,26 +34,18 @@ from lofar.mac.observation_control_rpc import ObservationControlRPCClient ...@@ -35,26 +34,18 @@ from lofar.mac.observation_control_rpc import ObservationControlRPCClient
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class TaskManagementHandler(MessageHandlerInterface): class TaskManagementHandler(ServiceMessageHandler):
def handle_message(self, msg): def handle_message(self, msg):
pass pass
def __init__(self, **kwargs): def __init__(self):
super(TaskManagementHandler, self).__init__(**kwargs) super(TaskManagementHandler, self).__init__()
self.service2MethodMap = {
'AbortTask': self.abort_task,
}
self.radb = RARPC() self.radb = RARPC()
self.otdb = OTDBRPC() self.otdb = OTDBRPC()
self.obs_ctrl = ObservationControlRPCClient() self.obs_ctrl = ObservationControlRPCClient()
def prepare_loop(self): def AbortTask(self, otdb_id):
""" Tread-local initialisation. """
pass
def abort_task(self, otdb_id):
"""aborts tasks based on otdb id """aborts tasks based on otdb id
:param otdb_id: :param otdb_id:
:return: dict with aborted key saying if aborting was succesful and otdb_id key :return: dict with aborted key saying if aborting was succesful and otdb_id key
...@@ -95,19 +86,16 @@ class TaskManagementHandler(MessageHandlerInterface): ...@@ -95,19 +86,16 @@ class TaskManagementHandler(MessageHandlerInterface):
return task_type, task_status return task_type, task_status
def createService(busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, verbose=False): def createService(exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER):
return Service(DEFAULT_SERVICENAME, return Service(service_name=DEFAULT_SERVICENAME,
TaskManagementHandler, handler_type=TaskManagementHandler,
busname=busname, exchange=exchange,
broker=broker, broker=broker,
use_service_methods=True, num_threads=1)
numthreads=1,
verbose=verbose)
def main(): def main():
from optparse import OptionParser from optparse import OptionParser
from lofar.messaging import setQpidLogLevel
from lofar.common.util import waitForInterrupt from lofar.common.util import waitForInterrupt
# make sure we run in UTC timezone # make sure we run in UTC timezone
...@@ -117,20 +105,18 @@ def main(): ...@@ -117,20 +105,18 @@ def main():
# Check the invocation arguments # Check the invocation arguments
parser = OptionParser("%prog [options]", parser = OptionParser("%prog [options]",
description='runs the resourceassignment database service') description='runs the resourceassignment database service')
parser.add_option('-q', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, parser.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER,
help='Address of the qpid broker, default: %default') help='Address of the qpid broker, default: %default')
parser.add_option("-b", "--busname", dest="busname", type="string", default=DEFAULT_BUSNAME, parser.add_option("-e", "--exchange", dest="exchange", type="string", default=DEFAULT_BUSNAME,
help="Name of the bus exchange on the qpid broker, default: %default") help="Name of the bus exchange on the qpid broker, default: %default")
parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging') parser.add_option('-V', '--verbose', dest='verbose', action='store_true',
help='verbose logging')
(options, args) = parser.parse_args() (options, args) = parser.parse_args()
setQpidLogLevel(logging.INFO)
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
level=logging.DEBUG if options.verbose else logging.INFO) level=logging.DEBUG if options.verbose else logging.INFO)
with createService(busname=options.busname, with createService(exchange=options.exchange, broker=options.broker):
broker=options.broker,
verbose=options.verbose):
waitForInterrupt() waitForInterrupt()
......
...@@ -22,7 +22,7 @@ import unittest ...@@ -22,7 +22,7 @@ import unittest
from lofar.mac.services.taskmanagement.server.taskmanagement import TaskManagementHandler from lofar.mac.services.taskmanagement.server.taskmanagement import TaskManagementHandler
from lofar.sas.otdb.otdbrpc import OTDBPRCException from lofar.sas.otdb.otdbrpc import OTDBPRCException
from unittest.mock import mock from unittest import mock
class TestServiceSkeletonHandler(unittest.TestCase): class TestServiceSkeletonHandler(unittest.TestCase):
...@@ -64,75 +64,75 @@ class TestServiceSkeletonHandler(unittest.TestCase): ...@@ -64,75 +64,75 @@ class TestServiceSkeletonHandler(unittest.TestCase):
self.logger_mock = logger_patcher.start() self.logger_mock = logger_patcher.start()
self.handler = TaskManagementHandler() self.handler = TaskManagementHandler()
self.handler.prepare_loop() self.handler.start_handling()
def test_abort_task_should_abort_non_running_or_scheduled_observation(self): def test_AbortTask_should_abort_non_running_or_scheduled_observation(self):
self.handler.abort_task(self.obs_otdb_id) self.handler.AbortTask(self.obs_otdb_id)
self.assertEqual(1, self.otdbrpc_mock().taskSetStatus.call_count) self.assertEqual(1, self.otdbrpc_mock().taskSetStatus.call_count)
def test_abort_task_should_abort_a_pipeline(self): def test_AbortTask_should_abort_a_pipeline(self):
self.handler.abort_task(self.pipeline_otdb_id) self.handler.AbortTask(self.pipeline_otdb_id)
self.assertEqual(1, self.otdbrpc_mock().taskSetStatus.call_count) self.assertEqual(1, self.otdbrpc_mock().taskSetStatus.call_count)
def test_abort_task_should_abort_a_reservation(self): def test_AbortTask_should_abort_a_reservation(self):
self.handler.abort_task(self.reservation_otdb_id) self.handler.AbortTask(self.reservation_otdb_id)
self.assertEqual(1, self.otdbrpc_mock().taskSetStatus.call_count) self.assertEqual(1, self.otdbrpc_mock().taskSetStatus.call_count)
def test_abort_task_should_not_abort_a_running_observation(self): def test_AbortTask_should_not_abort_a_running_observation(self):
self.handler.abort_task(self.running_obs_otdb_id) self.handler.AbortTask(self.running_obs_otdb_id)
self.assertEqual(0, self.otdbrpc_mock().taskSetStatus.call_count) self.assertEqual(0, self.otdbrpc_mock().taskSetStatus.call_count)
def test_abort_task_should_not_abort_a_queued_observation(self): def test_AbortTask_should_not_abort_a_queued_observation(self):
self.handler.abort_task(self.queued_obs_otdb_id) self.handler.AbortTask(self.queued_obs_otdb_id)
self.assertEqual(0, self.otdbrpc_mock().taskSetStatus.call_count) self.assertEqual(0, self.otdbrpc_mock().taskSetStatus.call_count)
def test_abort_task_should_abort_running_observation(self): def test_AbortTask_should_abort_running_observation(self):
self.handler.abort_task(self.running_obs_otdb_id) self.handler.AbortTask(self.running_obs_otdb_id)
self.obs_ctrl_rpc_mock().abort_observation.assert_called_with(self.running_obs_otdb_id) self.obs_ctrl_rpc_mock().abort_observation.assert_called_with(self.running_obs_otdb_id)
def test_abort_task_should_return_aborted_true_on_success_for_running_observations(self): def test_AbortTask_should_return_aborted_true_on_success_for_running_observations(self):
self.obs_ctrl_rpc_mock().abort_observation.return_value = {"aborted": True, self.obs_ctrl_rpc_mock().abort_observation.return_value = {"aborted": True,
"otdb_id": self.running_obs_otdb_id} "otdb_id": self.running_obs_otdb_id}
result = self.handler.abort_task(self.running_obs_otdb_id) result = self.handler.AbortTask(self.running_obs_otdb_id)
self.assertTrue(result["aborted"]) self.assertTrue(result["aborted"])
self.assertEqual(self.running_obs_otdb_id, result["otdb_id"]) self.assertEqual(self.running_obs_otdb_id, result["otdb_id"])
def test_abort_task_should_return_aborted_false_on_failure_for_running_observations(self): def test_AbortTask_should_return_aborted_false_on_failure_for_running_observations(self):
self.obs_ctrl_rpc_mock().abort_observation.return_value = {"aborted": False, self.obs_ctrl_rpc_mock().abort_observation.return_value = {"aborted": False,
"otdb_id": self.running_obs_otdb_id} "otdb_id": self.running_obs_otdb_id}
result = self.handler.abort_task(self.running_obs_otdb_id) result = self.handler.AbortTask(self.running_obs_otdb_id)
self.assertFalse(result["aborted"]) self.assertFalse(result["aborted"])
self.assertEqual(self.running_obs_otdb_id, result["otdb_id"]) self.assertEqual(self.running_obs_otdb_id, result["otdb_id"])
def test_abort_task_should_return_aborted_false_on_exception_setting_task_status(self): def test_AbortTask_should_return_aborted_false_on_exception_setting_task_status(self):
self.otdbrpc_mock().taskSetStatus.side_effect = OTDBPRCException("Not aborted") self.otdbrpc_mock().taskSetStatus.side_effect = OTDBPRCException("Not aborted")
result = self.handler.abort_task(self.pipeline_otdb_id) result = self.handler.AbortTask(self.pipeline_otdb_id)
self.assertFalse(result["aborted"]) self.assertFalse(result["aborted"])
def test_abort_task_should_return_aborted_true_on_setting_task_status_to_aborted(self): def test_AbortTask_should_return_aborted_true_on_setting_task_status_to_aborted(self):
result = self.handler.abort_task(self.pipeline_otdb_id) result = self.handler.AbortTask(self.pipeline_otdb_id)
self.assertTrue(result["aborted"]) self.assertTrue(result["aborted"])
def test_abort_task_should_log_aborting_of_active_task(self): def test_AbortTask_should_log_aborting_of_active_task(self):
self.handler.abort_task(self.running_obs_otdb_id) self.handler.AbortTask(self.running_obs_otdb_id)
self.logger_mock.info.assert_any_call("Aborting active task: %s", self.running_obs_otdb_id) self.logger_mock.info.assert_any_call("Aborting active task: %s", self.running_obs_otdb_id)
def test_abort_task_should_log_aborting_of_inactive_task(self): def test_AbortTask_should_log_aborting_of_inactive_task(self):
self.handler.abort_task(self.pipeline_otdb_id) self.handler.AbortTask(self.pipeline_otdb_id)
self.logger_mock.info.assert_any_call("Aborting inactive task: %s", self.pipeline_otdb_id) self.logger_mock.info.assert_any_call("Aborting inactive task: %s", self.pipeline_otdb_id)
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
import logging import logging
from lofar.messaging.RPC import RPCWrapper from lofar.messaging.rpc_service import RPC
from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME
from lofar.mac.config import DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME from lofar.mac.config import DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME
...@@ -30,13 +30,13 @@ from lofar.mac.config import DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME ...@@ -30,13 +30,13 @@ from lofar.mac.config import DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class ObservationControlRPCClient(RPCWrapper): class ObservationControlRPCClient():
def __init__(self, def __init__(self,
busname=DEFAULT_BUSNAME, exchange=DEFAULT_BUSNAME,
servicename=DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME, servicename=DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME,
broker=DEFAULT_BROKER, broker=DEFAULT_BROKER,
timeout=120): timeout=120):
super(ObservationControlRPCClient, self).__init__(busname, servicename, broker, timeout=timeout) self.rpc = RPC(service_name=servicename, exchange=exchange, broker=broker, timeout=timeout)
def abort_observation(self, sas_id): def abort_observation(self, sas_id):
return self.rpc('AbortObservation', sas_id=sas_id) return self.rpc.execute('AbortObservation', sas_id=sas_id)
#!/usr/bin/env python3 #!/usr/bin/env python3
import logging import logging
import datetime
from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME
from lofar.messaging.RPC import RPC, RPCException, RPCWrapper from lofar.messaging.rpc_service import RPC
from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICENAME from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICENAME
''' Simple RPC client for Service lofarbus.*Z ''' Simple RPC client for Service lofarbus.*Z
...@@ -11,6 +10,7 @@ from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICENAME ...@@ -11,6 +10,7 @@ from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICENAME
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class OTDBPRCException(Exception): class OTDBPRCException(Exception):
def __init__(self, message): def __init__(self, message):
self.message = message self.message = message
...@@ -18,17 +18,18 @@ class OTDBPRCException(Exception): ...@@ -18,17 +18,18 @@ class OTDBPRCException(Exception):
def __str__(self): def __str__(self):
return "OTDBPRCException: " + str(self.message) return "OTDBPRCException: " + str(self.message)
class OTDBRPC(RPCWrapper):
class OTDBRPC():
def __init__(self, busname=DEFAULT_BUSNAME, def __init__(self, busname=DEFAULT_BUSNAME,
broker=DEFAULT_BROKER, broker=DEFAULT_BROKER,
timeout=120): timeout=120):
super(OTDBRPC, self).__init__(busname, DEFAULT_OTDB_SERVICENAME, broker, timeout=timeout) self.rpc = RPC(busname, DEFAULT_OTDB_SERVICENAME, broker, timeout=timeout)
def taskGetIDs(self, otdb_id=None, mom_id=None): def taskGetIDs(self, otdb_id=None, mom_id=None):
if otdb_id: if otdb_id:
answer = self.rpc('TaskGetIDs', OtdbID=otdb_id, return_tuple=False) answer = self.rpc.execute('TaskGetIDs', OtdbID=otdb_id, return_tuple=False)
elif mom_id: elif mom_id:
answer = self.rpc('TaskGetIDs', MomID=mom_id, return_tuple=False) answer = self.rpc.execute('TaskGetIDs', MomID=mom_id, return_tuple=False)
else: else:
raise OTDBPRCException("TaskGetIDs was called without OTDB or Mom ID") raise OTDBPRCException("TaskGetIDs was called without OTDB or Mom ID")
if not answer: if not answer:
...@@ -38,9 +39,9 @@ class OTDBRPC(RPCWrapper): ...@@ -38,9 +39,9 @@ class OTDBRPC(RPCWrapper):
def taskGetSpecification(self, otdb_id=None, mom_id=None): def taskGetSpecification(self, otdb_id=None, mom_id=None):
if otdb_id: if otdb_id:
answer = self.rpc('TaskGetSpecification', OtdbID=otdb_id) answer = self.rpc.execute('TaskGetSpecification', OtdbID=otdb_id)
elif mom_id: elif mom_id:
answer = self.rpc('TaskGetSpecification', MomID=mom_id) answer = self.rpc.execute('TaskGetSpecification', MomID=mom_id)
else: else:
raise OTDBPRCException("TaskGetSpecification was called without OTDB or Mom ID") raise OTDBPRCException("TaskGetSpecification was called without OTDB or Mom ID")
if not answer["TaskSpecification"]: if not answer["TaskSpecification"]:
...@@ -49,9 +50,9 @@ class OTDBRPC(RPCWrapper): ...@@ -49,9 +50,9 @@ class OTDBRPC(RPCWrapper):
def taskCreate(self, otdb_id=None, mom_id=None, template_name="", campaign_name="", specification={}): def taskCreate(self, otdb_id=None, mom_id=None, template_name="", campaign_name="", specification={}):
if otdb_id: ##Can this ever be called with a otdb_id? if otdb_id: ##Can this ever be called with a otdb_id?
answer = self.rpc('TaskCreate', OtdbID=otdb_id, TemplateName=template_name, CampaignName=campaign_name, Specification=specification) answer = self.rpc.execute('TaskCreate', OtdbID=otdb_id, TemplateName=template_name, CampaignName=campaign_name, Specification=specification)
elif mom_id: elif mom_id:
answer = self.rpc('TaskCreate', MomID=mom_id, TemplateName=template_name, CampaignName=campaign_name, Specification=specification) answer = self.rpc.execute('TaskCreate', MomID=mom_id, TemplateName=template_name, CampaignName=campaign_name, Specification=specification)
else: else:
raise OTDBPRCException("TaskCreate was called without OTDB or Mom ID") raise OTDBPRCException("TaskCreate was called without OTDB or Mom ID")
if not answer["Success"]: if not answer["Success"]:
...@@ -59,20 +60,20 @@ class OTDBRPC(RPCWrapper): ...@@ -59,20 +60,20 @@ class OTDBRPC(RPCWrapper):
return {"mom_id": answer["MomID"], "otdb_id": answer["OtdbID"]} return {"mom_id": answer["MomID"], "otdb_id": answer["OtdbID"]}
def taskGetTreeInfo(self, otdb_id): def taskGetTreeInfo(self, otdb_id):
info = self.rpc('TaskGetTreeInfo', otdb_id=otdb_id) info = self.rpc.execute('TaskGetTreeInfo', otdb_id=otdb_id)
return info return info
def taskGetStatus(self, otdb_id): def taskGetStatus(self, otdb_id):
return self.rpc('TaskGetStatus', otdb_id=otdb_id)['status'] return self.rpc.execute('TaskGetStatus', otdb_id=otdb_id)['status']
def taskSetStatus(self, otdb_id=None, new_status="", update_timestamps=True): def taskSetStatus(self, otdb_id=None, new_status="", update_timestamps=True):
answer = self.rpc('TaskSetStatus', OtdbID=otdb_id, NewStatus=new_status, UpdateTimestamps=update_timestamps) answer = self.rpc.execute('TaskSetStatus', OtdbID=otdb_id, NewStatus=new_status, UpdateTimestamps=update_timestamps)
if not answer["Success"]: if not answer["Success"]:
raise OTDBPRCException("TaskSetStatus failed for %i" % (otdb_id,)) raise OTDBPRCException("TaskSetStatus failed for %i" % (otdb_id,))
return {"mom_id": answer["MomID"], "otdb_id": answer["OtdbID"]} return {"mom_id": answer["MomID"], "otdb_id": answer["OtdbID"]}
def taskSetSpecification(self, otdb_id=None, specification={}): def taskSetSpecification(self, otdb_id=None, specification={}):
answer = self.rpc('TaskSetSpecification', OtdbID=otdb_id, Specification=specification) answer = self.rpc.execute('TaskSetSpecification', OtdbID=otdb_id, Specification=specification)
if "Errors" in answer: if "Errors" in answer:
for key, problem in answer["Errors"].items(): for key, problem in answer["Errors"].items():
logger.warning("TaskSetSpecification for %i failed to set key %s because of %s" % (otdb_id, key, problem)) logger.warning("TaskSetSpecification for %i failed to set key %s because of %s" % (otdb_id, key, problem))
...@@ -80,23 +81,23 @@ class OTDBRPC(RPCWrapper): ...@@ -80,23 +81,23 @@ class OTDBRPC(RPCWrapper):
return {"mom_id": answer["MomID"], "otdb_id": answer["OtdbID"]} return {"mom_id": answer["MomID"], "otdb_id": answer["OtdbID"]}
def taskPrepareForScheduling(self, otdb_id=None, starttime="", endtime=""): def taskPrepareForScheduling(self, otdb_id=None, starttime="", endtime=""):
answer = self.rpc('TaskPrepareForScheduling', OtdbID= otdb_id, StartTime=starttime, StopTime=endtime) answer = self.rpc.execute('TaskPrepareForScheduling', OtdbID= otdb_id, StartTime=starttime, StopTime=endtime)
return {"otdb_id": answer["OtdbID"]} return {"otdb_id": answer["OtdbID"]}
def taskDelete(self, otdb_id=None): def taskDelete(self, otdb_id=None):
answer = self.rpc('TaskDelete', OtdbID=otdb_id) answer = self.rpc.execute('TaskDelete', OtdbID=otdb_id)
if not answer["Success"]: if not answer["Success"]:
logger.warning("TaskDelete failed for %i" % (otdb_id,)) ##Probably was already deleted? logger.warning("TaskDelete failed for %i" % (otdb_id,)) ##Probably was already deleted?
return {"mom_id": answer["MomID"], "otdb_id": answer["OtdbID"]} return {"mom_id": answer["MomID"], "otdb_id": answer["OtdbID"]}
def getDefaultTemplates(self): def getDefaultTemplates(self):
answer = self.rpc('GetDefaultTemplates') answer = self.rpc.execute('GetDefaultTemplates')
if not answer["DefaultTemplates"]: if not answer["DefaultTemplates"]:
raise OTDBPRCException("GetDefaultTemplates returned an empty dict") raise OTDBPRCException("GetDefaultTemplates returned an empty dict")
return {"default_templates": answer["DefaultTemplates"]} return {"default_templates": answer["DefaultTemplates"]}
def getStations(self): def getStations(self):
answer = self.rpc('GetStations') answer = self.rpc.execute('GetStations')
if not answer["Stations"]: if not answer["Stations"]:
raise OTDBPRCException("GetStations returned an empty dict") raise OTDBPRCException("GetStations returned an empty dict")
return {"stations": answer["Stations"]} return {"stations": answer["Stations"]}
...@@ -104,7 +105,7 @@ class OTDBRPC(RPCWrapper): ...@@ -104,7 +105,7 @@ class OTDBRPC(RPCWrapper):
def setProject(self, name=None, title="", pi="", co_i="", contact=""): def setProject(self, name=None, title="", pi="", co_i="", contact=""):
if not name: if not name:
raise OTDBPRCException("SetProject was called with an empty project") raise OTDBPRCException("SetProject was called with an empty project")
answer = self.rpc('SetProject', name=name, pi=pi, co_i=co_i, contact=contact) answer = self.rpc.execute('SetProject', name=name, pi=pi, co_i=co_i, contact=contact)
if not answer["projectID"]: if not answer["projectID"]:
raise OTDBPRCException("SetProject failed for %s" % (name,)) raise OTDBPRCException("SetProject failed for %s" % (name,))
return {"project_id": answer["projectID"]} return {"project_id": answer["projectID"]}
...@@ -114,6 +115,7 @@ def do_tests(busname=DEFAULT_BUSNAME): ...@@ -114,6 +115,7 @@ def do_tests(busname=DEFAULT_BUSNAME):
with OTDBRPC(busname=busname, broker='10.149.96.6') as rpc: with OTDBRPC(busname=busname, broker='10.149.96.6') as rpc:
print(rpc.taskGetStatus(452728)) print(rpc.taskGetStatus(452728))
if __name__ == '__main__': if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
do_tests() do_tests()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment