diff --git a/.gitattributes b/.gitattributes index d19e47c19ef048f266e3ff9637468ed3032a5fa4..9efb6da3e097f04f7be9ff72b8a6ee7e2ffb3ca1 100644 --- a/.gitattributes +++ b/.gitattributes @@ -2082,6 +2082,7 @@ LCS/PyCommon/__init__.py -text LCS/PyCommon/datetimeutils.py -text LCS/PyCommon/factory.py -text LCS/PyCommon/flask_utils.py -text +LCS/PyCommon/math.py -text LCS/PyCommon/postgres.py -text LCS/PyCommon/subprocess.py -text LCS/PyCommon/test/python-coverage.sh eol=lf @@ -5062,6 +5063,8 @@ SAS/ResourceAssignment/ResourceAssignmentEditor/test/test_webservice.sh -text SAS/ResourceAssignment/ResourceAssignmentEstimator/CMakeLists.txt -text SAS/ResourceAssignment/ResourceAssignmentEstimator/__init__.py -text SAS/ResourceAssignment/ResourceAssignmentEstimator/config.py -text +SAS/ResourceAssignment/ResourceAssignmentEstimator/lib/CMakeLists.txt -text +SAS/ResourceAssignment/ResourceAssignmentEstimator/lib/cobaltblocksize.py -text SAS/ResourceAssignment/ResourceAssignmentEstimator/raestimatorservice -text SAS/ResourceAssignment/ResourceAssignmentEstimator/raestimatorservice.ini -text SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/CMakeLists.txt -text @@ -5095,6 +5098,9 @@ SAS/ResourceAssignment/ResourceAssignmentEstimator/test/data_sets/t_resource_est SAS/ResourceAssignment/ResourceAssignmentEstimator/test/data_sets/t_resource_estimator.out_long_baseline_pipeline -text SAS/ResourceAssignment/ResourceAssignmentEstimator/test/data_sets/t_resource_estimator.out_preprocessing_pipeline -text SAS/ResourceAssignment/ResourceAssignmentEstimator/test/data_sets/t_resource_estimator.out_pulsar_observation -text +SAS/ResourceAssignment/ResourceAssignmentEstimator/test/t_cobaltblocksize.py -text +SAS/ResourceAssignment/ResourceAssignmentEstimator/test/t_cobaltblocksize.run -text +SAS/ResourceAssignment/ResourceAssignmentEstimator/test/t_cobaltblocksize.sh -text SAS/ResourceAssignment/ResourceAssignmentEstimator/test/t_resource_estimator.py -text SAS/ResourceAssignment/ResourceAssignmentEstimator/test/t_resource_estimator.run -text svneol=unset#application/x-shellscript SAS/ResourceAssignment/ResourceAssignmentEstimator/test/t_resource_estimator.sh -text svneol=unset#application/x-shellscript diff --git a/LCS/PyCommon/CMakeLists.txt b/LCS/PyCommon/CMakeLists.txt index 9a2343ae01b52c48b894fa33e8ec397959d9a7f4..2dc07661ee1f50d610516c89d7221579abdc43b2 100644 --- a/LCS/PyCommon/CMakeLists.txt +++ b/LCS/PyCommon/CMakeLists.txt @@ -9,6 +9,7 @@ set(_py_files __init__.py dbcredentials.py factory.py + math.py methodtrigger.py util.py postgres.py diff --git a/LCS/PyCommon/math.py b/LCS/PyCommon/math.py new file mode 100644 index 0000000000000000000000000000000000000000..b461a4202bbd588b5184647ce0cb7210317444ec --- /dev/null +++ b/LCS/PyCommon/math.py @@ -0,0 +1,7 @@ +from fractions import gcd + +__all__ = ["lcm"] + +def lcm(a, b): + """ Return the Least Common Multiple of a and b. """ + return abs(a * b) / gcd(a, b) if a and b else 0 diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/CMakeLists.txt b/SAS/ResourceAssignment/ResourceAssignmentEstimator/CMakeLists.txt index ea2d8d4c8ddff636568439f305c1cf1967cd31de..ea07c75bb19b5ec6d6f9a0969cceee692fa081b4 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/CMakeLists.txt +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/CMakeLists.txt @@ -20,5 +20,6 @@ install(FILES raestimatorservice.ini DESTINATION etc/supervisord.d) +add_subdirectory(lib) add_subdirectory(test) add_subdirectory(resource_estimators) diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/lib/CMakeLists.txt b/SAS/ResourceAssignment/ResourceAssignmentEstimator/lib/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..d2cac297f54d39c22b1601fba27f27ef03bf9c98 --- /dev/null +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/lib/CMakeLists.txt @@ -0,0 +1,7 @@ +# $Id: CMakeLists.txt 32905 2015-11-17 15:31:54Z schaap $ + +python_install( + __init__.py + cobaltblocksize.py + DESTINATION lofar/sas/resourceassignment/resourceassignmentestimator) + diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/lib/cobaltblocksize.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/lib/cobaltblocksize.py new file mode 100644 index 0000000000000000000000000000000000000000..4e5ce72369ef48603c2ce62605f636a792f3f514 --- /dev/null +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/lib/cobaltblocksize.py @@ -0,0 +1,209 @@ +""" + Code to derive the following parset input parameters for Cobalt. These keys need to be tuned + specifically to make sure all Cobalt processing fits inside a block. Only two processing + kernels can cross block boundaries: the FIR Filter taps, and the integration of multiple + blocks of Correlator output. + + Once the following parameters are set, the integration time of the correlator can change + slightly from what was requested. This in turn forces us to derive these keys during resource + estimation. + + Cobalt.blockSize + + The number of samples in each unit of work. Needs to be a multiple of the working size + of each individual step, for example, an 64-channel FFT requires blockSize to be a multiple + of 64. + + Cobalt.Correlator.nrBlocksPerIntegration + + The number of correlator integration periods that fit in one block. + + Cobalt.Correlator.nrIntegrationsPerBlock + + The number of blocks that together form one integration period. + + Note that either nrBlocksPerIntegration or nrIntegrationsPerBlock has to be equal to 1. +""" + +from fractions import gcd +from math import ceil +from lofar.common.math import lcm + +class CorrelatorSettings(object): + """ Settings for the Correlator. """ + + def __init__(self): + self.nrChannelsPerSubband = 64 + self.integrationTime = 1.0 + +class StokesSettings(object): + """ Settings for the Beamformer. """ + + def __init__(self): + self.nrChannelsPerSubband = 1 + self.timeIntegrationFactor = 1 + +class BlockConstraints(object): + """ Provide the constraints for the block size, as derived + from the correlator and beamformer settings. """ + + def __init__(self, correlatorSettings=None, coherentStokesSettings=None, incoherentStokesSettings=None, clockMHz=200): + self.correlator = correlatorSettings + self.coherentStokes = coherentStokesSettings + self.incoherentStokes = incoherentStokesSettings + + self.clockMHz = clockMHz + + def minBlockSize(self): + """ Block size below which the overhead per block becomes unwieldy. """ + + # 0.6s is an estimate. + return int(round(self._time2samples(0.6))) + + def maxBlockSize(self): + """ Block size above which the data does not fit on the GPU. """ + + # 1.3s is an estimate. + return int(round(self._time2samples(1.3))) + + def nrSubblocks(self): + if self.correlator: + integrationSamples = self._time2samples(self.correlator.integrationTime) + if integrationSamples < self.minBlockSize(): + def average(x, y): + return (x + y) / 2.0 + + return max(1, int(round(average(self.maxBlockSize() + self.minBlockSize()) / integrationSamples))) + + return 1 + + def idealBlockSize(self): + integrationTime = self.correlator.integrationTime if self.correlator else 1.0 + return self.nrSubblocks() * self._time2samples(integrationTime) + + def factor(self): + """ + Determine common factors needed for the block Size. + + The Cobalt GPU kernels require the Cobalt.blockSize to be a multiple + of several values in order to: + 1) divide the work evenly over threads and blocks. + 2) prevent integration of samples from crossing blockSize boundaries. + """ + + factor = 1 + + NR_PPF_TAPS = 16 + MAX_THREADS_PER_BLOCK = 1024 + CORRELATOR_BLOCKSIZE = 16 + BEAMFORMER_NR_DELAYCOMPENSATION_CHANNELS = 64 + BEAMFORMER_DELAYCOMPENSATION_BLOCKSIZE = 16 + + # Process correlator settings + if self.correlator: + # FIR_Filter.cu + factor = lcm(factor, NR_PPF_TAPS * self.correlator.nrChannelsPerSubband) + + # Correlator.cu (minimum of 16 samples per channel) + factor = lcm(factor, CORRELATOR_BLOCKSIZE * self.correlator.nrChannelsPerSubband * self.nrSubblocks()) + + if self.coherentStokes: + # DelayAndBandPass.cu + factor = lcm(factor, BEAMFORMER_DELAYCOMPENSATION_BLOCKSIZE * BEAMFORMER_NR_DELAYCOMPENSATION_CHANNELS) + + # FIR_Filter.cu + factor = lcm(factor, NR_PPF_TAPS * self.coherentStokes.nrChannelsPerSubband) + + # CoherentStokesKernel.cc + factor = lcm(factor, MAX_THREADS_PER_BLOCK * self.coherentStokes.timeIntegrationFactor) + + #CoherentStokes.cu (integration should fit) + factor = lcm(factor, 1024 * self.coherentStokes.timeIntegrationFactor * self.coherentStokes.nrChannelsPerSubband) + + if self.incoherentStokes: + # DelayAndBandPass.cu + factor = lcm(factor, BEAMFORMER_DELAYCOMPENSATION_BLOCKSIZE * BEAMFORMER_NR_DELAYCOMPENSATION_CHANNELS) + + # FIR_Filter.cu + factor = lcm(factor, NR_PPF_TAPS * self.incoherentStokes.nrChannelsPerSubband) + + # IncoherentStokes.cu (integration should fit) + factor = lcm(factor, 1024 * self.incoherentStokes.timeIntegrationFactor * self.incoherentStokes.nrChannelsPerSubband) + + return factor + + def __samples_per_second(self): + MHZ_PER_HZ = 1e6 + STATION_FFT_LENGTH = 1024 + + return self.clockMHz * MHZ_PER_HZ / STATION_FFT_LENGTH + + def _time2samples(self, t): + """ Convert a time `t' (seconds) into a number of station samples. """ + return int(round(t * self.__samples_per_second())) + + def _samples2time(self, samples): + """ Return the duration of a number of station samples. """ + return samples / self.__samples_per_second() + +class BlockSize(object): + """ Derive Cobalt specifications given BlockConstraints. Output: + + BlockSize member | Cobalt parset key + --------------------------------------- + blockSize | Cobalt.blockSize + nrSubblocks | Cobalt.Correlator.nrIntegrationsPerBlock + nrBlocks | Cobalt.Correlator.nrBlocksPerIntegration + integrationTime | Cobalt.Correlator.integrationTime + """ + def __init__(self, constraints): + self.constraints = constraints + self.nrSubblocks = constraints.nrSubblocks() + self.blockSize = self._blockSize(constraints.idealBlockSize(), constraints.factor()) + self.nrBlocks = self._nrBlocks(constraints.idealBlockSize(), self.blockSize) + + if self.nrSubblocks > 1: + self.integrationSamples = self.blockSize / self.nrSubblocks + else: + self.integrationSamples = self.blockSize * self.nrBlocks + + self.integrationTime = constraints._samples2time(self.integrationSamples) + + def _nrBlocks(self, integrationSamples, blockSize): + return max(1, int(round(integrationSamples / blockSize))) + + def _blockSize(self, integrationSamples, factor): + bestBlockSize = None + bestNrBlocks = None + bestError = None + + # Create a comfortable range to search in for possible fits. + maxFactorPerBlock = int(ceil(integrationSamples / factor)) * 2 + + for factorsPerBlock in xrange(1, maxFactorPerBlock): + blockSize = factorsPerBlock * factor; + + # Discard invalid block sizes + if blockSize < self.constraints.minBlockSize(): + continue + + if blockSize > self.constraints.maxBlockSize(): + continue + + # Calculate the number of blocks we'd use + nrBlocks = self._nrBlocks(integrationSamples, blockSize) + + # Calculate error for this solution + diff = lambda a,b: max(a,b) - min(a,b) + error = diff(integrationSamples, nrBlocks * blockSize) + + # Accept this candidate if best so far. Prefer + # fewer blocks if candidates are (nearly) equal in their error. + if not bestBlockSize \ + or error < bestError \ + or (error < 0.01 * integrationSamples and nrBlocks < bestNrBlocks): + bestBlockSize = blockSize + bestNrBlocks = nrBlocks + bestError = error + + return bestBlockSize diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/test/CMakeLists.txt b/SAS/ResourceAssignment/ResourceAssignmentEstimator/test/CMakeLists.txt index 933573b01b0ef2842f7ce39bc9ea98e0b68da095..822554ff7096f07ea5404025e9a8f93fc76bc1d4 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/test/CMakeLists.txt +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/test/CMakeLists.txt @@ -1,6 +1,7 @@ # $Id: CMakeLists.txt $ include(LofarCTest) +lofar_add_test(t_cobaltblocksize) #lofar_add_test(t_resource_estimator) # to be re-enabled when estimator pipeline code is fixed by Alexander set(_py_files diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/test/t_cobaltblocksize.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/test/t_cobaltblocksize.py new file mode 100644 index 0000000000000000000000000000000000000000..fbccde57279a2fce1c0d6cd8d3040b8376495821 --- /dev/null +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/test/t_cobaltblocksize.py @@ -0,0 +1,25 @@ +import unittest +from lofar.sas.resourceassignment.resourceassignmentestimator import cobaltblocksize + +def frange(x, y, jump): + n = 0 + while x + n * jump < y: + yield x + n * jump + +class TestCorrelatorIntegrationTimeApproximation(unittest.TestCase): + """ + Test whether the requested correlator integration time can be sufficiently approximated. + """ + + def test_correlator_subsecond(self): + correlator_settings = cobaltblocksize.CorrelatorSettings() + + for clockMHz in [160,200]: + for correlator_settings.nrChannelsPerSubband in [16, 64, 256]: + for correlator_settings.integrationTime in frange(0.1, 1.0, 0.05): + c = BlockConstraints(correlator_settings) + + + +if __name__ == '__main__': + unittest.main() diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/test/t_cobaltblocksize.run b/SAS/ResourceAssignment/ResourceAssignmentEstimator/test/t_cobaltblocksize.run new file mode 100755 index 0000000000000000000000000000000000000000..4841812eadce96cde245480acb160b6f1b89a1fa --- /dev/null +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/test/t_cobaltblocksize.run @@ -0,0 +1,4 @@ +#!/bin/bash + +source python-coverage.sh +python_coverage_test "cobaltblocksize*" t_cobaltblocksize.py diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/test/t_cobaltblocksize.sh b/SAS/ResourceAssignment/ResourceAssignmentEstimator/test/t_cobaltblocksize.sh new file mode 100755 index 0000000000000000000000000000000000000000..de952a2129968e918dfcacd0d15dde06a7e84c8e --- /dev/null +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/test/t_cobaltblocksize.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +./runctest.sh t_cobaltblocksize