Skip to content
Snippets Groups Projects
Commit e6c92f8e authored by Jan David Mol's avatar Jan David Mol
Browse files

SW-462: Backported 3.2 fixes to trunk (remove CEP4/COBALT qpid infra: post to...

SW-462: Backported 3.2 fixes to trunk (remove CEP4/COBALT qpid infra: post to target queues directly, ignore invalid mom SSL certificate)
parents 7bac3347 48b402d9
1 merge request!6Import cobalt2 into lofar4
Showing
with 122 additions and 27 deletions
...@@ -1566,6 +1566,7 @@ LCS/MessageBus/qpid/local/sbin/qpidd_init.d_script_ubuntu -text ...@@ -1566,6 +1566,7 @@ LCS/MessageBus/qpid/local/sbin/qpidd_init.d_script_ubuntu -text
LCS/MessageBus/src/Message.cc -text LCS/MessageBus/src/Message.cc -text
LCS/MessageBus/src/Protocols/TaskFeedbackState.cc -text LCS/MessageBus/src/Protocols/TaskFeedbackState.cc -text
LCS/MessageBus/src/__init__.py -text LCS/MessageBus/src/__init__.py -text
LCS/MessageBus/src/environment.py -text
LCS/MessageBus/src/messagebus.py -text LCS/MessageBus/src/messagebus.py -text
LCS/MessageBus/test/tMessage.cc -text LCS/MessageBus/test/tMessage.cc -text
LCS/MessageBus/test/tMessageBus.cc -text LCS/MessageBus/test/tMessageBus.cc -text
......
...@@ -55,7 +55,7 @@ class control(StatefulRecipe): ...@@ -55,7 +55,7 @@ class control(StatefulRecipe):
""" """
if self.feedback_method == "messagebus": if self.feedback_method == "messagebus":
bus = messagebus.ToBus("lofar.task.feedback.processing") bus = messagebus.ToBus("otdb.task.feedback.processing", broker=messagebus.broker_feedback)
msg = TaskFeedbackProcessing( msg = TaskFeedbackProcessing(
"lofarpipe.support.control", "lofarpipe.support.control",
"", "",
...@@ -74,7 +74,7 @@ class control(StatefulRecipe): ...@@ -74,7 +74,7 @@ class control(StatefulRecipe):
""" """
if self.feedback_method == "messagebus": if self.feedback_method == "messagebus":
bus = messagebus.ToBus("lofar.task.feedback.dataproducts") bus = messagebus.ToBus("otdb.task.feedback.dataproducts", broker=messagebus.broker_feedback)
msg = TaskFeedbackDataproducts( msg = TaskFeedbackDataproducts(
"lofarpipe.support.control", "lofarpipe.support.control",
"", "",
...@@ -94,7 +94,7 @@ class control(StatefulRecipe): ...@@ -94,7 +94,7 @@ class control(StatefulRecipe):
""" """
if self.feedback_method == "messagebus" and self.feedback_send_status: if self.feedback_method == "messagebus" and self.feedback_send_status:
bus = messagebus.ToBus("lofar.task.feedback.state") bus = messagebus.ToBus("mac.task.feedback.state", broker=messagebus.broker_state)
msg = TaskFeedbackState( msg = TaskFeedbackState(
"lofarpipe.support.control", "lofarpipe.support.control",
"", "",
......
...@@ -13,6 +13,7 @@ install(FILES ...@@ -13,6 +13,7 @@ install(FILES
MessageBus.h MessageBus.h
NoQpidFallback.h NoQpidFallback.h
ToBus.h ToBus.h
Util.h
XMLDoc.h XMLDoc.h
DESTINATION include/${PACKAGE_NAME}) DESTINATION include/${PACKAGE_NAME})
......
...@@ -45,7 +45,7 @@ namespace LOFAR { ...@@ -45,7 +45,7 @@ namespace LOFAR {
class FromBus class FromBus
{ {
public: public:
FromBus(const std::string &address, const std::string &options="; {create: never}", const std::string &broker = "amqp:tcp:127.0.0.1:5672") ; FromBus(const std::string &address, const std::string &broker = "amqp:tcp:127.0.0.1:5672", const std::string &options="; {create: never}");
~FromBus(void); ~FromBus(void);
bool addQueue(const std::string &address, const std::string &options="; {create: never}"); bool addQueue(const std::string &address, const std::string &options="; {create: never}");
......
...@@ -43,7 +43,7 @@ namespace LOFAR { ...@@ -43,7 +43,7 @@ namespace LOFAR {
class ToBus class ToBus
{ {
public: public:
ToBus(const std::string &address, const std::string &options="; {create: never}", const std::string &broker = "amqp:tcp:127.0.0.1:5672") ; ToBus(const std::string &address, const std::string &broker = "amqp:tcp:127.0.0.1:5672", const std::string &options="; {create: never}");
~ToBus(void); ~ToBus(void);
void send(LOFAR::Message &msg); void send(LOFAR::Message &msg);
......
...@@ -39,6 +39,12 @@ namespace LOFAR { ...@@ -39,6 +39,12 @@ namespace LOFAR {
// Return the prefix that has to be prepended to all queue names // Return the prefix that has to be prepended to all queue names
std::string queue_prefix(); std::string queue_prefix();
// Return the broker that should receive feedback
std::string broker_feedback();
// Return the broker that should receive state
std::string broker_state();
} // namespace LOFAR } // namespace LOFAR
#endif #endif
......
...@@ -26,6 +26,7 @@ endforeach(prog ${messagebus_PROGRAMS}) ...@@ -26,6 +26,7 @@ endforeach(prog ${messagebus_PROGRAMS})
python_install( python_install(
__init__.py __init__.py
environment.py
messagebus.py messagebus.py
message.py message.py
noqpidfallback.py noqpidfallback.py
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
#include <MessageBus/FromBus.h> #include <MessageBus/FromBus.h>
#include <MessageBus/Exceptions.h> #include <MessageBus/Exceptions.h>
#include "Util.h" #include <MessageBus/Util.h>
#include <Common/LofarLogger.h> #include <Common/LofarLogger.h>
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
using namespace qpid::messaging; using namespace qpid::messaging;
namespace LOFAR { namespace LOFAR {
FromBus::FromBus(const std::string &address, const std::string &options, const std::string &broker) FromBus::FromBus(const std::string &address, const std::string &broker, const std::string &options)
try: try:
itsConnection(broker,"{reconnect:true}"), itsConnection(broker,"{reconnect:true}"),
itsNrMissingACKs(0) itsNrMissingACKs(0)
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
#include <MessageBus/ToBus.h> #include <MessageBus/ToBus.h>
#include <MessageBus/Exceptions.h> #include <MessageBus/Exceptions.h>
#include "Util.h" #include <MessageBus/Util.h>
#include <Common/LofarLogger.h> #include <Common/LofarLogger.h>
...@@ -15,7 +15,7 @@ using namespace qpid::messaging; ...@@ -15,7 +15,7 @@ using namespace qpid::messaging;
namespace LOFAR { namespace LOFAR {
ToBus::ToBus(const std::string &address, const std::string &options, const std::string &broker) ToBus::ToBus(const std::string &address, const std::string &broker, const std::string &options)
try: try:
itsConnection(broker,"{reconnect:true}") itsConnection(broker,"{reconnect:true}")
{ {
......
#include "lofar_config.h" #include "lofar_config.h"
#include <Common/LofarTypes.h> #include <Common/LofarTypes.h>
#include "Util.h" #include <MessageBus/Util.h>
#include <stdlib.h> #include <stdlib.h>
...@@ -38,5 +38,31 @@ namespace LOFAR { ...@@ -38,5 +38,31 @@ namespace LOFAR {
return queueprefix; return queueprefix;
} }
std::string broker_state()
{
string lofarenv = getenv_str("LOFARENV");
if (lofarenv == "PRODUCTION") {
return "ccu001.control.lofar";
} else if (lofarenv == "TEST") {
return "ccu199.control.lofar";
} else {
return "localhost";
}
}
std::string broker_feedback()
{
string lofarenv = getenv_str("LOFARENV");
if (lofarenv == "PRODUCTION") {
return "mcu001.control.lofar";
} else if (lofarenv == "TEST") {
return "mcu199.control.lofar";
} else {
return "localhost";
}
}
} // namespace LOFAR } // namespace LOFAR
# environment.py: Functions to determine the LOFAR environment,
# to avoid a dependency on PyCommon.
#
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id: __init__.py 1568 2015-09-18 15:21:11Z loose $
import os
def isProductionEnvironment():
'''check if the program is running in a lofar producution environment'''
return os.environ.get('LOFARENV', '') == 'PRODUCTION'
def isTestEnvironment():
'''check if the program is running in a lofar test environment'''
return os.environ.get('LOFARENV', '') == 'TEST'
def isDevelopmentEnvironment():
'''check if the program is running in a lofar development (not production or test) environment'''
return not (isProductionEnvironment() or isTestEnvironment())
...@@ -29,12 +29,24 @@ except ImportError: ...@@ -29,12 +29,24 @@ except ImportError:
import os import os
import logging import logging
import lofar.messagebus.message as message import lofar.messagebus.message as message
from lofar.messagebus.environment import isProductionEnvironment, isTestEnvironment
import atexit import atexit
# Candidate for a config file # Candidate for a config file
broker="127.0.0.1" broker="127.0.0.1"
options="create:never" options="create:never"
# which brokers to use to avoid routing
if isProductionEnvironment():
broker_feedback="mcu001.control.lofar"
broker_state="ccu001.control.lofar"
elif isTestEnvironment():
broker_feedback="mcu199.control.lofar"
broker_state="ccu199.control.lofar"
else:
broker_feedback="localhost"
broker_state="localhost"
logger=logging.getLogger("MessageBus") logger=logging.getLogger("MessageBus")
#TODO: replace this version of the messagebus by the version in LCS/Messaging/python/messaging #TODO: replace this version of the messagebus by the version in LCS/Messaging/python/messaging
......
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
#lofar.task.specification.system: mom.task.specification.system #lofar.task.specification.system: mom.task.specification.system
# #
# Disabled some queues to MoM, see http://www.lofar.org/operations/lib/exe/fetch.php?cache=&media=mom3:qpid_mom3_feedback_production.png # Disabled some queues to MoM, see http://www.lofar.org/operations/lib/exe/fetch.php?cache=&media=mom3:qpid_mom3_feedback_production.png
lofar.task.feedback.dataproducts: otdb.task.feedback.dataproducts #lofar.task.feedback.dataproducts: otdb.task.feedback.dataproducts
lofar.task.feedback.processing: mom.task.feedback.processing, otdb.task.feedback.processing #lofar.task.feedback.processing: mom.task.feedback.processing, otdb.task.feedback.processing
lofar.task.feedback.state: mac.task.feedback.state #lofar.task.feedback.state: mac.task.feedback.state
lofar.task.specification.system: mom.task.specification.system lofar.task.specification.system: mom.task.specification.system
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
#lofar.task.specification.system: mom.task.specification.system #lofar.task.specification.system: mom.task.specification.system
# #
# Disabled some queues to MoM, see http://www.lofar.org/operations/lib/exe/fetch.php?cache=&media=mom3:qpid_mom3_feedback_production.png # Disabled some queues to MoM, see http://www.lofar.org/operations/lib/exe/fetch.php?cache=&media=mom3:qpid_mom3_feedback_production.png
lofar.task.feedback.dataproducts: otdb.task.feedback.dataproducts #lofar.task.feedback.dataproducts: otdb.task.feedback.dataproducts
lofar.task.feedback.processing: mom.task.feedback.processing, otdb.task.feedback.processing #lofar.task.feedback.processing: mom.task.feedback.processing, otdb.task.feedback.processing
lofar.task.feedback.state: mac.task.feedback.state #lofar.task.feedback.state: mac.task.feedback.state
lofar.task.specification.system: mom.task.specification.system lofar.task.specification.system: mom.task.specification.system
...@@ -55,8 +55,8 @@ namespace LOFAR ...@@ -55,8 +55,8 @@ namespace LOFAR
// @param broker valid Qpid broker URL. // @param broker valid Qpid broker URL.
// @note Please consult the Qpid documentation for more details. // @note Please consult the Qpid documentation for more details.
FromBus(const std::string& address, FromBus(const std::string& address,
const std::string& options = defaultAddressOptions, const std::string& broker = defaultBroker,
const std::string& broker = defaultBroker); const std::string& options = defaultAddressOptions);
// Destructor. Report the number of missing acknowledgements if non-zero. // Destructor. Report the number of missing acknowledgements if non-zero.
~FromBus(); ~FromBus();
......
...@@ -53,8 +53,8 @@ namespace LOFAR ...@@ -53,8 +53,8 @@ namespace LOFAR
// @param broker valid Qpid broker URL. // @param broker valid Qpid broker URL.
// @note Please consult the Qpid documentation for more details. // @note Please consult the Qpid documentation for more details.
ToBus(const std::string& address, ToBus(const std::string& address,
const std::string& options = defaultAddressOptions, const std::string& broker = defaultBroker,
const std::string& broker = defaultBroker); const std::string& options = defaultAddressOptions);
// Destructor. // Destructor.
~ToBus(); ~ToBus();
......
...@@ -45,8 +45,8 @@ namespace LOFAR ...@@ -45,8 +45,8 @@ namespace LOFAR
} }
FromBus::FromBus(const std::string& address, FromBus::FromBus(const std::string& address,
const std::string& options, const std::string& broker,
const std::string& broker) const std::string& options)
// We need to use a function try-block here, because we want to catch // We need to use a function try-block here, because we want to catch
// exceptions that may be thrown during member initialization as well. // exceptions that may be thrown during member initialization as well.
try : try :
......
...@@ -37,8 +37,8 @@ namespace LOFAR ...@@ -37,8 +37,8 @@ namespace LOFAR
using namespace std; using namespace std;
ToBus::ToBus(const string& address, ToBus::ToBus(const string& address,
const string& options, const string& broker,
const string& broker) const string& options)
// We need to use a function try-block here, because we want to catch // We need to use a function try-block here, because we want to catch
// exceptions that may be thrown during member initialization as well. // exceptions that may be thrown during member initialization as well.
try : try :
......
...@@ -12,7 +12,17 @@ from lofar.common.util import humanreadablesize ...@@ -12,7 +12,17 @@ from lofar.common.util import humanreadablesize
logger = logging.getLogger() logger = logging.getLogger()
import mechanize # ignore ssl errors from expired/selfsigned mom ssl certificate
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
try:
import mechanize
except ImportError as e:
print e
print "please install python 'mechanize' package: sudo pip install mechanize"
print
exit(1)
class MoMClient: class MoMClient:
......
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
#include <Common/LofarLocators.h> #include <Common/LofarLocators.h>
#include <Messaging/Message.h> #include <Messaging/Message.h>
#include <Messaging/FromBus.h> #include <Messaging/FromBus.h>
#include <Messaging/DefaultSettings.h>
int main() int main()
{ {
...@@ -46,7 +47,7 @@ int main() ...@@ -46,7 +47,7 @@ int main()
return 1; return 1;
} }
LOFAR::Messaging::FromBus fromBus{queueName, "{create: always, delete: always}"}; LOFAR::Messaging::FromBus fromBus{queueName, LOFAR::Messaging:defaultBroker, "{create: always, delete: always}"};
while (1) while (1)
{ {
...@@ -69,4 +70,4 @@ int main() ...@@ -69,4 +70,4 @@ int main()
} }
return 0; return 0;
} }
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment