diff --git a/SAS/Scheduler/src/Controller.cpp b/SAS/Scheduler/src/Controller.cpp index 60f17b01e47f87729001caf1801a31dbdeafdc72..5993c92f8105dbb6182719d466fc6ae5a6284e76 100644 --- a/SAS/Scheduler/src/Controller.cpp +++ b/SAS/Scheduler/src/Controller.cpp @@ -4943,10 +4943,12 @@ bool Controller::assignStorageToTask(Task *pTask) { } if (dpit->second == -1) { // the bandwidth required for a single file of this dataproduct exceeds the single storage node network bandwidth - pTask->setConflict(CONFLICT_STORAGE_SINGLE_FILE_BW_TOO_HIGH); - itsConflictDialog->addStorageConflict(pTask, dpit->first, CONFLICT_STORAGE_SINGLE_FILE_BW_TOO_HIGH); - bResult = false; - break; + if (pTask->getOutputDataproductCluster() != "CEP4") { + pTask->setConflict(CONFLICT_STORAGE_SINGLE_FILE_BW_TOO_HIGH); + itsConflictDialog->addStorageConflict(pTask, dpit->first, CONFLICT_STORAGE_SINGLE_FILE_BW_TOO_HIGH); + bResult = false; + break; + } } // get number of available nodes unsigned nrOfAvailableNodes(Controller::theSchedulerSettings.getNrOfStorageNodesAvailable()); @@ -4967,41 +4969,45 @@ bool Controller::assignStorageToTask(Task *pTask) { nrFilesPerNode = minNrFilesPerNode; singleFileBW = dfit->second.first / (double) pTask->getDuration().totalSeconds() * 8; // kbit/sec - // calculate the mininum number of files that have to fit on one storage node - if (nrOfAvailableNodes >= minNrOfNodes) { - if (minNrFilesPerNode <= maxNrFilesPerNode) { - while (!found_sufficient_nodes) { - preferred_locations = data.getStorageLocationOptions(dpit->first, pTask->getScheduledStart(), pTask->getScheduledEnd(), dfit->second.first, singleFileBW, nrFilesPerNode, sort_mode, preferred_nodes); - if (preferred_locations.size() <= minNrOfNodes) { - extra_locations = data.getStorageLocationOptions(dpit->first, pTask->getScheduledStart(), pTask->getScheduledEnd(), dfit->second.first, singleFileBW, nrFilesPerNode, sort_mode, extra_nodes); - if (((preferred_locations.size() + extra_locations.size()) * nrFilesPerNode >= nrFiles) && (preferred_locations.size() + extra_locations.size() >= minNrOfNodes)) { - found_sufficient_nodes = true; - break; - } - } - else { - if (preferred_locations.size() * nrFilesPerNode >= nrFiles) { - found_sufficient_nodes = true; - break; - } - } - if (++nrFilesPerNode > maxNrFilesPerNode) { // nr files per node too high will exceed bandwidth to node - break; - } - } + // calculate the mininum number of files that have to fit on one storage node + if (nrOfAvailableNodes >= minNrOfNodes) { + if (minNrFilesPerNode <= maxNrFilesPerNode) { + while (!found_sufficient_nodes) { + preferred_locations = data.getStorageLocationOptions(dpit->first, pTask->getScheduledStart(), pTask->getScheduledEnd(), dfit->second.first, singleFileBW, nrFilesPerNode, sort_mode, preferred_nodes); + if (preferred_locations.size() <= minNrOfNodes) { + extra_locations = data.getStorageLocationOptions(dpit->first, pTask->getScheduledStart(), pTask->getScheduledEnd(), dfit->second.first, singleFileBW, nrFilesPerNode, sort_mode, extra_nodes); + if (((preferred_locations.size() + extra_locations.size()) * nrFilesPerNode >= nrFiles) && (preferred_locations.size() + extra_locations.size() >= minNrOfNodes)) { + found_sufficient_nodes = true; + break; + } + } + else { + if (preferred_locations.size() * nrFilesPerNode >= nrFiles) { + found_sufficient_nodes = true; + break; + } + } + if (++nrFilesPerNode > maxNrFilesPerNode) { // nr files per node too high will exceed bandwidth to node + break; + } + } if (!found_sufficient_nodes) { - taskStorage->setStorageCheckResult(data.getLastStorageCheckResult()); - pTask->setConflict(CONFLICT_STORAGE_NO_OPTIONS); - itsConflictDialog->addStorageConflict(pTask, dpit->first, CONFLICT_STORAGE_NO_OPTIONS); - bResult = false; - break; + if (pTask->getOutputDataproductCluster() != "CEP4") { + taskStorage->setStorageCheckResult(data.getLastStorageCheckResult()); + pTask->setConflict(CONFLICT_STORAGE_NO_OPTIONS); + itsConflictDialog->addStorageConflict(pTask, dpit->first, CONFLICT_STORAGE_NO_OPTIONS); + bResult = false; + break; + } } } else { - pTask->setConflict(CONFLICT_STORAGE_TOO_FEW_NODES); - itsConflictDialog->addStorageConflict(pTask, dpit->first, CONFLICT_STORAGE_TOO_FEW_NODES); - bResult = false; - break; + if (pTask->getOutputDataproductCluster() != "CEP4") { + pTask->setConflict(CONFLICT_STORAGE_TOO_FEW_NODES); + itsConflictDialog->addStorageConflict(pTask, dpit->first, CONFLICT_STORAGE_TOO_FEW_NODES); + bResult = false; + break; + } } if (bResult) { @@ -5097,10 +5103,12 @@ bool Controller::assignStorageToTask(Task *pTask) { } } else { - pTask->setConflict(CONFLICT_STORAGE_TOO_FEW_NODES); - itsConflictDialog->addStorageConflict(pTask, dpit->first, CONFLICT_STORAGE_TOO_FEW_NODES); - bResult = false; - break; + if (pTask->getOutputDataproductCluster() != "CEP4") { + pTask->setConflict(CONFLICT_STORAGE_TOO_FEW_NODES); + itsConflictDialog->addStorageConflict(pTask, dpit->first, CONFLICT_STORAGE_TOO_FEW_NODES); + bResult = false; + break; + } } } } @@ -5110,15 +5118,15 @@ bool Controller::assignStorageToTask(Task *pTask) { } bool Controller::assignGroupedStorage(void) { // not for manual assignment of storage - bool bResult(true); - std::map<unsigned, std::vector<Task *> > groupedTasks = data.getGroupedTasks(Task::PRESCHEDULED); + bool bResult(true); + std::map<unsigned, std::vector<Task *> > groupedTasks = data.getGroupedTasks(Task::PRESCHEDULED); - std::vector<unsigned> emptyGroups; - if (!groupedTasks.empty()) { - std::vector<Task *> subGroupTasks; + std::vector<unsigned> emptyGroups; + if (!groupedTasks.empty()) { + std::vector<Task *> subGroupTasks; TaskStorage *tStorage; - for (std::map<unsigned, std::vector<Task *> >::iterator groupIt = groupedTasks.begin(); groupIt != groupedTasks.end(); ++groupIt) { - for (std::vector<Task *>::const_iterator taskit = groupIt->second.begin(); taskit != groupIt->second.end(); ++taskit) { + for (std::map<unsigned, std::vector<Task *> >::iterator groupIt = groupedTasks.begin(); groupIt != groupedTasks.end(); ++groupIt) { + for (std::vector<Task *>::const_iterator taskit = groupIt->second.begin(); taskit != groupIt->second.end(); ++taskit) { if ((*taskit)->hasStorage()) { tStorage = (*taskit)->storage(); if (tStorage->getStorageSelectionMode() != STORAGE_MODE_MANUAL) { // don't assign grouped storage to tasks that have manual storage assignment @@ -5130,454 +5138,464 @@ bool Controller::assignGroupedStorage(void) { // not for manual assignment of st } } } - if (subGroupTasks.empty()) { - emptyGroups.push_back(groupIt->first); - } - else { - groupIt->second = subGroupTasks; // removes all 'manual storage' tasks - subGroupTasks.clear(); - } - } - // remove groups that have no tasks with automatic storage assignment left - for (std::vector<unsigned>::const_iterator eit = emptyGroups.begin(); eit != emptyGroups.end(); ++eit) { - groupedTasks.erase(*eit); - } - if (groupedTasks.empty()) return bResult; // if nothing left return - - // get number of available storage nodes - unsigned nrOfAvailableNodes(Controller::theSchedulerSettings.getNrOfStorageNodesAvailable()); - if (nrOfAvailableNodes > 0) { - sortMode sort_mode; - - // the distribution algorithm used - const storageNodeDistribution &distribution(theSchedulerSettings.getStorageDistribution()); - if (distribution == STORAGE_DISTRIBUTION_FLAT_USAGE) { - sort_mode = SORT_USAGE; - } - else if (distribution == STORAGE_DISTIBUTION_LEAST_FRAGMENTED) { - sort_mode = SORT_SIZE; - } - else sort_mode = SORT_NONE; - - const preferredDataProductStorageMap &pdps(theSchedulerSettings.getPreferredDataProductStorage()); - const preferredProjectStorageMap &pps(theSchedulerSettings.getPreferredProjectStorage()); - std::vector<int> extra_nodes, usable_storage_nodes(itsDMConnection->getUsableStorageNodeIDs()); // all existing storage node IDs + if (subGroupTasks.empty()) { + emptyGroups.push_back(groupIt->first); + } + else { + groupIt->second = subGroupTasks; // removes all 'manual storage' tasks + subGroupTasks.clear(); + } + } + // remove groups that have no tasks with automatic storage assignment left + for (std::vector<unsigned>::const_iterator eit = emptyGroups.begin(); eit != emptyGroups.end(); ++eit) { + groupedTasks.erase(*eit); + } + if (groupedTasks.empty()) return bResult; // if nothing left return + // get number of available storage nodes + unsigned nrOfAvailableNodes(Controller::theSchedulerSettings.getNrOfStorageNodesAvailable()); + if (nrOfAvailableNodes > 0) { + sortMode sort_mode; - //determine grouped tasks combined storage needs and settings - dataFileMap combinedOutput; - storage_selection_mode mode; - std::map<dataProductTypes, int> combinedMinimumNrNodes; - std::map<dataProductTypes, double> maxSingleFileBW; - std::map<dataProductTypes, double>::iterator bwit; - dataFileMap::iterator dfit; - std::map<dataProductTypes, int>::iterator mit; - int project_id(0); - std::pair<unscheduled_reasons, QString> error; - for (std::map<unsigned, std::vector<Task *> >::const_iterator groupIt = groupedTasks.begin(); groupIt != groupedTasks.end(); ++groupIt) { // 2 - combinedOutput.clear(); - combinedMinimumNrNodes.clear(); - if (!groupIt->second.empty()) { - mode = groupIt->second.front()->storage()->getStorageSelectionMode(); - project_id = theSchedulerSettings.getCampaignInfo(groupIt->second.front()->getProjectName()).id; // should be the same for all tasks within group (no check done) - for (std::vector<Task *>::const_iterator taskit = groupIt->second.begin(); taskit != groupIt->second.end(); ++taskit) { // 3 - TaskStorage *task_storage((*taskit)->storage()); - if ((*taskit)->isPipeline()) { - Pipeline *pipe(static_cast<Pipeline *>(*taskit)); - error = setInputFilesForPipeline(pipe); - if (error.first == NO_ERROR) { - pipe->calculateDataFiles(); - const storageMap &inputStorageLocations(task_storage->getInputStorageLocations()); - storageMap::const_iterator inpcorit = inputStorageLocations.find(DP_CORRELATED_UV); - if (inpcorit != inputStorageLocations.end()) { - if (task_storage->isOutputDataProduktEnabled(DP_INSTRUMENT_MODEL)) { // set output storage nodes equal to input storage nodes for these type of data products - std::vector<storageResult> result = data.addStorageToTask(pipe, DP_INSTRUMENT_MODEL, inpcorit->second, false); - if (!result.empty()) { - for (std::vector<storageResult>::const_iterator sit = result.begin(); sit != result.end(); ++sit) { - if (sit->conflict != CONFLICT_NO_CONFLICT) { - itsConflictDialog->addStorageConflict(pipe, sit->dataProductType, sit->conflict); - } - pipe->setConflict(sit->conflict); - } - bResult = false; - } - } - if (task_storage->isOutputDataProduktEnabled(DP_CORRELATED_UV)) { - - // inpcorit->second bevat alle storage nodes, dus ook die van de eerdere SAPs waardoor - // dit dataprodukt de storage nodes van SAP000 van de input krijgt en niet die van SAP001 - std::vector<storageResult> result = data.addStorageToTask(pipe, DP_CORRELATED_UV, inpcorit->second, false); - if (!result.empty()) { - for (std::vector<storageResult>::const_iterator sit = result.begin(); sit != result.end(); ++sit) { - if (sit->conflict != CONFLICT_NO_CONFLICT) { - itsConflictDialog->addStorageConflict(pipe, sit->dataProductType, sit->conflict); - } - pipe->setConflict(sit->conflict); - } - bResult = false; - } - } + // the distribution algorithm used + const storageNodeDistribution &distribution(theSchedulerSettings.getStorageDistribution()); + if (distribution == STORAGE_DISTRIBUTION_FLAT_USAGE) { + sort_mode = SORT_USAGE; + } + else if (distribution == STORAGE_DISTIBUTION_LEAST_FRAGMENTED) { + sort_mode = SORT_SIZE; + } + else sort_mode = SORT_NONE; - task_storage->generateFileList(); - } - } - } + const preferredDataProductStorageMap &pdps(theSchedulerSettings.getPreferredDataProductStorage()); + const preferredProjectStorageMap &pps(theSchedulerSettings.getPreferredProjectStorage()); + std::vector<int> extra_nodes, usable_storage_nodes(itsDMConnection->getUsableStorageNodeIDs()); // all existing storage node IDs - double taskDuration((*taskit)->getDuration().totalSeconds() * 8); - const dataFileMap &dataFileSizes = task_storage->getOutputFileSizes(); - if (task_storage->getStorageSelectionMode() != mode) { - itsConflictDialog->addConflict(*taskit, CONFLICT_GROUP_STORAGE_MODE_DIFFERENT); - bResult = false; - } - if (!dataFileSizes.empty()) { - // summing individual file size for same data product types - for (dataFileMap::const_iterator dit = dataFileSizes.begin(); dit != dataFileSizes.end(); ++dit) { // 4 - if ((*taskit)->isObservation() || ((dit->first != DP_INSTRUMENT_MODEL) && (dit->first != DP_CORRELATED_UV))) { // skip this for instrument model and correlated, which is already set above - dfit = combinedOutput.find(dit->first); - if (dfit != combinedOutput.end()) { - dfit->second.first += dit->second.first; - } - else { - combinedOutput[dit->first] = std::pair<double, unsigned>(dit->second.first, dit->second.second); - } - //determine the maximum single file bandwidth for each data product type in the group of tasks - double currentBW = dit->second.first / taskDuration; // kbit/sec - bwit = maxSingleFileBW.find(dit->first); - if (bwit != maxSingleFileBW.end()) { - bwit->second = std::max(bwit->second, currentBW); - } - else { - maxSingleFileBW[dit->first] = currentBW; - } - } - } - // determining the overall minimum number of nodes needed for each data product type - std::map<dataProductTypes, int> minimumNrnodes = task_storage->getMinimumNrOfStorageNodes(); - for (std::map<dataProductTypes, int>::const_iterator minit = minimumNrnodes.begin(); minit != minimumNrnodes.end(); ++minit) { - if ((*taskit)->isObservation() || ((minit->first != DP_INSTRUMENT_MODEL) && (minit->first != DP_CORRELATED_UV))) { // skip for instrument model and correlated,they have been set - if (minit->second == -1) { - (*taskit)->setConflict(CONFLICT_STORAGE_SINGLE_FILE_BW_TOO_HIGH); - itsConflictDialog->addStorageConflict((*taskit), minit->first, CONFLICT_STORAGE_SINGLE_FILE_BW_TOO_HIGH); - bResult = false; - } - else { - mit = combinedMinimumNrNodes.find(minit->first); - if (mit != combinedMinimumNrNodes.end()) { - mit->second = std::max(mit->second, minit->second); - } - else { - combinedMinimumNrNodes[minit->first] = minit->second; - } - } - } - } - } - else { - itsConflictDialog->addConflict(*taskit, CONFLICT_STORAGE_NO_DATA); - bResult = false; - } - } // END 3 - } - else { - std::cout << "Controller::assignGroupedStorage: Warning: trying to assign storage to group:" << groupIt->first << " in which there are no tasks" << std::endl; - continue; - } + //determine grouped tasks combined storage needs and settings + dataFileMap combinedOutput; + storage_selection_mode mode; + std::map<dataProductTypes, int> combinedMinimumNrNodes; + std::map<dataProductTypes, double> maxSingleFileBW; + std::map<dataProductTypes, double>::iterator bwit; + dataFileMap::iterator dfit; + std::map<dataProductTypes, int>::iterator mit; + int project_id(0); + std::pair<unscheduled_reasons, QString> error; + for (std::map<unsigned, std::vector<Task *> >::const_iterator groupIt = groupedTasks.begin(); groupIt != groupedTasks.end(); ++groupIt) { // 2 + combinedOutput.clear(); + combinedMinimumNrNodes.clear(); + if (!groupIt->second.empty()) { + mode = groupIt->second.front()->storage()->getStorageSelectionMode(); + project_id = theSchedulerSettings.getCampaignInfo(groupIt->second.front()->getProjectName()).id; // should be the same for all tasks within group (no check done) + for (std::vector<Task *>::const_iterator taskit = groupIt->second.begin(); taskit != groupIt->second.end(); ++taskit) { // 3 + TaskStorage *task_storage((*taskit)->storage()); + if ((*taskit)->isPipeline()) { + Pipeline *pipe(static_cast<Pipeline *>(*taskit)); + error = setInputFilesForPipeline(pipe); + if (error.first == NO_ERROR) { + pipe->calculateDataFiles(); + const storageMap &inputStorageLocations(task_storage->getInputStorageLocations()); + storageMap::const_iterator inpcorit = inputStorageLocations.find(DP_CORRELATED_UV); + if (inpcorit != inputStorageLocations.end()) { + if (task_storage->isOutputDataProduktEnabled(DP_INSTRUMENT_MODEL)) { // set output storage nodes equal to input storage nodes for these type of data products + std::vector<storageResult> result = data.addStorageToTask(pipe, DP_INSTRUMENT_MODEL, inpcorit->second, false); + if (!result.empty()) { + for (std::vector<storageResult>::const_iterator sit = result.begin(); sit != result.end(); ++sit) { + if (sit->conflict != CONFLICT_NO_CONFLICT) { + itsConflictDialog->addStorageConflict(pipe, sit->dataProductType, sit->conflict); + } + pipe->setConflict(sit->conflict); + } + bResult = false; + } + } + if (task_storage->isOutputDataProduktEnabled(DP_CORRELATED_UV)) { + + // inpcorit->second bevat alle storage nodes, dus ook die van de eerdere SAPs waardoor + // dit dataprodukt de storage nodes van SAP000 van de input krijgt en niet die van SAP001 + std::vector<storageResult> result = data.addStorageToTask(pipe, DP_CORRELATED_UV, inpcorit->second, false); + if (!result.empty()) { + for (std::vector<storageResult>::const_iterator sit = result.begin(); sit != result.end(); ++sit) { + if (sit->conflict != CONFLICT_NO_CONFLICT) { + itsConflictDialog->addStorageConflict(pipe, sit->dataProductType, sit->conflict); + } + pipe->setConflict(sit->conflict); + } + bResult = false; + } + } - if (bResult) { // only do the actual group storage assignment if no conflicts are found + task_storage->generateFileList(); + } + } + } - if (!combinedOutput.empty()) { + double taskDuration((*taskit)->getDuration().totalSeconds() * 8); + const dataFileMap &dataFileSizes = task_storage->getOutputFileSizes(); + if (task_storage->getStorageSelectionMode() != mode) { + itsConflictDialog->addConflict(*taskit, CONFLICT_GROUP_STORAGE_MODE_DIFFERENT); + bResult = false; + } + if (!dataFileSizes.empty()) { + // summing individual file size for same data product types + for (dataFileMap::const_iterator dit = dataFileSizes.begin(); dit != dataFileSizes.end(); ++dit) { // 4 + if ((*taskit)->isObservation() || ((dit->first != DP_INSTRUMENT_MODEL) && (dit->first != DP_CORRELATED_UV))) { // skip this for instrument model and correlated, which is already set above + dfit = combinedOutput.find(dit->first); + if (dfit != combinedOutput.end()) { + dfit->second.first += dit->second.first; - // storage node selection by preferred project nodes? - // storage_selection_mode mode(pTask->getStorageSelectionMode()); - bool project_preferred_nodes; - if ((mode == STORAGE_MODE_MAXIMUM_PROJECT_PREFERRED) || (mode == STORAGE_MODE_MINIMUM_PROJECT_PREFERRED)) project_preferred_nodes = true; - else project_preferred_nodes = false; + } + else { + combinedOutput[dit->first] = std::pair<double, unsigned>(dit->second.first, dit->second.second); + } + //determine the maximum single file bandwidth for each data product type in the group of tasks + double currentBW = dit->second.first / taskDuration; // kbit/sec + bwit = maxSingleFileBW.find(dit->first); + if (bwit != maxSingleFileBW.end()) { + bwit->second = std::max(bwit->second, currentBW); + } + else { + maxSingleFileBW[dit->first] = currentBW; + } + } + } + // determining the overall minimum number of nodes needed for each data product type + std::map<dataProductTypes, int> minimumNrnodes = task_storage->getMinimumNrOfStorageNodes(); + for (std::map<dataProductTypes, int>::const_iterator minit = minimumNrnodes.begin(); minit != minimumNrnodes.end(); ++minit) { + if ((*taskit)->isObservation() || ((minit->first != DP_INSTRUMENT_MODEL) && (minit->first != DP_CORRELATED_UV))) { // skip for instrument model and correlated,they have been set + if (minit->second == -1) { + if ((*taskit)->getOutputDataproductCluster() != "CEP4") { + (*taskit)->setConflict(CONFLICT_STORAGE_SINGLE_FILE_BW_TOO_HIGH); + itsConflictDialog->addStorageConflict((*taskit), minit->first, CONFLICT_STORAGE_SINGLE_FILE_BW_TOO_HIGH); + bResult = false; + } + } + else { + mit = combinedMinimumNrNodes.find(minit->first); + if (mit != combinedMinimumNrNodes.end()) { + mit->second = std::max(mit->second, minit->second); + } + else { + combinedMinimumNrNodes[minit->first] = minit->second; + } + } + } + } + } + else { + itsConflictDialog->addConflict(*taskit, CONFLICT_STORAGE_NO_DATA); + bResult = false; + } + } // END 3 + } + else { + std::cout << "Controller::assignGroupedStorage: Warning: trying to assign storage to group:" << groupIt->first << " in which there are no tasks" << std::endl; + continue; + } + if (bResult) { // only do the actual group storage assignment if no conflicts are found -// double singleFileBW; - unsigned nrFiles(0), minNrOfNodes(0), nrFilesPerNode(0); + if (!combinedOutput.empty()) { - std::vector<int> preferred_nodes;// = emptyVec; + // storage node selection by preferred project nodes? + // storage_selection_mode mode(pTask->getStorageSelectionMode()); + bool project_preferred_nodes; + if ((mode == STORAGE_MODE_MAXIMUM_PROJECT_PREFERRED) || (mode == STORAGE_MODE_MINIMUM_PROJECT_PREFERRED)) project_preferred_nodes = true; + else project_preferred_nodes = false; - // *********************************************************************** - // ******************* SEARCH FOR SUITABLE LOCATIONS ********************* - // *********************************************************************** + // double singleFileBW; + unsigned nrFiles(0), minNrOfNodes(0), nrFilesPerNode(0); - // STEP2: search storageLocations for all dataProducts in sequence according to individual file size (large to small) of the dataProduct (i.e. sequence of sortedDataFiles) + std::vector<int> preferred_nodes;// = emptyVec; - preferredDataProductStorageMap::const_iterator pnit; - preferredProjectStorageMap::const_iterator ppit; - if (project_preferred_nodes) { - ppit = pps.find(project_id); - if (ppit != pps.end()) { - if (ppit->second.empty()) { - preferred_nodes = usable_storage_nodes; - } - else { - preferred_nodes = ppit->second; - for (std::vector<int>::const_iterator asit = usable_storage_nodes.begin(); asit != usable_storage_nodes.end(); ++asit) { - if (std::find(preferred_nodes.begin(), preferred_nodes.end(), *asit) == preferred_nodes.end()) { - extra_nodes.push_back(*asit); - } - } - } - } - else { - preferred_nodes = usable_storage_nodes; - } - } + // *********************************************************************** + // ******************* SEARCH FOR SUITABLE LOCATIONS ********************* + // *********************************************************************** - storageLocationOptions preferred_locations, extra_locations, common_pref_locations, common_extra_locations; - for (std::map<dataProductTypes, int>::const_iterator dpit = combinedMinimumNrNodes.begin(); dpit != combinedMinimumNrNodes.end(); ++dpit) { // 5 - for (std::vector<Task *>::const_iterator taskit = groupIt->second.begin(); taskit != groupIt->second.end(); ++taskit) { //6 + // STEP2: search storageLocations for all dataProducts in sequence according to individual file size (large to small) of the dataProduct (i.e. sequence of sortedDataFiles) - if (!project_preferred_nodes) { - pnit = pdps.find(dpit->first); // are there preferred storage nodes for this data product type specified? - if (pnit != pdps.end()) { - if (pnit->second.empty()) { - preferred_nodes = usable_storage_nodes; - } - else { - preferred_nodes = pnit->second; - for (std::vector<int>::const_iterator asit = usable_storage_nodes.begin(); asit != usable_storage_nodes.end(); ++asit) { - if (std::find(preferred_nodes.begin(), preferred_nodes.end(), *asit) == preferred_nodes.end()) { - extra_nodes.push_back(*asit); - } - } - } - } - else { - preferred_nodes = usable_storage_nodes; - } - } + preferredDataProductStorageMap::const_iterator pnit; + preferredProjectStorageMap::const_iterator ppit; - // now search for storage locations using the combined storage requirements (only search ones, not for all tasks separately, - // final check will be done when really assigning the storage to each grouped task - minNrOfNodes = dpit->second; // the minimum number of storage nodes REQUIRED for this data product - dfit = combinedOutput.find(dpit->first); - bwit = maxSingleFileBW.find(dpit->first); - if (dfit != combinedOutput.end()) { - nrFiles = dfit->second.second; - unsigned minNrFilesPerNode = static_cast<unsigned>(ceil((float)nrFiles / nrOfAvailableNodes)); // the minimum number of files the selected nodes should be able to hold - unsigned maxNrFilesPerNode = static_cast<unsigned>(floor((float)nrFiles / minNrOfNodes)); // the maximum number of files that can be written to one storage node according to bandwidth limitations - nrFilesPerNode = minNrFilesPerNode; + if (project_preferred_nodes) { + ppit = pps.find(project_id); + if (ppit != pps.end()) { + if (ppit->second.empty()) { + preferred_nodes = usable_storage_nodes; + } + else { + preferred_nodes = ppit->second; + for (std::vector<int>::const_iterator asit = usable_storage_nodes.begin(); asit != usable_storage_nodes.end(); ++asit) { + if (std::find(preferred_nodes.begin(), preferred_nodes.end(), *asit) == preferred_nodes.end()) { + extra_nodes.push_back(*asit); + } + } + } + } + else { + preferred_nodes = usable_storage_nodes; + } + } - // calculate the minimum number of files that have to fit on one storage node - if (nrOfAvailableNodes >= minNrOfNodes) { - if (minNrFilesPerNode <= maxNrFilesPerNode) { - preferred_locations = data.getStorageLocationOptions(dpit->first, (*taskit)->getScheduledStart(), (*taskit)->getScheduledEnd(), dfit->second.first, bwit->second, nrFilesPerNode, sort_mode, preferred_nodes); - extra_locations = data.getStorageLocationOptions(dpit->first, (*taskit)->getScheduledStart(), (*taskit)->getScheduledEnd(), dfit->second.first, bwit->second, nrFilesPerNode, sort_mode, extra_nodes); + storageLocationOptions preferred_locations, extra_locations, common_pref_locations, common_extra_locations; + for (std::map<dataProductTypes, int>::const_iterator dpit = combinedMinimumNrNodes.begin(); dpit != combinedMinimumNrNodes.end(); ++dpit) { // 5 + for (std::vector<Task *>::const_iterator taskit = groupIt->second.begin(); taskit != groupIt->second.end(); ++taskit) { //6 - if (preferred_locations.size() + extra_locations.size() >= minNrOfNodes) { + if (!project_preferred_nodes) { + pnit = pdps.find(dpit->first); // are there preferred storage nodes for this data product type specified? + if (pnit != pdps.end()) { + if (pnit->second.empty()) { + preferred_nodes = usable_storage_nodes; + } + else { + preferred_nodes = pnit->second; + for (std::vector<int>::const_iterator asit = usable_storage_nodes.begin(); asit != usable_storage_nodes.end(); ++asit) { + if (std::find(preferred_nodes.begin(), preferred_nodes.end(), *asit) == preferred_nodes.end()) { + extra_nodes.push_back(*asit); + } + } + } + } + else { + preferred_nodes = usable_storage_nodes; + } + } - preferred_nodes.clear(); - extra_nodes.clear(); - for (storageLocationOptions::const_iterator cpit = preferred_locations.begin(); cpit != preferred_locations.end(); ++cpit) { - preferred_nodes.push_back(cpit->first); - } - for (storageLocationOptions::const_iterator cpit = extra_locations.begin(); cpit != extra_locations.end(); ++cpit) { - extra_nodes.push_back(cpit->first); - } + // now search for storage locations using the combined storage requirements (only search ones, not for all tasks separately, + // final check will be done when really assigning the storage to each grouped task + minNrOfNodes = dpit->second; // the minimum number of storage nodes REQUIRED for this data product + dfit = combinedOutput.find(dpit->first); + bwit = maxSingleFileBW.find(dpit->first); + if (dfit != combinedOutput.end()) { + nrFiles = dfit->second.second; + unsigned minNrFilesPerNode = static_cast<unsigned>(ceil((float)nrFiles / nrOfAvailableNodes)); // the minimum number of files the selected nodes should be able to hold + unsigned maxNrFilesPerNode = static_cast<unsigned>(floor((float)nrFiles / minNrOfNodes)); // the maximum number of files that can be written to one storage node according to bandwidth limitations + nrFilesPerNode = minNrFilesPerNode; + + // calculate the minimum number of files that have to fit on one storage node + if (nrOfAvailableNodes >= minNrOfNodes) { + if (minNrFilesPerNode <= maxNrFilesPerNode) { + preferred_locations = data.getStorageLocationOptions(dpit->first, (*taskit)->getScheduledStart(), (*taskit)->getScheduledEnd(), dfit->second.first, bwit->second, nrFilesPerNode, sort_mode, preferred_nodes); + extra_locations = data.getStorageLocationOptions(dpit->first, (*taskit)->getScheduledStart(), (*taskit)->getScheduledEnd(), dfit->second.first, bwit->second, nrFilesPerNode, sort_mode, extra_nodes); + + if (preferred_locations.size() + extra_locations.size() >= minNrOfNodes) { + + preferred_nodes.clear(); + extra_nodes.clear(); + for (storageLocationOptions::const_iterator cpit = preferred_locations.begin(); cpit != preferred_locations.end(); ++cpit) { + preferred_nodes.push_back(cpit->first); + } + for (storageLocationOptions::const_iterator cpit = extra_locations.begin(); cpit != extra_locations.end(); ++cpit) { + extra_nodes.push_back(cpit->first); + } - if (preferred_nodes.empty() && extra_nodes.empty()) { - (*taskit)->setConflict(CONFLICT_STORAGE_NO_OPTIONS); - itsConflictDialog->addStorageConflict((*taskit), dpit->first, CONFLICT_STORAGE_NO_OPTIONS); - bResult = false; - break; - } + if (preferred_nodes.empty() && extra_nodes.empty()) { + (*taskit)->setConflict(CONFLICT_STORAGE_NO_OPTIONS); + itsConflictDialog->addStorageConflict((*taskit), dpit->first, CONFLICT_STORAGE_NO_OPTIONS); + bResult = false; + break; + } - if (nrFilesPerNode > maxNrFilesPerNode) { // nr files per node too high will exceed bandwidth to node - (*taskit)->setConflict(CONFLICT_STORAGE_EXCEEDS_BANDWIDTH); - itsConflictDialog->addStorageConflict((*taskit), dpit->first, CONFLICT_STORAGE_EXCEEDS_BANDWIDTH); - bResult = false; - break; - } - } - else { - (*taskit)->setConflict(CONFLICT_STORAGE_MINIMUM_NODES); - itsConflictDialog->addStorageConflict((*taskit), dpit->first, CONFLICT_STORAGE_MINIMUM_NODES); - bResult = false; - break; - } - } - else { - (*taskit)->setConflict(CONFLICT_STORAGE_EXCEEDS_BANDWIDTH); - itsConflictDialog->addStorageConflict((*taskit), dpit->first, CONFLICT_STORAGE_EXCEEDS_BANDWIDTH); - bResult = false; - break; - } - } - else { - (*taskit)->setConflict(CONFLICT_STORAGE_TOO_FEW_NODES); - itsConflictDialog->addStorageConflict((*taskit), dpit->first, CONFLICT_STORAGE_TOO_FEW_NODES); - bResult = false; - break; - } - } + if (nrFilesPerNode > maxNrFilesPerNode) { // nr files per node too high will exceed bandwidth to node + if ((*taskit)->getOutputDataproductCluster() != "CEP4") { + (*taskit)->setConflict(CONFLICT_STORAGE_EXCEEDS_BANDWIDTH); + itsConflictDialog->addStorageConflict((*taskit), dpit->first, CONFLICT_STORAGE_EXCEEDS_BANDWIDTH); + bResult = false; + break; + } + } + } + else { + if ((*taskit)->getOutputDataproductCluster() != "CEP4") { + (*taskit)->setConflict(CONFLICT_STORAGE_MINIMUM_NODES); + itsConflictDialog->addStorageConflict((*taskit), dpit->first, CONFLICT_STORAGE_MINIMUM_NODES); + bResult = false; + break; + } + } + } + else { + if ((*taskit)->getOutputDataproductCluster() != "CEP4") { + (*taskit)->setConflict(CONFLICT_STORAGE_EXCEEDS_BANDWIDTH); + itsConflictDialog->addStorageConflict((*taskit), dpit->first, CONFLICT_STORAGE_EXCEEDS_BANDWIDTH); + bResult = false; + break; + } + } + } + else { + if ((*taskit)->getOutputDataproductCluster() != "CEP4") { + (*taskit)->setConflict(CONFLICT_STORAGE_TOO_FEW_NODES); + itsConflictDialog->addStorageConflict((*taskit), dpit->first, CONFLICT_STORAGE_TOO_FEW_NODES); + bResult = false; + break; + } + } + } - if (bResult) { - // check which nodes they have in common, assign common nodes with checking! - // *********************************************************************** - // ************* DISTRIBUTION OF DATA OVER STORAGE NODES ***************** - // *********************************************************************** - // use the maximum number of suitable storage nodes (= suitable_locations.size()) - // only keep the locations that are common to all tasks for this data product - if (taskit == groupIt->second.begin()) { // for first task search, keep all options - common_pref_locations = preferred_locations; - common_extra_locations = extra_locations; - } - else { - storageLocationOptions new_common_pref_locations, new_common_extra_locations; - for (storageLocationOptions::const_iterator sit = common_pref_locations.begin(); sit != common_pref_locations.end(); ++sit) { - for (nodeStorageOptions::const_iterator nsit = sit->second.begin(); nsit != sit->second.end(); ++nsit) { // iterates over the raids - if (storageLocationsContains(preferred_locations, sit->first, nsit->raidID)) { - new_common_pref_locations.push_back(*sit); - } - } - } - common_pref_locations = new_common_pref_locations; - for (storageLocationOptions::const_iterator sit = common_extra_locations.begin(); sit != common_extra_locations.end(); ++sit) { - for (nodeStorageOptions::const_iterator nsit = sit->second.begin(); nsit != sit->second.end(); ++nsit) { // iterates over the raids - if (storageLocationsContains(extra_locations, sit->first, nsit->raidID)) { - new_common_extra_locations.push_back(*sit); - } - } - } - common_extra_locations = new_common_extra_locations; - } - } - } // 6, end of search for all task in this group for the current data product + if (bResult) { + // check which nodes they have in common, assign common nodes with checking! + // *********************************************************************** + // ************* DISTRIBUTION OF DATA OVER STORAGE NODES ***************** + // *********************************************************************** + // use the maximum number of suitable storage nodes (= suitable_locations.size()) + // only keep the locations that are common to all tasks for this data product + if (taskit == groupIt->second.begin()) { // for first task search, keep all options + common_pref_locations = preferred_locations; + common_extra_locations = extra_locations; + } + else { + storageLocationOptions new_common_pref_locations, new_common_extra_locations; + for (storageLocationOptions::const_iterator sit = common_pref_locations.begin(); sit != common_pref_locations.end(); ++sit) { + for (nodeStorageOptions::const_iterator nsit = sit->second.begin(); nsit != sit->second.end(); ++nsit) { // iterates over the raids + if (storageLocationsContains(preferred_locations, sit->first, nsit->raidID)) { + new_common_pref_locations.push_back(*sit); + } + } + } + common_pref_locations = new_common_pref_locations; + for (storageLocationOptions::const_iterator sit = common_extra_locations.begin(); sit != common_extra_locations.end(); ++sit) { + for (nodeStorageOptions::const_iterator nsit = sit->second.begin(); nsit != sit->second.end(); ++nsit) { // iterates over the raids + if (storageLocationsContains(extra_locations, sit->first, nsit->raidID)) { + new_common_extra_locations.push_back(*sit); + } + } + } + common_extra_locations = new_common_extra_locations; + } + } + } // 6, end of search for all task in this group for the current data product - unsigned maxFilesToNodes(MAX_UNSIGNED); - storageVector locations; + unsigned maxFilesToNodes(MAX_UNSIGNED); + storageVector locations; - bool sufficient_locations(false); - // select enough locations from the common location solutions - if ((mode == STORAGE_MODE_MAXIMUM_DATA_TYPE_PREFERRED) || (mode == STORAGE_MODE_MAXIMUM_PROJECT_PREFERRED)) { - for (storageLocationOptions::const_iterator sit = common_pref_locations.begin(); sit != common_pref_locations.end(); ++sit) { // iterate over available locations - for (nodeStorageOptions::const_iterator nsit = sit->second.begin(); nsit != sit->second.end(); ++nsit) { // iterates over the raids - locations.push_back(storageVector::value_type(sit->first, nsit->raidID)); // use only the first raid option of each storage node available - // also determine the maximum number of files that can be written to a single suitable node, needed to determine the minimum number of extra (non-preferred) nodes needed in addition to the preferred nodes - maxFilesToNodes = std::min(maxFilesToNodes, nsit->nrUnits); - if (locations.size() * nrFilesPerNode >= nrFiles) { - sufficient_locations = true; -// break; // don't assign more storage nodes than the number of files written - } + bool sufficient_locations(false); + // select enough locations from the common location solutions + if ((mode == STORAGE_MODE_MAXIMUM_DATA_TYPE_PREFERRED) || (mode == STORAGE_MODE_MAXIMUM_PROJECT_PREFERRED)) { + for (storageLocationOptions::const_iterator sit = common_pref_locations.begin(); sit != common_pref_locations.end(); ++sit) { // iterate over available locations + for (nodeStorageOptions::const_iterator nsit = sit->second.begin(); nsit != sit->second.end(); ++nsit) { // iterates over the raids + locations.push_back(storageVector::value_type(sit->first, nsit->raidID)); // use only the first raid option of each storage node available + // also determine the maximum number of files that can be written to a single suitable node, needed to determine the minimum number of extra (non-preferred) nodes needed in addition to the preferred nodes + maxFilesToNodes = std::min(maxFilesToNodes, nsit->nrUnits); + if (locations.size() * nrFilesPerNode >= nrFiles) { + sufficient_locations = true; + // break; // don't assign more storage nodes than the number of files written + } - } -// if (sufficient_locations) break; - } - if (!sufficient_locations && (locations.size() < minNrOfNodes)) { // do we need extra locations (non preferred nodes)? If so, use as few as possible of these - for (storageLocationOptions::const_iterator sit = common_extra_locations.begin(); sit != common_extra_locations.end(); ++sit) { - for (nodeStorageOptions::const_iterator nsit = sit->second.begin(); nsit != sit->second.end(); ++nsit) { - locations.push_back(storageVector::value_type(sit->first, nsit->raidID)); - maxFilesToNodes = std::min(maxFilesToNodes, nsit->nrUnits); - if ((locations.size() * maxFilesToNodes >= nrFiles) && (locations.size() >= minNrOfNodes)) { - sufficient_locations = true; - break; // don't assign more storage nodes than the number of files written - } - } - if (sufficient_locations) break; - } - } - } - else if ((mode == STORAGE_MODE_MINIMUM_DATA_TYPE_PREFERRED) || (mode == STORAGE_MODE_MINIMUM_PROJECT_PREFERRED)) { - bool inserted(false); - vector<std::pair<int, storageOption> > smallest_vec; // first = node ID - for (storageLocationOptions::const_iterator sit = common_pref_locations.begin(); sit != common_pref_locations.end(); ++sit) { - for (nodeStorageOptions::const_iterator nsit = sit->second.begin(); nsit != sit->second.end(); ++nsit) { - maxFilesToNodes = std::min(maxFilesToNodes, nsit->nrUnits); // determine the maximum number of files that can be written to a single suitable node - for (vector<std::pair<int, storageOption> >::iterator ssit = smallest_vec.begin(); ssit != smallest_vec.end(); ++ssit) { - if (nsit->remainingSpacekB < ssit->second.remainingSpacekB) { // sort according to free space in smallest_vec - smallest_vec.insert(ssit, std::pair<int, storageOption>(sit->first, *nsit)); // insert the smallest free space raid arrays in smallest_vec - inserted = true; - break; - } - } - if (!inserted) { - smallest_vec.push_back(std::pair<int, storageOption>(sit->first, *nsit)); // put at the end (it's has the largest free space up to now) - break; // only one raid array per node here - } - } - if (smallest_vec.size() * nrFilesPerNode >= nrFiles) { - sufficient_locations = true; - break; // don't assign more storage nodes than the number of files written - } - } + } + // if (sufficient_locations) break; + } + if (!sufficient_locations && (locations.size() < minNrOfNodes)) { // do we need extra locations (non preferred nodes)? If so, use as few as possible of these + for (storageLocationOptions::const_iterator sit = common_extra_locations.begin(); sit != common_extra_locations.end(); ++sit) { + for (nodeStorageOptions::const_iterator nsit = sit->second.begin(); nsit != sit->second.end(); ++nsit) { + locations.push_back(storageVector::value_type(sit->first, nsit->raidID)); + maxFilesToNodes = std::min(maxFilesToNodes, nsit->nrUnits); + if ((locations.size() * maxFilesToNodes >= nrFiles) && (locations.size() >= minNrOfNodes)) { + sufficient_locations = true; + break; // don't assign more storage nodes than the number of files written + } + } + if (sufficient_locations) break; + } + } + } + else if ((mode == STORAGE_MODE_MINIMUM_DATA_TYPE_PREFERRED) || (mode == STORAGE_MODE_MINIMUM_PROJECT_PREFERRED)) { + bool inserted(false); + vector<std::pair<int, storageOption> > smallest_vec; // first = node ID + for (storageLocationOptions::const_iterator sit = common_pref_locations.begin(); sit != common_pref_locations.end(); ++sit) { + for (nodeStorageOptions::const_iterator nsit = sit->second.begin(); nsit != sit->second.end(); ++nsit) { + maxFilesToNodes = std::min(maxFilesToNodes, nsit->nrUnits); // determine the maximum number of files that can be written to a single suitable node + for (vector<std::pair<int, storageOption> >::iterator ssit = smallest_vec.begin(); ssit != smallest_vec.end(); ++ssit) { + if (nsit->remainingSpacekB < ssit->second.remainingSpacekB) { // sort according to free space in smallest_vec + smallest_vec.insert(ssit, std::pair<int, storageOption>(sit->first, *nsit)); // insert the smallest free space raid arrays in smallest_vec + inserted = true; + break; + } + } + if (!inserted) { + smallest_vec.push_back(std::pair<int, storageOption>(sit->first, *nsit)); // put at the end (it's has the largest free space up to now) + break; // only one raid array per node here + } + } + if (smallest_vec.size() * nrFilesPerNode >= nrFiles) { + sufficient_locations = true; + break; // don't assign more storage nodes than the number of files written + } + } - if (!sufficient_locations && (locations.size() < minNrOfNodes)) { // do we need extra locations (non preferred nodes)? If so, use as few as possible of these - for (storageLocationOptions::const_iterator sit = common_extra_locations.begin(); sit != common_extra_locations.end(); ++sit) { - for (nodeStorageOptions::const_iterator nsit = sit->second.begin(); nsit != sit->second.end(); ++nsit) { - maxFilesToNodes = std::min(maxFilesToNodes, nsit->nrUnits); - for (vector<std::pair<int, storageOption> >::iterator ssit = smallest_vec.begin(); ssit != smallest_vec.end(); ++ssit) { - if (nsit->remainingSpacekB < ssit->second.remainingSpacekB) { // sort according to free space in smallest_vec - smallest_vec.insert(ssit, std::pair<int, storageOption>(sit->first, *nsit)); // insert the smallest free space raid arrays in smallest_vec - inserted = true; - break; - } - } - if (!inserted) { - smallest_vec.push_back(std::pair<int, storageOption>(sit->first, *nsit)); // put at the end (it has the largest free space up to now) - break; // only one raid array per node here - } - } - if ((smallest_vec.size() * maxFilesToNodes >= nrFiles) & (smallest_vec.size() >= minNrOfNodes)) { - break; // don't assign more storage nodes than the number of files written - } - } - } - for (unsigned i = 0; i < minNrOfNodes; ++i) { - locations.push_back(pair<int,int>(smallest_vec.at(i).first, smallest_vec.at(i).second.raidID)); - } - } + if (!sufficient_locations && (locations.size() < minNrOfNodes)) { // do we need extra locations (non preferred nodes)? If so, use as few as possible of these + for (storageLocationOptions::const_iterator sit = common_extra_locations.begin(); sit != common_extra_locations.end(); ++sit) { + for (nodeStorageOptions::const_iterator nsit = sit->second.begin(); nsit != sit->second.end(); ++nsit) { + maxFilesToNodes = std::min(maxFilesToNodes, nsit->nrUnits); + for (vector<std::pair<int, storageOption> >::iterator ssit = smallest_vec.begin(); ssit != smallest_vec.end(); ++ssit) { + if (nsit->remainingSpacekB < ssit->second.remainingSpacekB) { // sort according to free space in smallest_vec + smallest_vec.insert(ssit, std::pair<int, storageOption>(sit->first, *nsit)); // insert the smallest free space raid arrays in smallest_vec + inserted = true; + break; + } + } + if (!inserted) { + smallest_vec.push_back(std::pair<int, storageOption>(sit->first, *nsit)); // put at the end (it has the largest free space up to now) + break; // only one raid array per node here + } + } + if ((smallest_vec.size() * maxFilesToNodes >= nrFiles) & (smallest_vec.size() >= minNrOfNodes)) { + break; // don't assign more storage nodes than the number of files written + } + } + } + for (unsigned i = 0; i < minNrOfNodes; ++i) { + locations.push_back(pair<int,int>(smallest_vec.at(i).first, smallest_vec.at(i).second.raidID)); + } + } - // finally assign the common storage locations for this data product to all tasks in the group and check if the result is ok (i.e. no conflicts) - for (std::vector<Task *>::const_iterator taskit = groupIt->second.begin(); taskit != groupIt->second.end(); ++taskit) { // 7 - if ((*taskit)->storage()->isOutputDataProduktEnabled(dpit->first)) { - std::vector<storageResult> result = data.addStorageToTask(*taskit, dpit->first, locations, false); - if (!result.empty()) { - for (std::vector<storageResult>::const_iterator sit = result.begin(); sit != result.end(); ++sit) { - if (sit->conflict != CONFLICT_NO_CONFLICT) { - itsConflictDialog->addStorageConflict(*taskit, sit->dataProductType, sit->conflict); - } - (*taskit)->setConflict(sit->conflict); - } - bResult = false; - } - } - } // END 7 - } // 5 - for (std::vector<Task *>::const_iterator taskit = groupIt->second.begin(); taskit != groupIt->second.end(); ++taskit) { - if ((*taskit)->isPipeline()) { - Pipeline *pipe(static_cast<Pipeline *>(*taskit)); - setInputFilesForPipeline(pipe); - // we have to re-assign the storage for the pipeline because the predecessor (observation or pipeline) might have changed in the previous loop - for (dataProductTypes dp = _BEGIN_DATA_PRODUCTS_ENUM_; dp < _END_DATA_PRODUCTS_ENUM_; dp = dataProductTypes(dp + 1)) { - if (dp != DP_SKY_IMAGE && pipe->storage()->isOutputDataProduktEnabled(dp)) { // for SKY_IMAGE input nodes are not equal to output nodes - const storageMap &inputStorageLocations(pipe->storage()->getInputStorageLocations()); - storageMap::const_iterator inpcorit = inputStorageLocations.find(DP_CORRELATED_UV); - if (inpcorit != inputStorageLocations.end()) { - data.addStorageToTask(*taskit, dp, inpcorit->second, false); - } - } - } - } - (*taskit)->storage()->generateFileList(); - } + // finally assign the common storage locations for this data product to all tasks in the group and check if the result is ok (i.e. no conflicts) + for (std::vector<Task *>::const_iterator taskit = groupIt->second.begin(); taskit != groupIt->second.end(); ++taskit) { // 7 + if ((*taskit)->storage()->isOutputDataProduktEnabled(dpit->first)) { + std::vector<storageResult> result = data.addStorageToTask(*taskit, dpit->first, locations, false); + if (!result.empty()) { + for (std::vector<storageResult>::const_iterator sit = result.begin(); sit != result.end(); ++sit) { + if (sit->conflict != CONFLICT_NO_CONFLICT) { + itsConflictDialog->addStorageConflict(*taskit, sit->dataProductType, sit->conflict); + } + (*taskit)->setConflict(sit->conflict); + } + bResult = false; + } + } + } // END 7 + } // 5 + for (std::vector<Task *>::const_iterator taskit = groupIt->second.begin(); taskit != groupIt->second.end(); ++taskit) { + if ((*taskit)->isPipeline()) { + Pipeline *pipe(static_cast<Pipeline *>(*taskit)); + setInputFilesForPipeline(pipe); + // we have to re-assign the storage for the pipeline because the predecessor (observation or pipeline) might have changed in the previous loop + for (dataProductTypes dp = _BEGIN_DATA_PRODUCTS_ENUM_; dp < _END_DATA_PRODUCTS_ENUM_; dp = dataProductTypes(dp + 1)) { + if (dp != DP_SKY_IMAGE && pipe->storage()->isOutputDataProduktEnabled(dp)) { // for SKY_IMAGE input nodes are not equal to output nodes + const storageMap &inputStorageLocations(pipe->storage()->getInputStorageLocations()); + storageMap::const_iterator inpcorit = inputStorageLocations.find(DP_CORRELATED_UV); + if (inpcorit != inputStorageLocations.end()) { + data.addStorageToTask(*taskit, dp, inpcorit->second, false); + } + } + } + } + (*taskit)->storage()->generateFileList(); + } - } - } - } // END for loop over individual groups - } - else return false; // no storage nodes available - } + } + } + } // END for loop over individual groups + } + else return false; // no storage nodes available + } - return bResult; + return bResult; } diff --git a/SAS/Scheduler/src/Storage.cpp b/SAS/Scheduler/src/Storage.cpp index dc64f36ee7f6f844786820ae0f78b925bc08aec2..126174c33ffd60ac60c19f46ef75d4a3c0481a95 100644 --- a/SAS/Scheduler/src/Storage.cpp +++ b/SAS/Scheduler/src/Storage.cpp @@ -125,63 +125,64 @@ std::vector<storageResult> Storage::addStorageToTask(Task *pTask, const storageM itsLastStorageCheckResult.clear(); if (pTask->getOutputDataproductCluster() == "CEP4") { //Can we just skip this for CEP4 ? /AR debugWarn("sis","Storage::addStorageToTask: Did not check storage for task:", pTask->getID(), " (CEP4 detected)"); - return itsLastStorageCheckResult; } - // check if the total bandwidths for the nodes used do not exceed the nodes their available bandwidths - for (std::map<int, double>::const_iterator nit = totalBWPerNodeMap.begin(); nit != totalBWPerNodeMap.end(); ++nit) { - storageNodesMap::const_iterator nodeit = itsStorageNodes.find(nit->first); - if (nodeit != itsStorageNodes.end()) { - // std::cout << "Total bandwidth required for node:" << nodeit->second.name() << " = " << nit->second << " kb/s" << std::endl; - res = nodeit->second.checkBandWidth(start, end, nit->second); - if (res != CONFLICT_NO_CONFLICT) { - itsLastStorageCheckResult.push_back(storageResult(_END_DATA_PRODUCTS_ENUM_, nit->first, -1, res)); + else { + // check if the total bandwidths for the nodes used do not exceed the nodes their available bandwidths + for (std::map<int, double>::const_iterator nit = totalBWPerNodeMap.begin(); nit != totalBWPerNodeMap.end(); ++nit) { + storageNodesMap::const_iterator nodeit = itsStorageNodes.find(nit->first); + if (nodeit != itsStorageNodes.end()) { + // std::cout << "Total bandwidth required for node:" << nodeit->second.name() << " = " << nit->second << " kb/s" << std::endl; + res = nodeit->second.checkBandWidth(start, end, nit->second); + if (res != CONFLICT_NO_CONFLICT) { + itsLastStorageCheckResult.push_back(storageResult(_END_DATA_PRODUCTS_ENUM_, nit->first, -1, res)); + } } } - } - if (itsLastStorageCheckResult.empty()) { // if no total bandwidth error for any node then start the rest of the checks - for (dataFileMap::const_iterator dfit = dataFiles.begin(); dfit != dataFiles.end(); ++dfit) { - storageMap::const_iterator stit = storageLocations.find(dfit->first); - if (stit != storageLocations.end()) { - if (!stit->second.empty()) { - claimSize = (double) dfit->second.first * dfit->second.second / stit->second.size(); // size per file * nrFiles / nr of raid arrays assigned - bandWidth = (double) claimSize / 1000 / durationSec; // MByte/sec, the required remaining disk write speed (or bandwidth) for this array + if (itsLastStorageCheckResult.empty()) { // if no total bandwidth error for any node then start the rest of the checks + for (dataFileMap::const_iterator dfit = dataFiles.begin(); dfit != dataFiles.end(); ++dfit) { + storageMap::const_iterator stit = storageLocations.find(dfit->first); + if (stit != storageLocations.end()) { + if (!stit->second.empty()) { + claimSize = (double) dfit->second.first * dfit->second.second / stit->second.size(); // size per file * nrFiles / nr of raid arrays assigned + bandWidth = (double) claimSize / 1000 / durationSec; // MByte/sec, the required remaining disk write speed (or bandwidth) for this array - // check requested resources - for (storageVector::const_iterator it = stit->second.begin(); it != stit->second.end(); ++it) { - sit = itsStorageNodes.find(it->first); - if (sit != itsStorageNodes.end()) { - // check size requirements - res = sit->second.checkSpaceAndWriteSpeed(start, end, claimSize, bandWidth, it->second); // check space and write speed for every raid array - if (res != CONFLICT_NO_CONFLICT) { - itsLastStorageCheckResult.push_back(storageResult(dfit->first, it->first, it->second, res)); - // itsLastStorageCheckResult[it->first].push_back(std::pair<int, task_conflict>(it->second, res)); // store the error result - } - else { // add the claim - sit->second.addClaim(pTask->getID(), start, end, dfit->first, claimSize, bandWidth, it->second); - } - } - } - // if there were conflicts then remove the claim again from the storage nodes - if (!itsLastStorageCheckResult.empty()) { - std::vector<int> snd; + // check requested resources for (storageVector::const_iterator it = stit->second.begin(); it != stit->second.end(); ++it) { sit = itsStorageNodes.find(it->first); if (sit != itsStorageNodes.end()) { - if (std::find(snd.begin(), snd.end(), stit->first) == snd.end()) { - sit->second.removeClaim(pTask->getID()); // only call removeClaim one time for every storage node (it removes all claims found for the task ID) - snd.push_back(stit->first); + // check size requirements + res = sit->second.checkSpaceAndWriteSpeed(start, end, claimSize, bandWidth, it->second); // check space and write speed for every raid array + if (res != CONFLICT_NO_CONFLICT) { + itsLastStorageCheckResult.push_back(storageResult(dfit->first, it->first, it->second, res)); + // itsLastStorageCheckResult[it->first].push_back(std::pair<int, task_conflict>(it->second, res)); // store the error result + } + else { // add the claim + sit->second.addClaim(pTask->getID(), start, end, dfit->first, claimSize, bandWidth, it->second); + } + } + } + // if there were conflicts then remove the claim again from the storage nodes + if (!itsLastStorageCheckResult.empty()) { + std::vector<int> snd; + for (storageVector::const_iterator it = stit->second.begin(); it != stit->second.end(); ++it) { + sit = itsStorageNodes.find(it->first); + if (sit != itsStorageNodes.end()) { + if (std::find(snd.begin(), snd.end(), stit->first) == snd.end()) { + sit->second.removeClaim(pTask->getID()); // only call removeClaim one time for every storage node (it removes all claims found for the task ID) + snd.push_back(stit->first); + } } } } } + else { // no storage has been assigned to this data product type + itsLastStorageCheckResult.push_back(storageResult(dfit->first, -1, -1, CONFLICT_NO_STORAGE_ASSIGNED)); + } } else { // no storage has been assigned to this data product type itsLastStorageCheckResult.push_back(storageResult(dfit->first, -1, -1, CONFLICT_NO_STORAGE_ASSIGNED)); } } - else { // no storage has been assigned to this data product type - itsLastStorageCheckResult.push_back(storageResult(dfit->first, -1, -1, CONFLICT_NO_STORAGE_ASSIGNED)); - } } } if (itsLastStorageCheckResult.empty()) {