Skip to content
Snippets Groups Projects
Commit fb9c838c authored by Jan David Mol's avatar Jan David Mol
Browse files

Task #9304: Add module to determine Cobalt block configuration (cherry-picked...

Task #9304: Add module to determine Cobalt block configuration (cherry-picked files from task branch to avoid svn hell)
parent 62ff0d4c
No related branches found
No related tags found
No related merge requests found
......@@ -2082,6 +2082,7 @@ LCS/PyCommon/__init__.py -text
LCS/PyCommon/datetimeutils.py -text
LCS/PyCommon/factory.py -text
LCS/PyCommon/flask_utils.py -text
LCS/PyCommon/math.py -text
LCS/PyCommon/postgres.py -text
LCS/PyCommon/subprocess.py -text
LCS/PyCommon/test/python-coverage.sh eol=lf
......@@ -5062,6 +5063,8 @@ SAS/ResourceAssignment/ResourceAssignmentEditor/test/test_webservice.sh -text
SAS/ResourceAssignment/ResourceAssignmentEstimator/CMakeLists.txt -text
SAS/ResourceAssignment/ResourceAssignmentEstimator/__init__.py -text
SAS/ResourceAssignment/ResourceAssignmentEstimator/config.py -text
SAS/ResourceAssignment/ResourceAssignmentEstimator/lib/CMakeLists.txt -text
SAS/ResourceAssignment/ResourceAssignmentEstimator/lib/cobaltblocksize.py -text
SAS/ResourceAssignment/ResourceAssignmentEstimator/raestimatorservice -text
SAS/ResourceAssignment/ResourceAssignmentEstimator/raestimatorservice.ini -text
SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/CMakeLists.txt -text
......@@ -5095,6 +5098,9 @@ SAS/ResourceAssignment/ResourceAssignmentEstimator/test/data_sets/t_resource_est
SAS/ResourceAssignment/ResourceAssignmentEstimator/test/data_sets/t_resource_estimator.out_long_baseline_pipeline -text
SAS/ResourceAssignment/ResourceAssignmentEstimator/test/data_sets/t_resource_estimator.out_preprocessing_pipeline -text
SAS/ResourceAssignment/ResourceAssignmentEstimator/test/data_sets/t_resource_estimator.out_pulsar_observation -text
SAS/ResourceAssignment/ResourceAssignmentEstimator/test/t_cobaltblocksize.py -text
SAS/ResourceAssignment/ResourceAssignmentEstimator/test/t_cobaltblocksize.run -text
SAS/ResourceAssignment/ResourceAssignmentEstimator/test/t_cobaltblocksize.sh -text
SAS/ResourceAssignment/ResourceAssignmentEstimator/test/t_resource_estimator.py -text
SAS/ResourceAssignment/ResourceAssignmentEstimator/test/t_resource_estimator.run -text svneol=unset#application/x-shellscript
SAS/ResourceAssignment/ResourceAssignmentEstimator/test/t_resource_estimator.sh -text svneol=unset#application/x-shellscript
......
......@@ -9,6 +9,7 @@ set(_py_files
__init__.py
dbcredentials.py
factory.py
math.py
methodtrigger.py
util.py
postgres.py
......
from fractions import gcd
__all__ = ["lcm"]
def lcm(a, b):
""" Return the Least Common Multiple of a and b. """
return abs(a * b) / gcd(a, b) if a and b else 0
......@@ -20,5 +20,6 @@ install(FILES
raestimatorservice.ini
DESTINATION etc/supervisord.d)
add_subdirectory(lib)
add_subdirectory(test)
add_subdirectory(resource_estimators)
# $Id: CMakeLists.txt 32905 2015-11-17 15:31:54Z schaap $
python_install(
__init__.py
cobaltblocksize.py
DESTINATION lofar/sas/resourceassignment/resourceassignmentestimator)
"""
Code to derive the following parset input parameters for Cobalt. These keys need to be tuned
specifically to make sure all Cobalt processing fits inside a block. Only two processing
kernels can cross block boundaries: the FIR Filter taps, and the integration of multiple
blocks of Correlator output.
Once the following parameters are set, the integration time of the correlator can change
slightly from what was requested. This in turn forces us to derive these keys during resource
estimation.
Cobalt.blockSize
The number of samples in each unit of work. Needs to be a multiple of the working size
of each individual step, for example, an 64-channel FFT requires blockSize to be a multiple
of 64.
Cobalt.Correlator.nrBlocksPerIntegration
The number of correlator integration periods that fit in one block.
Cobalt.Correlator.nrIntegrationsPerBlock
The number of blocks that together form one integration period.
Note that either nrBlocksPerIntegration or nrIntegrationsPerBlock has to be equal to 1.
"""
from fractions import gcd
from math import ceil
from lofar.common.math import lcm
class CorrelatorSettings(object):
""" Settings for the Correlator. """
def __init__(self):
self.nrChannelsPerSubband = 64
self.integrationTime = 1.0
class StokesSettings(object):
""" Settings for the Beamformer. """
def __init__(self):
self.nrChannelsPerSubband = 1
self.timeIntegrationFactor = 1
class BlockConstraints(object):
""" Provide the constraints for the block size, as derived
from the correlator and beamformer settings. """
def __init__(self, correlatorSettings=None, coherentStokesSettings=None, incoherentStokesSettings=None, clockMHz=200):
self.correlator = correlatorSettings
self.coherentStokes = coherentStokesSettings
self.incoherentStokes = incoherentStokesSettings
self.clockMHz = clockMHz
def minBlockSize(self):
""" Block size below which the overhead per block becomes unwieldy. """
# 0.6s is an estimate.
return int(round(self._time2samples(0.6)))
def maxBlockSize(self):
""" Block size above which the data does not fit on the GPU. """
# 1.3s is an estimate.
return int(round(self._time2samples(1.3)))
def nrSubblocks(self):
if self.correlator:
integrationSamples = self._time2samples(self.correlator.integrationTime)
if integrationSamples < self.minBlockSize():
def average(x, y):
return (x + y) / 2.0
return max(1, int(round(average(self.maxBlockSize() + self.minBlockSize()) / integrationSamples)))
return 1
def idealBlockSize(self):
integrationTime = self.correlator.integrationTime if self.correlator else 1.0
return self.nrSubblocks() * self._time2samples(integrationTime)
def factor(self):
"""
Determine common factors needed for the block Size.
The Cobalt GPU kernels require the Cobalt.blockSize to be a multiple
of several values in order to:
1) divide the work evenly over threads and blocks.
2) prevent integration of samples from crossing blockSize boundaries.
"""
factor = 1
NR_PPF_TAPS = 16
MAX_THREADS_PER_BLOCK = 1024
CORRELATOR_BLOCKSIZE = 16
BEAMFORMER_NR_DELAYCOMPENSATION_CHANNELS = 64
BEAMFORMER_DELAYCOMPENSATION_BLOCKSIZE = 16
# Process correlator settings
if self.correlator:
# FIR_Filter.cu
factor = lcm(factor, NR_PPF_TAPS * self.correlator.nrChannelsPerSubband)
# Correlator.cu (minimum of 16 samples per channel)
factor = lcm(factor, CORRELATOR_BLOCKSIZE * self.correlator.nrChannelsPerSubband * self.nrSubblocks())
if self.coherentStokes:
# DelayAndBandPass.cu
factor = lcm(factor, BEAMFORMER_DELAYCOMPENSATION_BLOCKSIZE * BEAMFORMER_NR_DELAYCOMPENSATION_CHANNELS)
# FIR_Filter.cu
factor = lcm(factor, NR_PPF_TAPS * self.coherentStokes.nrChannelsPerSubband)
# CoherentStokesKernel.cc
factor = lcm(factor, MAX_THREADS_PER_BLOCK * self.coherentStokes.timeIntegrationFactor)
#CoherentStokes.cu (integration should fit)
factor = lcm(factor, 1024 * self.coherentStokes.timeIntegrationFactor * self.coherentStokes.nrChannelsPerSubband)
if self.incoherentStokes:
# DelayAndBandPass.cu
factor = lcm(factor, BEAMFORMER_DELAYCOMPENSATION_BLOCKSIZE * BEAMFORMER_NR_DELAYCOMPENSATION_CHANNELS)
# FIR_Filter.cu
factor = lcm(factor, NR_PPF_TAPS * self.incoherentStokes.nrChannelsPerSubband)
# IncoherentStokes.cu (integration should fit)
factor = lcm(factor, 1024 * self.incoherentStokes.timeIntegrationFactor * self.incoherentStokes.nrChannelsPerSubband)
return factor
def __samples_per_second(self):
MHZ_PER_HZ = 1e6
STATION_FFT_LENGTH = 1024
return self.clockMHz * MHZ_PER_HZ / STATION_FFT_LENGTH
def _time2samples(self, t):
""" Convert a time `t' (seconds) into a number of station samples. """
return int(round(t * self.__samples_per_second()))
def _samples2time(self, samples):
""" Return the duration of a number of station samples. """
return samples / self.__samples_per_second()
class BlockSize(object):
""" Derive Cobalt specifications given BlockConstraints. Output:
BlockSize member | Cobalt parset key
---------------------------------------
blockSize | Cobalt.blockSize
nrSubblocks | Cobalt.Correlator.nrIntegrationsPerBlock
nrBlocks | Cobalt.Correlator.nrBlocksPerIntegration
integrationTime | Cobalt.Correlator.integrationTime
"""
def __init__(self, constraints):
self.constraints = constraints
self.nrSubblocks = constraints.nrSubblocks()
self.blockSize = self._blockSize(constraints.idealBlockSize(), constraints.factor())
self.nrBlocks = self._nrBlocks(constraints.idealBlockSize(), self.blockSize)
if self.nrSubblocks > 1:
self.integrationSamples = self.blockSize / self.nrSubblocks
else:
self.integrationSamples = self.blockSize * self.nrBlocks
self.integrationTime = constraints._samples2time(self.integrationSamples)
def _nrBlocks(self, integrationSamples, blockSize):
return max(1, int(round(integrationSamples / blockSize)))
def _blockSize(self, integrationSamples, factor):
bestBlockSize = None
bestNrBlocks = None
bestError = None
# Create a comfortable range to search in for possible fits.
maxFactorPerBlock = int(ceil(integrationSamples / factor)) * 2
for factorsPerBlock in xrange(1, maxFactorPerBlock):
blockSize = factorsPerBlock * factor;
# Discard invalid block sizes
if blockSize < self.constraints.minBlockSize():
continue
if blockSize > self.constraints.maxBlockSize():
continue
# Calculate the number of blocks we'd use
nrBlocks = self._nrBlocks(integrationSamples, blockSize)
# Calculate error for this solution
diff = lambda a,b: max(a,b) - min(a,b)
error = diff(integrationSamples, nrBlocks * blockSize)
# Accept this candidate if best so far. Prefer
# fewer blocks if candidates are (nearly) equal in their error.
if not bestBlockSize \
or error < bestError \
or (error < 0.01 * integrationSamples and nrBlocks < bestNrBlocks):
bestBlockSize = blockSize
bestNrBlocks = nrBlocks
bestError = error
return bestBlockSize
# $Id: CMakeLists.txt $
include(LofarCTest)
lofar_add_test(t_cobaltblocksize)
#lofar_add_test(t_resource_estimator) # to be re-enabled when estimator pipeline code is fixed by Alexander
set(_py_files
......
import unittest
from lofar.sas.resourceassignment.resourceassignmentestimator import cobaltblocksize
def frange(x, y, jump):
n = 0
while x + n * jump < y:
yield x + n * jump
class TestCorrelatorIntegrationTimeApproximation(unittest.TestCase):
"""
Test whether the requested correlator integration time can be sufficiently approximated.
"""
def test_correlator_subsecond(self):
correlator_settings = cobaltblocksize.CorrelatorSettings()
for clockMHz in [160,200]:
for correlator_settings.nrChannelsPerSubband in [16, 64, 256]:
for correlator_settings.integrationTime in frange(0.1, 1.0, 0.05):
c = BlockConstraints(correlator_settings)
if __name__ == '__main__':
unittest.main()
#!/bin/bash
source python-coverage.sh
python_coverage_test "cobaltblocksize*" t_cobaltblocksize.py
#!/bin/sh
./runctest.sh t_cobaltblocksize
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment