diff --git a/RTCP/IONProc/src/Job.cc b/RTCP/IONProc/src/Job.cc index d3a9f55270836b7b257c03dc373a2c0bea059c8f..1a47492f1edb85e712c765753d3b716750e3827e 100644 --- a/RTCP/IONProc/src/Job.cc +++ b/RTCP/IONProc/src/Job.cc @@ -785,6 +785,14 @@ template <typename SAMPLE_TYPE> void Job::doObservation() bool Job::checkParset() const { + // any error detected by the python environment, invalidating this parset + string pythonParsetError = itsParset.getString("OLAP.IONProc.parsetError",""); + + if (pythonParsetError != "" ) { + LOG_ERROR_STR(itsLogPrefix << "Early detected parset error: " << pythonParsetError ); + return false; + } + if (itsParset.nrCoresPerPset() > nrCNcoresInPset) { LOG_ERROR_STR(itsLogPrefix << "nrCoresPerPset (" << itsParset.nrCoresPerPset() << ") cannot exceed " << nrCNcoresInPset); return false; @@ -806,7 +814,7 @@ bool Job::define() { LOG_DEBUG_STR( itsLogPrefix << "Job: define(): check parset" ); - return true; + return checkParset(); } @@ -831,14 +839,21 @@ bool Job::run() bool Job::pause( const double &when ) { - LOG_DEBUG_STR( itsLogPrefix << "Job: pause(): pause observation at time " << static_cast<unsigned>(when) ); + char buf[26]; + time_t whenRounded = static_cast<time_t>(when); + + ctime_r(&whenRounded, buf); + buf[24] = '\0'; + + LOG_DEBUG_STR( itsLogPrefix << "Job: pause(): pause observation at " << buf ); // make sure we don't interfere with queue dynamics ScopedLock scopedLock(jobQueue.itsMutex); - if (when == 0 || when <= itsParset.startTime()) { // yes we can compare a double with 0 + if (when == 0 || when <= itsParset.startTime()) { // yes we can compare a double to 0 // make sure we also stop waiting for the job to start - cancel(); + if (!itsDoCancel) + cancel(); } else { itsRequestedStopTime = when; } @@ -851,6 +866,14 @@ bool Job::quit() { LOG_DEBUG_STR( itsLogPrefix << "Job: quit(): end observation" ); + // stop now + + if (!itsDoCancel) { + ScopedLock scopedLock(jobQueue.itsMutex); + + cancel(); + } + return true; } diff --git a/RTCP/IONProc/src/PLCClient.cc b/RTCP/IONProc/src/PLCClient.cc index 3153538961e58b8a091ab4f5b37318617428ac0d..8f9ce121379a8062026c801f497c65928a34a677 100644 --- a/RTCP/IONProc/src/PLCClient.cc +++ b/RTCP/IONProc/src/PLCClient.cc @@ -146,7 +146,7 @@ private: uint8 name_length; } __attribute__((packed)) header; - #define MAX_UINT8 (2 << 8 - 1) + #define MAX_UINT8 ((1 << 8) - 1) static const size_t maxNameLength = MAX_UINT8; char name[maxNameLength + 1]; // we zero-terminate, but Blob doesn't need it as the length is communicated in the header @@ -263,6 +263,8 @@ PLCClient::PLCClient( Stream &s, PLCRunnable &job, const std::string &procID, un itsStream( s ), itsJob( job ), itsProcID( procID ), + itsStartTime( time(0L) ), + itsDefineCalled( false ), itsDone( false ), itsLogPrefix( str(format("[PLC] [obs %u] ") % observationID) ), itsThread( 0 ) @@ -272,10 +274,38 @@ PLCClient::PLCClient( Stream &s, PLCRunnable &job, const std::string &procID, un PLCClient::~PLCClient() { - itsDone = true; + // wait until ApplController called define(), so that invalid parsets are reported + // as such before the connection is terminated + struct timespec disconnectAt = { itsStartTime + defineWaitTimeout, 0 }; - itsThread->abort(); + { + ScopedLock lock(itsMutex); + + while (!itsDefineCalled && !itsDone) { + LOG_DEBUG_STR( itsLogPrefix << "Waiting for ApplController to call define()" ); + + if (!itsCondition.wait(itsMutex, disconnectAt)) { + // timeout + LOG_WARN_STR( itsLogPrefix << "ApplController did not ask whether parset was ok (define())" ); + break; + } + } + + itsDone = true; + } + + if (itsThread) { + // thread might have been deleted in waitForDone() + + itsThread->abort(); + delete itsThread; + } +} + +void PLCClient::waitForDone() +{ delete itsThread; + itsThread = 0; } void PLCClient::sendCmd( PCCmd cmd, const string &options ) @@ -325,9 +355,19 @@ void PLCClient::mainLoop() { // make sure we set itsDone in case of exceptions struct D { - ~D() { done = true; } + ~D() { + ScopedLock lock(mutex); + + done = true; + + // signal our destructor that we're done + condition.broadcast(); + } + bool &done; - } onDestruct = { itsDone }; + Mutex &mutex; + Condition &condition; + } onDestruct = { itsDone, itsMutex, itsCondition }; (void)onDestruct; // register: send BOOT command @@ -361,6 +401,8 @@ void PLCClient::mainLoop() { LOG_DEBUG_STR( itsLogPrefix << "define()" ); result = itsJob.define(); + + itsDefineCalled = true; break; case PCCmdInit: @@ -409,9 +451,7 @@ void PLCClient::mainLoop() { break; case PCCmdRelease: - LOG_DEBUG_STR( itsLogPrefix << "release()" ); - - supported = false; + LOG_DEBUG_STR( itsLogPrefix << "release() -- silent ignore" ); break; case PCCmdQuit: @@ -424,19 +464,19 @@ void PLCClient::mainLoop() { break; case PCCmdSnapshot: - LOG_DEBUG_STR( itsLogPrefix << "snapshot( " << options << " )" ); + LOG_ERROR_STR( itsLogPrefix << "snapshot( " << options << " ) -- not supported" ); supported = false; break; case PCCmdRecover: - LOG_DEBUG_STR( itsLogPrefix << "recover( " << options << " )" ); + LOG_ERROR_STR( itsLogPrefix << "recover( " << options << " ) -- not supported" ); supported = false; break; case PCCmdReinit: - LOG_DEBUG_STR( itsLogPrefix << "reinit( " << options << " )" ); + LOG_ERROR_STR( itsLogPrefix << "reinit( " << options << " ) -- not supported" ); supported = false; break; @@ -449,7 +489,7 @@ void PLCClient::mainLoop() { pause mode, but this code is only triggered when commands are processed */ if (running && pausing && !itsJob.observationRunning()) { // we paused -- ack the pause command that triggered it - LOG_DEBUG_STR( itsLogPrefix << "sending ack for pause()" ); + LOG_DEBUG_STR( itsLogPrefix << "Sending ack for pause()" ); sendResult( PCCmdPause, "", PCCmdMaskOk ); running = false; pausing = false; diff --git a/RTCP/IONProc/src/PLCClient.h b/RTCP/IONProc/src/PLCClient.h index c5dc53651306510a5368655bde2a8ba9f306c497..90ebb43a4154d2e4a637e0cc9061248c87ef1c83 100644 --- a/RTCP/IONProc/src/PLCClient.h +++ b/RTCP/IONProc/src/PLCClient.h @@ -28,8 +28,11 @@ //# Includes #include <Stream/Stream.h> #include <Thread/Thread.h> +#include <Thread/Condition.h> +#include <Thread/Mutex.h> #include <Common/LofarTypes.h> #include <string> +#include <time.h> namespace LOFAR { namespace RTCP { @@ -83,19 +86,22 @@ public: return itsDone; } - void waitForDone() { - delete itsThread; - itsThread = 0; - } + void waitForDone(); private: Stream &itsStream; PLCRunnable &itsJob; const std::string itsProcID; + time_t itsStartTime; + bool itsDefineCalled; bool itsDone; const std::string itsLogPrefix; + Mutex itsMutex; + Condition itsCondition; InterruptibleThread *itsThread; + static const unsigned defineWaitTimeout = 300; // #seconds for ApplController to call define() before we disconnect + void mainLoop(); protected: