diff --git a/RTCP/IONProc/src/ION_main.cc b/RTCP/IONProc/src/ION_main.cc index 7cc2e5e925d9a97ffe83fab8348a126d7c6bfe8a..e65951af5a2c839dab12b826238303792a9aa23e 100644 --- a/RTCP/IONProc/src/ION_main.cc +++ b/RTCP/IONProc/src/ION_main.cc @@ -277,12 +277,18 @@ template <typename SAMPLE_TYPE> class Job : public JobParent void createCNstreams(); void configureCNs(), unconfigureCNs(); + void jobThread(); + static void *jobThreadStub(void *); + void toCNthread(); static void *toCNthreadStub(void *); void fromCNthread(); static void *fromCNthreadStub(void *); + void allocateResources(); + void deallocateResources(); + void attachToInputSection(); void detachFromInputSection(); @@ -292,7 +298,7 @@ template <typename SAMPLE_TYPE> class Job : public JobParent const Parset *itsParset; std::vector<Stream *> itsCNstreams; unsigned itsNrRuns; - pthread_t itsToCNthreadID, itsFromCNthreadID; + pthread_t itsJobID, itsToCNthreadID, itsFromCNthreadID; bool itsHasInputSection, itsHasOutputSection; Semaphore itsToCNthreadAcquiredCNaccess; unsigned itsObservationID; @@ -321,20 +327,7 @@ template <typename SAMPLE_TYPE> Job<SAMPLE_TYPE>::Job(const Parset *parset) itsHasOutputSection = parset->outputPsetIndex(myPsetNumber) >= 0; if (itsHasInputSection || itsHasOutputSection) { - createCNstreams(); - - itsNrRuns = static_cast<unsigned>(ceil((parset->stopTime() - parset->startTime()) / parset->CNintegrationTime())); - LOG_DEBUG_STR("itsNrRuns = " << itsNrRuns); - - if (itsHasInputSection) - attachToInputSection(); - - if (pthread_create(&itsToCNthreadID, 0, toCNthreadStub, static_cast<void *>(this)) != 0) { - perror("pthread_create"); - exit(1); - } - - if (itsHasOutputSection && pthread_create(&itsFromCNthreadID, 0, fromCNthreadStub, static_cast<void *>(this)) != 0) { + if (pthread_create(&itsJobID, 0, jobThreadStub, static_cast<void *>(this)) != 0) { perror("pthread_create"); exit(1); } @@ -345,27 +338,98 @@ template <typename SAMPLE_TYPE> Job<SAMPLE_TYPE>::Job(const Parset *parset) template <typename SAMPLE_TYPE> Job<SAMPLE_TYPE>::~Job() { if (itsHasInputSection || itsHasOutputSection) { - if (pthread_join(itsToCNthreadID, 0) != 0) { + if (pthread_join(itsJobID, 0) != 0) { perror("pthread join"); exit(1); } + } - LOG_DEBUG("lofar__fini: to_CN section joined"); + delete itsParset; // FIXME: not here +} - if (itsHasInputSection) - detachFromInputSection(); - if (itsHasOutputSection) { - if (pthread_join(itsFromCNthreadID, 0) != 0) { - perror("pthread join"); - exit(1); - } +template <typename SAMPLE_TYPE> void Job<SAMPLE_TYPE>::allocateResources() +{ + createCNstreams(); + + itsNrRuns = static_cast<unsigned>(ceil((itsParset->stopTime() - itsParset->startTime()) / itsParset->CNintegrationTime())); + LOG_DEBUG_STR("itsNrRuns = " << itsNrRuns); + + if (itsHasInputSection) + attachToInputSection(); + + if (pthread_create(&itsToCNthreadID, 0, toCNthreadStub, static_cast<void *>(this)) != 0) { + perror("pthread_create"); + exit(1); + } + + if (itsHasOutputSection && pthread_create(&itsFromCNthreadID, 0, fromCNthreadStub, static_cast<void *>(this)) != 0) { + perror("pthread_create"); + exit(1); + } +} + + +template <typename SAMPLE_TYPE> void Job<SAMPLE_TYPE>::deallocateResources() +{ + if (pthread_join(itsToCNthreadID, 0) != 0) { + perror("pthread join"); + exit(1); + } + + LOG_DEBUG("lofar__fini: to_CN section joined"); + + if (itsHasInputSection) + detachFromInputSection(); - LOG_DEBUG("lofar__fini: output section joined"); + if (itsHasOutputSection) { + if (pthread_join(itsFromCNthreadID, 0) != 0) { + perror("pthread join"); + exit(1); } + + LOG_DEBUG("lofar__fini: output section joined"); } +} - delete itsParset; // FIXME: not here + +template <typename SAMPLE_TYPE> void Job<SAMPLE_TYPE>::jobThread() +{ + if (itsParset->realTime()) { + // claim resources two seconds before observation start + WallClockTime wallClock; + TimeStamp closeToStart(static_cast<int64>((itsParset->startTime() - 2) * itsParset->sampleRate())); + + wallClock.waitUntil(closeToStart); + } + + LOG_DEBUG("claiming resources for observation " << itsObservationID); + allocateResources(); + + // do observation + + deallocateResources(); + LOG_DEBUG("resources of job " << itsObservationID << " deallocated"); +} + + +template <typename SAMPLE_TYPE> void *Job<SAMPLE_TYPE>::jobThreadStub(void *job) +{ +#if defined CATCH_EXCEPTIONS + try { +#endif + static_cast<Job<SAMPLE_TYPE> *>(job)->jobThread(); +#if defined CATCH_EXCEPTIONS + } catch (Exception &ex) { + LOG_FATAL_STR("to_CN section caught Exception: " << ex); + } catch (std::exception &ex) { + LOG_FATAL_STR("to_CN section caught std::exception: " << ex.what()); + } catch (...) { + LOG_FATAL("to_CN section caught non-std::exception: "); + } +#endif + + return 0; }