Skip to content
Snippets Groups Projects
Commit cb4302f6 authored by Ruud Overeem's avatar Ruud Overeem
Browse files

Task #7342: Far from complete but it works.

parent 6b4d1541
No related branches found
No related tags found
No related merge requests found
......@@ -2423,6 +2423,9 @@ LCS/MSLofar/test/tBeamTables.in_during_filled -text
LCS/MSLofar/test/tBeamTables.in_hd/CS001-iHBADeltas.conf -text
LCS/MSLofar/test/tBeamTables.in_hd/DE601-iHBADeltas.conf -text
LCS/MSLofar/test/tBeamTables.in_hd/RS106-iHBADeltas.conf -text
LCS/MessageBus/src/LofarMsgTemplate.txt -text
LCS/MessageBus/src/Message.cc -text
LCS/MessageBus/test/tMessage.cc -text
LCS/MessageBus/test/tMsgBus.cc -text
LCS/Stream/include/Stream/SocketStream.tcc -text
LCS/Tools/src/checkcomp.py -text
......
......@@ -32,14 +32,15 @@
namespace LOFAR {
class Message
{
// Name of the system sending these messages
static const std::string system = "LOFAR";
// Name of the system sending these messages
static const std::string system = "LOFAR";
// Version of the header we write
static const std::string headerVersion = "1.0.0";
// Version of the header we write
static const std::string headerVersion = "1.0.0";
class Message
{
public:
// Construct a message
Message(
// Name of the service or process producing this message
......@@ -59,36 +60,37 @@ class Message
);
// Parse a message
Message(const qpid::message::Message &qpidMsg);
Message(const qpid::messaging::Message &qpidMsg);
// Read a message from disk (header + payload)
Message(const std::string &rawContent);
// Set the payload, supporting various types
void setXMLPayload(const std::string &payload);
void setTXTPayload(const std::string &payload);
void setMapPayload(const qpid::Variant::Map &payload);
void setListPayload(const qpid::Variant::List &payload);
void setXMLPayload (const std::string &payload);
void setTXTPayload (const std::string &payload);
void setMapPayload (const qpid::types::Variant::Map &payload);
void setListPayload(const qpid::types::Variant::List &payload);
virtual ~Message();
// Return properties of the constructed or received message
std::string getSystem() const;
std::string getHeaderVersion() const;
std::string getPayload() const;
std::string getFrom() const;
std::string getForUser() const;
std::string getSummary() const;
std::string getToService() const;
std::string getToVersion() const;
std::string system() const;
std::string headerVersion() const;
std::string payload() const;
std::string from() const;
std::string forUser() const;
std::string summary() const;
std::string toService() const;
std::string toVersion() const;
// Construct the given fields as a QPID message
qpid::messaging::Message getQpidMsg() const;
qpid::messaging::Message getQpidMsg() const { return (itsQpidMsg); }
// Return the raw message (header + payload)
std::string getRawContent() const;
std::string getRawContent() const { return (itsQpidMsg.getContent()); }
private:
// datamembers
qpid::messaging::Message itsQpidMsg;
};
......
......@@ -4,7 +4,8 @@ include(LofarPackageVersion)
set(messagebus_LIB_SRCS
Package__Version.cc
MsgBus.cc)
MsgBus.cc
Message.cc)
set(messagebus_PROGRAMS
)
......
<?xml version="1.0" encoding="UTF-8"?>
<message>
<header>
<system>LOFAR</system>
<version>1.0.0</version>
<service>
<name>%s</name>
<version>%s</version>
</service>
<request>
<source>%s</source>
<user>%s</user>
<uuid>%s</uuid>
<timestamp>%s</timestamp>
<summary>%s</summary>
</request>
</header>
<payload>
%s
</payload>
</message>
//# Message.cc: one_line_description
//#
//# Copyright (C) 2002-2004
//# ASTRON (Netherlands Foundation for Research in Astronomy)
//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
//#
//# This program is free software; you can redistribute it and/or modify
//# it under the terms of the GNU General Public License as published by
//# the Free Software Foundation; either version 2 of the License, or
//# (at your option) any later version.
//#
//# This program is distributed in the hope that it will be useful,
//# but WITHOUT ANY WARRANTY; without even the implied warranty of
//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//# GNU General Public License for more details.
//#
//# You should have received a copy of the GNU General Public License
//# along with this program; if not, write to the Free Software
//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//#
//# $Id: $
//# Always #include <lofar_config.h> first!
#include <lofar_config.h>
//# Includes
#include <Common/LofarLogger.h>
#include <Common/lofar_string.h>
#include <MessageBus/Message.h>
namespace LOFAR {
const string LOFAR_MSG_TEMPLATE = "\
<message>\n\
<header>\n\
<system>LOFAR</system>\n\
<version>1.0.0</version>\n\
<service>\n\
<name>%s</name>\n\
<version>%s</version>\n\
</service>\n\
<request>\n\
<source>%s</source>\n\
<user>%s</user>\n\
<uuid>%s</uuid>\n\
<timestamp>%s</timestamp>\n\
<summary>%s</summary>\n\
</request>\n\
</header>\n\
<payload>\n\
%s\n\
</payload>\n\
</message>";
Message::Message(const std::string &from,
const std::string &forUser,
const std::string &summary,
const std::string &toService,
const std::string &toVersion)
{
itsQpidMsg.setContent(formatString(LOFAR_MSG_TEMPLATE.c_str(), toService.c_str(), toVersion.c_str(),
from.c_str(), forUser.c_str(), "", "", summary.c_str(), "%s"));
cout << itsQpidMsg.getContent() << endl;
}
Message::Message(const qpid::messaging::Message &qpidMsg)
{
itsQpidMsg.setContent(formatString(LOFAR_MSG_TEMPLATE.c_str(), "", "",
"", "", "", "", "", qpidMsg.getContent().c_str()));
cout << itsQpidMsg.getContent() << endl;
}
// Read a message from disk (header + payload)
Message::Message(const std::string &rawContent)
{
itsQpidMsg.setContent(rawContent);
cout << itsQpidMsg.getContent() << endl;
}
Message::~Message()
{}
void Message::setXMLPayload (const std::string &payload)
{
}
void Message::setTXTPayload (const std::string &payload)
{
itsQpidMsg.setContent(formatString(itsQpidMsg.getContent().c_str(), payload.c_str()));
cout << itsQpidMsg.getContent() << endl;
}
void Message::setMapPayload (const qpid::types::Variant::Map &payload)
{
}
void Message::setListPayload(const qpid::types::Variant::List &payload)
{
}
// Return properties of the constructed or received message
std::string Message::system() const
{
return "???";
}
std::string Message::headerVersion() const
{
return "???";
}
std::string Message::payload() const
{
return "???";
}
std::string Message::from() const
{
return "???";
}
std::string Message::forUser() const
{
return "???";
}
std::string Message::summary() const
{
return "???";
}
std::string Message::toService() const
{
return "???";
}
std::string Message::toVersion() const
{
return "???";
}
} // namespace LOFAR
......@@ -3,3 +3,4 @@
include(LofarCTest)
lofar_add_test(tMsgBus tMsgBus.cc)
lofar_add_test(tMessage tMessage.cc)
//# Copyright (C) 2015
//# ASTRON (Netherlands Foundation for Research in Astronomy)
//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
//#
//# This program is free software; you can redistribute it and/or modify
//# it under the terms of the GNU General Public License as published by
//# the Free Software Foundation; either version 2 of the License, or
//# (at your option) any later version.
//#
//# This program is distributed in the hope that it will be useful,
//# but WITHOUT ANY WARRANTY; without even the implied warranty of
//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//# GNU General Public License for more details.
//#
//# You should have received a copy of the GNU General Public License
//# along with this program; if not, write to the Free Software
//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//#
//# $Id: $
//# Always #include <lofar_config.h> first!
#include <lofar_config.h>
//# Includes
#include <Common/LofarLogger.h>
#include <MessageBus/Message.h>
//using namespace qpid::messaging;
using namespace LOFAR;
#if 0
void showMessage(Message& msg)
{
cout << "Message ID : " << msg.getMessageId() << endl;
cout << "User ID : " << msg.getUserId() << endl;
cout << "Correlation ID: " << msg.getCorrelationId() << endl;
cout << "Subject : " << msg.getSubject() << endl;
cout << "Reply to : " << msg.getReplyTo() << endl;
cout << "Content type : " << msg.getContentType() << endl;
cout << "Priority : " << msg.getPriority() << endl;
// cout << "TTL : " << msg.getTtl() << endl;
cout << "Durable : " << (msg.getDurable() ? "Yes" : "No") << endl;
cout << "Redelivered : " << (msg.getRedelivered() ? "Yes" : "No") << endl;
cout << "Properties : " << msg.getProperties() << endl;
cout << "Content size : " << msg.getContentSize() << endl;
cout << "Content : " << msg.getContent() << endl;
}
void compareMessages(Message& lhm, Message& rhm)
{
ASSERTSTR(lhm.getMessageId() == rhm.getMessageId(), "messageIDs differ");
ASSERTSTR(lhm.getUserId() == rhm.getUserId(), "UserIDs differ");
ASSERTSTR(lhm.getCorrelationId() == rhm.getCorrelationId(), "CorrelationIDs differ");
ASSERTSTR(lhm.getSubject() == rhm.getSubject(), "Subjects differ");
ASSERTSTR(lhm.getReplyTo() == rhm.getReplyTo(), "ReplyTos differ");
ASSERTSTR(lhm.getContentType() == rhm.getContentType(), "ContentTypes differ");
ASSERTSTR(lhm.getPriority() == rhm.getPriority(), "Priorities differ");
ASSERTSTR(lhm.getTtl() == rhm.getTtl(), "TTLs differ");
ASSERTSTR(lhm.getDurable() == rhm.getDurable(), "Durability differs");
ASSERTSTR(lhm.getRedelivered() == rhm.getRedelivered(), "Redelivered differs");
// ASSERTSTR(lhm.getProperties() == rhm.getProperties(), "Properties differ");
ASSERTSTR(lhm.getContentSize() == rhm.getContentSize(), "ContentSize differs");
ASSERTSTR(lhm.getContent() == rhm.getContent(), "Content differs");
}
#endif
int main(int argc, char* argv[]) {
if (argc != 2) {
cout << "Syntax: " << argv[0] << " messagebus" << endl;
return (1);
}
Message msg1("mySubSystem", "user", "some test message", "lofar.observation.start", "1.0");
qpid::messaging::Message qpMsg("Qpid message");
Message msg2(qpMsg);
string KVmapje("abc=[aap,noot,mies]\nmyInteger=5\nmyDouble=3.14");
msg1.setTXTPayload(KVmapje);
return (0);
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment