Skip to content
Snippets Groups Projects
Commit 09c8993b authored by Ruud Beukema's avatar Ruud Beukema
Browse files

Task #10368: Reintegrated branch LOFAR-RT-Story10147 (Expose interface to MoM...

Task #10368: Reintegrated branch LOFAR-RT-Story10147 (Expose interface to MoM authorization model) into the trunk
parents df7cf34f 6ef1125e
No related branches found
No related tags found
No related merge requests found
Showing
with 1789 additions and 99 deletions
...@@ -4524,8 +4524,14 @@ MAC/Navigator2/scripts/readStationConfigs.ctl -text ...@@ -4524,8 +4524,14 @@ MAC/Navigator2/scripts/readStationConfigs.ctl -text
MAC/Navigator2/scripts/readStationConnections.ctl -text MAC/Navigator2/scripts/readStationConnections.ctl -text
MAC/Navigator2/scripts/setSumAlerts.ctl -text MAC/Navigator2/scripts/setSumAlerts.ctl -text
MAC/Navigator2/scripts/transferMPs.ctl -text MAC/Navigator2/scripts/transferMPs.ctl -text
MAC/Services/src/ObservationControl2.py -text
MAC/Services/src/config.py -text
MAC/Services/src/observationcontrol2 -text
MAC/Services/src/observationcontrol2.ini -text
MAC/Services/src/pipelinecontrol -text MAC/Services/src/pipelinecontrol -text
MAC/Services/src/pipelinecontrol.ini -text MAC/Services/src/pipelinecontrol.ini -text
MAC/Services/test/tObservationControl2.py -text
MAC/Services/test/tObservationControl2.sh -text
MAC/Services/test/tPipelineControl.sh eol=lf MAC/Services/test/tPipelineControl.sh eol=lf
MAC/Test/APL/PVSSproject/colorDB/Lofar[!!-~]colors -text svneol=native#application/octet-stream MAC/Test/APL/PVSSproject/colorDB/Lofar[!!-~]colors -text svneol=native#application/octet-stream
MAC/Test/APL/PVSSproject/colorDB/colorDB_de -text svneol=native#application/octet-stream MAC/Test/APL/PVSSproject/colorDB/colorDB_de -text svneol=native#application/octet-stream
......
...@@ -65,6 +65,9 @@ class Credentials: ...@@ -65,6 +65,9 @@ class Credentials:
# Database selection # Database selection
self.database = "" self.database = ""
# All key-value pairs found in the config
self.config = {}
def __str__(self): def __str__(self):
return "type={type} addr={host}:{port} auth={user}:{password} db={database}".format(**self.__dict__) return "type={type} addr={host}:{port} auth={user}:{password} db={database}".format(**self.__dict__)
...@@ -122,6 +125,8 @@ class Credentials: ...@@ -122,6 +125,8 @@ class Credentials:
return options return options
class DBCredentials: class DBCredentials:
NoSectionError = NoSectionError
def __init__(self, filepatterns=None): def __init__(self, filepatterns=None):
""" """
Read database credentials from all configuration files matched by any of the patterns. Read database credentials from all configuration files matched by any of the patterns.
...@@ -172,11 +177,11 @@ class DBCredentials: ...@@ -172,11 +177,11 @@ class DBCredentials:
# create default credentials # create default credentials
creds = Credentials() creds = Credentials()
# read configuration # read configuration (can throw NoSectionError)
try: d = dict(self.config.items(self._section(database)))
d = dict(self.config.items(self._section(database)))
except NoSectionError: # save the full config to support custom fields
return creds creds.config = d
# parse and convert config information # parse and convert config information
if "host" in d: creds.host = d["host"] if "host" in d: creds.host = d["host"]
......
...@@ -50,6 +50,12 @@ class TestDBCredentials(unittest.TestCase): ...@@ -50,6 +50,12 @@ class TestDBCredentials(unittest.TestCase):
self.assertEqual(str(c_out), str(c_in)) self.assertEqual(str(c_out), str(c_in))
def test_get_non_existing(self):
dbc = DBCredentials(filepatterns=[])
with self.assertRaises(DBCredentials.NoSectionError):
dbc.get("UNKNOWN")
def test_list(self): def test_list(self):
dbc = DBCredentials(filepatterns=[]) dbc = DBCredentials(filepatterns=[])
...@@ -93,6 +99,22 @@ database = mydb ...@@ -93,6 +99,22 @@ database = mydb
self.assertEqual(str(c_out), str(c_in)) self.assertEqual(str(c_out), str(c_in))
def test_freeform_config_option(self):
f = tempfile.NamedTemporaryFile()
f.write("""
[database:DATABASE]
foo = bar
test = word word
""")
f.flush() # don't close since that will delete the TemporaryFile
# extract our config
dbc = DBCredentials(filepatterns=[f.name])
c_out = dbc.get("DATABASE")
# test if the free-form config options got through
self.assertEqual(c_out.config["foo"], "bar")
self.assertEqual(c_out.config["test"], "word word")
def main(argv): def main(argv):
unittest.main() unittest.main()
......
# $Id$ # $Id$
include(FindPythonModule)
find_python_module(fabric REQUIRED)
lofar_add_bin_scripts( lofar_add_bin_scripts(
pipelinecontrol pipelinecontrol
observationcontrol2
) )
python_install( python_install(
PipelineControl.py PipelineControl.py
ObservationControl2.py
config.py
DESTINATION lofar/mac DESTINATION lofar/mac
) )
# supervisord config files # supervisord config files
install(FILES install(FILES
pipelinecontrol.ini pipelinecontrol.ini
observationcontrol2.ini
DESTINATION etc/supervisord.d) DESTINATION etc/supervisord.d)
#!/usr/bin/python
#
# Copyright (C) 2016
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
import os
import logging
from fabric import tasks
from optparse import OptionParser
from fabric.api import env, run
from lofar.messaging import Service
from lofar.messaging import setQpidLogLevel
from lofar.common.util import waitForInterrupt
from lofar.messaging.Service import MessageHandlerInterface
import lofar.mac.config as config
logger = logging.getLogger(__name__)
class ObservationControlHandler(MessageHandlerInterface):
def __init__(self, **kwargs):
super(ObservationControlHandler, self).__init__(**kwargs)
self.service2MethodMap = {
'AbortObservation': self.abort_observation
}
env.hosts = ["localhost"]
if os.environ.has_key("LOFARENV"):
lofar_environment = os.environ['LOFARENV']
if lofar_environment == "PRODUCTION":
env.hosts = [config.PRODUCTION_OBSERVATION_CONTROL_HOST]
elif lofar_environment == "TEST":
env.hosts = [config.TEST_OBSERVATION_CONTROL_HOST]
def abort_observation_task(self, sas_id):
logger.info("trying to abort ObservationControl for SAS ID: %s", sas_id)
killed = False
pid_line = run('pidof ObservationControl')
pids = pid_line.split(' ')
for pid in pids:
pid_sas_id = run("ps -p %s --no-heading -o command | awk -F[{}] '{ print $2; }'" % pid)
if pid_sas_id == sas_id:
logger.info("Killing ObservationControl with PID: %s for SAS ID: %s", pid, sas_id)
run('kill -SIGINT %s' % pid)
killed = True
return killed
def abort_observation(self, sas_id):
""" aborts an observation for a single sas_id """
result = tasks.execute(self.abort_observation_task, sas_id)
aborted = True in result.values()
return {'aborted': aborted}
def handle_message(self, msg):
pass
def create_service(bus_name=config.DEFAULT_OBSERVATION_CONTROL_BUS_NAME,
service_name=config.DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME,
broker=None, verbose=False):
return Service(service_name,
ObservationControlHandler,
busname=bus_name,
broker=broker,
use_service_methods=True,
numthreads=1,
handler_args={},
verbose=verbose)
def main():
# make sure we run in UTC timezone
import os
os.environ['TZ'] = 'UTC'
# Check the invocation arguments
parser = OptionParser("%prog [options]",
description='runs the observationcontrol service')
parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost')
parser.add_option("-b", "--busname", dest="busname", type="string", default=config.DEFAULT_OBSERVATION_CONTROL_BUS_NAME, help="Name of the bus exchange on the qpid broker, default: %s" % config.DEFAULT_OBSERVATION_CONTROL_BUS_NAME)
parser.add_option("-s", "--servicename", dest="servicename", type="string", default=config.DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME, help="Name for this service, default: %s" % config.DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME)
parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging')
(options, args) = parser.parse_args()
setQpidLogLevel(logging.INFO)
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
level=logging.DEBUG if options.verbose else logging.INFO)
with create_service(bus_name=options.busname,
service_name=options.servicename,
broker=options.broker,
verbose=options.verbose):
waitForInterrupt()
if __name__ == '__main__':
main()
#!/usr/bin/python
#
# Copyright (C) 2016
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
from lofar.messaging import adaptNameToEnvironment
DEFAULT_OBSERVATION_CONTROL_BUS_NAME = adaptNameToEnvironment('lofar.mac.command')
DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME = 'ObservationControl2'
PRODUCTION_OBSERVATION_CONTROL_HOST = "mcu001.control.lofar"
TEST_OBSERVATION_CONTROL_HOST = "mcu099.control.lofar"
#!/usr/bin/python
#
# Copyright (C) 2016
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
'''
runs the ObservationControl service
'''
from lofar.mac import ObservationControl2
if __name__ == '__main__':
ObservationControl2.main()
[program:observationcontrol2]
command=/bin/bash -c 'source $LOFARROOT/lofarinit.sh;exec observationcontrol2'
user=lofarsys
stopsignal=INT ; KeyboardInterrupt
stopasgroup=true ; bash does not propagate signals
stdout_logfile=%(program_name)s.log
redirect_stderr=true
stderr_logfile=NONE
# $Id$ # $Id$
include(LofarCTest) include(LofarCTest)
include(FindPythonModule)
lofar_find_package(Python REQUIRED) lofar_find_package(Python REQUIRED)
find_python_module(mock REQUIRED)
lofar_add_test(tPipelineControl) lofar_add_test(tPipelineControl)
import unittest
import uuid
import mock
import os
import time
from lofar.mac.ObservationControl2 import ObservationControlHandler, create_service
from qpid.messaging.message import Message as QpidMessage
import lofar.mac.config as config
class TestObservationControlHandler(unittest.TestCase):
pid1 = "1000"
pid2 = "2000"
sas_id = "100"
def _run_side_effect(self, cmd):
if cmd.startswith("ps -p %s" % self.pid1):
return self.sas_id
elif cmd.startswith("ps -p %s" % self.pid2):
return self.sas_id + "10"
elif cmd.startswith("pidof"):
return "%s %s" % (self.pid1, self.pid2)
elif cmd.startswith("kill"):
return ""
def setUp(self):
fabric_run_pathcher = mock.patch('lofar.mac.ObservationControl2.run')
self.addCleanup(fabric_run_pathcher.stop)
self.fabric_run_mock = fabric_run_pathcher.start()
self.fabric_run_mock.side_effect = self._run_side_effect
fabric_tasks_pathcher = mock.patch('lofar.mac.ObservationControl2.tasks')
self.addCleanup(fabric_tasks_pathcher.stop)
self.fabric_tasks_mock = fabric_tasks_pathcher.start()
fabric_env_pathcher = mock.patch('lofar.mac.ObservationControl2.env')
self.addCleanup(fabric_env_pathcher.stop)
self.fabric_env_mock = fabric_env_pathcher.start()
logger_patcher = mock.patch('lofar.mac.ObservationControl2.logger')
self.addCleanup(logger_patcher.stop)
self.logger_mock = logger_patcher.start()
self.observation_control_handler = ObservationControlHandler()
def test_abort_observation_task_should_run_pidof_ObservationControl(self):
self.observation_control_handler.abort_observation_task(self.sas_id)
self.fabric_run_mock.assert_any_call('pidof ObservationControl')
def test_abort_observation_tasks_should_run_ps_to_find_sas_id_on_command(self):
self.observation_control_handler.abort_observation_task(self.sas_id)
self.fabric_run_mock.assert_any_call(
"ps -p %s --no-heading -o command | awk -F[{}] '{ print $2; }'" % self.pid1)
self.fabric_run_mock.assert_any_call(
"ps -p %s --no-heading -o command | awk -F[{}] '{ print $2; }'" % self.pid2)
def test_abort_observation_task_should_run_kill_when_sas_id_matches(self):
self.observation_control_handler.abort_observation_task(self.sas_id)
self.fabric_run_mock.assert_any_call('kill -SIGINT %s' % self.pid1)
@mock.patch.dict(os.environ, {'LOFARENV': 'TEST'})
def test_observation_control_should_select_test_host_if_lofar_environment_is_test(self):
ObservationControlHandler()
self.assertEqual(self.fabric_env_mock.hosts, [config.TEST_OBSERVATION_CONTROL_HOST])
@mock.patch.dict(os.environ, {'LOFARENV': 'PRODUCTION'})
def test_observation_control_should_select_production_host_if_lofar_environment_is_production(self):
ObservationControlHandler()
self.assertEqual(self.fabric_env_mock.hosts, [config.PRODUCTION_OBSERVATION_CONTROL_HOST])
def test_observation_control_should_select_local_host_if_no_lofar_environment_is_set(self):
ObservationControlHandler()
self.assertEqual(self.fabric_env_mock.hosts, ["localhost"])
def test_abort_observation_should_execute_abort_observation_task_on_localhost(self):
self.observation_control_handler.abort_observation(self.sas_id)
self.fabric_tasks_mock.execute.assert_any_call(self.observation_control_handler.abort_observation_task,
self.sas_id)
def test_abort_observation_task_should_return_false_on_unknown_sas_id(self):
self.assertFalse(self.observation_control_handler.abort_observation_task("unknown"))
def test_abort_observation_task_should_return_true_on_known_sas_id(self):
self.assertTrue(self.observation_control_handler.abort_observation_task(self.sas_id))
def test_abort_observation_task_should_log_call(self):
self.observation_control_handler.abort_observation_task(self.sas_id)
self.logger_mock.info.assert_any_call("trying to abort ObservationControl for SAS ID: %s", self.sas_id)
def test_abort_observation_taks_should_log_the_kill(self):
self.observation_control_handler.abort_observation_task(self.sas_id)
self.logger_mock.info.assert_any_call(
"Killing ObservationControl with PID: %s for SAS ID: %s", self.pid1, self.sas_id)
def test_abort_observation_should_return_aborted_true_if_execute_returns_true(self):
self.fabric_tasks_mock.execute.return_value = {'localhost': True}
result = self.observation_control_handler.abort_observation(self.sas_id)
self.assertTrue(result['aborted'])
def test_abort_observation_should_return_aborted_false_if_execute_returns_false(self):
self.fabric_tasks_mock.execute.return_value = {'localhost': False}
result = self.observation_control_handler.abort_observation(self.sas_id)
self.assertFalse(result['aborted'])
@mock.patch('lofar.mac.ObservationControl2.ObservationControlHandler.abort_observation')
@mock.patch('lofar.messaging.messagebus.qpid.messaging')
def test_service_runs_abort_observation_task_when_called_with_AbortObservation_command(self, qpid_mock, abort_observation_mock):
message_id = str(uuid.uuid1())
test_id = 200
qpid_message = QpidMessage([{'sas_id': 100}],
reply_to="localhost",
properties={
"has_args": True,
"has_kwargs": True,
"SystemName": "LOFAR",
"MessageType": "RequestMessage",
"MessageId": message_id,
"status": "OK"
},
subject=".AbortObservation")
sender_mock = mock.MagicMock()
receiver_mock = mock.MagicMock()
receiver_mock.fetch.return_value = qpid_message
abort_observation_mock.return_value = {"aborted": True}
qpid_mock.Message = QpidMessage
qpid_mock.Connection().session().senders = [sender_mock]
qpid_mock.Connection().session().next_receiver.return_value = receiver_mock
with create_service() as service:
while receiver_mock.fetch.calles == 0:
pass
self.assertTrue(abort_observation_mock.called)
if __name__ == "__main__":
unittest.main()
\ No newline at end of file
#!/bin/sh
./runctest.sh tObservationControl2
\ No newline at end of file
...@@ -21,6 +21,77 @@ class MoMQueryRPC(RPCWrapper): ...@@ -21,6 +21,77 @@ class MoMQueryRPC(RPCWrapper):
timeout=120): timeout=120):
super(MoMQueryRPC, self).__init__(busname, servicename, broker, timeout=timeout) super(MoMQueryRPC, self).__init__(busname, servicename, broker, timeout=timeout)
def add_trigger(self, user_name, host_name, project_name, meta_data):
logger.info("Requestion AddTrigger for user_name: %s, host_name: %s, project_name: %s and meta_data: %s",
user_name, host_name, project_name, meta_data)
row_id = self.rpc('AddTrigger',
user_name=user_name, host_name=host_name, project_name=project_name, meta_data=meta_data)
logger.info("Received AddTrigger for user_name (%s), host_name(%s), project_name(%s) and meta_data(%s): %s",
user_name, host_name, project_name, meta_data, row_id)
return row_id
def get_project_priority(self, project_name):
logger.info("Requestion GetProjectPriority for project_name: %s", project_name)
priority = self.rpc('GetProjectPriority', project_name=project_name)
logger.info("Received GetProjectPriority for project_name (%s): %s", project_name, priority)
return priority
def allows_triggers(self, project_name):
"""returns whether a project is allowed to submit triggers
:param project_name:
:return: Boolean
"""
logger.info("Requesting AllowsTriggers for project_name: %s", project_name)
result = self.rpc('AllowsTriggers', project_name=project_name)
logger.info("Received AllowsTriggers for project_name (%s): %s", project_name, result)
return result
def authorized_add_with_status(self, user_name, project_name, job_type, status):
"""returns whether user is allowed in project to move a certain jobtype to a certain state
:param user_name:
:param project_name:
:param job_type:
:param status:
:return: Boolean
"""
logger.info("Requesting AutorizedAddWithStatus for user_name: %s project_name: %s job_type: %s status: %s",
user_name, project_name, job_type, status)
result = self.rpc('AutorizedAddWithStatus', user_name=user_name, project_name=project_name, job_type=job_type,
status=status)
logger.info(
"Received AutorizedAddWithStatus for user_name: %s project_name: %s job_type: %s status: %s result: %s",
user_name, project_name, job_type, status, result)
return result
def folderExists(self, folder):
"""returns true if folder exists
:param folder:
:return: Boolean
"""
logger.info("Requesting folder: %s exists", folder)
result = self.rpc('FolderExists', folder=folder)
logger.info("Received folder exists: %s", result)
return result
def isProjectActive(self, project_name):
"""returns true if project is available and active
:param project_name:
:return: Boolean
"""
logger.info("Requesting if project: %s is active", project_name)
result = self.rpc('IsProjectActive', project_name=project_name)
logger.info("Received Project is active: %s", result)
return result
def getProjectDetails(self, ids): def getProjectDetails(self, ids):
'''get the project details for one or more mom ids '''get the project details for one or more mom ids
:param ids single or list of mom ids :param ids single or list of mom ids
...@@ -30,7 +101,7 @@ class MoMQueryRPC(RPCWrapper): ...@@ -30,7 +101,7 @@ class MoMQueryRPC(RPCWrapper):
ids = [str(x) for x in ids] ids = [str(x) for x in ids]
ids_string = ', '.join(ids) ids_string = ', '.join(ids)
logger.info("Requesting details for mom objects: %s" % (str(ids_string))) logger.info("Requesting details for mom objects: %s", (str(ids_string)))
result = self.rpc('GetProjectDetails', mom_ids=ids_string) result = self.rpc('GetProjectDetails', mom_ids=ids_string)
result = convertStringDigitKeysToInt(result) result = convertStringDigitKeysToInt(result)
logger.info("Received details for %s mom objects" % (len(result))) logger.info("Received details for %s mom objects" % (len(result)))
...@@ -44,7 +115,7 @@ class MoMQueryRPC(RPCWrapper): ...@@ -44,7 +115,7 @@ class MoMQueryRPC(RPCWrapper):
for project in projects: for project in projects:
project['statustime'] = project['statustime'].datetime() project['statustime'] = project['statustime'].datetime()
logger.info("Received %s projects" % (len(projects))) logger.info("Received %s projects", (len(projects)))
return projects return projects
def getProject(self, project_mom2id): def getProject(self, project_mom2id):
......
This diff is collapsed.
# $Id: CMakeLists.txt 32679 2015-10-26 09:31:56Z schaap $ # $Id: CMakeLists.txt 32679 2015-10-26 09:31:56Z schaap $
include(LofarCTest) include(LofarCTest)
include(FindPythonModule)
find_python_module(mock REQUIRED)
lofar_add_test(test_momqueryservice) lofar_add_test(test_momqueryservice)
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment