From 8fe8c0c3c6e27b7e27fee31fdfbcf85c6e13d438 Mon Sep 17 00:00:00 2001 From: Jan David Mol <mol@astron.nl> Date: Tue, 28 Feb 2012 20:26:55 +0000 Subject: [PATCH] Task #3049: Merged release-0.91 branch into trunk --- .../include/ApplCommon/Observation.h | 6 +- LCS/ApplCommon/src/Observation.cc | 111 ++++++++++++------ LCS/ApplCommon/test/tObservation.cc | 20 ++-- MAC/APL/APLCommon/src/ChildControl.cc | 7 +- .../MainCU/src/MACScheduler/MACScheduler.cc | 2 +- MAC/APL/MainCU/src/MACScheduler/ObsClaimer.cc | 11 +- .../ObservationControl/ObservationControl.cc | 55 +++++++-- .../ObservationControl.conf | 12 +- .../ObservationControl/ObservationControl.h | 16 ++- MAC/APL/PAC/ITRFBeamServer/src/BeamServer.cc | 3 +- .../StationCU/src/StationControl/ActiveObs.cc | 3 +- .../StationCU/src/StationControl/ActiveObs.h | 7 +- .../src/StationControl/StationControl.cc | 4 +- MAC/GCF/TM/src/GCF_Scheduler.cc | 10 +- RTCP/CNProc/src/CN_Processing.cc | 72 ++++++------ .../IONProc/src/BeamletBufferToComputeNode.cc | 3 +- RTCP/IONProc/src/OutputSection.cc | 2 +- RTCP/Storage/src/Storage_main.cc | 2 +- 18 files changed, 220 insertions(+), 126 deletions(-) diff --git a/LCS/ApplCommon/include/ApplCommon/Observation.h b/LCS/ApplCommon/include/ApplCommon/Observation.h index aa3161f8195..dba8bb4b268 100644 --- a/LCS/ApplCommon/include/ApplCommon/Observation.h +++ b/LCS/ApplCommon/include/ApplCommon/Observation.h @@ -52,7 +52,8 @@ class Observation public: Observation(); ~Observation(); - explicit Observation (const ParameterSet* aParSet, bool hasDualHBA = false); +// explicit Observation (const ParameterSet* aParSet, bool hasDualHBA = false); + explicit Observation (const ParameterSet* aParSet, bool hasDualHBA); // global function for converting filtername to nyquist zone static uint nyquistzoneFromFilter(const string& filterName); @@ -204,6 +205,9 @@ public: string storageNodeList; private: + bool _isStationName(const string& hostname) const; + bool _hasDataSlots (const ParameterSet* aPS) const; + RCUset_t RCUset; // set with participating receivers, use getRCUbitset to get this value. // many(!) vectors for dataslot allocation diff --git a/LCS/ApplCommon/src/Observation.cc b/LCS/ApplCommon/src/Observation.cc index 99ae3fcff75..d2635ca6e95 100644 --- a/LCS/ApplCommon/src/Observation.cc +++ b/LCS/ApplCommon/src/Observation.cc @@ -113,7 +113,7 @@ Observation::Observation(const ParameterSet* aParSet, // auto select the right antennaArray when antennaSet variable is used. if (!antennaSet.empty()) { - antennaArray = antennaSet.substr(0,3); + antennaArray = antennaSet.substr(0,3); // LBA or HBA } splitterOn = ((antennaSet == "HBA_ZERO") || (antennaSet == "HBA_ONE") || (antennaSet == "HBA_DUAL")); dualMode = (antennaSet == "HBA_DUAL"); @@ -144,7 +144,7 @@ Observation::Observation(const ParameterSet* aParSet, } // determine if DataslotLists are available in this parset - itsHasDataslots = !stations.empty() && aParSet->isDefined(prefix+str(format("Dataslots.%s%s.DataslotList") % stations[0] % getAntennaFieldName(itsStnHasDualHBA))); + itsHasDataslots = _hasDataSlots(aParSet); if (itsHasDataslots) { itsDataslotParset = aParSet->makeSubset(prefix+"Dataslots."); // save subset for later } @@ -232,26 +232,28 @@ Observation::Observation(const ParameterSet* aParSet, } // finally update vector with beamnumbers - int nrSubbands = newBeam.subbands.size(); - if (!itsHasDataslots) { // old situation - BeamBeamlets = aParSet->getInt32Vector(beamPrefix+"beamletList", vector<int32>(), true); // true:expandable - int nrBeamlets = BeamBeamlets.size(); - ASSERTSTR(nrBeamlets == nrSubbands, "Number of beamlets(" << nrBeamlets << ") != nr of subbands(" << nrSubbands << ") for Beam " << beamIdx); - for (int i = 0; i < nrBeamlets; ++i) { - if (beamlet2beams[BeamBeamlets[i]] != -1) { - stringstream os; - os << "beamlet2beams : "; writeVector(os, beamlet2beams, ",", "[", "]"); os << endl; - LOG_ERROR_STR(os.str()); - THROW (Exception, "beamlet " << i << "(" << BeamBeamlets[i] << ") of beam " << beamIdx << " clashes with beamlet of other beam"); - } - beamlet2beams[BeamBeamlets[i]] = beamIdx; - } // for all beamlets - } - else { // new situation - for (int i = 0; i < nrSubbands; ++i) { // Note nrBeamlets=nrSubbands - itsBeamSlotList.push_back(beamIdx); + if (_isStationName(myHostname(false))) { + int nrSubbands = newBeam.subbands.size(); + if (!itsHasDataslots) { // old situation + BeamBeamlets = aParSet->getInt32Vector(beamPrefix+"beamletList", vector<int32>(), true); // true:expandable + int nrBeamlets = BeamBeamlets.size(); + ASSERTSTR(nrBeamlets == nrSubbands, "Number of beamlets(" << nrBeamlets << ") != nr of subbands(" << nrSubbands << ") for Beam " << beamIdx); + for (int i = 0; i < nrBeamlets; ++i) { + if (beamlet2beams[BeamBeamlets[i]] != -1) { + stringstream os; + os << "beamlet2beams : "; writeVector(os, beamlet2beams, ",", "[", "]"); os << endl; + LOG_ERROR_STR(os.str()); + THROW (Exception, "beamlet " << i << "(" << BeamBeamlets[i] << ") of beam " << beamIdx << " clashes with beamlet of other beam"); + } + beamlet2beams[BeamBeamlets[i]] = beamIdx; + } // for all beamlets } - } // itsHasDataslots + else { // new situation + for (int i = 0; i < nrSubbands; ++i) { // Note nrBeamlets=nrSubbands + itsBeamSlotList.push_back(beamIdx); + } + } // itsHasDataslots + } // on a station } // for all digital beams // loop over al analogue beams @@ -352,8 +354,8 @@ Observation::Observation(const ParameterSet* aParSet, // pset, and then proceed to fill up the I/O nodes starting from // the first pset. Each data product is treated individually. - vector<string> filenames = aParSet->getStringVector(prefix+str(format("DataProducts.Output_%s.filenames") % dataProductNames[d])); - vector<string> locations = aParSet->getStringVector(prefix+str(format("DataProducts.Output_%s.locations") % dataProductNames[d])); + vector<string> filenames = aParSet->getStringVector(prefix+str(format("DataProducts.Output_%s.filenames") % dataProductNames[d]), true); + vector<string> locations = aParSet->getStringVector(prefix+str(format("DataProducts.Output_%s.locations") % dataProductNames[d]), true); vector<unsigned> &psets = dataProductPhases[d] == 2 ? phaseTwoPsets : phaseThreePsets; unsigned numFiles = filenames.size(); @@ -488,10 +490,6 @@ vector<int> Observation::getBeamAllocation(const string& stationName) const { vector<int> b2b; - if (!itsHasDataslots) { - return (beamlet2beams); // return old mapping so it keeps working - } - // construct stationname if not given by user. string station(stationName); if (station.empty()) { @@ -501,6 +499,14 @@ vector<int> Observation::getBeamAllocation(const string& stationName) const station.erase(station.length()-1, 1); // station.pop_back(); } } + if (!_isStationName(station)) { // called on a non-station machine? + return (b2b); // return an empty vector + } + + if (!itsHasDataslots) { + return (beamlet2beams); // return old mapping so it keeps working + } + // is DSL for this station available? string fieldName = getAntennaFieldName(itsStnHasDualHBA); string dsl(str(format("%s%s.DataslotList") % station % fieldName)); @@ -542,11 +548,6 @@ vector<int> Observation::getBeamlets (uint beamIdx, const string& stationName) c uint parsetIdx = (dualMode && itsStnHasDualHBA) ? beamIdx/2 : beamIdx; string fieldName = getAntennaFieldName(itsStnHasDualHBA, beamIdx); - if (!itsHasDataslots) { - // both fields use the same beamlet mapping - return (itsDataslotParset.getInt32Vector(str(format("Beam[%d].beamletList") % parsetIdx), vector<int32>(), true)); // true:expandable - } - // construct stationname if not given by user. string station(stationName); if (station.empty()) { @@ -557,11 +558,20 @@ vector<int> Observation::getBeamlets (uint beamIdx, const string& stationName) c } } + vector<int> result; + if (!_isStationName(station)) { // called on a non-station machine? + return (result); // return an empty vector + } + + if (!itsHasDataslots) { + // both fields use the same beamlet mapping + return (itsDataslotParset.getInt32Vector(str(format("Beam[%d].beamletList") % parsetIdx), vector<int32>(), true)); // true:expandable + } + // is DSL for this station available? // both fields have their own beamlet mapping string dsl(str(format("%s%s.DataslotList") % station % fieldName)); string rbl(str(format("%s%s.RSPBoardList") % station % fieldName)); - vector<int> result; if (!itsDataslotParset.isDefined(dsl) || !itsDataslotParset.isDefined(rbl)) { return (result); } @@ -621,6 +631,41 @@ string Observation::getAnaBeamName() const return (formatString("observation[%d]anabeam", obsID)); } + +// +// _isStationName(name) +// +bool Observation::_isStationName(const string& hostname) const +{ + // allow AA999, AA999C and AA999T + if (hostname.length() != 5 && hostname.length() != 6) + return (false); + + // We make a rough guess about the vality of the hostname. + // If we want to check more secure we have to implement all allowed stationnames + return (isalpha(hostname[0]) && isalpha(hostname[1]) && + isdigit(hostname[2]) && isdigit(hostname[3]) && isdigit(hostname[4])); +} + +// +// _hasDataSlots +// +bool Observation::_hasDataSlots(const ParameterSet* aPS) const +{ + ParameterSet::const_iterator iter = aPS->begin(); + ParameterSet::const_iterator end = aPS->end(); + while (iter != end) { + string::size_type pos(iter->first.find("Dataslots.")); + // if begin found, what is after it? + if (pos != string::npos && iter->first.find("Dataslots.DataslotInfo.") == string::npos) { + return _isStationName((iter->first.substr(pos+10,5))); + } + iter++; // try next line + } + + return (false); +} + // // nyquistzoneFromFilter(filtername) // diff --git a/LCS/ApplCommon/test/tObservation.cc b/LCS/ApplCommon/test/tObservation.cc index 17c3a22ab7d..3974f809bb5 100644 --- a/LCS/ApplCommon/test/tObservation.cc +++ b/LCS/ApplCommon/test/tObservation.cc @@ -52,11 +52,11 @@ int main (int argc, char* argv[]) cout << ">>>" << endl; ParameterSet parSet2("tObservation.in_parset2"); - Observation dualObs(&parSet2); + Observation dualObs(&parSet2, false); cout << dualObs << endl; ParameterSet parSet1("tObservation.in_parset1"); - Observation obs1(&parSet1); + Observation obs1(&parSet1, false); cout << obs1 << endl; cout << "<<<" << endl; @@ -67,7 +67,7 @@ int main (int argc, char* argv[]) parSet1.add("ObsSW.Observation.Beam[1].directionType", "AZEL"); parSet1.add("ObsSW.Observation.Beam[1].subbandList", "[4,3,102]"); parSet1.add("ObsSW.Observation.Beam[1].beamletList", "[15,16,18]"); - Observation obs2(&parSet1); + Observation obs2(&parSet1, false); cout << obs2 << endl; // test storage node assignment @@ -77,39 +77,39 @@ int main (int argc, char* argv[]) parSet1.add("ObsSW.Observation.DataProducts.Output_Beamformed.enabled", "true"); parSet1.add("ObsSW.Observation.DataProducts.Output_Beamformed.filenames", "[beam0.h5,beam1.h5]"); parSet1.add("ObsSW.Observation.DataProducts.Output_Beamformed.locations", "[/,/]"); - Observation obs4(&parSet1); + Observation obs4(&parSet1, false); ASSERTSTR(obs4.streamsToStorage.size() == 2, "Each file should have its own stream to storage"); cout << ">>>" << endl; // test conflicts in clock ParameterSet conflictPS1("tObservation.in_conflict1"); - Observation conflictObs1(&conflictPS1); + Observation conflictObs1(&conflictPS1, false); ASSERTSTR(obs2.conflicts(conflictObs1), "File 1 should have had a clock conflict"); // test conflicts in receivers ParameterSet conflictPS2("tObservation.in_conflict2"); - Observation conflictObs2(&conflictPS2); + Observation conflictObs2(&conflictPS2, false); ASSERTSTR(obs2.conflicts(conflictObs2), "File 2 should have had a receiver conflict"); // test conflicts in beamlets ParameterSet conflictPS3("tObservation.in_conflict3"); - Observation conflictObs3(&conflictPS3); + Observation conflictObs3(&conflictPS3, false); ASSERTSTR(obs2.conflicts(conflictObs3), "File 3 should have had a beamlet conflict"); // test conflicts in nrSlotsPerFrame ParameterSet conflictPS4("tObservation.in_conflict4"); - Observation conflictObs4(&conflictPS4); + Observation conflictObs4(&conflictPS4, false); ASSERTSTR(obs2.conflicts(conflictObs4), "File 4 should have had a nrSlotInFrame conflict"); // everything conflict except the time ParameterSet conflictPS5("tObservation.in_conflict5"); - Observation conflictObs5(&conflictPS5); + Observation conflictObs5(&conflictPS5, false); ASSERTSTR(!obs2.conflicts(conflictObs5), "File 5 should NOT have had a conflict"); cout << "<<<" << endl; cout << "No conflict found in file 5 which is oke." << endl; // basic test on RCU bitsets - Observation obs3(&parSet1); + Observation obs3(&parSet1, false); cout << "getRCUbitset(96,48,'') = " << obs3.getRCUbitset(96,48,"") << endl; // Europe cout << "getRCUbitset(96,96,'') = " << obs3.getRCUbitset(96,96,"") << endl; // Europe cout << "getRCUbitset(96,48,LBA_XXX) = " << obs3.getRCUbitset(96,48,"LBA_XXX") << endl; // Core diff --git a/MAC/APL/APLCommon/src/ChildControl.cc b/MAC/APL/APLCommon/src/ChildControl.cc index ca39b69466d..f13c9eef585 100644 --- a/MAC/APL/APLCommon/src/ChildControl.cc +++ b/MAC/APL/APLCommon/src/ChildControl.cc @@ -1024,13 +1024,12 @@ void ChildControl::_doGarbageCollection() // 1: port == 0: inform main task about removal after retry interval expired // 2: port == -1: remove from list // This is necc. because main task may poll childcontrol for results. - if (!iter->port) { - restartTimer = true; - LOG_DEBUG_STR(time(0)<<"-"<<iter->requestTime<<">="<<itsStartupRetryInterval<<"*"<<itsMaxStartupRetries<<"?"); + if (!iter->port && (iter->requestedState != CTState::CONNECTED)) { if ((time(0)-iter->requestTime) >= int32(MAC_SCP_TIMEOUT+(itsStartupRetryInterval*itsMaxStartupRetries))) { LOG_DEBUG_STR ("Controller " << iter->cntlrName << " is still unreachable, informing main task"); _setEstablishedState(iter->cntlrName, CTState::QUITED, time(0), CT_RESULT_LOST_CONNECTION); iter->port = (GCFPortInterface*) -1; + restartTimer = true; } iter++; @@ -1178,7 +1177,6 @@ GCFEvent::TResult ChildControl::operational(GCFEvent& event, _setEstablishedState(controller->cntlrName, CTState::ANYSTATE, time(0), CT_RESULT_LOST_CONNECTION); controller->port = 0; - #if 0 // Try to restart the controller over 5 seconds // Add it to the action list. @@ -1186,7 +1184,6 @@ GCFEvent::TResult ChildControl::operational(GCFEvent& event, itsListener->cancelTimer(itsActionTimer); itsActionTimer = itsListener->setTimer(1.0); itsActionList.push_back(*controller); - #endif } diff --git a/MAC/APL/MainCU/src/MACScheduler/MACScheduler.cc b/MAC/APL/MainCU/src/MACScheduler/MACScheduler.cc index ba4c8750204..3dca4ec2b58 100644 --- a/MAC/APL/MainCU/src/MACScheduler/MACScheduler.cc +++ b/MAC/APL/MainCU/src/MACScheduler/MACScheduler.cc @@ -767,7 +767,7 @@ void MACScheduler::_updateFinishedList() GCFPValueArray finishedArr; int32 freeSpace = MAX_CONCURRENT_OBSERVATIONS - itsNrPlanned - itsNrActive; int32 idx = finishedDBlist.size() - 1; - int32 limit = idx - (MIN2(MIN2(finishedDBlist.size(), itsMaxFinished), freeSpace) - 1); + int32 limit = idx - (MIN2(MIN2(finishedDBlist.size(), itsMaxFinished), (uint32)freeSpace) - 1); while (idx >= limit) { // construct name and timings info for observation string obsName(observationName(finishedDBlist[idx].treeID())); diff --git a/MAC/APL/MainCU/src/MACScheduler/ObsClaimer.cc b/MAC/APL/MainCU/src/MACScheduler/ObsClaimer.cc index c3570a9f3c8..b453f8f926d 100644 --- a/MAC/APL/MainCU/src/MACScheduler/ObsClaimer.cc +++ b/MAC/APL/MainCU/src/MACScheduler/ObsClaimer.cc @@ -233,11 +233,9 @@ GCFEvent::TResult ObsClaimer::preparePVSS_state (GCFEvent& event, GCFPortInterfa itsCurrentObs->second->obsName.c_str())); ParameterSet obsPS(obsPSFilename); try { - Observation theObs(&obsPS); + Observation theObs(&obsPS, false); RTDBPropertySet* theObsPS = itsCurrentObs->second->propSet; -// theObsPS->setValue(PN_OBS_CLAIM_PERIOD, GCFPVInteger(itsClaimPeriod), 0.0, false); -// theObsPS->setValue(PN_OBS_PREPARE_PERIOD, GCFPVInteger(itsPreparePeriod), 0.0, false); theObsPS->setValue(PN_OBS_RUN_STATE, GCFPVString(""), 0.0, false); theObsPS->setValue(PN_OBS_START_TIME, GCFPVString (to_simple_string(from_time_t(theObs.startTime))), 0.0, false); theObsPS->setValue(PN_OBS_STOP_TIME, GCFPVString (to_simple_string(from_time_t(theObs.stopTime))), 0.0, false); @@ -246,7 +244,6 @@ GCFEvent::TResult ObsClaimer::preparePVSS_state (GCFEvent& event, GCFPortInterfa theObsPS->setValue(PN_OBS_ANTENNA_ARRAY, GCFPVString (theObs.antennaArray), 0.0, false); theObsPS->setValue(PN_OBS_RECEIVER_LIST, GCFPVString (theObs.receiverList), 0.0, false); theObsPS->setValue(PN_OBS_SAMPLE_CLOCK, GCFPVInteger(theObs.sampleClock), 0.0, false); -// theObsPS->setValue(PN_OBS_MEASUREMENT_SET, GCFPVString (theObs.MSNameMask), 0.0, false); stringstream osl; writeVector(osl, theObs.stations); theObsPS->setValue(PN_OBS_STATION_LIST, GCFPVString (osl.str()), 0.0, false); @@ -259,7 +256,6 @@ GCFEvent::TResult ObsClaimer::preparePVSS_state (GCFEvent& event, GCFPortInterfa // for the beams we have to construct dyn arrays first. GCFPValueArray subbandArr; - GCFPValueArray beamletArr; GCFPValueArray angle1Arr; GCFPValueArray angle2Arr; GCFPValueArray dirTypesArr; @@ -267,9 +263,6 @@ GCFEvent::TResult ObsClaimer::preparePVSS_state (GCFEvent& event, GCFPortInterfa stringstream os1; writeVector(os1, theObs.beams[i].subbands); subbandArr.push_back (new GCFPVString(os1.str())); - stringstream os2; - writeVector(os2, theObs.getBeamlets(i)); - beamletArr.push_back (new GCFPVString(os2.str())); angle1Arr.push_back (new GCFPVDouble(theObs.beams[i].pointings[0].angle1)); angle2Arr.push_back (new GCFPVDouble(theObs.beams[i].pointings[0].angle2)); dirTypesArr.push_back (new GCFPVString(theObs.beams[i].pointings[0].directionType)); @@ -277,7 +270,6 @@ GCFEvent::TResult ObsClaimer::preparePVSS_state (GCFEvent& event, GCFPortInterfa // Finally we can write those value to PVSS as well. theObsPS->setValue(PN_OBS_BEAMS_SUBBAND_LIST, GCFPVDynArr(LPT_DYNSTRING, subbandArr), 0.0, false); - theObsPS->setValue(PN_OBS_BEAMS_BEAMLET_LIST, GCFPVDynArr(LPT_DYNSTRING, beamletArr), 0.0, false); theObsPS->setValue(PN_OBS_BEAMS_ANGLE1, GCFPVDynArr(LPT_DYNDOUBLE, angle1Arr), 0.0, false); theObsPS->setValue(PN_OBS_BEAMS_ANGLE2, GCFPVDynArr(LPT_DYNDOUBLE, angle2Arr), 0.0, false); theObsPS->setValue(PN_OBS_BEAMS_DIRECTION_TYPE, GCFPVDynArr(LPT_DYNSTRING, dirTypesArr), 0.0, false); @@ -326,7 +318,6 @@ GCFEvent::TResult ObsClaimer::preparePVSS_state (GCFEvent& event, GCFPortInterfa // release claimed memory. for (int i = subbandArr.size()-1; i >=0; i--) { delete subbandArr[i]; - delete beamletArr[i]; delete angle1Arr[i]; delete angle2Arr[i]; delete dirTypesArr[i]; diff --git a/MAC/APL/MainCU/src/ObservationControl/ObservationControl.cc b/MAC/APL/MainCU/src/ObservationControl/ObservationControl.cc index 21ac4d867b3..e5d867a0b84 100644 --- a/MAC/APL/MainCU/src/ObservationControl/ObservationControl.cc +++ b/MAC/APL/MainCU/src/ObservationControl/ObservationControl.cc @@ -70,6 +70,7 @@ ObservationControl::ObservationControl(const string& cntlrName) : itsFullReport (false), itsChangeReport (false), itsState (CTState::NOSTATE), + itsLastReportedState(CTState::NOSTATE), itsNrControllers (0), itsBusyControllers (0), itsQuitReason (CT_RESULT_NO_ERROR), @@ -97,14 +98,18 @@ ObservationControl::ObservationControl(const string& cntlrName) : itsClaimPeriod = globalParameterSet()->getTime ("Observation.claimPeriod"); itsPreparePeriod = globalParameterSet()->getTime ("Observation.preparePeriod"); + // Values from my conf file + itsLateLimit = globalParameterSet()->getTime ("ObservationControl.lateLimit", 15); + itsFailedLimit = globalParameterSet()->getTime ("ObservationControl.failedLimit", 30); + itsHeartBeatItv = globalParameterSet()->getTime ("ObservationControl.heartbeatInterval", 10); + string reportType = globalParameterSet()->getString("ObservationControl.reportType", "Full"); + if (reportType == "Full") itsFullReport = true; + else if (reportType == "Changes") itsChangeReport = true; + // My own parameters itsTreePrefix = globalParameterSet()->getString("prefix"); itsTreeID = globalParameterSet()->getUint32("_treeID"); // !!! itsObsDPname = globalParameterSet()->getString("_DPname"); - itsHeartBeatItv = globalParameterSet()->getTime("heartbeatInterval", 10); - string reportType = globalParameterSet()->getString("reportType", "Full"); - if (reportType == "Full") itsFullReport = true; - else if (reportType == "Changes") itsChangeReport = true; // The time I have to wait for the forced quit depends on the integration time of OLAP string OLAPpos = globalParameterSet()->locateModule("OLAP"); @@ -747,12 +752,13 @@ void ObservationControl::doHeartBeatTask() #if 1 // NOTE: [15122010] Sending respons when first child reached required state. // NOTE: [15122010] WHEN nrChilds = 1 EACH TIME WE COME HERE A REPLY IS SENT!!!!! - if (itsBusyControllers == nrChilds-1) { // first reply received? + if ((itsBusyControllers == nrChilds-1) && (itsLastReportedState != itsState)) { // first reply received? CTState cts; // report that state is reached. LOG_INFO_STR("First controller reached required state " << cts.name(cts.stateAck(itsState)) << ", informing SAS although it is too early!"); sendControlResult(*itsParentPort, cts.signal(cts.stateAck(itsState)), getName(), CT_RESULT_NO_ERROR); setState(cts.stateAck(itsState)); + itsLastReportedState = itsState; } #endif @@ -795,20 +801,24 @@ void ObservationControl::_updateChildInfo(const string& name, CTState::CTstateNr { // make sure that quited (and by ChildControl already removed) controllers are updatd also. if (!name.empty() && state != CTState::NOSTATE && itsChildInfo.find(name) != itsChildInfo.end()) { - itsChildInfo[name].state = state; + itsChildInfo[name].currentState = state; CTState CTS; LOG_DEBUG_STR("_updateChildInfo: FORCING " << name << " to " << CTS.name(state)); return; } + // get latest status info vector<ChildControl::StateInfo> childs = itsChildControl->getChildInfo(name, 0, CNTLRTYPE_NO_TYPE); int nrChilds = childs.size(); for (int i = 0; i < nrChilds; i++) { if (itsChildInfo.find(childs[i].name) == itsChildInfo.end()) { // not in map already? - itsChildInfo[childs[i].name] = ChildProc(childs[i].cntlrType, childs[i].currentState); + itsChildInfo[childs[i].name] = + ChildProc(childs[i].cntlrType, childs[i].currentState, childs[i].requestedState, childs[i].requestTime); } else { - itsChildInfo[childs[i].name].state = (state != CTState::NOSTATE) ? state : childs[i].currentState; + itsChildInfo[childs[i].name].currentState = (state != CTState::NOSTATE) ? state : childs[i].currentState; + itsChildInfo[childs[i].name].requestedState = childs[i].requestedState; + itsChildInfo[childs[i].name].requestTime = childs[i].requestTime; } } } @@ -825,15 +835,36 @@ void ObservationControl::_showChildInfo() CTState CTS; map<string, ChildProc>::iterator iter = itsChildInfo.begin(); map<string, ChildProc>::iterator end = itsChildInfo.end(); + time_t now(time(0)); while (iter != end) { - if (itsFullReport || (itsChangeReport && iter->second.state != iter->second.reportedState)) { - LOG_INFO(formatString("%-35.35s: %-10.10s", iter->first.c_str(), CTS.name(iter->second.state).c_str())); - iter->second.reportedState = iter->second.state; + ChildProc* cp = &(iter->second); + LOG_DEBUG_STR(iter->first<<":cur="<<cp->currentState<<",req="<<cp->requestedState<<",rep=" + <<cp->reportedState<<",late="<<now-cp->requestTime); + // always OR child not in requested state OR current state not yet reported + if (itsFullReport || cp->currentState != cp->requestedState || cp->reportedState != cp->currentState) { + string stateName = CTS.name(cp->currentState); + + // Selection of loglineType is based on 'late', while starting up correct late for scp timeout + // to prevent fault 'not responding' messages. + int32 late = now - cp->requestTime - (cp->requestedState <= CTState::CLAIMED ? MAC_SCP_TIMEOUT : 0); + if (late > itsLateLimit && late < itsFailedLimit && cp->currentState < cp->requestedState) { + LOG_WARN(formatString("%-35.35s: IS LATE, state= %-10.10s", iter->first.c_str(), stateName.c_str())); + } + else if (late > itsFailedLimit && cp->reportedState != cp->requestedState && cp->currentState < cp->requestedState) { + LOG_FATAL(formatString("%-35.35s: IS NOT RESPONDING, state= %-10.10s", iter->first.c_str(), stateName.c_str())); + cp->reportedState = cp->currentState; + } + else { + LOG_INFO(formatString("%-35.35s: %-10.10s", iter->first.c_str(), stateName.c_str())); + if (cp->currentState == cp->requestedState) { + cp->reportedState = cp->currentState; + } + } } iter++; } } - + // // _connectedHandler(port) // diff --git a/MAC/APL/MainCU/src/ObservationControl/ObservationControl.conf b/MAC/APL/MainCU/src/ObservationControl/ObservationControl.conf index b3b993358bb..69f735d982f 100644 --- a/MAC/APL/MainCU/src/ObservationControl/ObservationControl.conf +++ b/MAC/APL/MainCU/src/ObservationControl/ObservationControl.conf @@ -1,13 +1,19 @@ # # Manage amount of logging # -# Full: report state of each childController every heartbeat +# Full: report state of each childController every heartbeat [default] # Changes : report state-changes of the childControllers only # -#reportType = Changes +#ObservationControl.reportType = Changes # # Intervaltime how often the ObservationController should report the current state # and checks its quality measures. # -heartbeatInterval = 10s +ObservationControl.heartbeatInterval = 10s + +# +# Timeout values to control when a state-change is late or is treated as a failure. +# +ObservationControl.lateLimit = 15s +ObservationControl.FailedLimit = 30s diff --git a/MAC/APL/MainCU/src/ObservationControl/ObservationControl.h b/MAC/APL/MainCU/src/ObservationControl/ObservationControl.h index d22fbdcdf73..2acf4d7417e 100644 --- a/MAC/APL/MainCU/src/ObservationControl/ObservationControl.h +++ b/MAC/APL/MainCU/src/ObservationControl/ObservationControl.h @@ -127,17 +127,23 @@ private: // State administration. Note: administration is done by ChildControl, to simplify reports // about the states we keep a copy of it. typedef struct ChildProc { - ChildProc() : type(APLCommon::CNTLRTYPE_NO_TYPE), state(CTState::NOSTATE) {}; - ChildProc(int aType, CTState::CTstateNr aState) : type(aType), state(aState) {}; + ChildProc() : type(APLCommon::CNTLRTYPE_NO_TYPE), requestTime(0), requestedState(CTState::NOSTATE), + currentState(CTState::NOSTATE), reportedState(CTState::NOSTATE) {}; + ChildProc(int aType, CTState::CTstateNr curState, CTState::CTstateNr reqState, time_t reqTime) : + type(aType), requestTime(reqTime), requestedState(reqState), + currentState(curState), reportedState(CTState::NOSTATE) {}; uint16 type; - CTState::CTstateNr state; - CTState::CTstateNr reportedState; + time_t requestTime; // time last statechange was requested + CTState::CTstateNr requestedState; + CTState::CTstateNr currentState; + CTState::CTstateNr reportedState; // state eported in logfiles. } ChildProc; map<string, ChildProc> itsChildInfo; bool itsFullReport; // report every child every heartbeat bool itsChangeReport; // report only changed states CTState::CTstateNr itsState; + CTState::CTstateNr itsLastReportedState; // to SAS uint32 itsNrStations; uint32 itsNrOnlineCtrls; uint32 itsNrOfflineCtrls; @@ -162,6 +168,8 @@ private: uint32 itsForcedQuitDelay; uint32 itsClaimPeriod; uint32 itsPreparePeriod; + int32 itsLateLimit; // after how many seconds a requested state should have been reached. + int32 itsFailedLimit; // after how many seconds a late state change is treated as failure. ptime itsStartTime; ptime itsStopTime; }; diff --git a/MAC/APL/PAC/ITRFBeamServer/src/BeamServer.cc b/MAC/APL/PAC/ITRFBeamServer/src/BeamServer.cc index 717cebb6d6f..a55be4b425b 100644 --- a/MAC/APL/PAC/ITRFBeamServer/src/BeamServer.cc +++ b/MAC/APL/PAC/ITRFBeamServer/src/BeamServer.cc @@ -791,13 +791,12 @@ GCFEvent::TResult BeamServer::beamfree_state(GCFEvent& event, GCFPortInterface& // it was stopped (CAL_STOP by SRG) then // issue a warning but continue if (ack.status != CAL_Protocol::CAL_SUCCESS) { - LOG_WARN("CAL_UNSUBSCRIBE failed"); + LOG_WARN_STR("CAL_UNSUBSCRIBE failed, status = " << ack.status); } // send succesful ack beamfreeack.status = IBS_Protocol::IBS_NO_ERR; beamfreeack.beamName = itsBeamTransaction.getBeam()->name(); - itsBeamTransaction.getPort()->send(beamfreeack); // destroy beam, updates itsBeamTransaction diff --git a/MAC/APL/StationCU/src/StationControl/ActiveObs.cc b/MAC/APL/StationCU/src/StationControl/ActiveObs.cc index 06b455e0af3..1952c85252e 100644 --- a/MAC/APL/StationCU/src/StationControl/ActiveObs.cc +++ b/MAC/APL/StationCU/src/StationControl/ActiveObs.cc @@ -55,6 +55,7 @@ namespace LOFAR { ActiveObs::ActiveObs(const string& name, State initial, ParameterSet* thePS, + bool hasSplitters, GCFTask& task) : GCFTask (initial, string("ActiveObs:") + name), itsStopTimerID (0), @@ -62,7 +63,7 @@ ActiveObs::ActiveObs(const string& name, itsName (name), itsTask (&task), itsInstanceNr (getInstanceNr(name)), - itsObsPar (Observation(thePS)), + itsObsPar (Observation(thePS, hasSplitters)), itsUsesTBB (false), itsBeamCntlrReady (false), itsCalCntlrReady (false), diff --git a/MAC/APL/StationCU/src/StationControl/ActiveObs.h b/MAC/APL/StationCU/src/StationControl/ActiveObs.h index e169998809a..318da49f4f4 100644 --- a/MAC/APL/StationCU/src/StationControl/ActiveObs.h +++ b/MAC/APL/StationCU/src/StationControl/ActiveObs.h @@ -60,10 +60,11 @@ namespace LOFAR { class ActiveObs : public GCFTask { public: - ActiveObs (const string& name, - State initial, + ActiveObs (const string& name, + State initial, ParameterSet* aPS, - GCFTask& task); + bool hasSplitters, + GCFTask& task); virtual ~ActiveObs(); void start() { initFsm(); } diff --git a/MAC/APL/StationCU/src/StationControl/StationControl.cc b/MAC/APL/StationCU/src/StationControl/StationControl.cc index 7b484a6c88c..770a5e5da13 100644 --- a/MAC/APL/StationCU/src/StationControl/StationControl.cc +++ b/MAC/APL/StationCU/src/StationControl/StationControl.cc @@ -1060,7 +1060,7 @@ uint16 StationControl::_addObservation(const string& name) LOG_DEBUG_STR("Trying to readfile " << filename); try { theObsPS.adoptFile(filename); - theObs = Observation(&theObsPS); + theObs = Observation(&theObsPS, itsHasSplitters); LOG_DEBUG_STR("theOBS=" << theObs); } catch (Exception& ex) { @@ -1109,7 +1109,7 @@ LOG_DEBUG_STR("def&userReceivers=" << realReceivers); // create an activeObservation object that will manage the child controllers. - ActiveObs* theNewObs = new ActiveObs(name, (State)&ActiveObs::initial, &theObsPS, *this); + ActiveObs* theNewObs = new ActiveObs(name, (State)&ActiveObs::initial, &theObsPS, itsHasSplitters, *this); if (!theNewObs) { LOG_FATAL_STR("Unable to create the Observation '" << name << "'"); return (CT_RESULT_UNSPECIFIED); diff --git a/MAC/GCF/TM/src/GCF_Scheduler.cc b/MAC/GCF/TM/src/GCF_Scheduler.cc index 15cf765fe50..f907a2de097 100644 --- a/MAC/GCF/TM/src/GCF_Scheduler.cc +++ b/MAC/GCF/TM/src/GCF_Scheduler.cc @@ -455,8 +455,14 @@ void GCFScheduler::handleEventQueue() GCFEvent* eventPtr; GCFPortInterface* portPtr; while (task->unqueueTaskEvent(&eventPtr, &portPtr)) { - LOG_DEBUG_STR("Injecting deferred taskEvent " << eventName(*eventPtr) << "into the event queue"); - _injectEvent(task, *eventPtr, portPtr, false); // false=don't copy event(it's already cloned) + if ((eventPtr->signal == F_DATAIN || eventPtr->signal == F_DISCONNECTED) && + _isInEventQueue(eventPtr, portPtr)) { + LOG_DEBUG_STR("Skipping injection of double deferred taskEvent "<< eventName(*eventPtr)); + } + else { + LOG_DEBUG_STR("Injecting deferred taskEvent " << eventName(*eventPtr) << "into the event queue"); + _injectEvent(task, *eventPtr, portPtr, false); // false=don't copy event(it's already cloned) + } } } } diff --git a/RTCP/CNProc/src/CN_Processing.cc b/RTCP/CNProc/src/CN_Processing.cc index b0429673a4b..01c0a528f0c 100644 --- a/RTCP/CNProc/src/CN_Processing.cc +++ b/RTCP/CNProc/src/CN_Processing.cc @@ -410,7 +410,7 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::transposeInput( phaseOneTimer.start(); if (LOG_CONDITION) - LOG_DEBUG_STR(itsLogPrefix << "Start reading at " << MPI_Wtime()); + LOG_DEBUG_STR(itsLogPrefix << "Start reading at t = " << blockAge()); NSTimer asyncSendTimer("async send", LOG_CONDITION, true); @@ -421,16 +421,16 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::transposeInput( //unsigned subband = (*itsCurrentSubband % itsNrSubbandsPerPset) + (i * itsNrSubbandsPerPset); if (subband < itsNrSubbands) { - //if (LOG_CONDITION) { - // LOG_DEBUG_STR("read subband " << subband << " from IO node"); - //} + if (LOG_CONDITION) { + LOG_DEBUG_STR("read subband " << subband << " from IO node at t = " << blockAge()); + } readTimer.start(); itsInputData->readOne(itsInputStream, i); // Synchronously read 1 subband from my IO node. readTimer.stop(); asyncSendTimer.start(); - //if (LOG_CONDITION) { - // LOG_DEBUG_STR("transpose: send subband " << subband << " to pset id " << i); - //} + if (LOG_CONDITION) { + LOG_DEBUG_STR("transpose: send subband " << subband << " to pset id " << i << " at t = " << blockAge()); + } itsAsyncTransposeInput->asyncSend(i, itsInputSubbandMetaData, itsInputData); // Asynchronously send one subband to another pset. asyncSendTimer.stop(); @@ -482,7 +482,7 @@ template <typename SAMPLE_TYPE> int CN_Processing<SAMPLE_TYPE>::transposeBeams(u "I'm (" << itsTranspose2Logic.phaseThreePsetIndex << ", " << itsTranspose2Logic.phaseThreeCoreIndex << ") . According to the logic, for block " << block << ", I'm to handle stream " << myStream << ", yet that stream is to be handled by (" << itsTranspose2Logic.destPset( myStream, block ) << ", " << itsTranspose2Logic.destCore( myStream, block ) << ")" ); if (LOG_CONDITION) - LOG_DEBUG_STR(itsLogPrefix << "Phase 3"); + LOG_DEBUG_STR(itsLogPrefix << "Phase 3 starting at t = " << blockAge()); const StreamInfo &info = itsTranspose2Logic.streamInfo[myStream]; @@ -512,7 +512,7 @@ template <typename SAMPLE_TYPE> int CN_Processing<SAMPLE_TYPE>::transposeBeams(u if (itsHasPhaseTwo && *itsCurrentSubband < itsNrSubbands) { if (LOG_CONDITION) - LOG_DEBUG_STR(itsLogPrefix << "Start sending beams at " << MPI_Wtime()); + LOG_DEBUG_STR(itsLogPrefix << "Start sending beams at t = " << blockAge()); static NSTimer asyncSendTimer("async beam send", true, true); @@ -559,20 +559,16 @@ template <typename SAMPLE_TYPE> int CN_Processing<SAMPLE_TYPE>::transposeBeams(u } } + if(LOG_CONDITION) + LOG_DEBUG_STR(itsLogPrefix << "Forming beams " << beam << " .. " << (beam+groupSize-1) << " at t = " << blockAge()); formBeams(sap, beam, groupSize); } else { groupSize = 1; } - if(LOG_CONDITION) - LOG_DEBUG_STR(itsLogPrefix << " Group size: " << groupSize << " coherent: " << info.coherent); - for (unsigned i = 0; i < groupSize; i ++, beam ++) { stream = itsTranspose2Logic.stream(sap, beam, 0, part, stream); - if(LOG_CONDITION) - LOG_DEBUG_STR(itsLogPrefix << "Beam " << beam << " is stream " << stream << " out of " << itsTranspose2Logic.nrStreams()); - const StreamInfo &info = itsTranspose2Logic.streamInfo[stream]; ASSERT( beam < itsPreTransposeBeamFormedData.size() ); @@ -582,25 +578,29 @@ template <typename SAMPLE_TYPE> int CN_Processing<SAMPLE_TYPE>::transposeBeams(u ASSERT( itsPreTransposeBeamFormedData[beam].get() != NULL ); if (info.coherent) { - if (itsDedispersionAfterBeamForming != 0) + if (itsDedispersionAfterBeamForming != 0) { + if(LOG_CONDITION) + LOG_DEBUG_STR(itsLogPrefix << "Dedispersing beam-formed data at t = " << blockAge()); + dedisperseAfterBeamForming(i, itsCoherentDMs[beam]); + } switch (info.stokesType) { case STOKES_I: if(LOG_CONDITION) - LOG_DEBUG_STR(itsLogPrefix << "Calculating coherent Stokes I"); + LOG_DEBUG_STR(itsLogPrefix << "Calculating coherent Stokes I at t = " << blockAge()); itsCoherentStokes->calculate<false>(itsBeamFormedData.get(), itsPreTransposeBeamFormedData[beam].get(), i, info); break; case STOKES_IQUV: if(LOG_CONDITION) - LOG_DEBUG_STR(itsLogPrefix << "Calculating coherent Stokes IQUV"); + LOG_DEBUG_STR(itsLogPrefix << "Calculating coherent Stokes IQUV at t = " << blockAge()); itsCoherentStokes->calculate<true>(itsBeamFormedData.get(), itsPreTransposeBeamFormedData[beam].get(), i, info); break; case STOKES_XXYY: if(LOG_CONDITION) - LOG_DEBUG_STR(itsLogPrefix << "Calculating coherent Stokes XXYY"); + LOG_DEBUG_STR(itsLogPrefix << "Calculating coherent Stokes XXYY at t = " << blockAge()); itsBeamFormer->preTransposeBeam(itsBeamFormedData.get(), itsPreTransposeBeamFormedData[beam].get(), i); break; @@ -614,13 +614,13 @@ template <typename SAMPLE_TYPE> int CN_Processing<SAMPLE_TYPE>::transposeBeams(u switch (info.stokesType) { case STOKES_I: if(LOG_CONDITION) - LOG_DEBUG_STR(itsLogPrefix << "Calculating incoherent Stokes I"); + LOG_DEBUG_STR(itsLogPrefix << "Calculating incoherent Stokes I at t = " << blockAge()); itsIncoherentStokes->calculate<false>(itsFilteredData.get(), itsPreTransposeBeamFormedData[beam].get(), itsBeamFormer->getStationMapping(), info, subband, itsIncoherentDMs[beam]); break; case STOKES_IQUV: if(LOG_CONDITION) - LOG_DEBUG_STR(itsLogPrefix << "Calculating incoherent Stokes IQUV"); + LOG_DEBUG_STR(itsLogPrefix << "Calculating incoherent Stokes IQUV at t = " << blockAge()); itsIncoherentStokes->calculate<true>(itsFilteredData.get(), itsPreTransposeBeamFormedData[beam].get(), itsBeamFormer->getStationMapping(), info, subband, itsIncoherentDMs[beam]); break; @@ -633,8 +633,9 @@ template <typename SAMPLE_TYPE> int CN_Processing<SAMPLE_TYPE>::transposeBeams(u break; } } + if(LOG_CONDITION) - LOG_DEBUG_STR(itsLogPrefix << "Done calculating Stokes"); + LOG_DEBUG_STR(itsLogPrefix << "Done calculating Stokes at t = " << blockAge()); asyncSendTimer.start(); @@ -665,7 +666,7 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::filter() { #if defined HAVE_MPI && !defined CLUSTER_SCHEDULING if (LOG_CONDITION) - LOG_DEBUG_STR(itsLogPrefix << "Start filtering at " << MPI_Wtime()); + LOG_DEBUG_STR(itsLogPrefix << "Start filtering at t = " << blockAge()); NSTimer asyncReceiveTimer("wait for any async receive", LOG_CONDITION, true); static NSTimer timer("filter timer", true, true); @@ -700,7 +701,7 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::dedisperseAfter { #if defined HAVE_MPI if (LOG_CONDITION) - LOG_DEBUG_STR(itsLogPrefix << "Start dedispersion of coherent data at " << MPI_Wtime()); + LOG_DEBUG_STR(itsLogPrefix << "Start dedispersion of coherent data at t = " << blockAge()); #endif static NSTimer timer("dedispersion (coherent) timer", true, true); @@ -717,7 +718,7 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::preCorrelationF { #if defined HAVE_MPI if (LOG_CONDITION) - LOG_DEBUG_STR(itsLogPrefix << "Start pre correlation flagger at " << MPI_Wtime()); + LOG_DEBUG_STR(itsLogPrefix << "Start pre correlation flagger at t = " << blockAge()); #endif // HAVE_MPI static NSTimer timer("pre correlation flagger", true, true); @@ -734,7 +735,7 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::mergeStations() { #if defined HAVE_MPI if (LOG_CONDITION) - LOG_DEBUG_STR(itsLogPrefix << "Start merging stations at " << MPI_Wtime()); + LOG_DEBUG_STR(itsLogPrefix << "Start merging stations at t = " << blockAge()); #endif // HAVE_MPI static NSTimer timer("superstation forming timer", true, true); @@ -770,7 +771,7 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::correlate() { #if defined HAVE_MPI if (LOG_CONDITION) - LOG_DEBUG_STR(itsLogPrefix << "Start correlating at " << MPI_Wtime()); + LOG_DEBUG_STR(itsLogPrefix << "Start correlating at t = " << blockAge()); #endif // HAVE_MPI computeTimer.start(); @@ -784,7 +785,7 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::postCorrelation { #if defined HAVE_MPI if (LOG_CONDITION) - LOG_DEBUG_STR(itsLogPrefix << "Start post correlation flagger at " << MPI_Wtime()); + LOG_DEBUG_STR(itsLogPrefix << "Start post correlation flagger at t = " << blockAge()); #endif // HAVE_MPI static NSTimer timer("post correlation flagger", true, true); @@ -806,7 +807,7 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::sendOutput(Stre { #if defined HAVE_MPI if (LOG_CONDITION) { - LOG_DEBUG_STR(itsLogPrefix << "Start writing output "/* << outputNr <<*/ " at " << MPI_Wtime()); + LOG_DEBUG_STR(itsLogPrefix << "Start writing output at t = " << blockAge()); } //LOG_INFO_STR(itsLogPrefix << "Output " << outputNr << " has been processed " << blockAge() << " seconds after being observed."); #endif // HAVE_MPI @@ -822,7 +823,7 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::finishSendingIn { #if defined HAVE_MPI if (LOG_CONDITION) - LOG_DEBUG_STR(itsLogPrefix << "Start waiting to finish sending input for transpose at " << MPI_Wtime()); + LOG_DEBUG_STR(itsLogPrefix << "Start waiting to finish sending input for transpose at t = " << blockAge()); static NSTimer waitAsyncSendTimer("wait for all async sends", true, true); waitAsyncSendTimer.start(); @@ -836,7 +837,7 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::finishSendingBe { #if defined HAVE_MPI if (LOG_CONDITION) - LOG_DEBUG_STR(itsLogPrefix << "Start waiting to finish sending beams for transpose at " << MPI_Wtime()); + LOG_DEBUG_STR(itsLogPrefix << "Start waiting to finish sending beams for transpose at t = " << blockAge()); static NSTimer waitAsyncSendTimer("wait for all async beam sends", true, true); waitAsyncSendTimer.start(); @@ -862,7 +863,7 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::receiveBeam(uns static NSTimer asyncNonfirstReceiveTimer("wait for subsequent async beam receive", true, true); if (LOG_CONDITION) - LOG_DEBUG_STR(itsLogPrefix << "Starting to receive and process subbands at " << MPI_Wtime()); + LOG_DEBUG_STR(itsLogPrefix << "Starting to receive and process subbands at t = " << blockAge()); /* Overlap transpose and computations? */ /* this makes timings better as this time we're waiting for data to come in @@ -886,6 +887,9 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::receiveBeam(uns for (unsigned subband = 0; subband < nrSubbands; subband++) { #endif + if (LOG_CONDITION && (i == 0 || i == 1 || i == nrSubbands - 2 || i == nrSubbands - 1)) + LOG_DEBUG_STR(itsLogPrefix << "Starting to post process subband " << i << " / " << nrSubbands << " at t = " << blockAge()); + if (itsFinalBeamFormedData != 0) { itsBeamFormer->postTransposeBeam(itsTransposedBeamFormedData, itsFinalBeamFormedData, subband, info.nrChannels, nrSamples); } @@ -919,7 +923,7 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::process(unsigne if (itsHasPhaseTwo && *itsCurrentSubband < itsNrSubbands) { if (LOG_CONDITION) - LOG_DEBUG_STR(itsLogPrefix << "Phase 2: Processing subband " << *itsCurrentSubband << " block " << itsBlock); + LOG_DEBUG_STR(itsLogPrefix << "Phase 2: Processing subband " << *itsCurrentSubband << " block " << itsBlock << " at t = " << blockAge()); #if defined CLUSTER_SCHEDULING receiveInput(); @@ -987,7 +991,7 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::process(unsigne #if defined HAVE_MPI if ((itsHasPhaseOne || itsHasPhaseTwo || itsHasPhaseThree) && LOG_CONDITION) - LOG_DEBUG_STR(itsLogPrefix << "Start idling at " << MPI_Wtime()); + LOG_DEBUG_STR(itsLogPrefix << "Start idling at t = " << blockAge()); #endif // HAVE_MPI #if 0 diff --git a/RTCP/IONProc/src/BeamletBufferToComputeNode.cc b/RTCP/IONProc/src/BeamletBufferToComputeNode.cc index 394cd3d79ea..1cbb7222427 100644 --- a/RTCP/IONProc/src/BeamletBufferToComputeNode.cc +++ b/RTCP/IONProc/src/BeamletBufferToComputeNode.cc @@ -234,8 +234,9 @@ template<typename SAMPLE_TYPE> void BeamletBufferToComputeNode<SAMPLE_TYPE>::wri double currentTime = tv.tv_sec + tv.tv_usec / 1e6; double expectedTime = itsCorrelationStartTime * itsSampleDuration; + double recordingTime = itsCurrentTimeStamp * itsSampleDuration; - logStr << ", late: " << PrettyTime(currentTime - expectedTime); + logStr << ", age: " << PrettyTime(currentTime - recordingTime) << ", late: " << PrettyTime(currentTime - expectedTime); } if (itsNeedDelays) { diff --git a/RTCP/IONProc/src/OutputSection.cc b/RTCP/IONProc/src/OutputSection.cc index f3db1e1d3b9..80cd6a9d93c 100644 --- a/RTCP/IONProc/src/OutputSection.cc +++ b/RTCP/IONProc/src/OutputSection.cc @@ -68,7 +68,7 @@ OutputSection::OutputSection(const Parset &parset, itsTmpSum(newStreamableData(parset, outputType, -1, hugeMemoryAllocator)) { // lookup the PVSS adders to use in our reports - Observation obs(&parset); + Observation obs(&parset, false); itsAdders.resize(itsNrStreams); for (unsigned i = 0; i < itsNrStreams; i ++) { diff --git a/RTCP/Storage/src/Storage_main.cc b/RTCP/Storage/src/Storage_main.cc index 56cdbc08df8..c27370543c4 100644 --- a/RTCP/Storage/src/Storage_main.cc +++ b/RTCP/Storage/src/Storage_main.cc @@ -140,7 +140,7 @@ int main(int argc, char *argv[]) setRTpriority(); lockInMemory(); - Observation obs(&parset); + Observation obs(&parset, false); for (OutputType outputType = FIRST_OUTPUT_TYPE; outputType < LAST_OUTPUT_TYPE; outputType ++) { for (unsigned streamNr = 0; streamNr < parset.nrStreams(outputType); streamNr ++) { -- GitLab