diff --git a/.gitattributes b/.gitattributes index e6ed31fde7d1e3a104223d81e9b6daeac2ad58cf..b321e61a9713bfda220634b09a4cc1aa76e0d9f6 100644 --- a/.gitattributes +++ b/.gitattributes @@ -4524,8 +4524,14 @@ MAC/Navigator2/scripts/readStationConfigs.ctl -text MAC/Navigator2/scripts/readStationConnections.ctl -text MAC/Navigator2/scripts/setSumAlerts.ctl -text MAC/Navigator2/scripts/transferMPs.ctl -text +MAC/Services/src/ObservationControl2.py -text +MAC/Services/src/config.py -text +MAC/Services/src/observationcontrol2 -text +MAC/Services/src/observationcontrol2.ini -text MAC/Services/src/pipelinecontrol -text MAC/Services/src/pipelinecontrol.ini -text +MAC/Services/test/tObservationControl2.py -text +MAC/Services/test/tObservationControl2.sh -text MAC/Services/test/tPipelineControl.sh eol=lf MAC/Test/APL/PVSSproject/colorDB/Lofar[!!-~]colors -text svneol=native#application/octet-stream MAC/Test/APL/PVSSproject/colorDB/colorDB_de -text svneol=native#application/octet-stream diff --git a/LCS/PyCommon/dbcredentials.py b/LCS/PyCommon/dbcredentials.py index 7767790fd233831202059a7a42ca232c60ae57e1..4f3c0483b803690b5a8a365977dd292870003ef0 100644 --- a/LCS/PyCommon/dbcredentials.py +++ b/LCS/PyCommon/dbcredentials.py @@ -65,6 +65,9 @@ class Credentials: # Database selection self.database = "" + # All key-value pairs found in the config + self.config = {} + def __str__(self): return "type={type} addr={host}:{port} auth={user}:{password} db={database}".format(**self.__dict__) @@ -122,6 +125,8 @@ class Credentials: return options class DBCredentials: + NoSectionError = NoSectionError + def __init__(self, filepatterns=None): """ Read database credentials from all configuration files matched by any of the patterns. @@ -172,11 +177,11 @@ class DBCredentials: # create default credentials creds = Credentials() - # read configuration - try: - d = dict(self.config.items(self._section(database))) - except NoSectionError: - return creds + # read configuration (can throw NoSectionError) + d = dict(self.config.items(self._section(database))) + + # save the full config to support custom fields + creds.config = d # parse and convert config information if "host" in d: creds.host = d["host"] diff --git a/LCS/PyCommon/test/t_dbcredentials.py b/LCS/PyCommon/test/t_dbcredentials.py index 8c2d131d12773e506bf1e27d6601b899706966ef..17b303f9b5b53af7d7b80cd98c3188047db8511b 100644 --- a/LCS/PyCommon/test/t_dbcredentials.py +++ b/LCS/PyCommon/test/t_dbcredentials.py @@ -50,6 +50,12 @@ class TestDBCredentials(unittest.TestCase): self.assertEqual(str(c_out), str(c_in)) + def test_get_non_existing(self): + dbc = DBCredentials(filepatterns=[]) + + with self.assertRaises(DBCredentials.NoSectionError): + dbc.get("UNKNOWN") + def test_list(self): dbc = DBCredentials(filepatterns=[]) @@ -93,6 +99,22 @@ database = mydb self.assertEqual(str(c_out), str(c_in)) + def test_freeform_config_option(self): + f = tempfile.NamedTemporaryFile() + f.write(""" +[database:DATABASE] +foo = bar +test = word word +""") + f.flush() # don't close since that will delete the TemporaryFile + + # extract our config + dbc = DBCredentials(filepatterns=[f.name]) + c_out = dbc.get("DATABASE") + + # test if the free-form config options got through + self.assertEqual(c_out.config["foo"], "bar") + self.assertEqual(c_out.config["test"], "word word") def main(argv): unittest.main() diff --git a/MAC/Services/src/CMakeLists.txt b/MAC/Services/src/CMakeLists.txt index 692b7b5ce4ab157ee88c3d60c29c3c350723c0ab..286bde15ebb4af8fc1e9d443a55f8a9c8c2b1b00 100644 --- a/MAC/Services/src/CMakeLists.txt +++ b/MAC/Services/src/CMakeLists.txt @@ -1,15 +1,23 @@ # $Id$ +include(FindPythonModule) + +find_python_module(fabric REQUIRED) + lofar_add_bin_scripts( pipelinecontrol + observationcontrol2 ) python_install( PipelineControl.py + ObservationControl2.py + config.py DESTINATION lofar/mac ) # supervisord config files install(FILES pipelinecontrol.ini + observationcontrol2.ini DESTINATION etc/supervisord.d) diff --git a/MAC/Services/src/ObservationControl2.py b/MAC/Services/src/ObservationControl2.py new file mode 100644 index 0000000000000000000000000000000000000000..8868790e5bc1b466a88bd693ba6a0caa79dd2983 --- /dev/null +++ b/MAC/Services/src/ObservationControl2.py @@ -0,0 +1,119 @@ +#!/usr/bin/python +# +# Copyright (C) 2016 +# ASTRON (Netherlands Institute for Radio Astronomy) +# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. +import os +import logging +from fabric import tasks +from optparse import OptionParser + +from fabric.api import env, run +from lofar.messaging import Service +from lofar.messaging import setQpidLogLevel +from lofar.common.util import waitForInterrupt +from lofar.messaging.Service import MessageHandlerInterface +import lofar.mac.config as config + +logger = logging.getLogger(__name__) + + +class ObservationControlHandler(MessageHandlerInterface): + def __init__(self, **kwargs): + super(ObservationControlHandler, self).__init__(**kwargs) + + self.service2MethodMap = { + 'AbortObservation': self.abort_observation + } + + env.hosts = ["localhost"] + + if os.environ.has_key("LOFARENV"): + lofar_environment = os.environ['LOFARENV'] + + if lofar_environment == "PRODUCTION": + env.hosts = [config.PRODUCTION_OBSERVATION_CONTROL_HOST] + elif lofar_environment == "TEST": + env.hosts = [config.TEST_OBSERVATION_CONTROL_HOST] + + def abort_observation_task(self, sas_id): + logger.info("trying to abort ObservationControl for SAS ID: %s", sas_id) + + killed = False + + pid_line = run('pidof ObservationControl') + pids = pid_line.split(' ') + + for pid in pids: + pid_sas_id = run("ps -p %s --no-heading -o command | awk -F[{}] '{ print $2; }'" % pid) + if pid_sas_id == sas_id: + logger.info("Killing ObservationControl with PID: %s for SAS ID: %s", pid, sas_id) + run('kill -SIGINT %s' % pid) + killed = True + + return killed + + def abort_observation(self, sas_id): + """ aborts an observation for a single sas_id """ + result = tasks.execute(self.abort_observation_task, sas_id) + aborted = True in result.values() + return {'aborted': aborted} + + def handle_message(self, msg): + pass + + +def create_service(bus_name=config.DEFAULT_OBSERVATION_CONTROL_BUS_NAME, + service_name=config.DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME, + broker=None, verbose=False): + return Service(service_name, + ObservationControlHandler, + busname=bus_name, + broker=broker, + use_service_methods=True, + numthreads=1, + handler_args={}, + verbose=verbose) + + +def main(): + # make sure we run in UTC timezone + import os + os.environ['TZ'] = 'UTC' + + # Check the invocation arguments + parser = OptionParser("%prog [options]", + description='runs the observationcontrol service') + parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost') + parser.add_option("-b", "--busname", dest="busname", type="string", default=config.DEFAULT_OBSERVATION_CONTROL_BUS_NAME, help="Name of the bus exchange on the qpid broker, default: %s" % config.DEFAULT_OBSERVATION_CONTROL_BUS_NAME) + parser.add_option("-s", "--servicename", dest="servicename", type="string", default=config.DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME, help="Name for this service, default: %s" % config.DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME) + parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging') + (options, args) = parser.parse_args() + + setQpidLogLevel(logging.INFO) + logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', + level=logging.DEBUG if options.verbose else logging.INFO) + + with create_service(bus_name=options.busname, + service_name=options.servicename, + broker=options.broker, + verbose=options.verbose): + waitForInterrupt() + + +if __name__ == '__main__': + main() diff --git a/MAC/Services/src/config.py b/MAC/Services/src/config.py new file mode 100644 index 0000000000000000000000000000000000000000..a4976653ef12279087991940956c1981292e2ad7 --- /dev/null +++ b/MAC/Services/src/config.py @@ -0,0 +1,26 @@ +#!/usr/bin/python +# +# Copyright (C) 2016 +# ASTRON (Netherlands Institute for Radio Astronomy) +# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. + +from lofar.messaging import adaptNameToEnvironment + +DEFAULT_OBSERVATION_CONTROL_BUS_NAME = adaptNameToEnvironment('lofar.mac.command') +DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME = 'ObservationControl2' +PRODUCTION_OBSERVATION_CONTROL_HOST = "mcu001.control.lofar" +TEST_OBSERVATION_CONTROL_HOST = "mcu099.control.lofar" diff --git a/MAC/Services/src/observationcontrol2 b/MAC/Services/src/observationcontrol2 new file mode 100644 index 0000000000000000000000000000000000000000..7c08d7da4f548b53e289169f35a6a875f89c0ff7 --- /dev/null +++ b/MAC/Services/src/observationcontrol2 @@ -0,0 +1,27 @@ +#!/usr/bin/python +# +# Copyright (C) 2016 +# ASTRON (Netherlands Institute for Radio Astronomy) +# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. + +''' +runs the ObservationControl service +''' +from lofar.mac import ObservationControl2 + +if __name__ == '__main__': + ObservationControl2.main() diff --git a/MAC/Services/src/observationcontrol2.ini b/MAC/Services/src/observationcontrol2.ini new file mode 100644 index 0000000000000000000000000000000000000000..ba538bf84860db403624c00d2fc4891b9b6f3df5 --- /dev/null +++ b/MAC/Services/src/observationcontrol2.ini @@ -0,0 +1,8 @@ +[program:observationcontrol2] +command=/bin/bash -c 'source $LOFARROOT/lofarinit.sh;exec observationcontrol2' +user=lofarsys +stopsignal=INT ; KeyboardInterrupt +stopasgroup=true ; bash does not propagate signals +stdout_logfile=%(program_name)s.log +redirect_stderr=true +stderr_logfile=NONE diff --git a/MAC/Services/test/CMakeLists.txt b/MAC/Services/test/CMakeLists.txt index b1f0434a8f1707e19fd427b3127dd71721809014..54988403fb1651e60c3f33cbcb02c9ba713d740d 100644 --- a/MAC/Services/test/CMakeLists.txt +++ b/MAC/Services/test/CMakeLists.txt @@ -1,8 +1,11 @@ # $Id$ include(LofarCTest) +include(FindPythonModule) lofar_find_package(Python REQUIRED) +find_python_module(mock REQUIRED) + lofar_add_test(tPipelineControl) diff --git a/MAC/Services/test/tObservationControl2.py b/MAC/Services/test/tObservationControl2.py new file mode 100644 index 0000000000000000000000000000000000000000..f3e127b7ec6162d55b9096c27b15f65863085a1c --- /dev/null +++ b/MAC/Services/test/tObservationControl2.py @@ -0,0 +1,157 @@ +import unittest +import uuid + +import mock +import os + +import time +from lofar.mac.ObservationControl2 import ObservationControlHandler, create_service +from qpid.messaging.message import Message as QpidMessage +import lofar.mac.config as config + +class TestObservationControlHandler(unittest.TestCase): + pid1 = "1000" + pid2 = "2000" + + sas_id = "100" + + def _run_side_effect(self, cmd): + if cmd.startswith("ps -p %s" % self.pid1): + return self.sas_id + elif cmd.startswith("ps -p %s" % self.pid2): + return self.sas_id + "10" + elif cmd.startswith("pidof"): + return "%s %s" % (self.pid1, self.pid2) + elif cmd.startswith("kill"): + return "" + + def setUp(self): + fabric_run_pathcher = mock.patch('lofar.mac.ObservationControl2.run') + self.addCleanup(fabric_run_pathcher.stop) + self.fabric_run_mock = fabric_run_pathcher.start() + + self.fabric_run_mock.side_effect = self._run_side_effect + + fabric_tasks_pathcher = mock.patch('lofar.mac.ObservationControl2.tasks') + self.addCleanup(fabric_tasks_pathcher.stop) + self.fabric_tasks_mock = fabric_tasks_pathcher.start() + + fabric_env_pathcher = mock.patch('lofar.mac.ObservationControl2.env') + self.addCleanup(fabric_env_pathcher.stop) + self.fabric_env_mock = fabric_env_pathcher.start() + + logger_patcher = mock.patch('lofar.mac.ObservationControl2.logger') + self.addCleanup(logger_patcher.stop) + self.logger_mock = logger_patcher.start() + + self.observation_control_handler = ObservationControlHandler() + + def test_abort_observation_task_should_run_pidof_ObservationControl(self): + self.observation_control_handler.abort_observation_task(self.sas_id) + + self.fabric_run_mock.assert_any_call('pidof ObservationControl') + + def test_abort_observation_tasks_should_run_ps_to_find_sas_id_on_command(self): + self.observation_control_handler.abort_observation_task(self.sas_id) + + self.fabric_run_mock.assert_any_call( + "ps -p %s --no-heading -o command | awk -F[{}] '{ print $2; }'" % self.pid1) + self.fabric_run_mock.assert_any_call( + "ps -p %s --no-heading -o command | awk -F[{}] '{ print $2; }'" % self.pid2) + + def test_abort_observation_task_should_run_kill_when_sas_id_matches(self): + self.observation_control_handler.abort_observation_task(self.sas_id) + + self.fabric_run_mock.assert_any_call('kill -SIGINT %s' % self.pid1) + + @mock.patch.dict(os.environ, {'LOFARENV': 'TEST'}) + def test_observation_control_should_select_test_host_if_lofar_environment_is_test(self): + ObservationControlHandler() + + self.assertEqual(self.fabric_env_mock.hosts, [config.TEST_OBSERVATION_CONTROL_HOST]) + + @mock.patch.dict(os.environ, {'LOFARENV': 'PRODUCTION'}) + def test_observation_control_should_select_production_host_if_lofar_environment_is_production(self): + ObservationControlHandler() + + self.assertEqual(self.fabric_env_mock.hosts, [config.PRODUCTION_OBSERVATION_CONTROL_HOST]) + + def test_observation_control_should_select_local_host_if_no_lofar_environment_is_set(self): + ObservationControlHandler() + + self.assertEqual(self.fabric_env_mock.hosts, ["localhost"]) + + def test_abort_observation_should_execute_abort_observation_task_on_localhost(self): + self.observation_control_handler.abort_observation(self.sas_id) + + self.fabric_tasks_mock.execute.assert_any_call(self.observation_control_handler.abort_observation_task, + self.sas_id) + + def test_abort_observation_task_should_return_false_on_unknown_sas_id(self): + self.assertFalse(self.observation_control_handler.abort_observation_task("unknown")) + + def test_abort_observation_task_should_return_true_on_known_sas_id(self): + self.assertTrue(self.observation_control_handler.abort_observation_task(self.sas_id)) + + def test_abort_observation_task_should_log_call(self): + self.observation_control_handler.abort_observation_task(self.sas_id) + + self.logger_mock.info.assert_any_call("trying to abort ObservationControl for SAS ID: %s", self.sas_id) + + def test_abort_observation_taks_should_log_the_kill(self): + self.observation_control_handler.abort_observation_task(self.sas_id) + + self.logger_mock.info.assert_any_call( + "Killing ObservationControl with PID: %s for SAS ID: %s", self.pid1, self.sas_id) + + def test_abort_observation_should_return_aborted_true_if_execute_returns_true(self): + self.fabric_tasks_mock.execute.return_value = {'localhost': True} + + result = self.observation_control_handler.abort_observation(self.sas_id) + + self.assertTrue(result['aborted']) + + def test_abort_observation_should_return_aborted_false_if_execute_returns_false(self): + self.fabric_tasks_mock.execute.return_value = {'localhost': False} + + result = self.observation_control_handler.abort_observation(self.sas_id) + + self.assertFalse(result['aborted']) + + @mock.patch('lofar.mac.ObservationControl2.ObservationControlHandler.abort_observation') + @mock.patch('lofar.messaging.messagebus.qpid.messaging') + def test_service_runs_abort_observation_task_when_called_with_AbortObservation_command(self, qpid_mock, abort_observation_mock): + message_id = str(uuid.uuid1()) + test_id = 200 + + qpid_message = QpidMessage([{'sas_id': 100}], + reply_to="localhost", + properties={ + "has_args": True, + "has_kwargs": True, + "SystemName": "LOFAR", + "MessageType": "RequestMessage", + "MessageId": message_id, + "status": "OK" + }, + subject=".AbortObservation") + + sender_mock = mock.MagicMock() + receiver_mock = mock.MagicMock() + + receiver_mock.fetch.return_value = qpid_message + abort_observation_mock.return_value = {"aborted": True} + + qpid_mock.Message = QpidMessage + qpid_mock.Connection().session().senders = [sender_mock] + qpid_mock.Connection().session().next_receiver.return_value = receiver_mock + + with create_service() as service: + while receiver_mock.fetch.calles == 0: + pass + + self.assertTrue(abort_observation_mock.called) + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/MAC/Services/test/tObservationControl2.sh b/MAC/Services/test/tObservationControl2.sh new file mode 100644 index 0000000000000000000000000000000000000000..6634f8db752024d72592c6d3c690b367522ee263 --- /dev/null +++ b/MAC/Services/test/tObservationControl2.sh @@ -0,0 +1,2 @@ +#!/bin/sh +./runctest.sh tObservationControl2 \ No newline at end of file diff --git a/SAS/MoM/MoMQueryService/momqueryrpc.py b/SAS/MoM/MoMQueryService/momqueryrpc.py index 62927291e9c2e68d911b936c44586eb93e0beb67..c5868c8665f74d7e90b218f4e6dbce29fd761a50 100644 --- a/SAS/MoM/MoMQueryService/momqueryrpc.py +++ b/SAS/MoM/MoMQueryService/momqueryrpc.py @@ -21,6 +21,77 @@ class MoMQueryRPC(RPCWrapper): timeout=120): super(MoMQueryRPC, self).__init__(busname, servicename, broker, timeout=timeout) + def add_trigger(self, user_name, host_name, project_name, meta_data): + logger.info("Requestion AddTrigger for user_name: %s, host_name: %s, project_name: %s and meta_data: %s", + user_name, host_name, project_name, meta_data) + + row_id = self.rpc('AddTrigger', + user_name=user_name, host_name=host_name, project_name=project_name, meta_data=meta_data) + + logger.info("Received AddTrigger for user_name (%s), host_name(%s), project_name(%s) and meta_data(%s): %s", + user_name, host_name, project_name, meta_data, row_id) + + return row_id + + def get_project_priority(self, project_name): + logger.info("Requestion GetProjectPriority for project_name: %s", project_name) + + priority = self.rpc('GetProjectPriority', project_name=project_name) + + logger.info("Received GetProjectPriority for project_name (%s): %s", project_name, priority) + + return priority + + def allows_triggers(self, project_name): + """returns whether a project is allowed to submit triggers + :param project_name: + :return: Boolean + """ + logger.info("Requesting AllowsTriggers for project_name: %s", project_name) + + result = self.rpc('AllowsTriggers', project_name=project_name) + + logger.info("Received AllowsTriggers for project_name (%s): %s", project_name, result) + + return result + + def authorized_add_with_status(self, user_name, project_name, job_type, status): + """returns whether user is allowed in project to move a certain jobtype to a certain state + :param user_name: + :param project_name: + :param job_type: + :param status: + :return: Boolean + """ + logger.info("Requesting AutorizedAddWithStatus for user_name: %s project_name: %s job_type: %s status: %s", + user_name, project_name, job_type, status) + result = self.rpc('AutorizedAddWithStatus', user_name=user_name, project_name=project_name, job_type=job_type, + status=status) + logger.info( + "Received AutorizedAddWithStatus for user_name: %s project_name: %s job_type: %s status: %s result: %s", + user_name, project_name, job_type, status, result) + return result + + def folderExists(self, folder): + """returns true if folder exists + :param folder: + :return: Boolean + """ + logger.info("Requesting folder: %s exists", folder) + result = self.rpc('FolderExists', folder=folder) + logger.info("Received folder exists: %s", result) + return result + + def isProjectActive(self, project_name): + """returns true if project is available and active + :param project_name: + :return: Boolean + """ + logger.info("Requesting if project: %s is active", project_name) + result = self.rpc('IsProjectActive', project_name=project_name) + logger.info("Received Project is active: %s", result) + return result + def getProjectDetails(self, ids): '''get the project details for one or more mom ids :param ids single or list of mom ids @@ -30,7 +101,7 @@ class MoMQueryRPC(RPCWrapper): ids = [str(x) for x in ids] ids_string = ', '.join(ids) - logger.info("Requesting details for mom objects: %s" % (str(ids_string))) + logger.info("Requesting details for mom objects: %s", (str(ids_string))) result = self.rpc('GetProjectDetails', mom_ids=ids_string) result = convertStringDigitKeysToInt(result) logger.info("Received details for %s mom objects" % (len(result))) @@ -44,7 +115,7 @@ class MoMQueryRPC(RPCWrapper): for project in projects: project['statustime'] = project['statustime'].datetime() - logger.info("Received %s projects" % (len(projects))) + logger.info("Received %s projects", (len(projects))) return projects def getProject(self, project_mom2id): diff --git a/SAS/MoM/MoMQueryService/momqueryservice.py b/SAS/MoM/MoMQueryService/momqueryservice.py index bc440f5fa085b497bec9fd4b48e9ad10e695a651..e64ffe1650e60dc55dca076b8b4ffba854de1cd4 100755 --- a/SAS/MoM/MoMQueryService/momqueryservice.py +++ b/SAS/MoM/MoMQueryService/momqueryservice.py @@ -18,13 +18,16 @@ with RPC(busname, 'GetProjectDetails') as getProjectDetails: res, status = getProjectDetails(ids_string) ''' -from os import stat -import sys +#import os +#from os import stat +#import sys import logging -import time +#import time from optparse import OptionParser from mysql import connector from mysql.connector.errors import OperationalError + +#from django.db.models import query from lofar.messaging import Service from lofar.messaging.Service import MessageHandlerInterface from lofar.common.util import waitForInterrupt @@ -34,6 +37,7 @@ from lofar.common.util import convertIntKeysToString logger=logging.getLogger(__file__) + def _idsFromString(id_string): if not isinstance(id_string, basestring): raise ValueError('Expected a string, got a ' + str(type(id_string))) @@ -43,6 +47,7 @@ def _idsFromString(id_string): ids = [int(y) for y in [x.strip() for x in id_string.split(',')] if y.isdigit()] return ids + def _isListOfInts(items): if not items: return False @@ -70,12 +75,16 @@ def _toIdsString(ids): ids_str = ','.join([str(id) for id in ids_list]) return ids_str + class MoMDatabaseWrapper: - '''handler class for details query in mom db''' + """handler class for details query in mom db""" def __init__(self, dbcreds): self.dbcreds = dbcreds self.conn = None + self.useradministration_db = dbcreds.config["useradministration_database"] + self.momprivilidge_db = dbcreds.config["momprivilege_database"] + def _connect(self): if self.conn: self.conn.close() @@ -83,27 +92,252 @@ class MoMDatabaseWrapper: connect_options = self.dbcreds.mysql_connect_options() connect_options['connection_timeout'] = 5 try: - logger.info("Connecting to %s" % self.dbcreds.stringWithHiddenPassword()) + logger.info("Connecting to %s", self.dbcreds.stringWithHiddenPassword()) self.conn = connector.connect(**connect_options) - logger.debug("Connected to %s" % self.dbcreds.stringWithHiddenPassword()) + logger.debug("Connected to %s", self.dbcreds.stringWithHiddenPassword()) except Exception as e: logger.error(str(e)) self.conn = None - def _executeQuery(self, query): + def _executeSelectQuery(self, query, data=None): # try to execute query on flaky lofar mysql connection - # max of 3 tries, on succes return result + # max of 3 tries, on success return result # use new connection for every query, # because on the flaky lofar network a connection may appear functional but returns improper results. for i in range(3): try: self._connect() cursor = self.conn.cursor(dictionary=True) - cursor.execute(query) + cursor.execute(query, data) return cursor.fetchall() except (OperationalError, AttributeError) as e: logger.error(str(e)) + def _executeInsertQuery(self, query, data=None): + # try to execute query on flaky lofar mysql connection + # max of 3 tries, on success return result + # use new connection for every query, + # because on the flaky lofar network a connection may appear functional but returns improper results. + for i in range(3): + try: + self._connect() + cursor = self.conn.cursor(dictionary=True) + cursor.execute(query, data) + self.conn.commit() + return cursor.lastrowid + except (OperationalError, AttributeError) as e: + logger.error(str(e)) + + def add_trigger(self, user_name, host_name, project_name, meta_data): + logger.info("add_trigger for user_name: %s, host_name: %s, project_name: %s, meta_data: %s", + user_name, host_name, project_name, meta_data) + + query = """insert into lofar_trigger (username, hostname, projectname, metadata) +values (%s, %s, %s, %s)""" + parameters = (user_name, host_name, project_name, meta_data) + + row_id = self._executeInsertQuery(query, parameters) + + logger.info("add_trigger for user_name(%s), host_name(%s), project_name(%s), meta_data(%s): %s", + user_name, host_name, project_name, meta_data, row_id) + + return row_id + + def get_project_priority(self, project_name): + logger.info("get_project_priority for project_name: %s", project_name) + + query = """SELECT priority FROM project +join mom2object on project.mom2objectid=mom2object.id +where mom2object.name = %s""" + parameters = (project_name, ) + + rows = self._executeSelectQuery(query, parameters) + + if not rows: + raise ValueError("project name (%s) not found in MoM database" % project_name) + + priority = rows[0]['priority'] + + logger.info("get_project_priority for project_name (%s): %s", project_name, priority) + + return priority + + def allows_triggers(self, project_name): + """returns whether a project is allowed to submit triggers + :param project_name: + :return: Boolean + """ + logger.info("allows_triggers for project_name: %s", project_name) + + query = """SELECT allowtriggers FROM project +join mom2object on project.mom2objectid=mom2object.id +where mom2object.name = %s""" + parameters = (project_name, ) + + rows = self._executeSelectQuery(query, parameters) + + if not rows: + raise ValueError("project name (%s) not found in MoM database" % project_name) + + allows = rows[0]['allowtriggers'] + + logger.info("allows_triggers for project_name (%s) result: %s", project_name, allows) + + return allows + + def authorized_add_with_status(self, user_name, project_name, job_type, status): + """returns whether user is allowed in project to move a certain jobtype to a certain state + :param user_name: + :param project_name: + :param job_type: should be either 'observation', 'ingest' or 'pipeline' + :param status: status should be either 'opened' or 'approved' + :return: Boolean + """ + if status not in ['opened', 'approved']: + raise ValueError("status should be either 'opened' or 'approved'") + + if job_type not in ['observation', 'ingest', 'pipeline']: + raise ValueError("job_type should be either 'observation', 'ingest' or 'pipeline'") + + logger.info("authorized_add_with_status for user_name: %s project_name: %s job_type: %s status: %s", + user_name, project_name, job_type, status) + + status_type = { + 'observation': 'OBSERVATION', + 'ingest': 'EXPORT', + 'pipeline': 'POSTPROCESS' + } + + # query have opened status hardcoded because this is domain knowledge and works for the current requirements. + # If more status transitions are needed this query will be more complex + # The or on the status will then not be valid anymore. + query_system_rights = """SELECT 1 FROM """ + self.useradministration_db + """.useraccount as useraccount + join """ + self.useradministration_db + """.useraccountsystemrole as system_role on useraccount.userid=system_role.useraccountid + join """ + self.momprivilidge_db + """.statustransitionrole as transition_role on system_role.systemroleid=transition_role.roleid + join """ + self.momprivilidge_db + """.statustransition as transition on transition_role.statustransitionid=transition.id + join status as open_status on open_status.code='opened' + join status as status on status.id=transition.newstatusid and (transition.oldstatusid=0 or transition.oldstatusid=open_status.id) + where status.code=%s and + status.type='""" + status_type[job_type] + """' and + open_status.type='""" + status_type[job_type] + """' and + transition_role.roletype="nl.astron.useradministration.data.entities.SystemRole" and + useraccount.username=%s""" + parameters = (status, user_name) + + rows_system_rights = self._executeSelectQuery(query_system_rights, parameters) + + # query have opened status hardcoded because this is domain knowledge and works for the current requirements. + # If more status transitions are needed this query will be more complex. + # The or on the status will then not be valid anymore. + query_project_rights = """SELECT 1 FROM mom2object as project + join member as member on member.projectid=project.id + join registeredmember as registered_member on registered_member.memberid=member.id + join """ + self.useradministration_db + """.useraccount as useraccount on registered_member.userid=useraccount.id + join memberprojectrole as member_project_role on member_project_role.memberid=member.id + join projectrole as project_role on project_role.id=member_project_role.projectroleid + join """ + self.momprivilidge_db + """.statustransitionrole as transition_role on project_role.id=transition_role.roleid + join """ + self.momprivilidge_db + """.statustransition as transition on transition_role.statustransitionid=transition.id + join status as open_status on open_status.code='opened' + join status as status on status.id=transition.newstatusid and (transition.oldstatusid=0 or transition.oldstatusid=open_status.id) + where status.code=%s and + status.type='""" + status_type[job_type] + """' and + open_status.type='""" + status_type[job_type] + """' and + transition_role.roletype="nl.astron.mom2.data.entities.ProjectRole" and + useraccount.username=%s and + project.name=%s""" + parameters = (status, user_name, project_name) + + rows_project_rights = self._executeSelectQuery(query_project_rights, parameters) + + authorized = len(rows_system_rights) != 0 or len(rows_project_rights) != 0 + + logger.info("authorized_add_with_status for user_name: %s project_name: %s job_type: %s status: %s result: %s", + user_name, project_name, job_type, status, authorized) + + return authorized + + def folder_exists(self, folder_path): + """ returns true if folder exists + :param folder_path: + :return: Boolean + """ + try: + logger.info("folder_exists for folder: %s", folder_path) + + project_name, folders = self._get_project_name_and_folders(folder_path) + + query = self._build_folder_exists_query(len(folders)) + parameters = tuple(folders) + (project_name, ) + + rows = self._executeSelectQuery(query, parameters) + + exists = len(rows) != 0 + + logger.info("folder_exists for folder (%s): %s", folder_path, exists) + + return exists + except ValueError as exception: + logger.error("Folder path is incorrect: %s", exception.message) + return False + + def _get_project_name_and_folders(self, folder_path): + if not folder_path.startswith('/'): + raise ValueError("Folder path (%s) does not start with a /" % folder_path) + + path_parts = folder_path.split('/') + + project_name = path_parts[1] + + if project_name == "": + raise ValueError("Folder path (%s) should minimally have a project") + + if len(path_parts) > 2: + if path_parts[-1] == "": + folders = path_parts[2:-1] + else: + folders = path_parts[2:] + else: + folders = [] + + return project_name, folders + + def _build_folder_exists_query(self, folder_count): + query = """SELECT 1\nFROM mom2object as project """ + parent_id = "project" + + for index in xrange(folder_count): + folder_alias = "folder%s" % index + query += """\njoin mom2object as """ + folder_alias + """ on + """ + folder_alias + """.parentid=""" + parent_id + """.id and + """ + folder_alias + """.mom2objecttype="FOLDER" and + """ + folder_alias + """.name=%s""" + parent_id = folder_alias + + query += """\nwhere project.mom2objecttype="PROJECT" and project.name=%s""" + + return query + + def is_project_active(self, project_name): + """ returns true if project is available and active + :param project_name: + :return: Boolean + """ + logger.info("is_project_active for project name: %s", project_name) + + query = """SELECT 1 + FROM mom2object as project + left join mom2objectstatus as status on project.currentstatusid = status.id + where project.mom2objecttype='PROJECT' and status.statusid = 7 and project.name = %s;""" + parameters = (project_name, ) + + rows = self._executeSelectQuery(query, parameters) + + is_active = len(rows) != 0 + + logger.info("is_project_active for project (%s): %s", project_name, is_active) + + return is_active + def getProjectDetails(self, mom_ids): ''' get the project details (project_mom2id, project_name, project_description, object_mom2id, object_name, object_description, @@ -116,7 +350,7 @@ class MoMDatabaseWrapper: ids_str = _toIdsString(mom_ids) - logger.info("getProjectDetails for mom ids: %s" % ids_str) + logger.info("getProjectDetails for mom ids: %s", ids_str) # TODO: make a view for this query in momdb! query = '''SELECT project.mom2id as project_mom2id, project.id as project_mom2objectid, project.name as project_name, project.description as project_description, @@ -131,11 +365,11 @@ class MoMDatabaseWrapper: left join mom2object as parent_grp on parent_grp.id = grp.parentid where object.mom2id in (%s) order by project_mom2id - ''' % (ids_str,) - rows = self._executeQuery(query) + ''' + parameters = (ids_str, ) + rows = self._executeSelectQuery(query, parameters) - logger.info("Found %d results for mom id(s): %s" % - (len(rows) if rows else 0, ids_str)) + logger.info("Found %d results for mom id(s): %s", (len(rows) if rows else 0, ids_str)) result = {} for row in rows: @@ -162,9 +396,9 @@ class MoMDatabaseWrapper: where project.mom2objecttype='PROJECT' order by mom2id; ''' - result = self._executeQuery(query) + result = self._executeSelectQuery(query) - logger.info("Found %d projects" % (len(result), )) + logger.info("Found %d projects", (len(result), )) return result @@ -173,7 +407,7 @@ class MoMDatabaseWrapper: project_description, status_name, status_id, last_user_id, last_user_name, statustime) ''' - ids_str = _toIdsString(mom_ids) + ids_str = _toIdsString(project_mom2id) # TODO: make a view for this query in momdb! query = '''SELECT project.mom2id as mom2id, project.name as name, project.description as description, @@ -184,8 +418,10 @@ class MoMDatabaseWrapper: left join status as statustype on status.statusid=statustype.id where project.mom2objecttype='PROJECT' and project.mom2id = %s order by mom2id; - ''' % (ids_str) - result = self._executeQuery(query) + ''' + parameters = (ids_str, ) + + result = self._executeSelectQuery(query, parameters) return result @@ -195,14 +431,15 @@ class MoMDatabaseWrapper: ids_str = _toIdsString(project_mom2id) - logger.info("getProjectTaskIds for project_mom2id: %s" % ids_str) + logger.info("getProjectTaskIds for project_mom2id: %s", ids_str) query = '''SELECT tasks.mom2id FROM mom2object tasks inner join mom2object project on project.id = tasks.ownerprojectid where project.mom2id = %s and - (tasks.mom2objecttype = 'LOFAR_OBSERVATION' or tasks.mom2objecttype like \'%%PIPELINE%%\');''' % ids_str + (tasks.mom2objecttype = 'LOFAR_OBSERVATION' or tasks.mom2objecttype like \'%%PIPELINE%%\');''' + parameters = (ids_str, ) - rows = self._executeQuery(query) + rows = self._executeSelectQuery(query, parameters) result = { 'project_mom2id': project_mom2id, 'task_mom2ids': [r['mom2id'] for r in rows]} @@ -216,14 +453,16 @@ class MoMDatabaseWrapper: ids_str = _toIdsString(mom_ids) - logger.info("getPredecessorIds for mom ids: %s" % ids_str) + logger.info("getPredecessorIds for mom ids: %s", ids_str) query = '''SELECT mom2id, predecessor FROM mom2object where mom2id in (%s) order by mom2id; - ''' % (ids_str,) - rows = self._executeQuery(query) + ''' + parameters = (ids_str, ) + + rows = self._executeSelectQuery(query, parameters) result = {} for row in rows: @@ -247,7 +486,7 @@ class MoMDatabaseWrapper: ids_str = _toIdsString(mom_ids) - logger.info("getSuccessorIds for mom ids: %s" % ids_str) + logger.info("getSuccessorIds for mom ids: %s", ids_str) condition = ' OR '.join(['predecessor LIKE \'%%M%s%%\'' % x for x in ids_str.split(',')]) @@ -256,8 +495,9 @@ class MoMDatabaseWrapper: FROM mom2object where %s order by mom2id; - ''' % (condition,) - rows = self._executeQuery(query) + ''' + parameters = (condition, ) + rows = self._executeSelectQuery(query, parameters) result = {} for mom2id in ids_str.split(','): @@ -336,13 +576,14 @@ class MoMDatabaseWrapper: ids_str = _toIdsString(mom_group_ids) - logger.info("getTaskIdsInGroup for mom group ids: %s" % ids_str) + logger.info("getTaskIdsInGroup for mom group ids: %s", ids_str) query = '''SELECT mom2id, group_id FROM mom2object where group_id in (%s) - and (mom2objecttype = 'LOFAR_OBSERVATION' or mom2objecttype like \'%%PIPELINE%%\')''' % ids_str + and (mom2objecttype = 'LOFAR_OBSERVATION' or mom2objecttype like \'%%PIPELINE%%\')''' + parameters = (ids_str, ) - rows = self._executeQuery(query) + rows = self._executeSelectQuery(query, parameters) result = {} for group_id in ids_str.split(','): @@ -363,16 +604,17 @@ class MoMDatabaseWrapper: ids_str = _toIdsString(mom_parent_group_ids) - logger.debug("getGroupsInParentGroup for mom parent group ids: %s" % ids_str) + logger.debug("getGroupsInParentGroup for mom parent group ids: %s", ids_str) query = '''SELECT parent.id as parent_mom2object_id, parent.mom2id as parent_mom2id, grp.mom2id as group_mom2id, grp.id as group_mom2object_id, grp.name as group_name, grp.description as group_description from mom2object parent inner join mom2object grp on parent.id = grp.parentid where parent.mom2id in (%s) - and grp.group_id = grp.mom2id''' % ids_str + and grp.group_id = grp.mom2id''' + parameters = (ids_str, ) - rows = self._executeQuery(query) + rows = self._executeSelectQuery(query, parameters) result = {} for parent_group_id in ids_str.split(','): @@ -392,7 +634,7 @@ class MoMDatabaseWrapper: ids_str = _toIdsString(mom_parent_group_ids) - logger.debug("getTaskIdsInParentGroup for mom parent group ids: %s" % ids_str) + logger.debug("getTaskIdsInParentGroup for mom parent group ids: %s", ids_str) groups_result = self.getGroupsInParentGroup(ids_str) @@ -499,7 +741,7 @@ class MoMDatabaseWrapper: ids_str = _toIdsString(mom_ids) - logger.info("getDataProducts for mom ids: %s" % ids_str) + logger.info("getDataProducts for mom ids: %s", ids_str) query = '''SELECT mo.id as momobject_id, mo.mom2id as mom2id, mop.id as parent_momobject_id, mop.mom2id as parent_mom2id, @@ -512,7 +754,7 @@ class MoMDatabaseWrapper: or mo.parentid in (SELECT mo_parent.id FROM mom2object mo_parent where mo_parent.mom2id in (%s))) ''' % (ids_str, ids_str) - rows = self._executeQuery(query) + rows = self._executeSelectQuery(query) result = {} for mom2id in ids_str.split(','): @@ -532,6 +774,9 @@ class MoMDatabaseWrapper: return result + pass + + class ProjectDetailsQueryHandler(MessageHandlerInterface): '''handler class for details query in mom db :param MoMDatabaseWrapper momdb inject database access via wrapper @@ -541,6 +786,12 @@ class ProjectDetailsQueryHandler(MessageHandlerInterface): self.dbcreds = kwargs.pop("dbcreds", None) self.service2MethodMap = { + 'AddTrigger': self.add_trigger, + 'GetProjectPriority': self.get_project_priority, + 'AllowsTriggers': self.allows_triggers, + 'AutorizedAddWithStatus': self.authorized_add_with_status, + 'FolderExists': self.folder_exists, + 'IsProjectActive': self.is_project_active, 'GetProjects': self.getProjects, 'GetProject': self.getProject, 'GetProjectDetails': self.getProjectDetails, @@ -558,6 +809,30 @@ class ProjectDetailsQueryHandler(MessageHandlerInterface): def prepare_loop(self): self.momdb = MoMDatabaseWrapper(self.dbcreds) + def add_trigger(self, user_name, host_name, project_name, meta_data): + row_id = self.momdb.add_trigger(user_name, host_name, project_name, meta_data) + return {"row_id": row_id} + + def get_project_priority(self, project_name): + priority = self.momdb.get_project_priority(project_name) + return {"priority": priority} + + def allows_triggers(self, project_name): + allows = self.momdb.allows_triggers(project_name) + return {"allows": allows} + + def authorized_add_with_status(self, user, project, jobtype, status): + authorized = self.momdb.authorized_add_with_status(user, project, jobtype, status) + return {"authorized": authorized} + + def folder_exists(self, folder): + exists = self.momdb.folder_exists(folder) + return {"exists": exists} + + def is_project_active(self, project_name): + is_active = self.momdb.is_project_active(project_name) + return {"active": is_active} + def getProjectDetails(self, mom_ids): return convertIntKeysToString(self.momdb.getProjectDetails(mom_ids)) @@ -634,7 +909,7 @@ def main(): dbcreds = dbcredentials.parse_options(options) - logger.info("Using dbcreds: %s" % dbcreds.stringWithHiddenPassword()) + logger.info("Using dbcreds: %s", dbcreds.stringWithHiddenPassword()) # start the service and listen. with createService(busname=options.busname, diff --git a/SAS/MoM/MoMQueryService/test/CMakeLists.txt b/SAS/MoM/MoMQueryService/test/CMakeLists.txt index b337f99844032cb115fcabde61d087f445e8683a..d48ea94dc154e53878f76bf290aa1ef04506b85c 100644 --- a/SAS/MoM/MoMQueryService/test/CMakeLists.txt +++ b/SAS/MoM/MoMQueryService/test/CMakeLists.txt @@ -1,5 +1,8 @@ # $Id: CMakeLists.txt 32679 2015-10-26 09:31:56Z schaap $ include(LofarCTest) +include(FindPythonModule) + +find_python_module(mock REQUIRED) lofar_add_test(test_momqueryservice) diff --git a/SAS/MoM/MoMQueryService/test/test_momqueryservice.py b/SAS/MoM/MoMQueryService/test/test_momqueryservice.py index bb4043471966618a082133fbb17404f828a69705..96bda5156996d911f49ce26a0246548747103849 100755 --- a/SAS/MoM/MoMQueryService/test/test_momqueryservice.py +++ b/SAS/MoM/MoMQueryService/test/test_momqueryservice.py @@ -18,61 +18,1019 @@ # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # $Id: $ - import unittest +import mock +import testing.mysqld import uuid -import logging -from lofar.mom.momqueryservice.momqueryservice import createService -from lofar.mom.momqueryservice.momqueryservice import ProjectDetailsQueryHandler +from mysql import connector + +from lofar.common.dbcredentials import Credentials from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_SERVICENAME -from qpid.messaging import Connection -from qpidtoollibs import BrokerAgent - -try: - logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) - logger = logging.getLogger() - - # setup broker connection - connection = Connection.establish('127.0.0.1') - broker = BrokerAgent(connection) - - # add test service busname - busname = "momqueryservice-test-%s" % (uuid.uuid1()) - broker.addExchange('topic', busname) - - testid = 1234 - - # create a mock for the MoMDatabaseWrapper - # so we don't need the actual momdb for this test - # and we don't need the momdb passwd - class MockMoMDatabaseWrapper: - def getProjectDetails(self, mom_ids_str): - return { testid: {'project_mom2id': '4567', 'project_name': 'foo', 'project_description': 'bar', 'object_mom2id': testid}} - - class MockProjectDetailsQueryHandler(ProjectDetailsQueryHandler): - def prepare_loop(self): - self.momdb = MockMoMDatabaseWrapper() - - # inject the mock into the service - with createService(busname, handler=MockProjectDetailsQueryHandler), \ - MoMQueryRPC(busname, DEFAULT_MOMQUERY_SERVICENAME) as momrpc: - - class TestLTAStorageDb(unittest.TestCase): - def testProjectDetailsQuery(self): - result = momrpc.getProjectDetails(testid) - self.assertEquals(1, len(result.keys())) - self.assertEquals(testid, result.keys()[0]) - self.assertTrue('project_mom2id' in result[testid]) - self.assertTrue('project_name' in result[testid]) - self.assertTrue('project_description' in result[testid]) - - unittest.main() - -finally: - # cleanup test bus and exit - if broker: - broker.delExchange(busname) - - if connection: - connection.close() +from lofar.mom.momqueryservice.momqueryservice import MoMDatabaseWrapper, ProjectDetailsQueryHandler +from qpid.messaging.message import Message as QpidMessage + + +class TestProjectDetailsQueryHandler(unittest.TestCase): + database_credentials = Credentials() + database_credentials.host = "localhost" + database_credentials.user = "root" + database_credentials.database = "testdb" + database_credentials.password = None + database_credentials.config = {"useradministration_database": "useradministration", + "momprivilege_database": "momprivilege"} + + project_name = "project name" + folder = "/project/folder" + + def setUp(self): + mom_database_wrapper_patcher = mock.patch('lofar.mom.momqueryservice.momqueryservice.MoMDatabaseWrapper') + self.addCleanup(mom_database_wrapper_patcher.stop) + self.mom_database_wrapper_mock = mom_database_wrapper_patcher.start() + + + self.project_details_query_handler = ProjectDetailsQueryHandler(dbcreds=self.database_credentials) + self.project_details_query_handler.prepare_loop() + + def test_IsProjectActive_returns_active_true_when_mom_wrapper_returns_true(self): + self.mom_database_wrapper_mock().is_project_active.return_value = True + + return_value = self.project_details_query_handler.is_project_active(self.project_name) + + self.assertTrue(return_value['active']) + + def test_IsProjectActive_returns_active_flase_when_mom_wrapper_returns_false(self): + self.mom_database_wrapper_mock().is_project_active.return_value = False + + return_value = self.project_details_query_handler.is_project_active(self.project_name) + + self.assertFalse(return_value['active']) + + def test_FolderExists_return_exists_true_when_mom_wrapper_returns_true(self): + self.mom_database_wrapper_mock().folder_exists.return_value = True + + return_value = self.project_details_query_handler.folder_exists(self.folder) + + self.assertTrue(return_value['exists']) + + def test_FolderExists_return_exists_false_when_mom_wrapper_returns_false(self): + self.mom_database_wrapper_mock().folder_exists.return_value = False + + return_value = self.project_details_query_handler.folder_exists(self.folder) + + self.assertFalse(return_value['exists']) + + def test_authorized_add_with_status_returns_autorized_false_when_mom_wrapper_returns_false(self): + user_name = "user" + project_name = "project" + job_type = "observation" + status = "approved" + + self.mom_database_wrapper_mock().authorized_add_with_status.return_value = False + + return_value = self.project_details_query_handler.authorized_add_with_status(user_name, project_name, job_type, + status) + + self.assertFalse(return_value['authorized']) + + def test_allows_triggers_returns_allows_true_when_mom_wrapper_returns_true(self): + project_name = "project" + + self.mom_database_wrapper_mock().allows_triggers.return_value = True + + return_value = self.project_details_query_handler.allows_triggers(project_name) + + self.assertTrue(return_value['allows']) + + def test_allows_triggers_returns_allows_false_when_mom_wrapper_returns_false(self): + project_name = "project" + + self.mom_database_wrapper_mock().allows_triggers.return_value = False + + return_value = self.project_details_query_handler.allows_triggers(project_name) + + self.assertFalse(return_value['allows']) + + def test_get_project_priority_returns_priority_that_the_mom_wrapper_returs(self): + project_name = "project" + + self.mom_database_wrapper_mock().get_project_priority.return_value = 1000 + + return_value = self.project_details_query_handler.get_project_priority(project_name) + + self.assertEqual(return_value['priority'], 1000) + + def test_add_trigger_returns_row_id_that_the_mom_wrapper_returns(self): + project_name = "project" + host_name = "host name" + user_name = "user name" + meta_data = "meta data" + + row_id = 55 + + self.mom_database_wrapper_mock().add_trigger.return_value = row_id + + return_value = self.project_details_query_handler.add_trigger(user_name, host_name, project_name, meta_data) + + self.assertEqual(return_value['row_id'], row_id) + + +class TestMomQueryRPC(unittest.TestCase): + test_id = 1234 + message_id = str(uuid.uuid4()) + folder = "/project/folder" + user_name = "user name" + project_name = "project name" + meta_data = "meta data" + host_name = "host name" + job_type = "observation" + status = "opened" + + qpid_message = QpidMessage({ + str(test_id): { + 'project_mom2id': '4567', + 'project_name': 'foo', + 'project_description': 'bar', + 'object_mom2id': str(test_id) + } + }, + properties={ + "SystemName": "LOFAR", + "MessageType": "ReplyMessage", + "MessageId": message_id, + "status": "OK" + }) + + qpid_message_is_project_active_true = QpidMessage({"active": True}, + properties={ + "SystemName": "LOFAR", + "MessageType": "ReplyMessage", + "MessageId": message_id, + "status": "OK" + }) + + qpid_message_project_exists_true = QpidMessage({"exists": True}, + properties={ + "SystemName": "LOFAR", + "MessageType": "ReplyMessage", + "MessageId": message_id, + "status": "OK" + }) + qpid_message_authorized_true = QpidMessage({"authorized": True}, + properties={ + "SystemName": "LOFAR", + "MessageType": "ReplyMessage", + "MessageId": message_id, + "status": "OK" + }) + + qpid_message_allows_true = QpidMessage({"allows": True}, + properties={ + "SystemName": "LOFAR", + "MessageType": "ReplyMessage", + "MessageId": message_id, + "status": "OK" + }) + + qpid_message_priority_1000 = QpidMessage({"priority": 1000}, + properties={ + "SystemName": "LOFAR", + "MessageType": "ReplyMessage", + "MessageId": message_id, + "status": "OK" + }) + + qpid_message_add_trigger_row_id = 33 + qpid_message_add_trigger = QpidMessage({"row_id": qpid_message_add_trigger_row_id}, + properties={ + "SystemName": "LOFAR", + "MessageType": "ReplyMessage", + "MessageId": message_id, + "status": "OK" + }) + + def setUp(self): + # the mock library had difficulty to mock ToBus and FromBus probably to some weir naming issue. + # so mocking is done on QPID messaging level. + + self.momrpc = MoMQueryRPC('busname', DEFAULT_MOMQUERY_SERVICENAME) + self.sender_mock = mock.MagicMock() + + self.receiver_mock = mock.MagicMock() + + logger_patcher = mock.patch('lofar.mom.momqueryservice.momqueryrpc.logger') + self.addCleanup(logger_patcher.stop) + self.logger_mock = logger_patcher.start() + + @mock.patch('lofar.messaging.messagebus.qpid.messaging') + def test_project_details_query(self, qpid_mock): + self.receiver_mock.fetch.return_value = self.qpid_message + + qpid_mock.Message = QpidMessage + qpid_mock.Connection().session().senders = [self.sender_mock] + qpid_mock.Connection().session().next_receiver.return_value = self.receiver_mock + + result = self.momrpc.getProjectDetails(self.test_id) + + self.assertEquals(1, len(result.keys())) + self.assertEquals(self.test_id, result.keys()[0]) + self.assertTrue('project_mom2id' in result[self.test_id]) + self.assertTrue('project_name' in result[self.test_id]) + self.assertTrue('project_description' in result[self.test_id]) + + @mock.patch('lofar.messaging.messagebus.qpid.messaging') + def test_is_project_active_query(self, qpid_mock): + self.receiver_mock.fetch.return_value = self.qpid_message_is_project_active_true + + qpid_mock.Message = QpidMessage + qpid_mock.Connection().session().senders = [self.sender_mock] + qpid_mock.Connection().session().next_receiver.return_value = self.receiver_mock + + result = self.momrpc.isProjectActive(self.project_name) + + self.assertTrue(result['active']) + + @mock.patch('lofar.messaging.messagebus.qpid.messaging') + def test_is_project_active_logs_before_query(self, qpid_mock): + self.receiver_mock.fetch.return_value = self.qpid_message_is_project_active_true + + qpid_mock.Message = QpidMessage + qpid_mock.Connection().session().senders = [self.sender_mock] + qpid_mock.Connection().session().next_receiver.return_value = self.receiver_mock + + self.momrpc.isProjectActive(self.project_name) + + self.logger_mock.info.assert_any_call("Requesting if project: %s is active", self.project_name) + + @mock.patch('lofar.messaging.messagebus.qpid.messaging') + def test_is_project_active_logs_after_query(self, qpid_mock): + self.receiver_mock.fetch.return_value = self.qpid_message_is_project_active_true + + qpid_mock.Message = QpidMessage + qpid_mock.Connection().session().senders = [self.sender_mock] + qpid_mock.Connection().session().next_receiver.return_value = self.receiver_mock + + result = self.momrpc.isProjectActive(self.project_name) + + self.logger_mock.info.assert_any_call("Received Project is active: %s", result) + + @mock.patch('lofar.messaging.messagebus.qpid.messaging') + def test_folder_exists_active_query(self, qpid_mock): + self.receiver_mock.fetch.return_value = self.qpid_message_project_exists_true + + qpid_mock.Message = QpidMessage + qpid_mock.Connection().session().senders = [self.sender_mock] + qpid_mock.Connection().session().next_receiver.return_value = self.receiver_mock + + result = self.momrpc.folderExists(self.folder) + + self.assertTrue(result['exists']) + + @mock.patch('lofar.messaging.messagebus.qpid.messaging') + def test_is_project_logs_before_query(self, qpid_mock): + self.receiver_mock.fetch.return_value = self.qpid_message_project_exists_true + + qpid_mock.Message = QpidMessage + qpid_mock.Connection().session().senders = [self.sender_mock] + qpid_mock.Connection().session().next_receiver.return_value = self.receiver_mock + + self.momrpc.folderExists(self.folder) + + self.logger_mock.info.assert_any_call("Requesting folder: %s exists", self.folder) + + @mock.patch('lofar.messaging.messagebus.qpid.messaging') + def test_is_project_logs_after_query(self, qpid_mock): + self.receiver_mock.fetch.return_value = self.qpid_message_project_exists_true + + qpid_mock.Message = QpidMessage + qpid_mock.Connection().session().senders = [self.sender_mock] + qpid_mock.Connection().session().next_receiver.return_value = self.receiver_mock + + result = self.momrpc.folderExists(self.folder) + + self.logger_mock.info.assert_any_call("Received folder exists: %s", result) + + @mock.patch('lofar.messaging.messagebus.qpid.messaging') + def test_authorized_add_with_status_logs_before_query(self, qpid_mock): + self.receiver_mock.fetch.return_value = self.qpid_message_authorized_true + + qpid_mock.Message = QpidMessage + qpid_mock.Connection().session().senders = [self.sender_mock] + qpid_mock.Connection().session().next_receiver.return_value = self.receiver_mock + + self.momrpc.authorized_add_with_status(self.user_name, self.project_name, self.job_type, self.status) + + self.logger_mock.info.assert_any_call( + "Requesting AutorizedAddWithStatus for user_name: %s project_name: %s job_type: %s status: %s", + self.user_name, self.project_name, self.job_type, self.status) + + @mock.patch('lofar.messaging.messagebus.qpid.messaging') + def test_authorized_add_with_status_logs_after_query(self, qpid_mock): + self.receiver_mock.fetch.return_value = self.qpid_message_authorized_true + + qpid_mock.Message = QpidMessage + qpid_mock.Connection().session().senders = [self.sender_mock] + qpid_mock.Connection().session().next_receiver.return_value = self.receiver_mock + + result = self.momrpc.authorized_add_with_status(self.user_name, self.project_name, self.job_type, self.status) + + self.logger_mock.info.assert_any_call( + "Received AutorizedAddWithStatus for user_name: %s project_name: %s job_type: %s status: %s result: %s", + self.user_name, self.project_name, self.job_type, self.status, result) + + @mock.patch('lofar.messaging.messagebus.qpid.messaging') + def test_authorized_add_with_status_query(self, qpid_mock): + self.receiver_mock.fetch.return_value = self.qpid_message_authorized_true + + qpid_mock.Message = QpidMessage + qpid_mock.Connection().session().senders = [self.sender_mock] + qpid_mock.Connection().session().next_receiver.return_value = self.receiver_mock + + result = self.momrpc.authorized_add_with_status(self.user_name, self.project_name, self.job_type, self.status) + + self.assertTrue(result['authorized']) + + @mock.patch('lofar.messaging.messagebus.qpid.messaging') + def test_allows_triggers_logs_before_query(self, qpid_mock): + self.receiver_mock.fetch.return_value = self.qpid_message_allows_true + + qpid_mock.Message = QpidMessage + qpid_mock.Connection().session().senders = [self.sender_mock] + qpid_mock.Connection().session().next_receiver.return_value = self.receiver_mock + + self.momrpc.allows_triggers(self.project_name) + + self.logger_mock.info.assert_any_call("Requesting AllowsTriggers for project_name: %s", self.project_name) + + @mock.patch('lofar.messaging.messagebus.qpid.messaging') + def test_allows_triggers_logs_after_query(self, qpid_mock): + self.receiver_mock.fetch.return_value = self.qpid_message_allows_true + + qpid_mock.Message = QpidMessage + qpid_mock.Connection().session().senders = [self.sender_mock] + qpid_mock.Connection().session().next_receiver.return_value = self.receiver_mock + + result = self.momrpc.allows_triggers(self.project_name) + + self.logger_mock.info.assert_any_call( + "Received AllowsTriggers for project_name (%s): %s", self.project_name, result) + + @mock.patch('lofar.messaging.messagebus.qpid.messaging') + def test_allows_triggers_query(self, qpid_mock): + self.receiver_mock.fetch.return_value = self.qpid_message_allows_true + + qpid_mock.Message = QpidMessage + qpid_mock.Connection().session().senders = [self.sender_mock] + qpid_mock.Connection().session().next_receiver.return_value = self.receiver_mock + + result = self.momrpc.allows_triggers(self.project_name) + + self.assertTrue(result['allows']) + + @mock.patch('lofar.messaging.messagebus.qpid.messaging') + def test_get_project_priority_logs_before_query(self, qpid_mock): + self.receiver_mock.fetch.return_value = self.qpid_message_priority_1000 + + qpid_mock.Message = QpidMessage + qpid_mock.Connection().session().senders = [self.sender_mock] + qpid_mock.Connection().session().next_receiver.return_value = self.receiver_mock + + self.momrpc.get_project_priority(self.project_name) + + self.logger_mock.info.assert_any_call("Requestion GetProjectPriority for project_name: %s", self.project_name) + + @mock.patch('lofar.messaging.messagebus.qpid.messaging') + def test_get_project_priority_logs_after_query(self, qpid_mock): + self.receiver_mock.fetch.return_value = self.qpid_message_priority_1000 + + qpid_mock.Message = QpidMessage + qpid_mock.Connection().session().senders = [self.sender_mock] + qpid_mock.Connection().session().next_receiver.return_value = self.receiver_mock + + result = self.momrpc.get_project_priority(self.project_name) + + self.logger_mock.info.assert_any_call( + "Received GetProjectPriority for project_name (%s): %s", self.project_name, result) + + @mock.patch('lofar.messaging.messagebus.qpid.messaging') + def test_get_project_priority_query(self, qpid_mock): + self.receiver_mock.fetch.return_value = self.qpid_message_priority_1000 + + qpid_mock.Message = QpidMessage + qpid_mock.Connection().session().senders = [self.sender_mock] + qpid_mock.Connection().session().next_receiver.return_value = self.receiver_mock + + result = self.momrpc.get_project_priority(self.project_name) + + self.assertEqual(result['priority'], 1000) + + @mock.patch('lofar.messaging.messagebus.qpid.messaging') + def test_add_trigger_logs_before_query(self, qpid_mock): + self.receiver_mock.fetch.return_value = self.qpid_message_add_trigger + + qpid_mock.Message = QpidMessage + qpid_mock.Connection().session().senders = [self.sender_mock] + qpid_mock.Connection().session().next_receiver.return_value = self.receiver_mock + + self.momrpc.add_trigger(self.user_name, self.host_name, self.project_name, self.meta_data) + + self.logger_mock.info.assert_any_call( + "Requestion AddTrigger for user_name: %s, host_name: %s, project_name: %s and meta_data: %s", + self.user_name, self.host_name, self.project_name, self.meta_data) + + @mock.patch('lofar.messaging.messagebus.qpid.messaging') + def test_add_trigger_logs_after_query(self, qpid_mock): + self.receiver_mock.fetch.return_value = self.qpid_message_add_trigger + + qpid_mock.Message = QpidMessage + qpid_mock.Connection().session().senders = [self.sender_mock] + qpid_mock.Connection().session().next_receiver.return_value = self.receiver_mock + + result = self.momrpc.add_trigger(self.user_name, self.host_name, self.project_name, self.meta_data) + + self.logger_mock.info.assert_any_call( + "Received AddTrigger for user_name (%s), host_name(%s), project_name(%s) and meta_data(%s): %s", + self.user_name, self.host_name, self.project_name, self.meta_data, result) + + @mock.patch('lofar.messaging.messagebus.qpid.messaging') + def test_add_trigger_query(self, qpid_mock): + self.receiver_mock.fetch.return_value = self.qpid_message_add_trigger + + qpid_mock.Message = QpidMessage + qpid_mock.Connection().session().senders = [self.sender_mock] + qpid_mock.Connection().session().next_receiver.return_value = self.receiver_mock + + result = self.momrpc.add_trigger(self.user_name, self.host_name, self.project_name, self.meta_data) + + self.assertEqual(result['row_id'], self.qpid_message_add_trigger_row_id) + + +class TestMoMDatabaseWrapper(unittest.TestCase): + database_credentials = Credentials() + database_credentials.host = "localhost" + database_credentials.user = "root" + database_credentials.database = "testdb" + database_credentials.password = None + database_credentials.config = {"useradministration_database": "useradministration", + "momprivilege_database": "momprivilege"} + + project_name = "project name" + folder = "/project/folder1/folder2" + + user_name = "user name" + meta_data = "meta data" + host_name = "host name" + job_type = "observation" + status = "opened" + + def setUp(self): + logger_patcher = mock.patch('lofar.mom.momqueryservice.momqueryservice.logger') + self.addCleanup(logger_patcher.stop) + self.logger_mock = logger_patcher.start() + + mysql_patcher = mock.patch('lofar.mom.momqueryservice.momqueryservice.connector') + self.addCleanup(mysql_patcher.stop) + self.mysql_mock = mysql_patcher.start() + + self.mom_database_wrapper = MoMDatabaseWrapper(self.database_credentials) + + def test_is_project_active_logs_start_of_query(self): + self.mom_database_wrapper.is_project_active(self.project_name) + + self.logger_mock.info.assert_any_call("is_project_active for project name: %s", self.project_name) + + def test_is_project_active_logs_end_of_query(self): + is_active = False + + self.mom_database_wrapper.is_project_active(self.project_name) + + self.logger_mock.info.assert_any_call("is_project_active for project (%s): %s", self.project_name, is_active) + + def test_is_project_active_return_true_when_query_returns_rows(self): + self.mysql_mock.connect().cursor().fetchall.return_value = [{u'1': 1}] + + return_value = self.mom_database_wrapper.is_project_active(self.project_name) + + self.assertTrue(return_value) + + def test_is_project_active_return_false_when_query_returns_no_rows(self): + self.mysql_mock.connect().cursor().fetchall.return_value = [] + + return_value = self.mom_database_wrapper.is_project_active(self.project_name) + + self.assertFalse(return_value) + + def test_folder_exists_logs_start_of_query(self): + self.mom_database_wrapper.folder_exists(self.folder) + + self.logger_mock.info.assert_any_call("folder_exists for folder: %s", self.folder) + + def test_folder_exists_logs_stop_of_query(self): + exists = False + + self.mom_database_wrapper.folder_exists(self.folder) + + self.logger_mock.info.assert_any_call("folder_exists for folder (%s): %s", self.folder, exists) + + def test_folder_exists_returns_true_when_query_returns_rows(self): + self.mysql_mock.connect().cursor().fetchall.return_value = [{u'1': 1}] + + return_value = self.mom_database_wrapper.folder_exists(self.folder) + + self.assertTrue(return_value) + + def test_folder_exists_returns_false_when_query_returns_no_rows(self): + self.mysql_mock.connect().cursor().fetchall.return_value = [] + + return_value = self.mom_database_wrapper.folder_exists(self.folder) + + self.assertFalse(return_value) + + def test_folder_exists_returns_false_on_empty_folder_path(self): + empty_path = "" + + self.assertFalse(self.mom_database_wrapper.folder_exists(empty_path)) + + def test_folder_exists_logs_error_on_empty_folder_path(self): + empty_path = "" + + self.mom_database_wrapper.folder_exists(empty_path) + + self.logger_mock.error.assert_any_call( + "Folder path is incorrect: %s", "Folder path () does not start with a /") + + def test_folder_exists_raises_ValueError_on_folder_path_with_no_parent(self): + no_parent_path = "/" + + self.assertFalse(self.mom_database_wrapper.folder_exists(no_parent_path)) + + def test_folder_exists_logs_error_on_folder_path_with_no_parent(self): + no_parent_path = "/" + + self.mom_database_wrapper.folder_exists(no_parent_path) + + self.logger_mock.error.assert_any_call( + "Folder path is incorrect: %s", "Folder path (%s) should minimally have a project") + + def test_authorized_add_with_status_logs_start_of_query(self): + self.mom_database_wrapper.authorized_add_with_status(self.user_name, self.project_name, self.job_type, + self.status) + + self.logger_mock.info.assert_any_call( + "authorized_add_with_status for user_name: %s project_name: %s job_type: %s status: %s", + self.user_name, self.project_name, self.job_type, self.status) + + def test_authorized_add_with_status_logs_stop_of_query(self): + authorized = False + + self.mom_database_wrapper.authorized_add_with_status(self.user_name, self.project_name, self.job_type, + self.status) + + self.logger_mock.info.assert_any_call( + "authorized_add_with_status for user_name: %s project_name: %s job_type: %s status: %s result: %s", + self.user_name, self.project_name, self.job_type, self.status, authorized) + + def test_authorized_add_with_status_returns_true_when_query_returns_rows(self): + self.mysql_mock.connect().cursor().fetchall.return_value = [{u'1': 1}] + + return_value = self.mom_database_wrapper.authorized_add_with_status(self.user_name, self.project_name, + self.job_type, self.status) + self.assertTrue(return_value) + + def test_authorized_add_with_status_returns_false_when_query_returns_no_rows(self): + self.mysql_mock.connect().cursor().fetchall.return_value = [] + + return_value = self.mom_database_wrapper.authorized_add_with_status(self.user_name, self.project_name, + self.job_type, self.status) + + self.assertFalse(return_value) + + def test_authorized_add_with_status_throws_ValueError_when_status_is_not_approved_or_opened(self): + with self.assertRaises(ValueError) as exception: + self.mom_database_wrapper.authorized_add_with_status(self.user_name, self.project_name, + self.job_type, "aborted") + + self.assertEqual(exception.exception.message, "status should be either 'opened' or 'approved'") + + def test_authorized_add_with_status_throws_ValueError_when_job_type_is_not_observation_or_pipeline_ingest(self): + with self.assertRaises(ValueError) as exception: + self.mom_database_wrapper.authorized_add_with_status(self.user_name, self.project_name, + "measurment", self.status) + + self.assertEqual(exception.exception.message, "job_type should be either 'observation', 'ingest' or 'pipeline'") + + def test_allows_triggers_logs_start_of_query(self): + self.mysql_mock.connect().cursor().fetchall.return_value = [{u'allowtriggers': True}] + + self.mom_database_wrapper.allows_triggers(self.project_name) + + self.logger_mock.info.assert_any_call("allows_triggers for project_name: %s", self.project_name) + + def test_allows_triggers_logs_end_of_query(self): + self.mysql_mock.connect().cursor().fetchall.return_value = [{u'allowtriggers': True}] + + result = self.mom_database_wrapper.allows_triggers(self.project_name) + + self.logger_mock.info.assert_any_call( + "allows_triggers for project_name (%s) result: %s", self.project_name, result) + + def test_allows_triggers_returns_throws_exception_when_query_returns_no_rows(self): + self.mysql_mock.connect().cursor().fetchall.return_value = [] + + with self.assertRaises(ValueError) as exception: + self.mom_database_wrapper.allows_triggers(self.project_name) + + self.assertEqual(exception.exception.message, "project name (%s) not found in MoM database" % self.project_name) + + def test_allows_triggers_returns_true_when_query_returns_rows(self): + self.mysql_mock.connect().cursor().fetchall.return_value = [{u'allowtriggers': True}] + + return_value = self.mom_database_wrapper.allows_triggers(self.project_name) + + self.assertTrue(return_value) + + def test_get_project_priority_logs_start_of_query(self): + self.mysql_mock.connect().cursor().fetchall.return_value = [{u'priority': 1000}] + + self.mom_database_wrapper.get_project_priority(self.project_name) + + self.logger_mock.info.assert_any_call("get_project_priority for project_name: %s", self.project_name) + + def test_get_project_priority_logs_end_of_query(self): + self.mysql_mock.connect().cursor().fetchall.return_value = [{u'priority': 1000}] + + return_value = self.mom_database_wrapper.get_project_priority(self.project_name) + + self.logger_mock.info.assert_any_call( + "get_project_priority for project_name (%s): %s", self.project_name, return_value) + + def test_get_project_priority_returns_priority_when_query_returns_a_row(self): + self.mysql_mock.connect().cursor().fetchall.return_value = [{u'priority': 1000}] + + return_value = self.mom_database_wrapper.get_project_priority(self.project_name) + + self.assertEqual(return_value, 1000) + + def test_get_project_priority_throws_exception_when_query_returns_no_row(self): + self.mysql_mock.connect().cursor().fetchall.return_value = [] + + with self.assertRaises(ValueError) as exception: + self.mom_database_wrapper.get_project_priority(self.project_name) + + self.assertEqual(exception.exception.message, "project name (%s) not found in MoM database" % self.project_name) + + def test_add_trigger_logs_start_of_query(self): + self.mysql_mock.connect().cursor().lastrowid = 34 + + self.mom_database_wrapper.add_trigger(self.user_name, self.host_name, self.project_name, self.meta_data) + + self.logger_mock.info.assert_any_call( + "add_trigger for user_name: %s, host_name: %s, project_name: %s, meta_data: %s", + self.user_name, self.host_name, self.project_name, self.meta_data) + + def test_add_trigger_logs_end_of_query(self): + self.mysql_mock.connect().cursor().lastrowid = 34 + + result = self.mom_database_wrapper.add_trigger( + self.user_name, self.host_name, self.project_name, self.meta_data) + + self.logger_mock.info.assert_any_call( + "add_trigger for user_name(%s), host_name(%s), project_name(%s), meta_data(%s): %s", + self.user_name, self.host_name, self.project_name, self.meta_data, result) + + def test_add_trigger_returns_row_id_from_query(self): + self.mysql_mock.connect().cursor().lastrowid = 34 + + result = self.mom_database_wrapper.add_trigger( + self.user_name, self.host_name, self.project_name, self.meta_data) + + self.assertEqual(result, 34) + + +@unittest.skip("Skipping integration test") +class IntegrationTestMoMDatabaseWrapper(unittest.TestCase): + database_credentials = Credentials() + database_credentials.host = "localhost" + database_credentials.user = "root" + database_credentials.database = "mom" + database_credentials.password = None + database_credentials.config = {"useradministration_database": "useradministration", + "momprivilege_database": "momprivilege"} + + project_name = "project name" + folder = "/project/folder1/folder2" + + user_name = "lofar" + job_type = "observation" + status = "opened" + + def setUp(self): + self.mysqld = testing.mysqld.Mysqld() + + self.database_credentials.port = self.mysqld.my_cnf['port'] + self.connection = connector.connect(**self.mysqld.dsn()) + + cursor = self.connection.cursor(dictionary=True) + # useradmin db + cursor.execute("CREATE DATABASE useradministration") + cursor.execute("CREATE TABLE useradministration.useraccount ( " + "id int(11) NOT NULL AUTO_INCREMENT, " + "userid int(11) NOT NULL DEFAULT '0', " + "username varchar(20) NOT NULL DEFAULT '', " + "password varchar(32) NOT NULL DEFAULT '', " + "publickey varchar(32) DEFAULT NULL, " + "PRIMARY KEY (id), " + "UNIQUE KEY useraccount_UNIQ (username) " + ") ENGINE=InnoDB AUTO_INCREMENT=1787 DEFAULT CHARSET=latin1") + cursor.execute("CREATE TABLE useradministration.useraccountsystemrole (" + "id int(11) NOT NULL AUTO_INCREMENT, " + "useraccountid int(11) NOT NULL DEFAULT '0', " + "systemroleid int(11) NOT NULL DEFAULT '0', " + "indexid int(11) NOT NULL DEFAULT '0', " + "PRIMARY KEY (id), " + "KEY useraccount_useraccountsystemrole_IND (useraccountid), " + "KEY systemrole_useraccountsystemrole_IND (systemroleid), " + "KEY useraccount_index_useraccountsystemrole_IND (indexid) " + ") ENGINE=InnoDB AUTO_INCREMENT=3413 DEFAULT CHARSET=latin1") + # mom database + cursor.execute("CREATE DATABASE mom") + cursor.execute("USE mom") + cursor.execute("CREATE TABLE mom2objectstatus ( " + "id int(11) NOT NULL AUTO_INCREMENT, " + "name varchar(255) DEFAULT NULL, " + "roles varchar(512) DEFAULT NULL, " + "userid int(11) DEFAULT NULL, " + "statusid int(11) DEFAULT NULL, " + "mom2objectid int(11) DEFAULT NULL, " + "indexid int(11) DEFAULT NULL, " + "statustime datetime NOT NULL DEFAULT '1000-01-01 00:00:00.000000', " + "pending tinyint(1) DEFAULT 0, " + "PRIMARY KEY (id) " + ") ENGINE=InnoDB AUTO_INCREMENT=1725902 DEFAULT CHARSET=latin1") + cursor.execute("CREATE TABLE mom2object (" + "id int(11) NOT NULL AUTO_INCREMENT, " + "parentid int(11) DEFAULT NULL, " + "indexid int(11) DEFAULT NULL, " + "mom2id int(11) NOT NULL DEFAULT 0, " + "mom2objecttype char(25) NOT NULL, " + "name varchar(100) NOT NULL DEFAULT '', " + "description varchar(255) DEFAULT NULL, " + "ownerprojectid int(11) DEFAULT NULL, " + "currentstatusid int(11) DEFAULT NULL, " + "topology varchar(100) DEFAULT NULL, " + "predecessor varchar(512) DEFAULT NULL, " + "topology_parent tinyint(1) DEFAULT 0, " + "group_id int(11) DEFAULT 0, " + "datasize bigint(20) DEFAULT 0, " + "PRIMARY KEY (id), " + "UNIQUE KEY mom2object_UNIQ (mom2id) " + ") ENGINE=InnoDB AUTO_INCREMENT=331855 DEFAULT CHARSET=latin1") + cursor.execute("CREATE TABLE status (" + "id int(11) NOT NULL AUTO_INCREMENT, " + "code char(15) NOT NULL DEFAULT '', " + "type char(20) NOT NULL, " + "description varchar(100) DEFAULT NULL, " + "PRIMARY KEY (id), " + "UNIQUE KEY status_UNIQ (code,type) " + ") ENGINE=InnoDB AUTO_INCREMENT=712 DEFAULT CHARSET=latin1") + cursor.execute("CREATE TABLE member ( " + "id int(11) NOT NULL AUTO_INCREMENT, " + "projectid int(11) DEFAULT NULL, " + "indexid int(11) DEFAULT NULL, " + "PRIMARY KEY (id), " + "KEY mom2object_member_FK (projectid), " + "KEY indexid_IND (indexid) " + ") ENGINE=InnoDB AUTO_INCREMENT=1010 DEFAULT CHARSET=latin1") + cursor.execute("CREATE TABLE registeredmember ( " + "id int(11) NOT NULL AUTO_INCREMENT, " + "memberid int(11) DEFAULT NULL, " + "userid int(11) DEFAULT NULL, " + "PRIMARY KEY (id), " + "KEY member_registeredmember_FK (memberid), " + "KEY userid_IND (userid) " + ") ENGINE=InnoDB AUTO_INCREMENT=768 DEFAULT CHARSET=latin1") + cursor.execute("CREATE TABLE memberprojectrole ( " + "id int(11) NOT NULL AUTO_INCREMENT, " + "memberid int(11) DEFAULT NULL, " + "indexid int(11) DEFAULT NULL, " + "projectroleid int(11) DEFAULT NULL, " + "PRIMARY KEY (id), " + "KEY member_memberprojectrole_FK (memberid), " + "KEY projectrole_memberprojectrole_FK (projectroleid), " + "KEY indexid_IND (indexid) " + ") ENGINE=InnoDB AUTO_INCREMENT=1167 DEFAULT CHARSET=latin1") + cursor.execute("CREATE TABLE projectrole ( " + "id int(11) NOT NULL AUTO_INCREMENT, " + "name char(15) NOT NULL DEFAULT '', " + "description varchar(100) DEFAULT NULL, " + "PRIMARY KEY (id), " + "UNIQUE KEY projectrole_UNIQ (name) " + ") ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=latin1") + cursor.execute("CREATE TABLE project ( " + "id int(11) NOT NULL AUTO_INCREMENT, " + "mom2objectid int(11) DEFAULT NULL, " + "releasedate date DEFAULT NULL, " + "PRIMARY KEY (id), " + "KEY mom2object_IND (mom2objectid) " + ") ENGINE=InnoDB AUTO_INCREMENT=149 DEFAULT CHARSET=latin1") + cursor.execute("ALTER TABLE project " + "ADD allowtriggers BOOLEAN NOT NULL DEFAULT FALSE AFTER releasedate, " + "ADD priority int(11) NOT NULL DEFAULT 1000 AFTER allowtriggers") + cursor.execute("CREATE TABLE lofar_trigger ( " + "id int(11) NOT NULL AUTO_INCREMENT, " + "username varchar(120) NOT NULL DEFAULT '', " + "hostname varchar(128) NOT NULL DEFAULT '', " + "arrivaltime datetime NOT NULL DEFAULT CURRENT_TIMESTAMP, " + "projectname varchar(100) NOT NULL DEFAULT '', " + "metadata TEXT NOT NULL, " + "PRIMARY KEY (id), " + "FOREIGN KEY (username) REFERENCES useradministration.useraccount(username)" + ") ") + # mom privilege + cursor.execute("CREATE DATABASE momprivilege") + cursor.execute("CREATE TABLE momprivilege.statustransitionrole ( " + "id int(11) NOT NULL AUTO_INCREMENT, " + "statustransitionid int(11) DEFAULT NULL, " + "roleid int(11) NOT NULL, " + "roletype char(100) NOT NULL, " + "PRIMARY KEY (id), " + "KEY roletype_IND (roleid,roletype), " + "KEY statustransition_statustransitionrole_FK (statustransitionid) " + ") ENGINE=InnoDB AUTO_INCREMENT=8572 DEFAULT CHARSET=latin1") + cursor.execute("CREATE TABLE momprivilege.statustransition (" + "id int(11) NOT NULL AUTO_INCREMENT, " + "oldstatusid int(11) NOT NULL, " + "newstatusid int(11) NOT NULL, " + "PRIMARY KEY (id), " + "KEY oldstatus_IND (oldstatusid), " + "KEY newstatus_IND (oldstatusid) " + ") ENGINE=InnoDB AUTO_INCREMENT=1272 DEFAULT CHARSET=latin1") + + self.mom_database_wrapper = MoMDatabaseWrapper(self.database_credentials) + + def tearDown(self): + self.mysqld.stop() + + def execute(self, query): + cursor = self.connection.cursor(dictionary=True) + cursor.execute(query) + self.connection.commit() + cursor.close() + + def test_is_project_active_returns_false_on_empty_mom2object_table(self): + self.assertFalse(self.mom_database_wrapper.is_project_active("project_name")) + + def test_is_project_active_returns_true_when_project_with_correct_name_and_status_is_available(self): + self.execute("insert into mom2object values(169900, NULL, NULL, 183526, 'PROJECT', 'LC0_011', " + "'Pulsar timing with LOFAR', NULL, 966855, NULL, NULL, 0, 0, 0)") + self.execute("insert into mom2objectstatus values(966855, 'Pizzo, Dr. Roberto Francesco', " + "'Administrative, LTA User, manager, Operator, Prospective, Review Manager, Reviewer, Scientist, " + "System Scientist, Telescope Astronomer', 531, 7, 169900, 0, '2012-12-18 09:47:50', 0)") + + self.assertTrue(self.mom_database_wrapper.is_project_active("LC0_011")) + + def test_folder_exists_returns_false_on_empty_table(self): + self.assertFalse(self.mom_database_wrapper.folder_exists("/project/folder1/folder2")) + + def test_folder_exists_returns_true_when_folder_exists(self): + self.execute("insert into mom2object values(1, NULL, NULL, 11, 'PROJECT', 'project', " + "'Pulsar timing with LOFAR', NULL, 966855, NULL, NULL, 0, 0, 0)") + self.execute("insert into mom2object values(2, 1, NULL, 22, 'FOLDER', 'folder1', " + "'Pulsar timing with LOFAR', NULL, 966855, NULL, NULL, 0, 0, 0)") + self.execute("insert into mom2object values(3, 2, NULL, 33, 'FOLDER', 'folder2', " + "'Pulsar timing with LOFAR', NULL, 966855, NULL, NULL, 0, 0, 0)") + + self.assertTrue(self.mom_database_wrapper.folder_exists(self.folder)) + + def test_folder_exists_returns_true_when_folder_exists_and_path_ends_on_forward_slash(self): + self.execute("insert into mom2object values(1, NULL, NULL, 11, 'PROJECT', 'project', " + "'Pulsar timing with LOFAR', NULL, 966855, NULL, NULL, 0, 0, 0)") + self.execute("insert into mom2object values(2, 1, NULL, 22, 'FOLDER', 'folder1', " + "'Pulsar timing with LOFAR', NULL, 966855, NULL, NULL, 0, 0, 0)") + self.execute("insert into mom2object values(3, 2, NULL, 33, 'FOLDER', 'folder2', " + "'Pulsar timing with LOFAR', NULL, 966855, NULL, NULL, 0, 0, 0)") + + self.assertTrue(self.mom_database_wrapper.folder_exists(self.folder)) + + def test_authorized_add_with_status_returns_false_on_empty_db(self): + self.assertFalse(self.mom_database_wrapper.authorized_add_with_status(self.user_name, self.project_name, + self.job_type, self.status)) + + def test_authorized_add_with_status_returns_true_on_when_rights_are_on_system_role(self): + # insert user + self.execute("insert into useradministration.useraccount " + "values(1, 1, '%s', '26dcf77e2de89027e8895baea8e45057', 'sNgmwwN7fk')" + % self.user_name) + # setup status + self.execute("insert into status values(101, 'opened', 'OBSERVATION', '')") + self.execute("insert into status values(104, 'approved', 'OBSERVATION', " + "'The specification is in accordance with wishes of the PI.')") + # setup status transitions + self.execute("insert into momprivilege.statustransition values(1003, 0, 101)") + self.execute("insert into momprivilege.statustransition values(1059, 101, 104)") + # setup transition role + self.execute("insert into momprivilege.statustransitionrole " + "values(1, 1003, 9, 'nl.astron.useradministration.data.entities.SystemRole')") + self.execute("insert into momprivilege.statustransitionrole " + "values(2, 1059, 9, 'nl.astron.useradministration.data.entities.SystemRole')") + # user account system role + self.execute("insert into useradministration.useraccountsystemrole values(533, 1, 9, 0)") + + self.assertTrue(self.mom_database_wrapper.authorized_add_with_status(self.user_name, self.project_name, + 'observation', "approved")) + self.assertTrue(self.mom_database_wrapper.authorized_add_with_status(self.user_name, self.project_name, + 'observation', "opened")) + + def test_authorized_add_with_status_returns_true_on_when_rights_are_on_project_role(self): + # insert user + self.execute("insert into useradministration.useraccount " + "values(1, 1, '%s', '26dcf77e2de89027e8895baea8e45057', 'sNgmwwN7fk')" + % self.user_name) + # setup status + self.execute("insert into status values(101, 'opened', 'OBSERVATION', '')") + self.execute("insert into status values(104, 'approved', 'OBSERVATION', " + "'The specification is in accordance with wishes of the PI.')") + # setup status transitions + self.execute("insert into momprivilege.statustransition values(1003, 0, 101)") + self.execute("insert into momprivilege.statustransition values(1059, 101, 104)") + # setup transition role + self.execute("insert into momprivilege.statustransitionrole " + "values(1, 1003, 1, 'nl.astron.mom2.data.entities.ProjectRole')") + self.execute("insert into momprivilege.statustransitionrole " + "values(2, 1059, 1, 'nl.astron.mom2.data.entities.ProjectRole')") + # setup project role + self.execute("insert into projectrole values(1, 'Pi', NULL)") + # setup member project role + self.execute("insert into memberprojectrole values(1, 1, 0, 1)") + # setup registered member + self.execute("insert into registeredmember values(1, 1, 1)") + # setup member + self.execute("insert into member values(1, 1, 0)") + # setup project + self.execute("insert into mom2object values(1, NULL, NULL, 2, 'PROJECT', '%(project_name)s', 'test-lofar', " + "NULL, 1704653, NULL, NULL, 0, 0, 0)" % {"project_name": self.project_name}) + + self.assertTrue(self.mom_database_wrapper.authorized_add_with_status(self.user_name, self.project_name, + 'observation', 'approved')) + self.assertTrue(self.mom_database_wrapper.authorized_add_with_status(self.user_name, self.project_name, + 'observation', 'opened')) + + def test_allows_triggers_returns_raises_exception_on_empty_db(self): + with self.assertRaises(ValueError) as exception: + self.assertFalse(self.mom_database_wrapper.allows_triggers(self.project_name)) + + self.assertEqual(exception.exception.message, "project name (%s) not found in MoM database" % self.project_name) + + def test_allows_triggers_returns_true_when_project_allows_triggers(self): + self.execute("insert into mom2object values(1, NULL, NULL, 2, 'PROJECT', '%(project_name)s', 'test-lofar', " + "NULL, 1704653, NULL, NULL, 0, 0, 0)" % {"project_name": self.project_name}) + self.execute("insert into project values(1, 1, '2012-09-14', TRUE, 1000)") + + self.assertTrue(self.mom_database_wrapper.allows_triggers(self.project_name)) + + def test_allows_triggers_returns_false_when_project_does_not_allow_triggers(self): + self.execute("insert into mom2object values(1, NULL, NULL, 2, 'PROJECT', '%(project_name)s', 'test-lofar', " + "NULL, 1704653, NULL, NULL, 0, 0, 0)" % {"project_name": self.project_name}) + self.execute("insert into project values(1, 1, '2012-09-14', FALSE, 1000)") + + self.assertFalse(self.mom_database_wrapper.allows_triggers(self.project_name)) + + def test_get_project_priority_raises_exception_on_empty_database(self): + with self.assertRaises(ValueError) as exception: + self.mom_database_wrapper.get_project_priority(self.project_name) + + self.assertEqual(exception.exception.message, "project name (%s) not found in MoM database" % self.project_name) + + def test_get_project_priority_returns_priority_of_project(self): + self.execute("insert into mom2object values(1, NULL, NULL, 2, 'PROJECT', '%(project_name)s', 'test-lofar', " + "NULL, 1704653, NULL, NULL, 0, 0, 0)" % {"project_name": self.project_name}) + self.execute("insert into project values(1, 1, '2012-09-14', FALSE, 5000)") + + priority = self.mom_database_wrapper.get_project_priority(self.project_name) + + self.assertEqual(priority, 5000) + + def test_add_trigger_returns_row_id_1_on_empty_table(self): + self.execute("insert into useradministration.useraccount " + "values(1, 1, '%s', '26dcf77e2de89027e8895baea8e45057', 'sNgmwwN7fk')" + % self.user_name) + result = self.mom_database_wrapper.add_trigger(self.user_name, "host name", "project name", "meta data") + + self.assertEqual(result, 1) + + def test_add_trigger_returns_row_id_2_on_insert_delete_insert_on_empty_database(self): + # It is (maybe) not likely that triggers will be deleted but at least the code can handle it + self.execute("insert into useradministration.useraccount " + "values(1, 1, '%s', '26dcf77e2de89027e8895baea8e45057', 'sNgmwwN7fk')" + % self.user_name) + + self.mom_database_wrapper.add_trigger(self.user_name, "host name", "project name", "meta data") + self.execute("delete from lofar_trigger " + "where id = 1") + result = self.mom_database_wrapper.add_trigger(self.user_name, "host name", "project name", "meta data") + + self.assertEqual(result, 2) + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file