diff --git a/LCS/PyCommon/CMakeLists.txt b/LCS/PyCommon/CMakeLists.txt index f5040f3b74119e61788d2cf3793230496216397b..9213dccaa2fabf10cf84383d4eab1e391b5de8b4 100644 --- a/LCS/PyCommon/CMakeLists.txt +++ b/LCS/PyCommon/CMakeLists.txt @@ -14,6 +14,7 @@ set(_py_files __init__.py ssh_utils.py cep4_utils.py + cobaltblocksize.py threading_utils.py lcu_utils.py cache.py diff --git a/LCS/PyCommon/test/CMakeLists.txt b/LCS/PyCommon/test/CMakeLists.txt index 624f130d2336df98ce720564ea572792c39bc894..f2b9b9ca9a12c8e03a9b1b7a1e0cf0fc5f702041 100644 --- a/LCS/PyCommon/test/CMakeLists.txt +++ b/LCS/PyCommon/test/CMakeLists.txt @@ -22,6 +22,7 @@ IF(BUILD_TESTING) DESTINATION ${CMAKE_BINARY_DIR}/bin) lofar_add_test(t_cache) + lofar_add_test(t_cobaltblocksize) lofar_add_test(t_dbcredentials) lofar_add_test(t_defaultmailaddresses) lofar_add_test(t_methodtrigger) diff --git a/SAS/ResourceAssignment/TaskPrescheduler/test/t_cobaltblocksize.py b/LCS/PyCommon/test/t_cobaltblocksize.py similarity index 96% rename from SAS/ResourceAssignment/TaskPrescheduler/test/t_cobaltblocksize.py rename to LCS/PyCommon/test/t_cobaltblocksize.py index 29799a8f64d7a3843fd2e616cdcf6418ca675f54..62931b92c6f4e28c05d0a9d9bd7418ae3c5c5b41 100644 --- a/SAS/ResourceAssignment/TaskPrescheduler/test/t_cobaltblocksize.py +++ b/LCS/PyCommon/test/t_cobaltblocksize.py @@ -1,5 +1,5 @@ import unittest -from lofar.sas.resourceassignment.taskprescheduler.cobaltblocksize import StokesSettings, CorrelatorSettings, BlockConstraints, BlockSize +from lofar.common.cobaltblocksize import StokesSettings, CorrelatorSettings, BlockConstraints, BlockSize import logging from lofar.common.test_utils import unit_test diff --git a/SAS/ResourceAssignment/TaskPrescheduler/test/t_cobaltblocksize.run b/LCS/PyCommon/test/t_cobaltblocksize.run similarity index 100% rename from SAS/ResourceAssignment/TaskPrescheduler/test/t_cobaltblocksize.run rename to LCS/PyCommon/test/t_cobaltblocksize.run diff --git a/SAS/ResourceAssignment/TaskPrescheduler/test/t_cobaltblocksize.sh b/LCS/PyCommon/test/t_cobaltblocksize.sh similarity index 100% rename from SAS/ResourceAssignment/TaskPrescheduler/test/t_cobaltblocksize.sh rename to LCS/PyCommon/test/t_cobaltblocksize.sh diff --git a/SAS/ResourceAssignment/TaskPrescheduler/CMakeLists.txt b/SAS/ResourceAssignment/TaskPrescheduler/CMakeLists.txt index 53c7782ec1ec2f51e8ad67c7a9fd4a362c0d04d6..d9fd6cf5575e7b72fdb4cc21762274ab861e3012 100644 --- a/SAS/ResourceAssignment/TaskPrescheduler/CMakeLists.txt +++ b/SAS/ResourceAssignment/TaskPrescheduler/CMakeLists.txt @@ -1,6 +1,6 @@ # $Id$ -lofar_package(TaskPrescheduler 1.0 DEPENDS PyMessaging ResourceAssignmentService ResourceAssigner OTDB_Services MoMQueryServiceClient pyparameterset RACommon) +lofar_package(TaskPrescheduler 1.0 DEPENDS PyCommon PyMessaging ResourceAssignmentService ResourceAssigner OTDB_Services MoMQueryServiceClient pyparameterset RACommon) lofar_find_package(Python 3.4 REQUIRED) include(PythonInstall) diff --git a/SAS/ResourceAssignment/TaskPrescheduler/lib/CMakeLists.txt b/SAS/ResourceAssignment/TaskPrescheduler/lib/CMakeLists.txt index cbd3592991f8d395c54b704155782d47d533d933..ef28edec5f953e94db7558959c5bd794f551c09d 100644 --- a/SAS/ResourceAssignment/TaskPrescheduler/lib/CMakeLists.txt +++ b/SAS/ResourceAssignment/TaskPrescheduler/lib/CMakeLists.txt @@ -2,6 +2,5 @@ python_install( __init__.py - cobaltblocksize.py DESTINATION lofar/sas/resourceassignment/taskprescheduler) diff --git a/SAS/ResourceAssignment/TaskPrescheduler/lib/cobaltblocksize.py b/SAS/ResourceAssignment/TaskPrescheduler/lib/cobaltblocksize.py deleted file mode 100644 index 38e9dd38f9f4df373218763944c6aa8e37d57951..0000000000000000000000000000000000000000 --- a/SAS/ResourceAssignment/TaskPrescheduler/lib/cobaltblocksize.py +++ /dev/null @@ -1,209 +0,0 @@ -""" - Code to derive the following parset input parameters for Cobalt. These keys need to be tuned - specifically to make sure all Cobalt processing fits inside a block. Only two processing - kernels can cross block boundaries: the FIR Filter taps, and the integration of multiple - blocks of Correlator output. - - Once the following parameters are set, the integration time of the correlator can change - slightly from what was requested. This in turn forces us to derive these keys during resource - estimation. - - Cobalt.blockSize - - The number of samples in each unit of work. Needs to be a multiple of the working size - of each individual step, for example, an 64-channel FFT requires blockSize to be a multiple - of 64. - - Cobalt.Correlator.nrBlocksPerIntegration - - The number of correlator integration periods that fit in one block. - - Cobalt.Correlator.nrIntegrationsPerBlock - - The number of blocks that together form one integration period. - - Note that either nrBlocksPerIntegration or nrIntegrationsPerBlock has to be equal to 1. -""" - -from math import ceil -from lofar.common.math import lcm - -class CorrelatorSettings(object): - """ Settings for the Correlator. """ - - def __init__(self): - self.nrChannelsPerSubband = 64 - self.integrationTime = 1.0 - -class StokesSettings(object): - """ Settings for the Beamformer. """ - - def __init__(self): - self.nrChannelsPerSubband = 1 - self.timeIntegrationFactor = 1 - -class BlockConstraints(object): - """ Provide the constraints for the block size, as derived - from the correlator and beamformer settings. """ - - def __init__(self, correlatorSettings=None, coherentStokesSettings=[], incoherentStokesSettings=[], clockMHz=200): - self.correlator = correlatorSettings - self.coherentStokes = coherentStokesSettings - self.incoherentStokes = incoherentStokesSettings - - self.clockMHz = clockMHz - - def minBlockSize(self): - """ Block size below which the overhead per block becomes unwieldy. """ - - # 0.6s is an estimate. - return int(round(self._time2samples(0.6))) - - def maxBlockSize(self): - """ Block size above which the data does not fit on the GPU. """ - - # 1.3s is an estimate. - return int(round(self._time2samples(1.3))) - - def nrSubblocks(self): - if self.correlator: - integrationSamples = self._time2samples(self.correlator.integrationTime) - if integrationSamples < self.minBlockSize(): - def average(x, y): - return (x + y) / 2.0 - - return max(1, int(round(average(self.maxBlockSize(), self.minBlockSize()) / integrationSamples))) - - return 1 - - def idealBlockSize(self): - integrationTime = self.correlator.integrationTime if self.correlator else 1.0 - return self.nrSubblocks() * self._time2samples(integrationTime) - - def factor(self): - """ - Determine common factors needed for the block Size. - - The Cobalt GPU kernels require the Cobalt.blockSize to be a multiple - of several values in order to: - 1) divide the work evenly over threads and blocks. - 2) prevent integration of samples from crossing blockSize boundaries. - """ - - factor = 1 - - NR_PPF_TAPS = 16 - CORRELATOR_BLOCKSIZE = 16 - BEAMFORMER_NR_DELAYCOMPENSATION_CHANNELS = 256 - BEAMFORMER_DELAYCOMPENSATION_BLOCKSIZE = 16 - - # Process correlator settings - if self.correlator: - # FIR_Filter.cu - factor = lcm(factor, NR_PPF_TAPS * self.correlator.nrChannelsPerSubband) - - # Correlator.cu (minimum of 16 samples per channel) - factor = lcm(factor, CORRELATOR_BLOCKSIZE * self.correlator.nrChannelsPerSubband * self.nrSubblocks()) - - for coherentStokes in self.coherentStokes: - # DelayAndBandPass.cu - factor = lcm(factor, BEAMFORMER_DELAYCOMPENSATION_BLOCKSIZE * BEAMFORMER_NR_DELAYCOMPENSATION_CHANNELS) - - # FIR_Filter.cu - factor = lcm(factor, NR_PPF_TAPS * coherentStokes.nrChannelsPerSubband) - - # CoherentStokesKernel.cc searches for the best fit, supporting a wide range of configurations by - # splitting up the work load into passes. There could be a performance impact for some ill-chosen - # values, but that is not something this code is tasked with. - pass - - # CoherentStokes.cu (produce at least one output sample/channel) - factor = lcm(factor, coherentStokes.timeIntegrationFactor * coherentStokes.nrChannelsPerSubband) - - for incoherentStokes in self.incoherentStokes: - # DelayAndBandPass.cu - factor = lcm(factor, BEAMFORMER_DELAYCOMPENSATION_BLOCKSIZE * BEAMFORMER_NR_DELAYCOMPENSATION_CHANNELS) - - # FIR_Filter.cu - factor = lcm(factor, NR_PPF_TAPS * incoherentStokes.nrChannelsPerSubband) - - # IncoherentStokes.cu (produce at least one output sample/channel) - factor = lcm(factor, incoherentStokes.timeIntegrationFactor * incoherentStokes.nrChannelsPerSubband) - - return factor - - def __samples_per_second(self): - MHZ_PER_HZ = 1e6 - STATION_FFT_LENGTH = 1024 - - return self.clockMHz * MHZ_PER_HZ / STATION_FFT_LENGTH - - def _time2samples(self, t): - """ Convert a time `t' (seconds) into a number of station samples. """ - return int(round(t * self.__samples_per_second())) - - def _samples2time(self, samples): - """ Return the duration of a number of station samples. """ - return samples / self.__samples_per_second() - -class BlockSize(object): - """ Derive Cobalt specifications given BlockConstraints. Output: - - BlockSize member | Cobalt parset key - --------------------------------------- - blockSize | Cobalt.blockSize - nrSubblocks | Cobalt.Correlator.nrIntegrationsPerBlock - nrBlocks | Cobalt.Correlator.nrBlocksPerIntegration - integrationTime | Cobalt.Correlator.integrationTime - """ - def __init__(self, constraints): - self.constraints = constraints - self.nrSubblocks = constraints.nrSubblocks() - self.blockSize = self._blockSize(constraints.idealBlockSize(), constraints.factor()) - self.nrBlocks = self._nrBlocks(constraints.idealBlockSize(), self.blockSize) - - if self.nrSubblocks > 1: - self.integrationSamples = self.blockSize / self.nrSubblocks - else: - self.integrationSamples = self.blockSize * self.nrBlocks - - self.integrationTime = constraints._samples2time(self.integrationSamples) - - def _nrBlocks(self, integrationSamples, blockSize): - return max(1, int(round(integrationSamples / blockSize))) - - def _blockSize(self, integrationSamples, factor): - bestBlockSize = None - bestNrBlocks = None - bestError = None - - # Create a comfortable range to search in for possible fits. - maxFactorPerBlock = int(ceil(integrationSamples / factor)) * 2 - - for factorsPerBlock in range(1, maxFactorPerBlock): - blockSize = factorsPerBlock * factor; - - # Discard invalid block sizes - if blockSize < self.constraints.minBlockSize(): - continue - - if blockSize > self.constraints.maxBlockSize(): - continue - - # Calculate the number of blocks we'd use - nrBlocks = self._nrBlocks(integrationSamples, blockSize) - - # Calculate error for this solution - diff = lambda a,b: max(a,b) - min(a,b) - error = diff(integrationSamples, nrBlocks * blockSize) - - # Accept this candidate if best so far. Prefer - # fewer blocks if candidates are (nearly) equal in their error. - if not bestBlockSize \ - or error < bestError \ - or (error < 0.01 * integrationSamples and nrBlocks < bestNrBlocks): - bestBlockSize = blockSize - bestNrBlocks = nrBlocks - bestError = error - - return int(round(bestBlockSize)) diff --git a/SAS/ResourceAssignment/TaskPrescheduler/taskprescheduler.py b/SAS/ResourceAssignment/TaskPrescheduler/taskprescheduler.py index 5cf07d6b85ab9866355c1a352df47d1a3697e1ab..fcda3724c3e682994505487c5f7d57f2dd282cc4 100644 --- a/SAS/ResourceAssignment/TaskPrescheduler/taskprescheduler.py +++ b/SAS/ResourceAssignment/TaskPrescheduler/taskprescheduler.py @@ -31,7 +31,7 @@ from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME from lofar.sas.resourceassignment.resourceassigner.rabuslistener import RABusListener, RAEventMessageHandler from lofar.sas.otdb.otdbrpc import OTDBRPC from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC -from lofar.sas.resourceassignment.taskprescheduler.cobaltblocksize import CorrelatorSettings, StokesSettings, BlockConstraints, BlockSize +from lofar.common.cobaltblocksize import CorrelatorSettings, StokesSettings, BlockConstraints, BlockSize from lofar.sas.resourceassignment.database.radb import RADatabase from lofar.sas.resourceassignment.common.specification import Specification from lofar.sas.resourceassignment.common.specification import OUTPUT_PREFIX diff --git a/SAS/ResourceAssignment/TaskPrescheduler/test/CMakeLists.txt b/SAS/ResourceAssignment/TaskPrescheduler/test/CMakeLists.txt index 402d031aeba098b62831c5f885f925f429ee5899..3070b923c32208d8db1276fd8548109b7e977329 100644 --- a/SAS/ResourceAssignment/TaskPrescheduler/test/CMakeLists.txt +++ b/SAS/ResourceAssignment/TaskPrescheduler/test/CMakeLists.txt @@ -3,5 +3,4 @@ include(LofarCTest) include(FindPythonModule) lofar_add_test(test_taskprescheduler) -lofar_add_test(t_cobaltblocksize) diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py index ce2d4c74b035198d660acfa6293cf772a453a9fa..d4bb844cf9a778e98f8fa6452ec228169d0a4c41 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py @@ -23,7 +23,7 @@ from lofar.sas.tmss.tmss.tmssapp.models import * from lofar.sas.resourceassignment.resourceassigner.rarpc import RARPC from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC from lofar.sas.tmss.tmss.tmssapp.adapters.parset import convert_to_parset_dict -from lofar.sas.resourceassignment.taskprescheduler.cobaltblocksize import CorrelatorSettings, StokesSettings, BlockConstraints, BlockSize +from lofar.common.cobaltblocksize import CorrelatorSettings, StokesSettings, BlockConstraints, BlockSize from lofar.sas.resourceassignment.resourceassigner.schedulers import ScheduleException from lofar.mac.observation_control_rpc import ObservationControlRPCClient from lofar.mac.pipeline_control_rpc import PipelineControlRPCClient