diff --git a/.gitattributes b/.gitattributes index b7afe8f2258a5d7c752e9412e2cd58d0f0cf3cc0..b6617db58c3852fca89fe2fd3bb2f7f5592a5678 100644 --- a/.gitattributes +++ b/.gitattributes @@ -2326,6 +2326,7 @@ CMake/testscripts/default.debug -text CMake/testscripts/timeout -text CMake/variants/variants.MacRenting -text CMake/variants/variants.b7015 -text +CMake/variants/variants.buildhostcentos7 -text CMake/variants/variants.cbt001 -text CMake/variants/variants.cbt002 -text CMake/variants/variants.cbt003 -text @@ -2337,10 +2338,12 @@ CMake/variants/variants.cbt008 -text CMake/variants/variants.cbt009 -text CMake/variants/variants.cbt010 -text CMake/variants/variants.dop282 -text +CMake/variants/variants.dop320 -text CMake/variants/variants.dragproc -text CMake/variants/variants.fs0 -text CMake/variants/variants.gpu01 -text CMake/variants/variants.gpu1 -text +CMake/variants/variants.lcs157 -text CMake/variants/variants.lexar -text CMake/variants/variants.lexar001 -text CMake/variants/variants.lexar002 -text diff --git a/CEP/DP3/DPPP/src/FlagCounter.cc b/CEP/DP3/DPPP/src/FlagCounter.cc index edd1e3661870b965025b5e31290a1acc231fcaa7..5b96edf8ee677fa42ce10854e6a305969d45b2ca 100644 --- a/CEP/DP3/DPPP/src/FlagCounter.cc +++ b/CEP/DP3/DPPP/src/FlagCounter.cc @@ -265,6 +265,10 @@ namespace LOFAR { os << endl; } int64 totalnpoints = npoints * itsChanCounts.size(); + // Prevent division by zero + if (totalnpoints == 0) { + totalnpoints = 1; + } os << "Total flagged: "; showPerc3 (os, nflagged, totalnpoints); os << " (" << nflagged << " out of " << totalnpoints @@ -287,6 +291,10 @@ namespace LOFAR { void FlagCounter::showCorrelation (ostream& os, int64 ntimes) const { int64 ntotal = ntimes * itsBLCounts.size() * itsChanCounts.size(); + // Prevent division by zero + if (ntotal == 0) { + ntotal = 1; + } os << endl << "Percentage of flagged visibilities detected per correlation:" << endl; diff --git a/CEP/DP3/DPPP/src/GainCal.cc b/CEP/DP3/DPPP/src/GainCal.cc index 20280fada09a0325969e07659f8c9da110f0a5da..38ad50438a51628da99e847130dd5f0c517af7d6 100644 --- a/CEP/DP3/DPPP/src/GainCal.cc +++ b/CEP/DP3/DPPP/src/GainCal.cc @@ -206,6 +206,7 @@ namespace LOFAR { os << " mode: " << itsMode << endl; os << " apply solution: " << boolalpha << itsApplySolution << endl; os << " propagate solutions: " << boolalpha << itsPropagateSolutions << endl; + os << " timeslotsperparmupdate: " << itsTimeSlotsPerParmUpdate << endl; os << " detect stalling: " << boolalpha << itsDetectStalling << endl; os << " use model column: " << boolalpha << itsUseModelColumn << endl; if (!itsUseModelColumn) { @@ -666,23 +667,33 @@ namespace LOFAR { if (getInfo().chanFreqs().size()>1) { // Handle data with evenly spaced gaps between channels freqWidth = info().chanFreqs()[1]-info().chanFreqs()[0]; } + + // Get end time of the current chunk. For the last chunk, this + // is chopped off at the end of the MS (only if solint > 1) + double endTime = min(startTime + ntime * info().timeInterval() * itsSolInt, + info().startTime() + info().ntime() * info().timeInterval()); + + // Make time axis (can be non regular for last chunk if solint > 1) + vector<double> lowtimes(ntime), hightimes(ntime); + for (uint t=0; t<ntime; ++t) { + lowtimes[t] = startTime + info().timeInterval() * itsSolInt * t; + hightimes[t] = min(startTime + info().timeInterval() * itsSolInt * (t+1), + endTime); + } + BBS::Axis::ShPtr timeAxis = Axis::makeAxis(lowtimes, hightimes); + BBS::Axis::ShPtr freqAxis( new BBS::RegularAxis( getInfo().chanFreqs()[0] - freqWidth * 0.5, freqWidth*itsNChan, itsNFreqCells)); - BBS::Axis::ShPtr timeAxis( - new BBS::RegularAxis( - startTime, - info().timeInterval() * itsSolInt, - ntime)); BBS::Grid solGrid(freqAxis, timeAxis); // Construct domain grid for the current chunk BBS::Axis::ShPtr tdomAxis( new BBS::RegularAxis( startTime, - ntime * info().timeInterval() * itsSolInt, + endTime - startTime, 1)); BBS::Axis::ShPtr fdomAxis( new BBS::RegularAxis( diff --git a/CEP/DP3/DPPP/src/MSReader.cc b/CEP/DP3/DPPP/src/MSReader.cc index 35dd696864e6e0fe5161ea7bc3c79ac7dd9a81be..1b6f205db5ee85197fb6b5372d4edfebb10c31a3 100644 --- a/CEP/DP3/DPPP/src/MSReader.cc +++ b/CEP/DP3/DPPP/src/MSReader.cc @@ -131,7 +131,7 @@ namespace LOFAR { } } // Prepare the MS access and get time info. - double startTime, endTime; + double startTime=0., endTime=0.; prepare (startTime, endTime, itsTimeInterval); // Start and end time can be given in the parset in case leading // or trailing time slots are missing. @@ -258,8 +258,9 @@ namespace LOFAR { itsLastMSTime = mstime; itsIter.next(); } - // Stop if at the end. - if (itsNextTime > itsLastTime && !near(itsNextTime, itsLastTime)) { + // Stop if at the end, or if there is no data at all + if ((itsNextTime > itsLastTime && !near(itsNextTime, itsLastTime)) || + itsNextTime==0.) { return false; } // Fill the buffer. @@ -390,7 +391,7 @@ namespace LOFAR { os << " ncorrelations: " << getInfo().ncorr() << std::endl; uint nrbl = getInfo().nbaselines(); os << " nbaselines: " << nrbl << std::endl; - os << " ntimes: " << itsSelMS.nrow() / nrbl << std::endl; + os << " ntimes: " << (nrbl==0 ? 0 : itsSelMS.nrow() / nrbl) << std::endl; os << " time interval: " << getInfo().timeInterval() << std::endl; os << " DATA column: " << itsDataColName; if (itsMissingData) { @@ -421,7 +422,9 @@ namespace LOFAR { void MSReader::prepare (double& firstTime, double& lastTime, double& interval) { - ASSERT (itsSelMS.nrow() > 0); + if (itsSelMS.nrow() == 0) { + DPLOG_WARN_STR ("The selected input does not contain any data."); + } TableDesc tdesc = itsMS.tableDesc(); itsHasWeightSpectrum = false; @@ -495,9 +498,11 @@ namespace LOFAR { sortms = itsSelMS.sort(sortCols); } // Get first and last time and interval from MS. - firstTime = ROScalarColumn<double>(sortms, "TIME")(0); - lastTime = ROScalarColumn<double>(sortms, "TIME")(sortms.nrow()-1); - interval = ROScalarColumn<double>(sortms, "INTERVAL")(0); + if (itsSelMS.nrow() > 0) { + firstTime = ROScalarColumn<double>(sortms, "TIME")(0); + lastTime = ROScalarColumn<double>(sortms, "TIME")(sortms.nrow()-1); + interval = ROScalarColumn<double>(sortms, "INTERVAL")(0); + } // Create iterator over time. Do not sort again. itsIter = TableIterator (sortms, Block<String>(1, "TIME"), TableIterator::Ascending, diff --git a/CEP/DP3/DPPP/src/MSWriter.cc b/CEP/DP3/DPPP/src/MSWriter.cc index eb2884247f13169e8a2ce946e6068198ddcc43c3..a4cf6ee2fb1ac2782979350f521b1bb84ad711bb 100644 --- a/CEP/DP3/DPPP/src/MSWriter.cc +++ b/CEP/DP3/DPPP/src/MSWriter.cc @@ -508,6 +508,11 @@ namespace LOFAR { ArrayColumn<Complex> dataCol(out, itsDataColName); ArrayColumn<Bool> flagCol(out, "FLAG"); ScalarColumn<Bool> flagRowCol(out, "FLAG_ROW"); + + if (buf.getData().empty()) { + return; + } + dataCol.putColumn (buf.getData()); flagCol.putColumn (buf.getFlags()); // A row is flagged if no flags in the row are False. diff --git a/CEP/Pipeline/recipes/sip/bin/calibration_pipeline.py b/CEP/Pipeline/recipes/sip/bin/calibration_pipeline.py index b4b3a869030ccb070e2e704c3d1b0065b23fefc7..18edcaec53a7052200a7a4860b2afa4804c4c54c 100755 --- a/CEP/Pipeline/recipes/sip/bin/calibration_pipeline.py +++ b/CEP/Pipeline/recipes/sip/bin/calibration_pipeline.py @@ -250,7 +250,8 @@ class calibration_pipeline(control): mapfile_source=bbs_mapfile, mapfile_target=output_correlated_mapfile, mapfiles_dir=mapfile_dir, - mapfile=output_correlated_mapfile + mapfile=output_correlated_mapfile, + allow_move=True ) with duration(self, "copier"): @@ -258,7 +259,8 @@ class calibration_pipeline(control): mapfile_source=parmdb_mapfile, mapfile_target=output_instrument_mapfile, mapfiles_dir=mapfile_dir, - mapfile=output_instrument_mapfile + mapfile=output_instrument_mapfile, + allow_move=True ) # ********************************************************************* diff --git a/CEP/Pipeline/recipes/sip/bin/msss_calibrator_pipeline.py b/CEP/Pipeline/recipes/sip/bin/msss_calibrator_pipeline.py index 0c627b6cb56006576297eb09b5014c41c162541e..eb641e574a125d85011e5005447ed1b8c0f5d768 100755 --- a/CEP/Pipeline/recipes/sip/bin/msss_calibrator_pipeline.py +++ b/CEP/Pipeline/recipes/sip/bin/msss_calibrator_pipeline.py @@ -273,7 +273,8 @@ class msss_calibrator_pipeline(control): mapfile_source=bbs_mapfile, mapfile_target=output_correlated_mapfile, mapfiles_dir=mapfile_dir, - mapfile=output_correlated_mapfile + mapfile=output_correlated_mapfile, + allow_move=True ) # ********************************************************************* diff --git a/CEP/Pipeline/recipes/sip/master/copier.py b/CEP/Pipeline/recipes/sip/master/copier.py index c15dba135c90f2ca7df798cedea9d298286506c5..c9295910b2f913f5423a8be496fd8267f7bac4b2 100644 --- a/CEP/Pipeline/recipes/sip/master/copier.py +++ b/CEP/Pipeline/recipes/sip/master/copier.py @@ -142,6 +142,11 @@ class copier(MasterNodeInterface): default=True, help="Allow renaming of basename at target location" ), + 'allow_move': ingredient.BoolField( + '--allow-move', + default=True, + help="Allow moving files instead of copying them" + ), 'mapfiles_dir': ingredient.StringField( '--mapfiles-dir', help="Path of directory, shared by all nodes, which will be used" @@ -248,7 +253,7 @@ class copier(MasterNodeInterface): # Run the compute nodes with the node specific mapfiles for source, target in zip(self.source_map, self.target_map): - args = [source.host, source.file, target.file, globalfs] + args = [source.host, source.file, target.file, globalfs, self.inputs['allow_move']] self.append_job(target.host, args) # start the jobs, return the exit status. diff --git a/CEP/Pipeline/recipes/sip/nodes/copier.py b/CEP/Pipeline/recipes/sip/nodes/copier.py index b972b06c0042f50985d7162214ca5146816f5413..8a30b7a517800e5f5bd39d489078e544f33a2411 100644 --- a/CEP/Pipeline/recipes/sip/nodes/copier.py +++ b/CEP/Pipeline/recipes/sip/nodes/copier.py @@ -21,15 +21,15 @@ class copier(LOFARnodeTCP): """ Node script for copying files between nodes. See master script for full public interface """ - def run(self, source_node, source_path, target_path, globalfs): + def run(self, source_node, source_path, target_path, globalfs, allow_move): self.globalfs = globalfs # Time execution of this job with log_time(self.logger): return self._copy_single_file_using_rsync( - source_node, source_path, target_path) + source_node, source_path, target_path, allow_move) def _copy_single_file_using_rsync(self, source_node, source_path, - target_path): + target_path, allow_move): # assure that target dir exists (rsync creates it but.. # an error in the python code will throw a nicer error message = "No write acces to target path: {0}".format( @@ -51,9 +51,12 @@ class copier(LOFARnodeTCP): # construct copy command: Copy to the dir - # if process runs on local host use a simple copy command. - if self.globalfs or source_node=="localhost": - command = ["cp", "-r","{0}".format(source_path),"{0}".format(target_path)] + # if process runs on local host use a simple copy or move command. + if self.globalfs or source_node == "localhost": + if allow_move: + command = ["mv", "{0}".format(source_path), "{0}".format(target_path)] + else: + command = ["cp", "-r", "{0}".format(source_path), "{0}".format(target_path)] else: command = ["rsync", "-r", "{0}:{1}/".format(source_node, source_path), diff --git a/CEP/Pipeline/recipes/sip/nodes/imager_prepare.py b/CEP/Pipeline/recipes/sip/nodes/imager_prepare.py index 5abce8b6e7c41a5f1973ec6936d5ae25349a92d3..eb4dd79e1facc712365083a3a10a9ff001fee90b 100644 --- a/CEP/Pipeline/recipes/sip/nodes/imager_prepare.py +++ b/CEP/Pipeline/recipes/sip/nodes/imager_prepare.py @@ -181,7 +181,8 @@ class imager_prepare(LOFARnodeTCP): input_item.host, input_item.file), "{0}".format(processed_ms_dir)] if self.globalfs or input_item.host == "localhost": - command = ["cp", "-r", "{0}".format(input_item.file), + # symlinking is enough + command = ["ln", "-sfT", "{0}".format(input_item.file), "{0}".format(processed_ms_dir)] self.logger.debug("executing: " + " ".join(command)) diff --git a/CEP/Pipeline/recipes/sip/nodes/long_baseline.py b/CEP/Pipeline/recipes/sip/nodes/long_baseline.py index 2fe0aa3e405dcb572376a9ef3a6d7094fa883491..42f667a80e3366d7aee7404583a0e272865d805a 100644 --- a/CEP/Pipeline/recipes/sip/nodes/long_baseline.py +++ b/CEP/Pipeline/recipes/sip/nodes/long_baseline.py @@ -199,7 +199,8 @@ class long_baseline(LOFARnodeTCP): input_item.host, input_item.file), "{0}".format(processed_ms_dir)] if self.globalfs or input_item.host == "localhost": - command = ["cp", "-r", "{0}".format(input_item.file), + # symlinking is enough + command = ["ln", "-sfT", "{0}".format(input_item.file), "{0}".format(processed_ms_dir)] self.logger.debug("executing: " + " ".join(command)) diff --git a/CEP/Pipeline/recipes/sip/tasks.cfg.CEP4.in b/CEP/Pipeline/recipes/sip/tasks.cfg.CEP4.in index 76693d17ad7f8afb0e67918b26acbd87ccdda80e..bc8e4f1ef5e58b448a2f026937c7047f049e9964 100644 --- a/CEP/Pipeline/recipes/sip/tasks.cfg.CEP4.in +++ b/CEP/Pipeline/recipes/sip/tasks.cfg.CEP4.in @@ -1,6 +1,6 @@ [ndppp] nproc = 0 -nthreads = 10 +nthreads = 2 [setupparmdb] nproc = 0 @@ -24,7 +24,7 @@ nthreads = 10 [dppp] max_per_node = 0 -nthreads = 10 +nthreads = 2 [awimager] max_per_node = 0 diff --git a/CMake/testscripts/assay b/CMake/testscripts/assay index 09256b2d7f6498f3845ad224e359985cb56368af..d97460d4562a782e3dca2ce231c05d2f8147adf9 100755 --- a/CMake/testscripts/assay +++ b/CMake/testscripts/assay @@ -190,7 +190,7 @@ # Define exit and interrupt handler. - trap 'rm -rf core ${PROG}_tammop*; \ + trap 'rm -rf core ${PROG}_tmp*; \ trap - 0 ; \ exit $STATUS' 0 1 2 3 15 diff --git a/CMake/variants/variants.buildhostcentos7 b/CMake/variants/variants.buildhostcentos7 new file mode 120000 index 0000000000000000000000000000000000000000..0c9019c0432464aaca3f677ce8c1104ce730f2fe --- /dev/null +++ b/CMake/variants/variants.buildhostcentos7 @@ -0,0 +1 @@ +variants.lcs157 \ No newline at end of file diff --git a/CMake/variants/variants.dop320 b/CMake/variants/variants.dop320 new file mode 100644 index 0000000000000000000000000000000000000000..e11707efd900a6574ec1f1adbb928c77e7f109b0 --- /dev/null +++ b/CMake/variants/variants.dop320 @@ -0,0 +1,2 @@ +# Fix for Debian (FindPython tends to settle on /usr/bin/python2.6, which is a "minimal" python install) +set(PYTHON_EXECUTABLE "/usr/bin/python2.7") diff --git a/CMake/variants/variants.lcs157 b/CMake/variants/variants.lcs157 new file mode 100644 index 0000000000000000000000000000000000000000..026dfdfff6fa341372c2d19ee53fc07ac47fc187 --- /dev/null +++ b/CMake/variants/variants.lcs157 @@ -0,0 +1,15 @@ +# Station software and the like apparently cannot (yet) handle shared libs. +option(BUILD_SHARED_LIBS "Build shared libraries" OFF) + +#set(ENV{JAVA_HOME} /usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0) +#set(PVSS_ROOT_DIR /opt/pvss/pvss2_v3.7) +set(PVSS_ROOT_DIR /opt/WinCC_OA/3.14) +#set(LOG4CPLUS_ROOT_DIR "/usr/local/log4cplus") +set(CASACORE_ROOT_DIR "/opt/casacore") +set(QPID_ROOT_DIR /opt/qpid) + +#set(CTEST_CUSTOM_WARNING_EXCEPTION +# "/boost/date_time/time_facet.hpp:[0-9]+: warning: unused parameter" +# "/boost/date_time/time.hpp:[0-9]+: warning: unused parameter" +# "/pvss2_v3.7/api/include/(Basics|Datapoint|Manager|Messages)/" +#) diff --git a/Docker/lofar-outputproc/Dockerfile.tmpl b/Docker/lofar-outputproc/Dockerfile.tmpl index c25fffe979accf5de9a8fd12bae4b73fa3702872..9ab90ed15ee2bb9a98944ac1027bd68a03dbb69d 100644 --- a/Docker/lofar-outputproc/Dockerfile.tmpl +++ b/Docker/lofar-outputproc/Dockerfile.tmpl @@ -18,7 +18,7 @@ ENV LOFAR_BRANCH=${LOFAR_BRANCH} \ LOFAR_BUILDVARIANT=gnucxx11_optarch # Install -RUN apt-get update && apt-get install -y subversion cmake g++ gfortran bison flex autogen liblog4cplus-dev libhdf5-dev libblitz0-dev libboost-dev libboost-python${BOOST_VERSION}-dev libxml2-dev pkg-config libpng12-dev libfftw3-dev libunittest++-dev libxml++2.6-dev libboost-filesystem${BOOST_VERSION}-dev libboost-date-time${BOOST_VERSION}-dev libboost-thread${BOOST_VERSION}-dev libboost-regex${BOOST_VERSION} binutils-dev libopenblas-dev && libcfitsio3-dev wcslib-dev \ +RUN apt-get update && apt-get install -y subversion cmake g++ gfortran bison flex autogen liblog4cplus-dev libhdf5-dev libblitz0-dev libboost-dev libboost-python${BOOST_VERSION}-dev libxml2-dev pkg-config libpng12-dev libfftw3-dev libunittest++-dev libxml++2.6-dev libboost-filesystem${BOOST_VERSION}-dev libboost-date-time${BOOST_VERSION}-dev libboost-thread${BOOST_VERSION}-dev libboost-regex${BOOST_VERSION} binutils-dev libopenblas-dev libcfitsio3-dev wcslib-dev && \ mkdir -p ${INSTALLDIR}/lofar/build/${LOFAR_BUILDVARIANT} && \ cd ${INSTALLDIR}/lofar && \ svn --non-interactive -q co -r ${LOFAR_REVISION} -N ${LOFAR_BRANCH_URL} src; \ @@ -32,6 +32,6 @@ RUN apt-get update && apt-get install -y subversion cmake g++ gfortran bison fle bash -c "strip ${INSTALLDIR}/lofar/{bin,sbin,lib64}/* || true" && \ bash -c "rm -rf ${INSTALLDIR}/lofar/{build,src}" && \ setcap cap_sys_nice,cap_ipc_lock=ep ${INSTALLDIR}/lofar/bin/outputProc && \ - apt-get purge -y subversion cmake g++ gfortran bison flex autogen liblog4cplus-dev libhdf5-dev libblitz0-dev libboost-dev libboost-python${BOOST_VERSION}-dev libxml2-dev pkg-config libpng12-dev libfftw3-dev libunittest++-dev libxml++2.6-dev libboost-filesystem${BOOST_VERSION}-dev libboost-date-time${BOOST_VERSION}-dev libboost-thread${BOOST_VERSION}-dev binutils-dev libcfitsio3-dev wcslib-dev libopenblas-dev && libcfitsio3-dev wcslib-dev \ + apt-get purge -y subversion cmake g++ gfortran bison flex autogen liblog4cplus-dev libhdf5-dev libblitz0-dev libboost-dev libboost-python${BOOST_VERSION}-dev libxml2-dev pkg-config libpng12-dev libfftw3-dev libunittest++-dev libxml++2.6-dev libboost-filesystem${BOOST_VERSION}-dev libboost-date-time${BOOST_VERSION}-dev libboost-thread${BOOST_VERSION}-dev binutils-dev libcfitsio3-dev wcslib-dev libopenblas-dev && \ apt-get autoremove -y diff --git a/MAC/APL/CEPCU/src/PythonControl/PythonControl.cc b/MAC/APL/CEPCU/src/PythonControl/PythonControl.cc index 0fce53909dca9659e70d578925de2f4f84846463..99125ff82515db8570907f1f852b4656d4f5896d 100644 --- a/MAC/APL/CEPCU/src/PythonControl/PythonControl.cc +++ b/MAC/APL/CEPCU/src/PythonControl/PythonControl.cc @@ -212,13 +212,13 @@ bool PythonControl::_startPython(const string& pythonProg, itsPythonName = formatString("PythonServer{%d}@%s", obsID, pythonHost.c_str()); if (onRemoteMachine) { - startCmd = formatString("ssh %s %s %s %s %s %s %s", + startCmd = formatString("ssh %s LOFARENV=$LOFARENV %s %s %s %s %s %s", pythonHost.c_str(), startScript.c_str(), executable.c_str(), parSetName.c_str(), myHostname(true).c_str(), parentService.c_str(), itsPythonName.c_str()); } else { - startCmd = formatString("%s %s %s %s %s %s", + startCmd = formatString("env LOFARENV=$LOFARENV %s %s %s %s %s %s", startScript.c_str(), executable.c_str(), parSetName.c_str(), myHostname(true).c_str(), parentService.c_str(), itsPythonName.c_str()); diff --git a/MAC/APL/MainCU/src/MACScheduler/MACScheduler.cc b/MAC/APL/MainCU/src/MACScheduler/MACScheduler.cc index 268a664f815bdd046c72c783ba3a28f8f5019560..ee6cd42d954faf96cb22935cb5b7ff9eaedfb884 100644 --- a/MAC/APL/MainCU/src/MACScheduler/MACScheduler.cc +++ b/MAC/APL/MainCU/src/MACScheduler/MACScheduler.cc @@ -65,7 +65,7 @@ namespace LOFAR { // static (this) pointer used for signal handling static MACScheduler* pMacScheduler = 0; - + // // MACScheduler() // @@ -108,7 +108,7 @@ MACScheduler::MACScheduler() : } } - ASSERTSTR(itsMaxPlanned + itsMaxFinished < MAX_CONCURRENT_OBSERVATIONS, + ASSERTSTR(itsMaxPlanned + itsMaxFinished < MAX_CONCURRENT_OBSERVATIONS, "maxPlannedList + maxFinishedList should be less than " << MAX_CONCURRENT_OBSERVATIONS); // Read the schedule periods for starting observations. @@ -192,12 +192,12 @@ void MACScheduler::_databaseEventHandler(GCFEvent& event) itsQueuePeriod = newVal; } #endif - } + } break; default: break; - } + } } @@ -211,7 +211,7 @@ GCFEvent::TResult MACScheduler::initial_state(GCFEvent& event, GCFPortInterface& LOG_DEBUG_STR ("initial_state:" << eventName(event)); GCFEvent::TResult status = GCFEvent::HANDLED; - + switch (event.signal) { case F_INIT: break; @@ -235,7 +235,7 @@ GCFEvent::TResult MACScheduler::initial_state(GCFEvent& event, GCFPortInterface& itsTimerPort->setTimer(0.0); } break; - + case F_TIMER: { // must be timer that PropSet is enabled. // update PVSS. LOG_TRACE_FLOW ("Updateing state to PVSS"); @@ -249,7 +249,7 @@ GCFEvent::TResult MACScheduler::initial_state(GCFEvent& event, GCFPortInterface& itsPropertySet->setValue(PN_MS_PLANNED_OBSERVATIONS, GCFPVDynArr(LPT_STRING, emptyArr)); itsPropertySet->setValue(PN_MS_FINISHED_OBSERVATIONS, GCFPVDynArr(LPT_STRING, emptyArr)); - + // Try to connect to the SAS database. ParameterSet* pParamSet = globalParameterSet(); string username = pParamSet->getString("OTDBusername"); @@ -261,7 +261,7 @@ GCFEvent::TResult MACScheduler::initial_state(GCFEvent& event, GCFPortInterface& itsOTDBconnection= new OTDBconnection(username, password, DBname, hostname); ASSERTSTR (itsOTDBconnection, "Memory allocation error (OTDB)"); ASSERTSTR (itsOTDBconnection->connect(), - "Unable to connect to database " << DBname << " on " << hostname << + "Unable to connect to database " << DBname << " on " << hostname << " using " << username << "," << password); LOG_INFO ("Connected to the OTDB"); itsPropertySet->setValue(PN_MS_OTDB_CONNECTED, GCFPVBool(true)); @@ -285,12 +285,12 @@ GCFEvent::TResult MACScheduler::initial_state(GCFEvent& event, GCFPortInterface& case F_DISCONNECTED: break; - + default: LOG_DEBUG ("MACScheduler::initial, default"); status = GCFEvent::NOT_HANDLED; break; - } + } return (status); } @@ -320,15 +320,15 @@ GCFEvent::TResult MACScheduler::recover_state(GCFEvent& event, GCFPortInterface& // TODO: do recovery TRAN(MACScheduler::active_state); - + break; } - + default: LOG_DEBUG("recover_state, default"); status = GCFEvent::NOT_HANDLED; break; - } + } return (status); } @@ -349,7 +349,7 @@ GCFEvent::TResult MACScheduler::active_state(GCFEvent& event, GCFPortInterface& break; case F_ENTRY: { - // install my own signal handler. GCFTask also installs a handler so we have + // install my own signal handler. GCFTask also installs a handler so we have // to install our handler later than the GCFTask handler. pMacScheduler = this; signal (SIGINT, MACScheduler::sigintHandler); // ctrl-c @@ -367,12 +367,12 @@ GCFEvent::TResult MACScheduler::active_state(GCFEvent& event, GCFPortInterface& case F_ACCEPT_REQ: break; - case F_CONNECTED: + case F_CONNECTED: // Should be from the (lost) connection with the SD _connectedHandler(port); break; - case F_DISCONNECTED: + case F_DISCONNECTED: // Can be from StartDaemon or ObsController. // StartDaemon: trouble! Try to reconnect asap. // ObsController: ok when obs is finished, BIG TROUBLE when not! @@ -432,7 +432,7 @@ GCFEvent::TResult MACScheduler::active_state(GCFEvent& event, GCFPortInterface& // -------------------- EVENTS FROM CHILDCONTROL -------------------- // - // That must be events from the ObservationControllers that are currently + // That must be events from the ObservationControllers that are currently // started or running. // case CONTROL_STARTED: { @@ -562,16 +562,16 @@ GCFEvent::TResult MACScheduler::finishing_state(GCFEvent& event, GCFPortInterfac itsTimerPort->setTimer(1L); break; } - + case F_TIMER: GCFScheduler::instance()->stop(); break; - + default: LOG_DEBUG("finishing_state, default"); status = GCFEvent::NOT_HANDLED; break; - } + } return (status); } @@ -632,17 +632,17 @@ void MACScheduler::_updatePlannedList() ASSERTSTR (currentTime != not_a_date_time, "Can't determine systemtime, bailing out"); // get new list (list is ordered on starttime) of planned observations - vector<OTDBtree> plannedDBlist = itsOTDBconnection->getTreeGroup(1, itsPlannedPeriod, itsExclPLcluster); + vector<OTDBtree> plannedDBlist = itsOTDBconnection->getTreeGroup(1, itsPlannedPeriod, itsExclPLcluster); if (!plannedDBlist.empty()) { - LOG_DEBUG(formatString("OTDBCheck:First planned observation (%d) is at %s (active over %d seconds)", - plannedDBlist[0].treeID(), to_simple_string(plannedDBlist[0].starttime).c_str(), + LOG_DEBUG(formatString("OTDBCheck:First planned observation (%d) is at %s (active over %d seconds)", + plannedDBlist[0].treeID(), to_simple_string(plannedDBlist[0].starttime).c_str(), time_duration(plannedDBlist[0].starttime - currentTime).total_seconds())); } // NOTE: do not exit routine on emptylist: we need to write an empty list to clear the DB // make a copy of the current prepared observations (= observations shown in the navigator in the 'future' - // list). By eliminating the observations that are in the current SAS list we end up (at the end of this function) + // list). By eliminating the observations that are in the current SAS list we end up (at the end of this function) // with a list of observations that were in the SASlist the last time but now not anymore. Normally those observations // will appear in the active-list and will be removed there from the prepared list but WHEN AN OPERATOR CHANGES // THE STATUS MANUALLY into something different (e.g. ON-HOLD) the observation stays in the preparedlist. @@ -672,14 +672,14 @@ void MACScheduler::_updatePlannedList() // must we claim this observation at the claimMgr? OLiter prepIter = itsPreparedObs.find(obsID); - if ((prepIter == itsPreparedObs.end()) || (prepIter->second.prepReady == false) || + if ((prepIter == itsPreparedObs.end()) || (prepIter->second.prepReady == false) || (prepIter->second.modTime != modTime)) { // create a ParameterFile for this Observation TreeMaintenance tm(itsOTDBconnection); OTDBnode topNode = tm.getTopNode(obsID); string filename(observationParset(obsID)); if (!tm.exportTree(obsID, topNode.nodeID(), filename)) { - LOG_ERROR_STR ("Cannot create ParameterSet '" << filename << + LOG_ERROR_STR ("Cannot create ParameterSet '" << filename << "' for new observation. Observation CANNOT BE STARTED!"); } else { @@ -694,10 +694,14 @@ void MACScheduler::_updatePlannedList() // otherwise thing will go wrong in the Navigator plannedArr.push_back(new GCFPVString(obsName)); } - + // should this observation (have) be(en) started? int timeBeforeStart = time_duration(plannedDBlist[idx].starttime - currentTime).total_seconds(); -// LOG_DEBUG_STR(obsName << " starts over " << timeBeforeStart << " seconds"); + LOG_INFO(formatString("%s starts at %s which is in %d seconds", + obsName.c_str(), + to_simple_string(plannedDBlist[idx].starttime).c_str(), + timeBeforeStart)); + if (timeBeforeStart > 0 && timeBeforeStart <= (int)itsQueuePeriod) { if (itsPreparedObs[obsID].prepReady == false) { LOG_INFO_STR("Observation " << obsID << " must be started but is not claimed yet."); @@ -710,8 +714,8 @@ void MACScheduler::_updatePlannedList() string cntlrName(controllerName(CNTLRTYPE_OBSERVATIONCTRL, 0, obsID)); if (itsControllerMap.find(cntlrName) == itsControllerMap.end()) { LOG_INFO_STR("Requesting start of " << cntlrName); - itsChildControl->startChild(CNTLRTYPE_OBSERVATIONCTRL, - obsID, + itsChildControl->startChild(CNTLRTYPE_OBSERVATIONCTRL, + obsID, 0, // instanceNr myHostname(true)); // Note: controller is now in state NO_STATE/CONNECTED (C/R) diff --git a/MAC/Deployment/data/OTDB/DPPP.comp b/MAC/Deployment/data/OTDB/DPPP.comp index 482cb96050b2be811035de30e8c18c3f132e9743..e9cb7f00021d7d3ec4cc91f6ab65a0b75fd86c17 100644 --- a/MAC/Deployment/data/OTDB/DPPP.comp +++ b/MAC/Deployment/data/OTDB/DPPP.comp @@ -37,7 +37,7 @@ node msout 4.0.0 development 'node constraint' "Output MeasurementSe #par name I text - 10 0 "-" - "Name of the MeasurementSet" par overwrite I bool - 10 0 F - "When creating a new MS, overwrite if already existing?" par tilenchan I int - 10 0 0 - "For expert user: maximum number of channels per tile in output MS (0 is all channels)" -par tilesize I int - 10 0 1024 - "For expert user: tile size (in Kbytes) for the data columns in the output MS" +par tilesize I int - 10 0 4096 - "For expert user: tile size (in Kbytes) for the data columns in the output MS" par vdsdir I text - 10 0 "A" - "Directory where to put the VDS file; if empty, the MS directory is used." par writefullresflag I bool - 10 0 T - "Write the full resolution flags" diff --git a/MAC/Services/src/PipelineControl.py b/MAC/Services/src/PipelineControl.py index f3c535d09b41a9be6e6c1a415024672c572f9e4d..692e6ba7b89b24df451a7302c865763bf7c7e05f 100755 --- a/MAC/Services/src/PipelineControl.py +++ b/MAC/Services/src/PipelineControl.py @@ -66,7 +66,7 @@ from lofar.sas.otdb.OTDBBusListener import OTDBBusListener from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_BUSNAME, DEFAULT_OTDB_SERVICE_BUSNAME from lofar.sas.otdb.otdbrpc import OTDBRPC from lofar.common.util import waitForInterrupt -from lofar.messaging.RPC import RPCTimeoutException +from lofar.messaging.RPC import RPCTimeoutException, RPCException from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as DEFAULT_RAS_SERVICE_BUSNAME @@ -130,13 +130,51 @@ class Parset(dict): return self[PARSET_PREFIX + "Observation.Cluster.ProcessingCluster.clusterName"] or "CEP2" def processingPartition(self): - return self[PARSET_PREFIX + "Observation.Cluster.ProcessingCluster.clusterPartition"] or "cpu" + result = self[PARSET_PREFIX + "Observation.Cluster.ProcessingCluster.clusterPartition"] or "cpu" + if '/' in result: + logger.error('clusterPartition contains invalid value: %s. Defaulting clusterPartition to \'cpu\'', result) + return 'cpu' + return result def processingNumberOfCoresPerTask(self): - return int(self[PARSET_PREFIX + "Observation.Cluster.ProcessingCluster.numberOfCoresPerTask"]) or "20" + result = int(self[PARSET_PREFIX + "Observation.Cluster.ProcessingCluster.numberOfCoresPerTask"]) or "20" + if result < 1 or result > 20: + logger.warn('Invalid Observation.Cluster.ProcessingCluster.numberOfCoresPerTask: %s, defaulting to %s', result, max(1, min(20, result))) + return max(1, min(20, result)) def processingNumberOfTasks(self): - return int(self[PARSET_PREFIX + "Observation.Cluster.ProcessingCluster.numberOfTasks"]) or "24" + """ Parse the number of nodes to allocate from "Observation.Cluster.ProcessingCluster.numberOfTasks", + which can have either the format "{number}" or "{min}-{max}". """ + + #JS 2016-07-19: set max nodes to 23, so we always have at least 50-2*23=4 nodes available for vlad pulp + + defaultValue = "12-23" + parsetValue = self[PARSET_PREFIX + "Observation.Cluster.ProcessingCluster.numberOfTasks"].strip() + + if "-" in parsetValue: + # min,max + _min, _max = parsetValue.split("-") + + if _max > 23: + _max = 23 + + # collapse if not min <= max + if _min > _max: + result = _min + else: + result = "%s-%s" % (_min, _max) + else: + # plain number + result = int(parsetValue) + + # apply bound + if result < 1 or result > 23: + result = defaultValue + + if result != parsetValue: + logger.error('Invalid Observation.Cluster.ProcessingCluster.numberOfTasks: %s, defaulting to %s', parsetValue, result) + + return result @staticmethod def dockerRepository(): @@ -226,7 +264,7 @@ class PipelineDependencies(object): radb_task = self.rarpc.getTask(otdb_id=otdb_id) if radb_task is None: - raise TaskNotFoundException("otdb_id %s not found in RADB" % (otdb_id,)) + raise PipelineDependencies.TaskNotFoundException("otdb_id %s not found in RADB" % (otdb_id,)) predecessor_radb_ids = radb_task['predecessor_ids'] predecessor_tasks = self.rarpc.getTasks(task_ids=predecessor_radb_ids) @@ -243,10 +281,10 @@ class PipelineDependencies(object): radb_task = self.rarpc.getTask(otdb_id=otdb_id) if radb_task is None: - raise TaskNotFoundException("otdb_id %s not found in RADB" % (otdb_id,)) + raise PipelineDependencies.TaskNotFoundException("otdb_id %s not found in RADB" % (otdb_id,)) successor_radb_ids = radb_task['successor_ids'] - successor_tasks = self.rarpc.getTasks(task_ids=successor_ids) if successor_radb_ids else [] + successor_tasks = self.rarpc.getTasks(task_ids=successor_radb_ids) if successor_radb_ids else [] successor_otdb_ids = [t["otdb_id"] for t in successor_tasks] logger.debug("getSuccessorIds(%s) = %s", otdb_id, successor_otdb_ids) @@ -262,11 +300,11 @@ class PipelineDependencies(object): try: myState = self.getState(otdbId) predecessorStates = self.getPredecessorStates(otdbId) - except TaskNotFoundException, e: + except PipelineDependencies.TaskNotFoundException, e: logger.error("canStart(%s): Error obtaining task states, not starting pipeline: %s", otdbId, e) return False - logger.debug("canStart(%s)? state = %s, predecessors = %s", otdbId, myState, predecessorStates) + logger.info("canStart(%s)? state = %s, predecessors = %s", otdbId, myState, predecessorStates) return ( myState == "scheduled" and @@ -286,7 +324,12 @@ class PipelineControl(OTDBBusListener): self.otdbrpc.taskSetStatus(otdb_id=otdb_id, new_status=status) def _getParset(self, otdbId): - return Parset(self.otdbrpc.taskGetSpecification(otdb_id=otdbId)["specification"]) + try: + return Parset(self.otdbrpc.taskGetSpecification(otdb_id=otdbId)["specification"]) + except RPCException, e: + # Parset not in OTDB, probably got deleted + logger.error("Cannot retrieve parset of task %s: %s", otdbId, e) + return None def start_listening(self, **kwargs): self.otdbrpc.open() @@ -294,20 +337,50 @@ class PipelineControl(OTDBBusListener): super(PipelineControl, self).start_listening(**kwargs) + self._checkScheduledPipelines() + def stop_listening(self, **kwargs): super(PipelineControl, self).stop_listening(**kwargs) self.dependencies.close() self.otdbrpc.close() + def _checkScheduledPipelines(self): + try: + scheduled_pipelines = self.dependencies.rarpc.getTasks(task_status='scheduled', task_type='pipeline') + logger.info("Checking %s scheduled pipelines if they can start.", len(scheduled_pipelines)) + + for pipeline in scheduled_pipelines: + logger.info("Checking if scheduled pipeline otdbId=%s can start.", pipeline['otdb_id']) + try: + otdbId = pipeline['otdb_id'] + parset = self._getParset(otdbId) + if not parset or not self._shouldHandle(parset): + continue + + # Maybe the pipeline can start already + if self.dependencies.canStart(otdbId): + self._startPipeline(otdbId, parset) + else: + logger.info("Job %s was set to scheduled, but cannot start yet.", otdbId) + except Exception as e: + logger.error(e) + except Exception as e: + logger.error(e) + @staticmethod def _shouldHandle(parset): - if not parset.isPipeline(): - logger.info("Not processing tree: is not a pipeline") - return False - - if parset.processingCluster() == "CEP2": - logger.info("Not processing tree: is a CEP2 pipeline") + try: + if not parset.isPipeline(): + logger.info("Not processing tree: is not a pipeline") + return False + + if parset.processingCluster() == "CEP2": + logger.info("Not processing tree: is a CEP2 pipeline") + return False + except KeyError as e: + # Parset not complete + logger.error("Parset incomplete, ignoring: %s", e) return False return True @@ -328,6 +401,8 @@ class PipelineControl(OTDBBusListener): logger.info("Pipeline %s is already queued or running in SLURM.", otdbId) return + logger.info("***** START Otdb ID %s *****", otdbId) + # Determine SLURM parameters sbatch_params = [ # Only run job if all nodes are ready @@ -344,11 +419,11 @@ class PipelineControl(OTDBBusListener): # Lower priority to drop below inspection plots "--nice=1000", - + "--partition=%s" % parset.processingPartition(), "--nodes=%s" % parset.processingNumberOfTasks(), "--cpus-per-task=%s" % parset.processingNumberOfCoresPerTask(), - + # Define better places to write the output os.path.expandvars("--output=/data/log/pipeline-%s-%%j.log" % (otdbId,)), ] @@ -367,39 +442,87 @@ class PipelineControl(OTDBBusListener): status_bus = self.otdb_service_busname, )) + logger.info("Handing over pipeline %s to SLURM, setting status to QUEUED", otdbId) + self._setStatus(otdbId, "queued") + # Schedule runPipeline.sh logger.info("Scheduling SLURM job for runPipeline.sh") slurm_job_id = self.slurm.submit(self._jobName(otdbId), - # notify that we're running - "{setStatus_active}\n" - # pull docker image from repository on all nodes - "srun --nodelist=$SLURM_NODELIST --cpus-per-task=1 --job-name=docker-pull" - " --kill-on-bad-exit=0 --wait=0" - " docker pull {repository}/{image}\n" - # put a local tag on the pulled image - "srun --nodelist=$SLURM_NODELIST --cpus-per-task=1 --job-name=docker-tag" - " --kill-on-bad-exit=0 --wait=0" - " docker tag -f {repository}/{image} {image}\n" - # call runPipeline.sh in the image on this node - "docker run --rm" - " --net=host" - " -e LOFARENV={lofarenv}" - " -u $UID" - " -e USER=$USER" - " -e HOME=$HOME" - " -v $HOME/.ssh:$HOME/.ssh:ro" - " -e SLURM_JOB_ID=$SLURM_JOB_ID" - " -v /data:/data" - " {image}" - " runPipeline.sh -o {obsid} -c /opt/lofar/share/pipeline/pipeline.cfg.{cluster} -P /data/parsets || exit $?\n" - - # notify that we're tearing down - "{setStatus_completing}\n" - # wait for MoM to pick up feedback before we set finished status - "sleep 60\n" - # if we reached this point, the pipeline ran succesfully - "{setStatus_finished}\n" - .format( +""" +# Run a command, but propagate SIGINT and SIGTERM +function runcmd {{ + trap 'kill -s SIGTERM $PID' SIGTERM + trap 'kill -s SIGINT $PID' SIGINT + + "$@" & + PID=$! + wait $PID # returns the exit status of "wait" if interrupted + wait $PID # returns the exit status of $PID + RESULT=$? + + trap - SIGTERM SIGINT + + return $RESULT +}} + +# print some info +echo Running on $SLURM_NODELIST + +# notify OTDB that we're running +runcmd {setStatus_active} + +# notify ganglia +wget -O - -q "http://ganglia.control.lofar/ganglia/api/events.php?action=add&start_time=now&summary=Pipeline {obsid} ACTIVE&host_regex=" + +# pull docker image from repository on all nodes +srun --nodelist=$SLURM_NODELIST --cpus-per-task=1 --job-name=docker-pull \ + --kill-on-bad-exit=0 --wait=0 \ + docker pull {repository}/{image} + +# put a local tag on the pulled image +srun --nodelist=$SLURM_NODELIST --cpus-per-task=1 --job-name=docker-tag \ + --kill-on-bad-exit=0 --wait=0 \ + docker tag -f {repository}/{image} {image} + +# run the pipeline +runcmd docker run --rm --net=host \ + -e LOFARENV={lofarenv} \ + -u $UID -e USER=$USER \ + -e HOME=$HOME -v $HOME/.ssh:$HOME/.ssh:ro \ + -e SLURM_JOB_ID=$SLURM_JOB_ID \ + -v /data:/data \ + {image} \ + runPipeline.sh -o {obsid} -c /opt/lofar/share/pipeline/pipeline.cfg.{cluster} + +if [ $? -eq 0 ]; then + # notify that we're tearing down + runcmd {setStatus_completing} + + # wait for MoM to pick up feedback before we set finished status + runcmd sleep 60 + + # if we reached this point, the pipeline ran succesfully + runcmd {setStatus_finished} + + # notify ganglia + wget -O - -q "http://ganglia.control.lofar/ganglia/api/events.php?action=add&start_time=now&summary=Pipeline {obsid} FINISHED&host_regex=" + + # return success + exit 0 +else + # aborted state is already set by pipeline framework. + # Why? why does the framework set the aborted state, but not the finished state? questions questions.... + # so, do not set aborted state here. + # {setStatus_aborted} + + # notify ganglia + wget -O - -q "http://ganglia.control.lofar/ganglia/api/events.php?action=add&start_time=now&summary=Pipeline {obsid} ABORTED&host_regex=" + + # return failure + exit 1 +fi + +""".format( lofarenv = os.environ.get("LOFARENV", ""), obsid = otdbId, repository = parset.dockerRepository(), @@ -409,6 +532,7 @@ class PipelineControl(OTDBBusListener): setStatus_active = setStatus_cmdline("active"), setStatus_completing = setStatus_cmdline("completing"), setStatus_finished = setStatus_cmdline("finished"), + setStatus_aborted = setStatus_cmdline("aborted"), ), sbatch_params=sbatch_params @@ -418,9 +542,16 @@ class PipelineControl(OTDBBusListener): # Schedule pipelineAborted.sh logger.info("Scheduling SLURM job for pipelineAborted.sh") slurm_cancel_job_id = self.slurm.submit("%s-abort-trigger" % self._jobName(otdbId), - "{setStatus_aborted}\n" +""" +# notify OTDB +{setStatus_aborted} + +# notify ganglia +wget -O - -q "http://ganglia.control.lofar/ganglia/api/events.php?action=add&start_time=now&summary=Pipeline {obsid} ABORTED&host_regex=" +""" .format( setStatus_aborted = setStatus_cmdline("aborted"), + obsid = otdbId, ), sbatch_params=[ @@ -435,16 +566,13 @@ class PipelineControl(OTDBBusListener): ) logger.info("Scheduled SLURM job %s", slurm_cancel_job_id) - logger.info("Setting status to QUEUED") - self._setStatus(otdbId, "queued") - def _stopPipeline(self, otdbId): # Cancel corresponding SLURM job, but first the abort-trigger # to avoid setting ABORTED as a side effect. # to be cancelled as well. if not self.slurm.isQueuedOrRunning(otdbId): - logger.info("_stopPipeline: Job %s not running") + logger.info("_stopPipeline: Job %s not running", otdbId) return def cancel(jobName): @@ -458,29 +586,27 @@ class PipelineControl(OTDBBusListener): def _startSuccessors(self, otdbId): try: successor_ids = self.dependencies.getSuccessorIds(otdbId) - except TaskNotFoundException, e: + except PipelineDependencies.TaskNotFoundException, e: logger.error("_startSuccessors(%s): Error obtaining task successors, not starting them: %s", otdbId, e) return for s in successor_ids: parset = self._getParset(s) - if not self._shouldHandle(parset): + if not parset or not self._shouldHandle(parset): continue if self.dependencies.canStart(s): - logger.info("***** START Otdb ID %s *****", otdbId) self._startPipeline(s, parset) else: logger.info("Job %s still cannot start yet.", otdbId) def onObservationScheduled(self, otdbId, modificationTime): parset = self._getParset(otdbId) - if not self._shouldHandle(parset): + if not parset or not self._shouldHandle(parset): return # Maybe the pipeline can start already if self.dependencies.canStart(otdbId): - logger.info("***** START Otdb ID %s *****", otdbId) self._startPipeline(otdbId, parset) else: logger.info("Job %s was set to scheduled, but cannot start yet.", otdbId) @@ -494,7 +620,7 @@ class PipelineControl(OTDBBusListener): def onObservationAborted(self, otdbId, modificationTime): parset = self._getParset(otdbId) - if not self._shouldHandle(parset): + if parset and not self._shouldHandle(parset): # stop jobs even if there's no parset return logger.info("***** STOP Otdb ID %s *****", otdbId) @@ -504,7 +630,6 @@ class PipelineControl(OTDBBusListener): More statusses we want to abort on. """ onObservationDescribed = onObservationAborted - onObservationPrepared = onObservationAborted onObservationApproved = onObservationAborted onObservationPrescheduled = onObservationAborted onObservationConflict = onObservationAborted diff --git a/MAC/Services/test/tPipelineControl.py b/MAC/Services/test/tPipelineControl.py index 07e23240de40b53b4ab313532bbe1366a1560633..f4994db0295480540d37e0dcfe51f918e2926708 100644 --- a/MAC/Services/test/tPipelineControl.py +++ b/MAC/Services/test/tPipelineControl.py @@ -108,6 +108,10 @@ class MockRAService(MessageHandlerInterface): def GetTasks(self, lower_bound, upper_bound, task_ids, task_status, task_type): print "***** GetTasks(%s) *****" % (task_ids,) + if task_ids is None: + # Used on startup to check which tasks are at scheduled + return [] + return [{ 'otdb_id': t - 1000, 'status': self.status[t - 1000], diff --git a/SAS/DataManagement/CleanupService/cleanupservice.ini b/SAS/DataManagement/CleanupService/cleanupservice.ini index 0e705c1191c68047150e9fb29e4086e610ac1c61..b92b6b0636f47388aaf7880c819a969d4f93d4c6 100644 --- a/SAS/DataManagement/CleanupService/cleanupservice.ini +++ b/SAS/DataManagement/CleanupService/cleanupservice.ini @@ -1,5 +1,5 @@ [program:cleanupservice] -command=/bin/bash -c 'source $LOFARROOT/lofarinit.sh;exec cleanupservice --log-queries' +command=/bin/bash -c 'source $LOFARROOT/lofarinit.sh;exec cleanupservice' user=lofarsys stopsignal=INT ; KeyboardInterrupt stopasgroup=true ; bash does not propagate signals diff --git a/SAS/DataManagement/StorageQueryService/cache.py b/SAS/DataManagement/StorageQueryService/cache.py index ad82f4ea8342214bab45c43d8bc72a4e0a84fbb7..2de9f88dffeff47f583f45f5a7c27acbff685ad3 100644 --- a/SAS/DataManagement/StorageQueryService/cache.py +++ b/SAS/DataManagement/StorageQueryService/cache.py @@ -148,7 +148,6 @@ class CacheManager: return with self._cacheLock: - logger.info('tree scan: %s %s', directory, self._cache.keys()) if directory not in self._cache: logger.info('tree scan: adding \'%s\' with empty disk_usage to cache which will be du\'ed later', directory) empty_du_result = {'found': True, 'disk_usage': None, 'path': directory, 'name': directory.split('/')[-1]} diff --git a/SAS/DataManagement/StorageQueryService/storagequeryservice.ini b/SAS/DataManagement/StorageQueryService/storagequeryservice.ini index 45bcc52ad0cb0e0c99bace78d4b804e3d6e37ec5..d65bbc1241316fd088e1f7dbad99e15d9db0b773 100644 --- a/SAS/DataManagement/StorageQueryService/storagequeryservice.ini +++ b/SAS/DataManagement/StorageQueryService/storagequeryservice.ini @@ -1,5 +1,5 @@ [program:storagequeryservice] -command=/bin/bash -c 'source $LOFARROOT/lofarinit.sh;exec storagequeryservice --log-queries' +command=/bin/bash -c 'source $LOFARROOT/lofarinit.sh;exec storagequeryservice' user=lofarsys stopsignal=INT ; KeyboardInterrupt stopasgroup=true ; bash does not propagate signals diff --git a/SAS/OTDB/sql/getTreeGroup_func.sql b/SAS/OTDB/sql/getTreeGroup_func.sql index b1b1dc95e23e548a1f9c26d4c5f90e945732165f..053363ad9aea96099ca1572ce9da4f967dc96a48 100644 --- a/SAS/OTDB/sql/getTreeGroup_func.sql +++ b/SAS/OTDB/sql/getTreeGroup_func.sql @@ -61,6 +61,7 @@ CREATE OR REPLACE FUNCTION getTreeGroup(INT, INT, VARCHAR(20)) vWhere TEXT; vQuery TEXT; vSortOrder TEXT; + vSortOrderExcept TEXT; vExcept TEXT; vCluster TEXT; @@ -113,6 +114,8 @@ CREATE OR REPLACE FUNCTION getTreeGroup(INT, INT, VARCHAR(20)) END IF; END IF; + vSortOrderExcept := replace(vSortOrder, 't.', ''); + -- do selection FOR vRecord IN EXECUTE ' WITH VICtrees AS ( @@ -140,7 +143,7 @@ CREATE OR REPLACE FUNCTION getTreeGroup(INT, INT, VARCHAR(20)) AND t.classif = 3 ' || vWhere || ' ORDER BY ' || vSortOrder || ') - ' || vQuery || vExcept + ' || vQuery || vExcept || ' ORDER BY ' || vSortOrderExcept LOOP RETURN NEXT vRecord; END LOOP; diff --git a/SAS/ResourceAssignment/OTDBtoRATaskStatusPropagator/propagator.py b/SAS/ResourceAssignment/OTDBtoRATaskStatusPropagator/propagator.py index 711f4721680b85da0bbeea28b02311d84d74c685..f2c904549c469541a4b4f3c3dea3ea5bf5d73805 100644 --- a/SAS/ResourceAssignment/OTDBtoRATaskStatusPropagator/propagator.py +++ b/SAS/ResourceAssignment/OTDBtoRATaskStatusPropagator/propagator.py @@ -137,9 +137,12 @@ class OTDBtoRATaskStatusPropagator(OTDBBusListener): def onObservationCompleting(self, treeId, modificationTime): self._update_radb_task_status(treeId, 'completing') - def _updateStopTime(self, treeId): + def _updateStopTime(self, treeId, only_pipelines=False): radb_task = self.radb.getTask(otdb_id=treeId) - if radb_task and radb_task['type'] == 'pipeline': + if radb_task: + if only_pipelines and radb_task['type'] != 'pipeline': + return + otdb_task = self.otdb.taskGetTreeInfo(otdb_id=treeId) if otdb_task and (otdb_task['starttime'] != radb_task['starttime'] or otdb_task['stoptime'] != radb_task['endtime']): new_endtime = otdb_task['stoptime'] @@ -152,14 +155,13 @@ class OTDBtoRATaskStatusPropagator(OTDBBusListener): # otdb adjusts stoptime when finishing, # reflect that in radb for pipelines - self._updateStopTime(treeId) + self._updateStopTime(treeId, only_pipelines=True) def onObservationAborted(self, treeId, modificationTime): self._update_radb_task_status(treeId, 'aborted') # otdb adjusts stoptime when aborted, - # reflect that in radb for pipelines - self._updateStopTime(treeId) + self._updateStopTime(treeId, only_pipelines=False) def main(): # Check the invocation arguments diff --git a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py index 081a83859ed0fff53e3669c5b8262d0240b34fb8..42cf5c22b21415f0597e49dbaf80b1eb39433fd6 100755 --- a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py +++ b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/propagator.py @@ -28,6 +28,7 @@ reads the info from the RA DB and sends it to OTDB in the correct format. import logging import datetime import time +import pprint from lofar.messaging.RPC import RPC, RPCException from lofar.parameterset import parameterset @@ -109,6 +110,16 @@ class RAtoOTDBPropagator(): except Exception as e: logger.error(e) + def doTaskError(self, otdb_id): + logger.info('doTaskError: otdb_id=%s' % (otdb_id,)) + if not otdb_id: + logger.warning('doTaskError no valid otdb_id: otdb_id=%s' % (otdb_id,)) + return + try: + self.otdbrpc.taskSetStatus(otdb_id, 'error') + except Exception as e: + logger.error(e) + def doTaskScheduled(self, ra_id, otdb_id, mom_id): try: logger.info('doTaskScheduled: ra_id=%s otdb_id=%s mom_id=%s' % (ra_id, otdb_id, mom_id)) @@ -137,11 +148,10 @@ class RAtoOTDBPropagator(): project_name = 'unknown' otdb_info = self.translator.CreateParset(otdb_id, ra_info, project_name) - logger.info("Parset info for OTDB: %s" %otdb_info) self.setOTDBinfo(otdb_id, otdb_info, 'scheduled') except Exception as e: logger.error(e) - self.doTaskConflict(otdb_id) + self.doTaskError(otdb_id) #FIXME should be to the RADB also or instead? def ParseStorageProperties(self, storage_claim): """input something like: @@ -190,7 +200,7 @@ class RAtoOTDBPropagator(): result['output_files'][p['type_name']] = p['value'] if p['io_type_name'] == 'input': result['input_files'][p['type_name']] = p['value'] - logging.info(result) + logger.info(pprint.pformat(result)) return result def getRAinfo(self, ra_id): @@ -199,7 +209,7 @@ class RAtoOTDBPropagator(): task = self.radbrpc.getTask(ra_id) claims = self.radbrpc.getResourceClaims(task_ids=ra_id, extended=True, include_properties=True) for claim in claims: - logger.debug("Processing claim: %s" % claim) + logger.info("Processing claim: %s" % claim) if claim['resource_type_name'] == 'storage': ## TODO we will need to check for different storage names/types in the future info['storage'] = self.ParseStorageProperties(claim) info["starttime"] = task["starttime"] @@ -210,11 +220,12 @@ class RAtoOTDBPropagator(): def setOTDBinfo(self, otdb_id, otdb_info, otdb_status): try: - logger.info('Setting specticication for otdb_id %s: %s' % (otdb_id, otdb_info)) + logger.info('Setting specticication for otdb_id %s:\n' % (otdb_id,)) + logger.info(pprint.pformat(otdb_info)) self.otdbrpc.taskSetSpecification(otdb_id, otdb_info) self.otdbrpc.taskPrepareForScheduling(otdb_id, otdb_info["LOFAR.ObsSW.Observation.startTime"], otdb_info["LOFAR.ObsSW.Observation.stopTime"]) logger.info('Setting status (%s) for otdb_id %s' % (otdb_status, otdb_id)) self.otdbrpc.taskSetStatus(otdb_id, otdb_status) except Exception as e: logger.error(e) - self.doTaskConflict(otdb_id) + self.doTaskError(otdb_id) #FIXME should be to the RADB also or instead? diff --git a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/rotspservice.py b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/rotspservice.py index bc3359afa3d3b9ea978d615799aef910bb1e99b8..36acbcfefabf3229ac6db1f0c15e2533bf6a8e26 100755 --- a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/rotspservice.py +++ b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/rotspservice.py @@ -80,6 +80,14 @@ class RATaskStatusChangedListener(RABusListener): self.propagator.doTaskConflict(otdb_id) + def onTaskError(self, task_ids): + radb_id = task_ids.get('radb_id') + otdb_id = task_ids.get('otdb_id') #Does this work if one of the Id's is not set? + mom_id = task_ids.get('mom_id') + logger.info('onTaskError: radb_id=%s otdb_id=%s mom_id=%s', radb_id, otdb_id, mom_id) + + self.propagator.doTaskError(otdb_id) + __all__ = ["RATaskStatusChangedListener"] diff --git a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py index e1d1e2cd4e532f1e82934a08c66b045c529502bf..60a68de5a17c233307f16e4efd0679ff0e676064 100755 --- a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py +++ b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py @@ -26,6 +26,7 @@ reads the info from the RA DB and sends it to OTDB in the correct format. """ import logging +import pprint from lofar.common.util import to_csv_string from math import ceil, floor @@ -95,11 +96,15 @@ class RAtoOTDBTranslator(): result = {} nr_stokes = storage_properties['nr_of_cs_stokes'] for sap in storage_properties["saps"]: ##We might need to sort saps? - if "nr_of_cs_files" in sap['properties']: + if 'nr_of_cs_files' in sap['properties']: nr_files = sap['properties']['nr_of_cs_files'] nr_tabs = sap['properties']['nr_of_tabs'] + skip_tab = 'is_tab_nr' in sap['properties'] nr_parts = int(ceil(nr_files/float(nr_tabs * nr_stokes))) for tab in xrange(nr_tabs): + if skip_tab: + if tab == sap['properties']['is_tab_nr']: + continue for stokes in xrange(nr_stokes): for part in xrange(nr_parts): locations.append(self.locationPath(project_name, otdb_id) + '/cs') @@ -118,15 +123,14 @@ class RAtoOTDBTranslator(): nr_stokes = storage_properties['nr_of_is_stokes'] for sap in storage_properties["saps"]: ##We might need to sort saps? if "nr_of_is_files" in sap['properties']: - nr_files = sap['properties']['nr_of_is_files'] - nr_tabs = sap['properties']['nr_of_tabs'] - nr_parts = int(ceil(nr_files/float(nr_tabs * nr_stokes))) - for tab in xrange(nr_tabs): - for stokes in xrange(nr_stokes): - for part in xrange(nr_parts): - locations.append(self.locationPath(project_name, otdb_id) + '/is') - filenames.append("L%d_SAP%03d_B%03d_S%d_P%03d_bf.h5" % (otdb_id, sap['sap_nr'], tab, stokes, part)) - skip.append("0") + nr_files = sap['properties']['nr_of_is_files'] + is_tab_nr = sap['properties']['is_tab_nr'] + nr_parts = int(ceil(nr_files/float(nr_stokes))) + for stokes in xrange(nr_stokes): + for part in xrange(nr_parts): + locations.append(self.locationPath(project_name, otdb_id) + '/is') + filenames.append("L%d_SAP%03d_B%03d_S%d_P%03d_bf.h5" % (otdb_id, sap['sap_nr'], is_tab_nr, stokes, part)) + skip.append("0") result[PREFIX + 'DataProducts.%s_IncoherentStokes.locations' % (io_type)] = '[' + to_csv_string(locations) + ']' result[PREFIX + 'DataProducts.%s_IncoherentStokes.filenames' % (io_type)] = '[' + to_csv_string(filenames) + ']' result[PREFIX + 'DataProducts.%s_IncoherentStokes.skip' % (io_type)] = '[' + to_csv_string(skip) + ']' @@ -198,7 +202,7 @@ class RAtoOTDBTranslator(): return result def ProcessStorageInfo(self, otdb_id, storage_info, project_name): - logging.debug('processing the storage for %i with ' % (otdb_id) + str(storage_info)) + logging.info('processing the storage for %i' % (otdb_id)) result = {} if 'input_files' in storage_info: result.update(self.CreateStorageKeys(otdb_id, storage_info['input_files'], project_name, "Input")) @@ -215,7 +219,7 @@ class RAtoOTDBTranslator(): parset[PREFIX+'stopTime'] = ra_info['endtime'].strftime('%Y-%m-%d %H:%M:%S') if 'storage' in ra_info: - logging.info("Adding storage claims to parset: " + str(ra_info['storage'])) + logging.info("Adding storage claims to parset\n" + pprint.pformat(ra_info['storage'])) parset.update(self.ProcessStorageInfo(otdb_id, ra_info['storage'], project_name)) if 'stations' in ra_info: logging.info("Adding stations to parset: " + str(ra_info["stations"])) diff --git a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/test/t_rotspservice.py b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/test/t_rotspservice.py index 66bfa974c64d2ab006e7a5bc3c5108bb1014d17a..1e49b01e28d6a0f1ab09a39e2ea4671c3dafefb2 100755 --- a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/test/t_rotspservice.py +++ b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/test/t_rotspservice.py @@ -42,10 +42,6 @@ with patch('lofar.sas.resourceassignment.ratootdbtaskspecificationpropagator.rpc #give pre-cooked answer depending on called service if servicename == 'ResourceEstimator': return {'Observation':{'total_data_size':1, 'total_bandwidth':1, 'output_files':1}}, "OK" - elif servicename == 'SSDBService.GetActiveGroupNames': - return {0:'storagenodes', 1:'computenodes', 2:'archivenodes', 3:'locusnodes', 4:'cep4'}, "OK" - elif servicename == 'SSDBService.GetHostForGID': - return {u'groupname': u'cep4', u'nodes': [{u'claimedspace': 0, u'totalspace': 702716, u'statename': u'Active', u'usedspace': 23084, u'id': 1, u'groupname': u'cep4', u'path': u'/lustre', u'hostname': u'lustre001'}]}, "OK" return None, None diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py index f0e31b9c13b0b0431f8c86a8852a0e062ac30eac..9344f5b7bba5c4aac28d9fc473e6ae1ce5d8f256 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py @@ -45,10 +45,6 @@ from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFA from lofar.sas.otdb.otdbrpc import OTDBRPC from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME -from lofar.sas.systemstatus.service.SSDBrpc import SSDBRPC -from lofar.sas.systemstatus.service.config import DEFAULT_SSDB_BUSNAME -from lofar.sas.systemstatus.service.config import DEFAULT_SSDB_SERVICENAME - from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_BUSNAME from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_PREFIX @@ -60,8 +56,6 @@ class ResourceAssigner(): radb_servicename=RADB_SERVICENAME, re_busname=RE_BUSNAME, re_servicename=RE_SERVICENAME, - ssdb_busname=DEFAULT_SSDB_BUSNAME, - ssdb_servicename=DEFAULT_SSDB_SERVICENAME, otdb_busname=DEFAULT_OTDB_SERVICE_BUSNAME, otdb_servicename=DEFAULT_OTDB_SERVICENAME, ra_notification_busname=DEFAULT_RA_NOTIFICATION_BUSNAME, @@ -73,13 +67,10 @@ class ResourceAssigner(): :param radb_servicename: servicename of the radb service (default: RADBService) :param re_busname: busname on which the resource estimator service listens (default: lofar.ra.command) :param re_servicename: servicename of the resource estimator service (default: ResourceEstimation) - :param ssdb_busname: busname on which the ssdb service listens (default: lofar.system) - :param ssdb_servicename: servicename of the radb service (default: SSDBService) :param broker: Valid Qpid broker host (default: None, which means localhost) """ self.radbrpc = RARPC(servicename=radb_servicename, busname=radb_busname, broker=broker) self.rerpc = RPC(re_servicename, busname=re_busname, broker=broker, ForwardExceptions=True) - self.ssdbrpc = SSDBRPC(servicename=ssdb_servicename, busname=ssdb_busname, broker=broker) self.otdbrpc = OTDBRPC(busname=otdb_busname, servicename=otdb_servicename, broker=broker) ## , ForwardExceptions=True hardcoded in RPCWrapper right now self.ra_notification_bus = ToBus(address=ra_notification_busname, broker=broker) self.ra_notification_prefix = ra_notification_prefix @@ -98,7 +89,6 @@ class ResourceAssigner(): self.radbrpc.open() self.rerpc.open() self.otdbrpc.open() - self.ssdbrpc.open() self.ra_notification_bus.open() def close(self): @@ -106,7 +96,6 @@ class ResourceAssigner(): self.radbrpc.close() self.rerpc.close() self.otdbrpc.close() - self.ssdbrpc.close() self.ra_notification_bus.close() def doAssignment(self, specification_tree): @@ -174,44 +163,46 @@ class ResourceAssigner(): logger.error("no task type %s found in estimator results %s" % (taskType, needed[str(otdb_id)])) return - # make sure the availability in the radb is up to date - # TODO: this should be updated regularly - try: - self.updateAvailableResources('cep4') - except Exception as e: - logger.warning("Exception while updating available resources: %s" % str(e)) - # claim the resources for this task # during the claim inserts the claims are automatically validated # and if not enough resources are available, then they are put to conflict status # also, if any claim is in conflict state, then the task is put to conflict status as well main_needed = needed[str(otdb_id)] task = self.radbrpc.getTask(taskId) - claimed, claim_ids = self.claimResources(main_needed, task) - if claimed: - conflictingClaims = self.radbrpc.getResourceClaims(task_ids=taskId, status='conflict') - - if conflictingClaims: - # radb set's task status to conflict automatically - logger.warning('doAssignment: %s conflicting claims detected. Task cannot be scheduled. %s' % - (len(conflictingClaims), conflictingClaims)) - self._sendNotification(task, 'conflict') - else: - logger.info('doAssignment: all claims for task %s were succesfully claimed. Setting task status to scheduled' % (taskId,)) - self.radbrpc.updateTaskAndResourceClaims(taskId, task_status='scheduled', claim_status='allocated') - self._sendNotification(task, 'scheduled') - self.processPredecessors(specification_tree) + if 'errors' in main_needed and main_needed['errors']: + for error in main_needed['errors']: + logger.error("Error in estimator: %s", error) + + logger.error("Error(s) in estimator for otdb_id=%s radb_id=%s, setting task status to 'error'", otdb_id, taskId) + self.radbrpc.updateTask(taskId, status='error') + self._sendNotification(task, 'error') else: - logger.warning('doAssignment: Not all claims could be inserted. Setting task %s status to conflict' % (taskId)) - self.radbrpc.updateTask(taskId, status='conflict') - self._sendNotification(task, 'conflict') + claimed, claim_ids = self.claimResources(main_needed, task) + if claimed: + conflictingClaims = self.radbrpc.getResourceClaims(task_ids=taskId, status='conflict') + + if conflictingClaims: + # radb set's task status to conflict automatically + logger.warning('doAssignment: %s conflicting claims detected. Task cannot be scheduled. %s' % + (len(conflictingClaims), conflictingClaims)) + self._sendNotification(task, 'conflict') + else: + logger.info('doAssignment: all claims for task %s were succesfully claimed. Setting task status to scheduled' % (taskId,)) + self.radbrpc.updateTaskAndResourceClaims(taskId, task_status='scheduled', claim_status='allocated') + self._sendNotification(task, 'scheduled') + + self.processPredecessors(specification_tree) + else: + logger.warning('doAssignment: Not all claims could be inserted. Setting task %s status to conflict' % (taskId)) + self.radbrpc.updateTask(taskId, status='conflict') + self._sendNotification(task, 'conflict') def _sendNotification(self, task, status): try: - if status == 'scheduled' or status == 'conflict': + if status == 'scheduled' or status == 'conflict' or status == 'error': content={'radb_id': task['id'], 'otdb_id':task['otdb_id'], 'mom_id': task['mom_id']} - subject= 'TaskScheduled' if status == 'scheduled' else 'TaskConflict' + subject= 'Task' + status[0].upper() + status[1:] msg = EventMessage(context=self.ra_notification_prefix + subject, content=content) logger.info('Sending notification %s: %s' % (subject, str(content).replace('\n', ' '))) self.ra_notification_bus.send(msg) @@ -229,7 +220,7 @@ class ResourceAssigner(): for predecessor_tree in predecessor_trees: pred_otdb_id = predecessor_tree['otdb_id'] predecessor_task = self.radbrpc.getTask(otdb_id=pred_otdb_id) - if predecessor_task: + if predecessor_task and predecessor_task['id'] not in task['predecessor_ids']: self.radbrpc.insertTaskPredecessor(task['id'], predecessor_task['id']) self.processPredecessors(predecessor_tree) @@ -271,34 +262,6 @@ class ResourceAssigner(): logger.info('getNeededResouces: %s' % replymessage) return replymessage - def updateAvailableResources(self, cluster): - # find out which resources are available - # and what is their capacity - # For now, only look at CEP4 storage - # Later, also look at stations up/down for short term scheduling - - #get all active groupnames, find id for cluster group - groupnames = self.ssdbrpc.getactivegroupnames() - cluster_group_id = next(k for k,v in groupnames.items() if v == cluster) - - # for CEP4 cluster, do hard codes lookup of first and only node - node_info = self.ssdbrpc.gethostsforgid(cluster_group_id)['nodes'][0] - - storage_resources = self.radbrpc.getResources(resource_types='storage', include_availability=True) - cep4_storage_resource = next(x for x in storage_resources if 'cep4' in x['name']) - active = node_info['statename'] == 'Active' - total_capacity = node_info['totalspace'] - available_capacity = total_capacity - node_info['usedspace'] - - logger.info("Updating resource availability of %s (id=%s) to active=%s available_capacity=%s total_capacity=%s" % - (cep4_storage_resource['name'], cep4_storage_resource['id'], active, available_capacity, total_capacity)) - - self.radbrpc.updateResourceAvailability(cep4_storage_resource['id'], - active=active, - available_capacity=available_capacity, - total_capacity=total_capacity) - - def claimResources(self, needed_resources, task): logger.info('claimResources: task %s needed_resources=%s' % (task, needed_resources)) diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/config.py b/SAS/ResourceAssignment/ResourceAssigner/lib/config.py index fccb37107e994bae816634317aa65265b3e087c0..761d48d09b100638d059c9e5dbc82f7d818e9502 100644 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/config.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/config.py @@ -9,3 +9,5 @@ DEFAULT_SERVICENAME = 'RAService' DEFAULT_RA_NOTIFICATION_BUSNAME = adaptNameToEnvironment('lofar.ra.notification') DEFAULT_RA_NOTIFICATION_PREFIX = 'ResourceAssigner.' DEFAULT_RA_NOTIFICATION_SUBJECTS=DEFAULT_RA_NOTIFICATION_PREFIX+'*' + +PIPELINE_CHECK_INTERVAL=300 diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/rabuslistener.py b/SAS/ResourceAssignment/ResourceAssigner/lib/rabuslistener.py index 573b791a8e04e697821c19b6bf388c5ed4e2e453..bbeec2501294045f96587ce043fdb7ec67768db6 100644 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/rabuslistener.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/rabuslistener.py @@ -64,6 +64,8 @@ class RABusListener(AbstractBusListener): self.onTaskScheduled(msg.content) elif msg.subject == '%sTaskConflict' % self.subject_prefix: self.onTaskConflict(msg.content) + elif msg.subject == '%sTaskError' % self.subject_prefix: + self.onTaskError(msg.content) else: logger.error("RABusListener.handleMessage: unknown subject: %s" %str(msg.subject)) @@ -77,6 +79,11 @@ class RABusListener(AbstractBusListener): :param task_ids: a dict containing radb_id, mom_id and otdb_id''' pass + def onTaskError(self, task_ids): + '''onTaskError is called upon receiving a TaskError message. + :param task_ids: a dict containing radb_id, mom_id and otdb_id''' + pass + if __name__ == '__main__': with RABusListener(broker=None) as listener: waitForInterrupt() diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py b/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py index a8cae557c285c661a5eb6dd44b20d46d8f1665ce..18aa2f2733628416e3d026e2ea51147567c85ec8 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py @@ -83,8 +83,6 @@ def main(): from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_BUSNAME as RE_BUSNAME from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_SERVICENAME as RE_SERVICENAME from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME - from lofar.sas.systemstatus.service.config import DEFAULT_SSDB_BUSNAME - from lofar.sas.systemstatus.service.config import DEFAULT_SSDB_SERVICENAME from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_BUSNAME from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_PREFIX @@ -114,12 +112,6 @@ def main(): help="Name of the resource estimator service. [default: %default]") parser.add_option("--otdb_busname", dest="otdb_busname", type="string", default=DEFAULT_OTDB_SERVICE_BUSNAME, help="Name of the bus on which the OTDB service listens, default: %default") parser.add_option("--otdb_servicename", dest="otdb_servicename", type="string", default=DEFAULT_OTDB_SERVICENAME, help="Name of the OTDB service, default: %default") - parser.add_option("--ssdb_busname", dest="ssdb_busname", type="string", - default=DEFAULT_SSDB_BUSNAME, - help="Name of the bus on which the ssdb service listens. [default: %default]") - parser.add_option("--ssdb_servicename", dest="ssdb_servicename", type="string", - default=DEFAULT_SSDB_SERVICENAME, - help="Name of the ssdb service. [default: %default]") parser.add_option("--ra_notification_busname", dest="ra_notification_busname", type="string", default=DEFAULT_RA_NOTIFICATION_BUSNAME, help="Name of the notification bus on which the resourceassigner publishes its notifications. [default: %default]") @@ -139,8 +131,6 @@ def main(): re_servicename=options.re_servicename, otdb_busname=options.otdb_busname, otdb_servicename=options.otdb_servicename, - ssdb_busname=options.ssdb_busname, - ssdb_servicename=options.ssdb_servicename, ra_notification_busname=options.ra_notification_busname, ra_notification_prefix=options.ra_notification_prefix, broker=options.broker) as assigner: diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py index 51a5758404655ace18b7efe4a7ded22134253d9c..c56572dfb8cc4df464b22a0f1d96db9b2254158f 100644 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py @@ -30,6 +30,8 @@ from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as DEFAULT_RADB_BUSNAME from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as DEFAULT_RADB_SERVICENAME +from lofar.sas.resourceassignment.resourceassigner.config import PIPELINE_CHECK_INTERVAL + logger = logging.getLogger(__name__) class ScheduleChecker(): @@ -75,7 +77,7 @@ class ScheduleChecker(): for task in active_pipelines: if task['endtime'] <= now: - new_endtime=now+timedelta(minutes=1) + new_endtime=now+timedelta(seconds=PIPELINE_CHECK_INTERVAL) logger.info("Extending endtime to %s for pipeline radb_id=%s otdb_id=%s", new_endtime, task['id'], task['otdb_id']) self._radbrpc.updateTaskAndResourceClaims(task['id'], endtime=new_endtime) except Exception as e: @@ -88,8 +90,8 @@ class ScheduleChecker(): for task in sq_pipelines: if task['starttime'] <= now: - logger.info("Moving ahead scheduled pipeline radb_id=%s otdb_id=%s to 1 minute from now", task['id'], task['otdb_id']) - self._radbrpc.updateTaskAndResourceClaims(task['id'], starttime=now+timedelta(seconds=60), endtime=now+timedelta(seconds=60+task['duration'])) + logger.info("Moving ahead scheduled pipeline radb_id=%s otdb_id=%s to %s seconds from now", task['id'], task['otdb_id'], PIPELINE_CHECK_INTERVAL) + self._radbrpc.updateTaskAndResourceClaims(task['id'], starttime=now+timedelta(seconds=PIPELINE_CHECK_INTERVAL), endtime=now+timedelta(seconds=PIPELINE_CHECK_INTERVAL+task['duration'])) updated_task = self._radbrpc.getTask(task['id']) if updated_task['status'] != u'scheduled': logger.warn("Moving of pipeline radb_id=%s otdb_id=%s caused the status to change to %s", updated_task['id'], updated_task['otdb_id'], updated_task['status']) @@ -97,7 +99,7 @@ class ScheduleChecker(): except Exception as e: logger.error("Error while checking scheduled pipelines: %s", e) - for i in range(60): + for i in range(PIPELINE_CHECK_INTERVAL): sleep(1) if not self._running: diff --git a/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py b/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py index 90a9cb2828a55b6bf0dc68ba2ba7bf79c1495935..eec15481b8c5f7317d17e6fc25ef717c79ea7c3e 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py +++ b/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py @@ -47,10 +47,6 @@ with patch('lofar.sas.resourceassignment.resourceassignmentservice.rpc.RARPC', a #give pre-cooked answer depending on called service if servicename == 'ResourceEstimation': return {'1290472': {'observation': {'bandwidth': {'total_size': 9372800}, 'storage': {'total_size': 140592000, 'output_files': {'is': {'is_nr_stokes': 1, 'is_file_size': 36864000, 'nr_of_is_files': 1}, 'uv': {'nr_of_uv_files': 50, 'uv_file_size': 2074560}, 'saps': [{'sap_nr': 0, 'properties': {'nr_of_uv_files': 50, 'nr_of_is_files': 1}}]}}}}}, "OK" - elif servicename == 'SSDBService.GetActiveGroupNames': - return {0:'storagenodes', 1:'computenodes', 2:'archivenodes', 3:'locusnodes', 4:'cep4'}, "OK" - elif servicename == 'SSDBService.GetHostForGID': - return {u'groupname': u'cep4', u'nodes': [{u'claimedspace': 0, u'totalspace': 702716, u'statename': u'Active', u'usedspace': 23084, u'id': 1, u'groupname': u'cep4', u'path': u'/lustre', u'hostname': u'lustre001'}]}, "OK" return None, None diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py index 123e4c1c3dbd6998242b101bd7f544caf7836cbd..e013b2f037781ff4bbf888d00171f41617463cd0 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py @@ -173,7 +173,7 @@ class RADatabase: if isinstance(task_ids, int): # just a single id conditions.append('id = %s') qargs.append(task_ids) - elif len(task_ids) > 0: # list of id's + else: #assume a list/enumerable of id's conditions.append('id in %s') qargs.append(tuple(task_ids)) @@ -191,12 +191,6 @@ class RADatabase: query += ' WHERE ' + ' AND '.join(conditions) tasks = list(self._executeQuery(query, qargs, fetch=_FETCH_ALL)) - predIds = self.getTaskPredecessorIds() - succIds = self.getTaskSuccessorIds() - - for task in tasks: - task['predecessor_ids'] = predIds.get(task['id'], []) - task['successor_ids'] = succIds.get(task['id'], []) return tasks @@ -220,6 +214,13 @@ class RADatabase: task = dict(result) if result else None + if task: + if task['predecessor_ids'] is None: + task['predecessor_ids'] = [] + + if task['successor_ids'] is None: + task['successor_ids'] = [] + return task def _convertTaskTypeAndStatusToIds(self, task_status, task_type): @@ -363,13 +364,20 @@ class RADatabase: VALUES (%s, %s) RETURNING id;''' - id = self._executeQuery(query, (task_id, predecessor_id), fetch=_FETCH_ONE)['id'] + result = self._executeQuery(query, (task_id, predecessor_id), fetch=_FETCH_ONE) + if commit: self.commit() - return id + + if result and 'id' in result: + return result['id'] + + return None def insertTaskPredecessors(self, task_id, predecessor_ids, commit=True): ids = [self.insertTaskPredecessor(task_id, predecessor_id, False) for predecessor_id in predecessor_ids] + ids = [x for x in ids if x is not None] + if commit: self.commit() return ids @@ -507,7 +515,7 @@ class RADatabase: if isinstance(resource_ids, int): # just a single id conditions.append('id = %s') qargs.append(resource_ids) - elif len(resource_ids) > 0: # a list of id's + else: #assume a list/enumerable of id's conditions.append('id in %s') qargs.append(tuple(resource_ids)) @@ -690,7 +698,7 @@ class RADatabase: if isinstance(claim_ids, int): # just a single id conditions.append('rcpv.resource_claim_id = %s') qargs.append(claim_ids) - elif len(claim_ids) > 0: # list of id's + else: #assume a list/enumerable of id's conditions.append('rcpv.resource_claim_id in %s') qargs.append(tuple(claim_ids)) @@ -825,7 +833,7 @@ class RADatabase: if isinstance(claim_ids, int): # just a single id conditions.append('id = %s') qargs.append(claim_ids) - elif len(claim_ids) > 0: # list of id's + else: #assume a list/enumerable of id's conditions.append('id in %s') qargs.append(tuple(claim_ids)) @@ -841,7 +849,7 @@ class RADatabase: if isinstance(resource_ids, int): # just a single id conditions.append('resource_id = %s') qargs.append(resource_ids) - elif len(resource_ids) > 0: # list of id's + else: #assume a list/enumerable of id's conditions.append('resource_id in %s') qargs.append(tuple(resource_ids)) @@ -1188,29 +1196,31 @@ class RADatabase: if claim['id'] not in claimDict: resource2otherClaims[claim['resource_id']].append(claim) - for claim_id, claim in claimDict.items(): - task_id = claim['task_id'] - task_status = taskDict[task_id]['status'] - if task_status in ['prepared', 'approved', 'on_hold', 'conflict', 'prescheduled']: - claimSize = claim['claim_size'] - resource_id = claim['resource_id'] - resource = resources[resource_id] - resourceOtherClaims = resource2otherClaims[resource_id] - totalOtherClaimSize = sum(c['claim_size'] for c in resourceOtherClaims) - - logger.info('resource_id=%s claimSize=%s totalOtherClaimSize=%s total=%s available_capacity=%s' % - (resource_id, - claimSize, - totalOtherClaimSize, - totalOtherClaimSize + claimSize, - resource['available_capacity'])) - - if totalOtherClaimSize + claimSize >= resource['available_capacity']: - logger.info("totalOtherClaimSize (%s) + claimSize (%s) >= resource_available_capacity %s for claim %s on resource %s %s for task %s", - totalOtherClaimSize, claimSize, resource['available_capacity'], claim_id, resource_id, resource['name'], task_id) - newClaimStatuses[conflistStatusId].append(claim_id) - elif claim['status_id'] != allocatedStatusId: - newClaimStatuses[claimedStatusId].append(claim_id) + # TODO: claim conflict computions below are incorrect + # they cause observations/pipelines to go to conflict, although there is ample space + #for claim_id, claim in claimDict.items(): + #task_id = claim['task_id'] + #task_status = taskDict[task_id]['status'] + #if task_status in ['prepared', 'approved', 'on_hold', 'conflict', 'prescheduled']: + #claimSize = claim['claim_size'] + #resource_id = claim['resource_id'] + #resource = resources[resource_id] + #resourceOtherClaims = resource2otherClaims[resource_id] + #totalOtherClaimSize = sum(c['claim_size'] for c in resourceOtherClaims) + + #logger.info('resource_id=%s claimSize=%s totalOtherClaimSize=%s total=%s available_capacity=%s' % + #(resource_id, + #claimSize, + #totalOtherClaimSize, + #totalOtherClaimSize + claimSize, + #resource['available_capacity'])) + + #if totalOtherClaimSize + claimSize >= resource['available_capacity']: + #logger.info("totalOtherClaimSize (%s) + claimSize (%s) >= resource_available_capacity %s for claim %s on resource %s %s for task %s", + #totalOtherClaimSize, claimSize, resource['available_capacity'], claim_id, resource_id, resource['name'], task_id) + #newClaimStatuses[conflistStatusId].append(claim_id) + #elif claim['status_id'] != allocatedStatusId: + #newClaimStatuses[claimedStatusId].append(claim_id) if newClaimStatuses: for status_id, claim_ids in newClaimStatuses.items(): @@ -1275,7 +1285,7 @@ class RADatabase: if isinstance(task_ids, int): # just a single id conditions.append('task_id = %s') qargs.append(task_ids) - elif len(task_ids) > 0: # list of id's + else: #assume a list/enumerable of id's conditions.append('task_id in %s') qargs.append(tuple(task_ids)) @@ -1318,7 +1328,7 @@ class RADatabase: if isinstance(claim_ids, int): # just a single id conditions.append('id = %s') qargs.append(claim_ids) - elif len(claim_ids) > 0: # list of id's + else: #assume a list/enumerable of id's conditions.append('id in %s') qargs.append(tuple(claim_ids)) @@ -1326,7 +1336,7 @@ class RADatabase: if isinstance(resource_ids, int): # just a single id conditions.append('resource_id = %s') qargs.append(resource_ids) - elif len(resource_ids) > 0: # list of id's + else: #assume a list/enumerable of id's conditions.append('resource_id in %s') qargs.append(tuple(resource_ids)) @@ -1334,7 +1344,7 @@ class RADatabase: if isinstance(task_ids, int): # just a single id conditions.append('task_id = %s') qargs.append(task_ids) - elif len(task_ids) > 0: # list of id's + else: #assume a list/enumerable of id's conditions.append('task_id in %s') qargs.append(tuple(task_ids)) diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/add_resource_allocation_statics.sql b/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/add_resource_allocation_statics.sql index 512e5634beb9f8ab4385bd79d685582546c27d81..6e8af1e2ecca351544edd0ee190244145e0c5846 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/add_resource_allocation_statics.sql +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/add_resource_allocation_statics.sql @@ -7,7 +7,7 @@ INSERT INTO resource_allocation.task_status VALUES (200, 'prepared'), (300, 'app (1150, 'error'), (1200, 'obsolete'); -- This is the list from OTDB, we'll need to merge it with the list from MoM in the future, might use different indexes? INSERT INTO resource_allocation.task_type VALUES (0, 'observation'),(1, 'pipeline'); -- We'll need more types INSERT INTO resource_allocation.resource_claim_status VALUES (0, 'claimed'), (1, 'allocated'), (2, 'conflict'); -INSERT INTO resource_allocation.resource_claim_property_type VALUES (0, 'nr_of_is_files'),(1, 'nr_of_cs_files'),(2, 'nr_of_uv_files'),(3, 'nr_of_im_files'),(4, 'nr_of_img_files'),(5, 'nr_of_pulp_files'),(6, 'nr_of_cs_stokes'),(7, 'nr_of_is_stokes'),(8, 'is_file_size'),(9, 'cs_file_size'),(10, 'uv_file_size'),(11, 'im_file_size'),(12, 'img_file_size'),(13, 'nr_of_pulp_files'),(14, 'nr_of_tabs'),(15, 'start_sb_nr'),(16,'uv_otdb_id'),(17,'cs_otdb_id'),(18,'is_otdb_id'),(19,'im_otdb_id'),(20,'img_otdb_id'),(21,'pulp_otdb_id'); +INSERT INTO resource_allocation.resource_claim_property_type VALUES (0, 'nr_of_is_files'),(1, 'nr_of_cs_files'),(2, 'nr_of_uv_files'),(3, 'nr_of_im_files'),(4, 'nr_of_img_files'),(5, 'nr_of_pulp_files'),(6, 'nr_of_cs_stokes'),(7, 'nr_of_is_stokes'),(8, 'is_file_size'),(9, 'cs_file_size'),(10, 'uv_file_size'),(11, 'im_file_size'),(12, 'img_file_size'),(13, 'nr_of_pulp_files'),(14, 'nr_of_tabs'),(15, 'start_sb_nr'),(16,'uv_otdb_id'),(17,'cs_otdb_id'),(18,'is_otdb_id'),(19,'im_otdb_id'),(20,'img_otdb_id'),(21,'pulp_otdb_id'),(22, 'is_tab_nr'); INSERT INTO resource_allocation.resource_claim_property_io_type VALUES (0, 'output'),(1, 'input'); INSERT INTO resource_allocation.config VALUES (0, 'max_fill_percentage_cep4', '85.00'), (1, 'claim_timeout', '172800'), (2, 'min_inter_task_delay', '60'); -- Just some values 172800 is two days in seconds INSERT INTO resource_allocation.conflict_reason diff --git a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/CMakeLists.txt b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/CMakeLists.txt index ffbddce6f5c1b7fbde330aa99ef9bcfd134e88fd..1bfa8364628aaa97db4f86192c70c9e8dcbad339 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/CMakeLists.txt +++ b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/CMakeLists.txt @@ -31,6 +31,7 @@ set(app_files static/favicon.ico static/app/app.js static/app/controllers/datacontroller.js + static/app/controllers/cleanupcontroller.js static/app/controllers/gridcontroller.js static/app/controllers/ganttresourcecontroller.js static/app/controllers/chartresourceusagecontroller.js diff --git a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/static/app/controllers/cleanupcontroller.js b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/static/app/controllers/cleanupcontroller.js index 1e51df29558a89e22cf9395f2e54676f28dad72d..45321de54451097a8ed5b6cae9d0e3178b09e605 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/static/app/controllers/cleanupcontroller.js +++ b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/static/app/controllers/cleanupcontroller.js @@ -161,7 +161,7 @@ cleanupControllerMod.controller('CleanupController', ['$scope', '$uibModal', '$h <highchart id="chart_disk_usage" config="diskUsageChartConfig" style="width: 960px; height: 720px;" ></highchart>\ <p>\ <span style="margin-right:50px">Last updated at: {{leastRecentCacheTimestamp | date }}</span>\ - <button class="btn btn-primary glyphicon glyphicon-level-up" type="button" ng-click="up()" title="Up one level"></button>\ + <button class="btn btn-primary glyphicon glyphicon-level-up" type="button" ng-click="up()" title="Up one level" ng-if="watchedObjectType!=\'projects\'"></button>\ </p>\ </div>\ <div class="modal-footer">\ @@ -214,6 +214,9 @@ cleanupControllerMod.controller('CleanupController', ['$scope', '$uibModal', '$h enabled: false } }, + legend: { + enabled: false + }, plotOptions: { pie: { allowPointSelect: true, diff --git a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/static/app/controllers/datacontroller.js b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/static/app/controllers/datacontroller.js index b9a002dd8d0b078c9a3ac1a970d8e282e422fd80..e9af7da1c89ff7b4625d8f7b020abab012b3b894 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/static/app/controllers/datacontroller.js +++ b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/static/app/controllers/datacontroller.js @@ -338,8 +338,14 @@ angular.module('raeApp').factory("dataService", ['$http', '$q', function($http, }; self.getTaskDiskUsageByOTDBId = function(otdb_id) { - var task = self.tasks.find(function(t) { return t.otdb_id == otdb_id; }); - return self.getTaskDiskUsage(task); + var defer = $q.defer(); + $http.get('/rest/tasks/otdb/' + otdb_id + '/diskusage').success(function(result) { + defer.resolve(result); + }).error(function(result) { + defer.resolve({found:false}); + }); + + return defer.promise; }; self.getTaskDiskUsage = function(task) { @@ -834,7 +840,7 @@ dataControllerMod.controller('DataController', }; }; - $scope.onZoomTimespanChanged = function(span) { + $scope.onZoomTimespanChanged = function() { var viewTimeSpanInmsec = dataService.viewTimeSpan.to.getTime() - dataService.viewTimeSpan.from.getTime(); var focusTime = new Date(dataService.viewTimeSpan.from + 0.5*viewTimeSpanInmsec); diff --git a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/static/app/controllers/ganttprojectcontroller.js b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/static/app/controllers/ganttprojectcontroller.js index 878ab4b7f9a09d0f62dcde970bff23d705ccab6b..09fd23f3e00d779e9a9082b73effb464b49ee871 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/static/app/controllers/ganttprojectcontroller.js +++ b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/static/app/controllers/ganttprojectcontroller.js @@ -38,7 +38,7 @@ ganttProjectControllerMod.controller('GanttProjectController', ['$scope', 'dataS sideMode: 'Tree', autoExpand: 'both', taskOutOfRange: 'truncate', - dependencies: true, + dependencies: false, api: function(api) { // API Object is used to control methods and events from angular-gantt. $scope.api = api; @@ -147,6 +147,9 @@ ganttProjectControllerMod.controller('GanttProjectController', ['$scope', 'dataS $scope.options.viewScale = '15 minutes'; } + //only enable dependencies (arrows between tasks) in detailed view + $scope.options.dependencies = (fullTimespanInMinutes <= 3*60); + for(var i = 0; i < numTasks; i++) { var task = tasks[i]; @@ -204,7 +207,7 @@ ganttProjectControllerMod.controller('GanttProjectController', ['$scope', 'dataS rowTask.classes += ' task-selected-task'; } - if(task.predecessor_ids && task.predecessor_ids.length > 0) { + if($scope.options.dependencies && task.predecessor_ids && task.predecessor_ids.length > 0) { rowTask['dependencies'] = []; for(var predId of task.predecessor_ids) { rowTask['dependencies'].push({'from': predId}); diff --git a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/static/app/controllers/gridcontroller.js b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/static/app/controllers/gridcontroller.js index 3453ee2b01cb00862ed04754ff6840e1148a2a0f..e64b3574dbb522c36d889ccc511a9e2e69c97134 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/static/app/controllers/gridcontroller.js +++ b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/static/app/controllers/gridcontroller.js @@ -303,7 +303,7 @@ $scope.columns = [ $scope.$watch('dataService.selected_task_ids', onSelectedTaskIdsChanged, true);} ]); -gridControllerMod.directive('contextMenu', ['$document', function($document) { +gridControllerMod.directive('contextMenu', ['$document', '$window', function($document, $window) { return { restrict: 'A', scope: { @@ -346,19 +346,20 @@ gridControllerMod.directive('contextMenu', ['$document', function($document) { var ulElement = angular.element('<ul class="dropdown-menu" role="menu" style="left:' + event.clientX + 'px; top:' + event.clientY + 'px; z-index: 100000; display:block;"></ul>'); contextmenuElement.append(ulElement); -// var liElement = angular.element('<li><a href="#">Copy Task</a></li>'); -// ulElement.append(liElement); -// liElement.on('click', function() { -// closeContextMenu(); -// dataService.copyTask(task); -// }); - - var liElement = angular.element('<li><a href="#">Delete data</a></li>'); - ulElement.append(liElement); - liElement.on('click', function() { - closeContextMenu(); - cleanupCtrl.deleteSelectedTasksDataWithConfirmation(); - }); + if(task.type == 'observation' && dataService.config.inspection_plots_base_url) { + var liElement = angular.element('<li><a href="#">Inspection Plots</a></li>'); + ulElement.append(liElement); + liElement.on('click', function() { + closeContextMenu(); + var tasks = dataService.selected_task_ids.map(function(t_id) { return dataService.taskDict[t_id]; }); + for(var t of tasks) { + if(t) { + var url = dataService.config.inspection_plots_base_url + '/' + t.otdb_id; + $window.open(url, '_blank'); + } + } + }); + } var liContent = dataService.selected_task_ids.length == 1 ? '<li><a href="#">Show disk usage</a></li>' : '<li><a href="#" style="color:#aaaaaa">Show disk usage</a></li>' var liElement = angular.element(liContent); @@ -370,6 +371,13 @@ gridControllerMod.directive('contextMenu', ['$document', function($document) { }); } + var liElement = angular.element('<li><a href="#">Delete data</a></li>'); + ulElement.append(liElement); + liElement.on('click', function() { + closeContextMenu(); + cleanupCtrl.deleteSelectedTasksDataWithConfirmation(); + }); + var closeContextMenu = function(cme) { contextmenuElement.remove(); diff --git a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/static/app/gantt-plugins/angular-gantt-contextmenu-plugin.js b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/static/app/gantt-plugins/angular-gantt-contextmenu-plugin.js index af055ba52dcdb592b78edf04ae9b20e157101922..33c7cb4d4343478167ad8846381754173584b884 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/static/app/gantt-plugins/angular-gantt-contextmenu-plugin.js +++ b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/static/app/gantt-plugins/angular-gantt-contextmenu-plugin.js @@ -1,6 +1,6 @@ (function(){ 'use strict'; - angular.module('gantt.contextmenu', ['gantt', 'gantt.contextmenu.templates']).directive('ganttContextmenu', ['$compile', '$document', function($compile, $document) { + angular.module('gantt.contextmenu', ['gantt', 'gantt.contextmenu.templates']).directive('ganttContextmenu', ['$compile', '$document', '$window', function($compile, $document, $window) { return { restrict: 'E', require: '^gantt', @@ -30,10 +30,13 @@ var cleanupCtrl = dScope.scope.$parent.cleanupCtrl; var docElement = angular.element($document); - if(dScope.task.model.raTask) { - if(!dataService.isTaskIdSelected(dScope.task.model.raTask.id)) { - dataService.setSelectedTaskId(dScope.task.model.raTask.id); - } + var task = dScope.task.model.raTask; + + if(!task) + return; + + if(!dataService.isTaskIdSelected(task.id)) { + dataService.setSelectedTaskId(task.id); } //search for already existing contextmenu element @@ -58,15 +61,23 @@ // liElement.on('click', function() { // closeContextMenu(); // //TODO: remove link to dataService in this generic plugin -// dataService.copyTask(dScope.task.model.raTask); +// dataService.copyTask(task); // }); - var liElement = angular.element('<li><a href="#">Delete data</a></li>'); - ulElement.append(liElement); - liElement.on('click', function() { - closeContextMenu(); - cleanupCtrl.deleteTaskDataWithConfirmation(dScope.task.model.raTask); - }); + if(task.type == 'observation' && dataService.config.inspection_plots_base_url) { + var liElement = angular.element('<li><a href="#">Inspection Plots</a></li>'); + ulElement.append(liElement); + liElement.on('click', function() { + closeContextMenu(); + var tasks = dataService.selected_task_ids.map(function(t_id) { return dataService.taskDict[t_id]; }); + for(var t of tasks) { + if(t) { + var url = dataService.config.inspection_plots_base_url + '/' + t.otdb_id; + $window.open(url, '_blank'); + } + } + }); + } var liContent = dataService.selected_task_ids.length == 1 ? '<li><a href="#">Show disk usage</a></li>' : '<li><a href="#" style="color:#aaaaaa">Show disk usage</a></li>' var liElement = angular.element(liContent); @@ -74,10 +85,17 @@ if(dataService.selected_task_ids.length == 1) { liElement.on('click', function() { closeContextMenu(); - cleanupCtrl.showTaskDiskUsage(dScope.task.model.raTask); + cleanupCtrl.showTaskDiskUsage(task); }); } + var liElement = angular.element('<li><a href="#">Delete data</a></li>'); + ulElement.append(liElement); + liElement.on('click', function() { + closeContextMenu(); + cleanupCtrl.deleteSelectedTasksDataWithConfirmation(); + }); + var closeContextMenu = function() { contextmenuElement.remove(); diff --git a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/templates/index.html b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/templates/index.html index 61024abac9bd90f74bac3bb78a5576f380fc8640..c4267fd13be79ab693004f393c1998ec21c64ee5 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/templates/index.html +++ b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/templates/index.html @@ -58,27 +58,27 @@ <div class="col-md-3"> <label>From:</label> <p class="input-group"> - <input type="text" class="form-control" style="min-width:100px" uib-datepicker-popup="yyyy-MM-dd" ng-model="dataService.viewTimeSpan.from" is-open="viewFromDatePopupOpened" datepicker-options="dateOptions" ng-required="true" close-text="Close" close-on-date-selection="false"/> + <input type="text" class="form-control" style="min-width:100px" uib-datepicker-popup="yyyy-MM-dd" ng-model="$parent.dataService.viewTimeSpan.from" is-open="viewFromDatePopupOpened" datepicker-options="dateOptions" ng-required="true" close-text="Close" close-on-date-selection="false"/> <span class="input-group-btn"> <button type="button" class="btn btn-default" ng-click="openViewFromDatePopup()"><i class="glyphicon glyphicon-calendar"></i></button> </span> - <uib-timepicker ng-model="dataService.viewTimeSpan.from" ng-change="onViewTimeSpanFromChanged()" hour-step="1" minute-step="5" show-meridian="false" show-spinners="false"></uib-timepicker> + <uib-timepicker ng-model="$parent.dataService.viewTimeSpan.from" ng-change="$parent.onViewTimeSpanFromChanged()" hour-step="1" minute-step="5" show-meridian="false" show-spinners="false"></uib-timepicker> </p> </div> <div class="col-md-3"> <label>To:</label> <p class="input-group"> - <input type="text" class="form-control" style="min-width:100px" uib-datepicker-popup="yyyy-MM-dd" ng-model="dataService.viewTimeSpan.to" is-open="viewToDatePopupOpened" datepicker-options="dateOptions" ng-required="true" close-text="Close" close-on-date-selection="false"/> + <input type="text" class="form-control" style="min-width:100px" uib-datepicker-popup="yyyy-MM-dd" ng-model="$parent.dataService.viewTimeSpan.to" is-open="viewToDatePopupOpened" datepicker-options="dateOptions" ng-required="true" close-text="Close" close-on-date-selection="false"/> <span class="input-group-btn"> <button type="button" class="btn btn-default" ng-click="openViewToDatePopup()"><i class="glyphicon glyphicon-calendar"></i></button> </span> - <uib-timepicker ng-model="dataService.viewTimeSpan.to" ng-change="onViewTimeSpanToChanged()" hour-step="1" minute-step="5" show-meridian="false" show-spinners="false"></uib-timepicker> + <uib-timepicker ng-model="$parent.dataService.viewTimeSpan.to" ng-change="$parent.onViewTimeSpanToChanged()" hour-step="1" minute-step="5" show-meridian="false" show-spinners="false"></uib-timepicker> </p> </div> <div class="col-md-1"> <label>Scroll:</label> <p class="input-group"> - <label title="Automatically scroll 'From' and 'To' to watch live events" style="padding-right: 4px; vertical-align: top;">Live <input type="checkbox" ng-model="dataService.autoFollowNow"></label> + <label title="Automatically scroll 'From' and 'To' to watch live events" style="padding-right: 4px; vertical-align: top;">Live <input type="checkbox" ng-model="$parent.dataService.autoFollowNow"></label> <button title="Scroll back in time" type="button" class="btn btn-default" ng-click="scrollBack()"><i class="glyphicon glyphicon-step-backward"></i></button> <button title="Scroll forward in time" type="button" class="btn btn-default" ng-click="scrollForward()"><i class="glyphicon glyphicon-step-forward"></i></button> </p> @@ -86,7 +86,7 @@ <div class="col-md-2"> <label>Zoom:</label> <p class="input-group"> - <select class="form-control" ng-model=zoomTimespan ng-options="option.name for option in zoomTimespans track by option.value" ng-change="onZoomTimespanChanged(span)"></select> + <select class="form-control" ng-model="$parent.zoomTimespan" ng-options="option.name for option in $parent.zoomTimespans track by option.value" ng-change="$parent.onZoomTimespanChanged()"></select> </p> </div> <div class="col-md-1"> @@ -122,7 +122,7 @@ allow-row-switching="false"> </gantt-movable> <gantt-tooltips enabled="true" date-format="'YYYY-MM-DD HH:mm'"></gantt-tooltips> - <gantt-dependencies enabled="true"></gantt-dependencies> + <gantt-dependencies enabled="options.dependencies"></gantt-dependencies> <gantt-contextmenu enabled="true"></gantt-contextmenu> </div> </div> diff --git a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/webservice.py b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/webservice.py index e7386979280660ed985c93c75903b07211be42e8..d2492c6dd73a971856f354deca6bd2d6446896a6 100755 --- a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/webservice.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/webservice.py @@ -118,7 +118,8 @@ def index(): @app.route('/rest/config') @gzipped def config(): - config = {'mom_base_url':''} + config = {'mom_base_url':'', + 'inspection_plots_base_url':'https://proxy.lofar.eu/inspect/HTML/'} if isProductionEnvironment(): config['mom_base_url'] = 'https://lofar.astron.nl/mom3' diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py index adc887b7751550612dcd29fbeef1388afec08f10..4a1519bbb4e27e89982075eb80fcbdd265022843 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py @@ -24,6 +24,7 @@ """ import logging import copy +import pprint from datetime import datetime from lofar.common.datetimeutils import totalSeconds from datetime import datetime, timedelta @@ -110,7 +111,7 @@ class BaseResourceEstimator(object): else: raise ValueError('The parset is incomplete') result = {} - result[self.name] = {} - result[self.name]['storage'] = estimates['storage'] - result[self.name]['bandwidth'] = estimates['bandwidth'] + result[self.name] = estimates + logger.info('Estimates for %s:' % self.name) + logger.info(pprint.pformat(result)) return result diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py index 5d7cb3a07ee81923c567dc8c6193b36adf8b18da..1e8cb50a5a3ece23b79ecd78bc6c4e769353a235 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py @@ -115,7 +115,11 @@ class CalibrationPipelineResourceEstimator(BaseResourceEstimator): #total_data_size = result['storage']['output_files']['uv']['nr_of_uv_files'] * result['storage']['output_files']['uv']['uv_file_size'] + \ # result['storage']['output_files']['im']['nr_of_im_files'] * result['storage']['output_files']['im']['im_file_size'] # bytes total_bandwidth = int(ceil((total_data_size * 8) / duration)) # bits/second - result['storage']['total_size'] = total_data_size - result['bandwidth']['total_size'] = total_bandwidth + if total_data_size: + result['storage']['total_size'] = total_data_size + result['bandwidth']['total_size'] = total_bandwidth + else: + result['errors'].append('Total data size is zero!') + logger.warning('ERROR: A datasize of zero was calculated!') return result diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py index 5314e3b81ecd5843da060da7a4c0a4855255c626..3dfaaa10eea3cc1c32269cc0387c759a58b8abee 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py @@ -98,7 +98,11 @@ class ImagePipelineResourceEstimator(BaseResourceEstimator): # count total data size total_data_size = result['storage']['output_files']['img']['nr_of_img_files'] * result['storage']['output_files']['img']['img_file_size'] # bytes total_bandwidth = int(ceil((total_data_size * 8) / duration)) # bits/second - result['storage']['total_size'] = total_data_size - result['bandwidth']['total_size'] = total_bandwidth + if total_data_size: + result['storage']['total_size'] = total_data_size + result['bandwidth']['total_size'] = total_bandwidth + else: + result['errors'].append('Total data size is zero!') + logger.warning('ERROR: A datasize of zero was calculated!') return result diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py index 6014e46e9fc315cc5e52171b09d1924a67e45f66..07779a61e5cea217d4564032cae8b306d53d7823 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py @@ -99,6 +99,10 @@ class LongBaselinePipelineResourceEstimator(BaseResourceEstimator): # count total data size total_data_size = result['storage']['output_files']['uv']['nr_of_uv_files'] * result['storage']['output_files']['uv']['uv_file_size'] # bytes total_bandwidth = int(ceil((total_data_size * 8) / duration)) # bits/second - result['storage']['total_size'] = total_data_size - result['bandwidth']['total_size'] = total_bandwidth + if total_data_size: + result['storage']['total_size'] = total_data_size + result['bandwidth']['total_size'] = total_bandwidth + else: + result['errors'].append('Total data size is zero!') + logger.warning('ERROR: A datasize of zero was calculated!') return result diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py index f94857845ae8a8d2f9de8f6927671c0b77d36114..a893e1bac20a9e8610c8536ee002c5541a21bfbc 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py @@ -73,7 +73,7 @@ class ObservationResourceEstimator(BaseResourceEstimator): logger.info('parset: %s ' % parset) duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime')) - result = {} + result = {'errors': [], 'storage': {'total_size': 0}, 'bandwidth': {'total_size': 0}} output_files = {} correlated_size, correlated_bandwidth, output_files_uv, correlated_saps = self.correlated(parset, duration) coherentstokes_size, coherentstokes_bandwidth, output_files_cs, coherentstokes_saps = self.coherentstokes(parset, duration) @@ -95,16 +95,33 @@ class ObservationResourceEstimator(BaseResourceEstimator): sap['properties'].update(coherentstokes_saps[sap_nr]) if sap_nr in incoherentstokes_saps: sap['properties'].update(incoherentstokes_saps[sap_nr]) + if 'nr_of_tabs' in sap['properties']: # These are coherent TABs + sap['properties']['nr_of_tabs'] = sap['properties']['nr_of_tabs'] + 1 + else: + sap['properties']['nr_of_tabs'] = 1 # Only an incoherent TAB for this SAP output_files['saps'].append(sap) + total_data_size = correlated_size + coherentstokes_size + incoherentstokes_size - result['storage'] = {'total_size': total_data_size, 'output_files': output_files} - result['bandwidth'] = {'total_size': correlated_bandwidth + coherentstokes_bandwidth + incoherentstokes_bandwidth} + if total_data_size and output_files: + result['storage'] = {'total_size': total_data_size, 'output_files': output_files} + result['bandwidth'] = {'total_size': correlated_bandwidth + coherentstokes_bandwidth + incoherentstokes_bandwidth} + else: + if not total_data_size: + result['errors'].append('Total data size is zero!') + logger.warning('ERROR: A datasize of zero was calculated!') + if not output_files: + result['errors'].append('No output files!') + logger.warning('ERROR: No output files were calculated!') return result def correlated(self, parset, duration): """ Estimate number of files, file size and bandwidth needed for correlated data """ + if not parset.getBool('Observation.DataProducts.Output_Correlated.enabled'): + logger.info("No correlated data") + return (0,0, {}, {}) + logger.info("calculating correlated datasize") size_of_header = 512 #TODO More magic numbers (probably from Alwin). ScS needs to check these. They look ok though. size_of_overhead = 600000 @@ -142,6 +159,7 @@ class ObservationResourceEstimator(BaseResourceEstimator): """ Estimate number of files, file size and bandwidth needed for coherent stokes """ if not parset.getBool('Observation.DataProducts.Output_CoherentStokes.enabled'): + logger.info("No coherent stokes data") return (0,0, {}, {}) logger.info("calculate coherentstokes datasize") @@ -173,7 +191,7 @@ class ObservationResourceEstimator(BaseResourceEstimator): logger.info("checking TAB {}".format(tab_nr)) if parset.getBool("Observation.Beam[%d].TiedArrayBeam[%d].coherent" % (sap_nr, tab_nr)): logger.info("adding coherentstokes size") - nr_stokes = nr_coherent #TODO what does min mean here? + nr_stokes = nr_coherent # TODO, there used to be a function with min() here? total_nr_tabs += 1 total_nr_stokes += nr_stokes nr_files += int(nr_stokes * ceil(nr_subbands / float(subbands_per_file))) @@ -194,9 +212,10 @@ class ObservationResourceEstimator(BaseResourceEstimator): nr_stokes = nr_tabs * nr_coherent total_nr_stokes += nr_stokes nr_files += int(nr_stokes * ceil(nr_subbands / float(subbands_per_file))) - - sap_files[sap_nr]= {'nr_of_cs_files': nr_files, 'nr_of_tabs': total_nr_tabs} - total_files += nr_files + + if nr_files: + sap_files[sap_nr]= {'nr_of_cs_files': nr_files, 'nr_of_tabs': total_nr_tabs} + total_files += nr_files nr_subbands_per_file = min(subbands_per_file, max_nr_subbands) size_per_file = int(nr_subbands_per_file * size_per_subband) @@ -212,6 +231,7 @@ class ObservationResourceEstimator(BaseResourceEstimator): """ Estimate number of files, file size and bandwidth needed for incoherentstokes """ if not parset.getBool('Observation.DataProducts.Output_IncoherentStokes.enabled'): + logger.info("No incoherent stokes data") return (0,0, {}, {}) logger.info("calculate incoherentstokes data size") @@ -222,7 +242,7 @@ class ObservationResourceEstimator(BaseResourceEstimator): channels_per_subband = parset.getInt(COBALT + 'Correlator.nrChannelsPerSubband', 64) #TODO should these have defaults? incoherent_channels_per_subband = parset.getInt(COBALT + 'BeamFormer.IncoherentStokes.nrChannelsPerSubband', 0) - nr_incoherent = 4 if incoherent_type in ('IQUV',) else 1 + nr_incoherent = 4 if incoherent_type in ('IQUV',) else 1 # Should this also include XXYY ? total_nr_stokes = 0 total_files = 0 @@ -235,16 +255,23 @@ class ObservationResourceEstimator(BaseResourceEstimator): nr_subbands = len(subbandList) max_nr_subbands = max(nr_subbands, max_nr_subbands) nr_files = 0 + is_tab_nr = -1 total_nr_tabs = parset.getInt('Observation.Beam[%d].nrTiedArrayBeams' % sap_nr) for tab_nr in xrange(total_nr_tabs): logger.info("checking TAB {}".format(tab_nr)) - if not parset.getBool("Observation.Beam[%d].TiedArrayBeam[%d].coherent" % (sap_nr, tab_nr)): - logger.info("adding incoherentstokes size") - total_nr_stokes += nr_incoherent - nr_files += int(nr_incoherent * ceil(nr_subbands / float(subbands_per_file))) - - sap_files[sap_nr]= {'nr_of_is_files': nr_files, 'nr_of_tabs': total_nr_tabs} - total_files += nr_files + if not parset.getBool("Observation.Beam[%d].TiedArrayBeam[%d].coherent" % (sap_nr, tab_nr)): #not coherent is incoherent + logger.info("Found incoherent stokes TAB: %i" % tab_nr) + if is_tab_nr >= 0: + logger.warning("TAB nr %i can't be incoherent as %i already is!" % (tab_nr, is_tab_nr)) + # TODO We need to generate an error here, or preferably check before we get here + else: + is_tab_nr = tab_nr + total_nr_stokes += nr_incoherent + nr_files += int(nr_incoherent * ceil(nr_subbands / float(subbands_per_file))) + + if nr_files: + sap_files[sap_nr] = {'nr_of_is_files': nr_files, 'is_tab_nr': is_tab_nr} + total_files += nr_files if incoherent_channels_per_subband > 0: channel_integration_factor = channels_per_subband / incoherent_channels_per_subband diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py index a024a9db3c34167516b8fa9bacbecb787d2b5846..c6ae0475d83413f03e884b8dc7fe2cf62153796a 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py @@ -99,6 +99,10 @@ class PulsarPipelineResourceEstimator(BaseResourceEstimator): # count total data size total_data_size = result['storage']['output_files']['pulp']['nr_of_pulp_files'] * result['storage']['output_files']['pulp']['pulp_file_size'] # bytes total_bandwidth = int(ceil((total_data_size * 8) / duration)) # bits/second - result['storage']['total_size'] = total_data_size - result['bandwidth']['total_size'] = total_bandwidth + if total_data_size: + result['storage']['total_size'] = total_data_size + result['bandwidth']['total_size'] = total_bandwidth + else: + result['errors'].append('Total data size is zero!') + logger.warning('ERROR: A datasize of zero was calculated!') return result diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py index 55cb19d86e5a8c6a01280c1da45e3d64c6c9596c..0dbadd795c067f7b029588c8a4d1f329064d6f95 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py @@ -6,6 +6,7 @@ Simple Service listening ''' import logging +import pprint from lofar.messaging import Service from lofar.messaging.Service import MessageHandlerInterface @@ -44,43 +45,52 @@ class ResourceEstimatorHandler(MessageHandlerInterface): if specification_tree['task_type'] == 'observation': return {str(otdb_id): self.add_id(self.observation.verify_and_estimate(parset), otdb_id)} elif specification_tree['task_type'] == 'pipeline': + if not 'predecessors' in specification_tree or not specification_tree['predecessors']: + logger.warning("Could not estimate %s because the pipeline has no predecessors" % (otdb_id)) + return {str(otdb_id): {'pipeline': {'errors': ["Could not estimate %s because the pipeline has no predecessors" % (otdb_id)]}}} + branch_estimates = {} for branch in specification_tree['predecessors']: - branch_estimates.update(self.get_subtree_estimate(branch)) - logger.info(str(branch_estimates)) + subtree_estimate = self.get_subtree_estimate(branch) + if subtree_estimate[str(branch['otdb_id'])][branch['task_type']]['errors']: + logger.warning("Could not estimate %s because predecessor %s has errors" % (otdb_id, branch)) + return {str(otdb_id): {'pipeline': {'errors': ["Could not estimate %s because predecessor %s has errors" % (otdb_id, branch)]}}} + branch_estimates.update(subtree_estimate) + logger.info(("Branch estimates for %s\n" % otdb_id) + pprint.pformat(branch_estimates)) + if specification_tree['task_subtype'] in ['averaging pipeline', 'calibration pipeline']: for id, estimate in branch_estimates.iteritems(): input_files = {} - if not 'im' in estimate.values()[0]['storage']['output_files'] and 'uv' in estimate.values()[0]['storage']['output_files']: # Not a calibrator pipeline + predecessor_output = estimate.values()[0]['storage']['output_files'] + if not 'im' in predecessor_output and 'uv' in predecessor_output: # Not a calibrator pipeline logger.info('found %s as the target of pipeline %s' % (id, otdb_id)) - input_files['uv'] = estimate.values()[0]['storage']['output_files']['uv'] - if 'saps' in estimate.values()[0]['storage']['output_files']: - input_files['saps'] = estimate.values()[0]['storage']['output_files']['saps'] - elif 'im' in estimate.values()[0]['storage']['output_files']: - input_files['im'] = estimate.values()[0]['storage']['output_files']['im'] + input_files['uv'] = predecessor_output['uv'] + if 'saps' in predecessor_output: + input_files['saps'] = predecessor_output['saps'] + elif 'im' in predecessor_output: + input_files['im'] = predecessor_output['im'] return {str(otdb_id): self.add_id(self.calibration_pipeline.verify_and_estimate(parset, input_files), otdb_id)} - + + if len(branch_estimates) > 1: + logger.warning('Pipeline %d should not have multiple predecessors: %s' % (otdb_id, branch_estimates.keys())) + return {str(otdb_id): {'pipeline': {'errors': ['Pipeline %d should not have multiple predecessors: %s' % (otdb_id, branch_estimates.keys())]}}} + predecessor_output = branch_estimates.values()[0].values()[0]['storage']['output_files'] + if specification_tree['task_subtype'] in ['imaging pipeline', 'imaging pipeline msss']: - if len(branch_estimates) > 1: - logger.error('Imaging pipeline %d should not have multiple predecessors: %s' % (otdb_id, branch_estimates.keys() ) ) - input_files = branch_estimates.values()[0].values()[0]['storage']['output_files'] + input_files = predecessor_output return {str(otdb_id): self.add_id(self.imaging_pipeline.verify_and_estimate(parset, input_files), otdb_id)} if specification_tree['task_subtype'] in ['long baseline pipeline']: - if len(branch_estimates) > 1: - logger.error('Long baseline pipeline %d should not have multiple predecessors: %s' % (otdb_id, branch_estimates.keys() ) ) - input_files = branch_estimates.values()[0].values()[0]['storage']['output_files'] + input_files = predecessor_output return {str(otdb_id): self.add_id(self.longbaseline_pipeline.verify_and_estimate(parset, input_files), otdb_id)} if specification_tree['task_subtype'] in ['pulsar pipeline']: - if len(branch_estimates) > 1: - logger.error('Pulsar pipeline %d should not have multiple predecessors: %s' % (otdb_id, branch_estimates.keys() ) ) - input_files = branch_estimates.values()[0].values()[0]['storage']['output_files'] + input_files = predecessor_output return {str(otdb_id): self.add_id(self.pulsar_pipeline.verify_and_estimate(parset, input_files), otdb_id)} else: # reservation, maintenance, system tasks? - logger.info("It's not a pipeline or observation: %s" % otdb_id) - return {str(otdb_id): {}} + logger.warning("ID %s is not a pipeline or observation." % otdb_id) + return {str(otdb_id): {specification_tree['task_type']: {'errors': ["ID %s is not a pipeline or observation." % otdb_id]}}} def _get_estimated_resources(self, specification_tree): """ Input is like: diff --git a/SubSystems/RAServices/CMakeLists.txt b/SubSystems/RAServices/CMakeLists.txt index 50dda7052a1b6d826727de81900b6881f5e4c0a0..9bc5419dca4f8ec26476abf99d60ee713436b1ca 100644 --- a/SubSystems/RAServices/CMakeLists.txt +++ b/SubSystems/RAServices/CMakeLists.txt @@ -12,7 +12,8 @@ lofar_package(RAServices ResourceAssignmentEstimator ResourceAssignmentService SystemStatusDatabase - SystemStatusService) + SystemStatusService + DataManagement) # supervisord config files install(FILES