Skip to content
Snippets Groups Projects
Commit 78b6df18 authored by Jan David Mol's avatar Jan David Mol
Browse files

Send feedback over RabbitMQ. Use 'now' as a default timestamp

parent 512cd1eb
Branches
No related tags found
1 merge request!39Resolve TMSS-826 "Add rabbitmq messages"
......@@ -23,6 +23,7 @@
#include "RabbitMQ.h"
#include <Common/LofarLogger.h>
#include <CoInterface/Exceptions.h>
#include <CoInterface/TimeFuncs.h>
#ifdef HAVE_RABBITMQC
#include <rabbitmq-c/amqp.h>
......@@ -37,6 +38,12 @@ using boost::format;
namespace LOFAR {
namespace Cobalt {
// Return a string representing the current time for use in RabbitMQ messages
static std::string now() {
const double now(TimeSpec::toDouble(TimeSpec::now()));
return TimeDouble::toString(now, false);
}
#ifdef HAVE_RABBITMQC
static void throw_on_amqp_error(amqp_rpc_reply_t reply, const std::string &context) {
// See also https://github.com/alanxz/rabbitmq-c/blob/master/examples/util.c
......@@ -137,16 +144,22 @@ namespace LOFAR {
}
void RabbitMQ::sendEventObservationStarted(int obsid, const std::string &startTime) const {
const std::string message = str(format("{ 'subtask_id': %d, 'start_time': '%s' }") % obsid % startTime);
const std::string message = str(format("{ 'subtask_id': %d, 'start_time': '%s' }") % obsid % (startTime == "" ? now() : startTime));
send_message("lofar", "COBALT.Event.ObservationStarted", message);
}
void RabbitMQ::sendEventObservationFinishing(int obsid, const std::string &stopTime) const {
const std::string message = str(format("{ 'subtask_id': %d, 'stop_time': '%s' }") % obsid % stopTime);
const std::string message = str(format("{ 'subtask_id': %d, 'stop_time': '%s' }") % obsid % (stopTime == "" ? now() : stopTime));
send_message("lofar", "COBALT.Event.ObservationFinishing", message);
}
void RabbitMQ::sendFeedback(int obsid, const std::string &feedback) const {
const std::string message = str(format("{ 'subtask_id': %d, 'feedback': '%s' }") % obsid % feedback);
send_message("lofar", "COBALT.Event.Feedback", message);
}
#endif
}
}
......@@ -40,8 +40,10 @@ namespace LOFAR {
{
}
void sendEventObservationStarted(int obsid, const std::string &startTime) const;
void sendEventObservationFinishing(int obsid, const std::string &stopTime) const;
void sendEventObservationStarted(int obsid, const std::string &startTime = "") const;
void sendEventObservationFinishing(int obsid, const std::string &stopTime = "") const;
void sendFeedback(int obsid, const std::string &feedback) const;
protected:
const Credentials credentials;
......
......@@ -96,9 +96,6 @@ const time_t defaultOutputProcTimeout = 60;
// has passed.
const time_t defaultRtcpTimeout = 5 * 60;
// Credentials to contact RabbitMQ
RabbitMQ::Credentials RabbitMQ_credentials;
static void usage(const char *argv0)
{
cout << "RTCP: Real-Time Central Processing of the LOFAR radio telescope." << endl;
......@@ -122,18 +119,6 @@ void ignore_sigpipe()
THROW_SYSCALL("sigaction(SIGPIPE, <SIG_IGN>)");
}
void set_state_finishing()
{
#ifdef HAVE_RABBITMQC
LOG_INFO("Sending FINISHING state update to RabbitMQ bus");
RabbitMQ rmq(RabbitMQ_credentials);
double now(TimeSpec::toDouble(TimeSpec::now()));
rmq.sendEventObservationFinishing(
ps.settings.observationID,
TimeDouble::toString(now, false));
#endif
}
string GPUProc_PVSSPrefix(const Parset &ps, int cpuNr)
......@@ -166,6 +151,9 @@ int main(int argc, char **argv)
* Parse command-line options
*/
// Credentials to contact RabbitMQ
RabbitMQ::Credentials RabbitMQ_credentials;
int opt;
while ((opt = getopt(argc, argv, "phR:")) != -1) {
stringstream ss(optarg ? optarg : "");
......@@ -600,7 +588,10 @@ int main(int argc, char **argv)
*/
LOG_INFO("===== FINALISE =====");
set_state_finishing();
#ifdef HAVE_RABBITMQC
LOG_INFO("Sending FINISHING state update to RabbitMQ bus");
RabbitMQ(RabbitMQ_credentials).sendEventObservationFinishing(ps.settings.observationID);
#endif
if (storageProcesses) {
LOG_INFO("----- Processing final metadata (broken antenna information)");
......@@ -617,10 +608,15 @@ int main(int argc, char **argv)
storageProcesses->stop(time(0) + outputProcTimeout);
// send processing feedback
ToBus bus("otdb.task.feedback.processing", broker_feedback());
LTAFeedback fb(ps.settings);
const ParameterSet feedback = fb.processingFeedback(nr_blocks);
std::string feedback_str;
feedback.writeBuffer(feedback_str);
// Post on QPID
ToBus bus("otdb.task.feedback.processing", broker_feedback());
Protocols::TaskFeedbackProcessing msg(
"Cobalt/GPUProc/rtcp",
"",
......@@ -631,6 +627,13 @@ int main(int argc, char **argv)
bus.send(msg);
#ifdef HAVE_RABBITMQC
// Post on RabbitMQ
LOG_INFO("Sending feedback to RabbitMQ bus");
RabbitMQ(RabbitMQ_credentials).sendFeedback(ps.settings.observationID, feedback_str);
#endif
// final cleanup
storageProcesses = NULL;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment