Skip to content
Snippets Groups Projects
Commit 9af385b2 authored by John Romein's avatar John Romein
Browse files

BugID: 225

Added support for adjustable OLAP.StorageProc.subbandsPerMS
parent c4ea00f6
No related branches found
No related tags found
No related merge requests found
......@@ -19,8 +19,8 @@ DOCHDRS = $(INSTHDRS) $(NOINSTHDRS)
bin_PROGRAMS = CS1_BGL_Processing
CCAS = /bgl/BlueLight/ppcfloor/blrts-gnu/powerpc-bgl-blrts-gnu/bin/gcc
#CCAS = gcc
#CCAS = /bgl/BlueLight/ppcfloor/blrts-gnu/powerpc-bgl-blrts-gnu/bin/gcc
CCAS = gcc
CCASFLAGS = $(patsubst -q%,,$(CPPFLAGS)) $(EXTRA_CPPFLAGS)
CS1_BGL_Processing_SOURCES = $(DOCHDRS) \
......
......@@ -18,7 +18,8 @@ tWH_BGL_Processing.cc \
transpose_SOURCES = \
transpose.cc
CCAS = /bgl/BlueLight/ppcfloor/blrts-gnu/powerpc-bgl-blrts-gnu/bin/gcc
#CCAS = /bgl/BlueLight/ppcfloor/blrts-gnu/powerpc-bgl-blrts-gnu/bin/gcc
CCAS = gcc
CCASFLAGS = $(patsubst -q%,,$(CPPFLAGS)) $(EXTRA_CPPFLAGS)
......
......@@ -9,7 +9,7 @@
# Do not change these hosts without changing the machinefile(s). Host on which TFC_DelayCompensation runs.
OLAP.OLAP_Conn.AMCServerHost = localhost
OLAP.DelayComp.hostname = list012
OLAP.DelayComp.hostname = list001
# The ports for all (socket)connections
......@@ -31,7 +31,7 @@ OLAP.OLAP_Conn.station_Input_Transport = UDP
# The format of the data
# Variables for Storage
OLAP.subbandsPerPset = 3
OLAP.subbandsPerPset = 4
OLAP.psetsPerStorage = 2
OLAP.storageIntegrationTime = 60
OLAP.minorIntegrationSteps = 608 #768 at 200MHz
......@@ -39,6 +39,7 @@ OLAP.BGLProc.nrPPFTaps=16
OLAP.BGLProc.psetsPerCell = 1
OLAP.BGLProc.nodesPerPset = 8
OLAP.BGLProc.maxConcurrentComm = 2
OLAP.StorageProc.subbandsPerMS = 1
# Variables for the DelayCompensation
OLAP.DelayComp.converterType = IMPL # should be one of IMPL, CLIENT
......@@ -47,7 +48,7 @@ OLAP.DelayComp.positionType = ITRF # should be ITRF
OLAP.IPHeaderSize = 32
OLAP.EPAHeaderSize = 16
OLAP.nrTimesInFrame = 16
OLAP.nrSubbandsPerFrame = 36
OLAP.nrSubbandsPerFrame = 48
OLAP.nrBitsPerSample=16
OLAP.nrSecondsOfBuffer = 26
OLAP.delayCompensation = T
......@@ -62,7 +63,8 @@ Observation.channelsPerSubband = 256
Observation.nrPolarisations = 2
Observation.year = 2007
Observation.treeID = 00001
Observation.subbandList = [307,310,313,317,320,323,326,330,333,336,339,343,346,349,353,356,359,363,366,369,374,377,380,384]
Observation.subbandList = [65, 73, 81, 89, 98, 106, 114, 122, 130, 138, 146, 155, 163, 171, 179, 187, 195, 204, 212, 220, 228, 236, 244, 252, 261, 269, 277, 285, 293, 301, 309, 318, 326, 334, 342, 350, 358, 367, 375, 383, 391, 399, 407, 415, 424, 432, 440, 448]
Observation.VirtualInstrument.stationList = ['CS010', 'CS008', 'CS001', 'CS016']
PIC.Core.CS010.inputNodeList = [lii001, lii001, lii002, lii003]
......
......@@ -10,10 +10,10 @@ listfen = ClusterFEN(name = 'listfen' ,
listfen.setSlavesByPattern('list%03d', '10.20.170.%d', [1,2,3,4,5,6,7,8,9,10,11,12])
list001 = Host(name = 'list001' , \
address = '10.20.170.1')
list002 = Host(name = 'list002' , \
address = '10.20.170.2')
list012 = Host(name = 'list012' , \
address = '10.20.170.12')
hpclf = Host(name = 'hpclf' , \
address = 'hpclf1.service.rug.nl')
bglfen1 = Host(name = 'bglfen1', \
......@@ -30,6 +30,6 @@ localhost = Host(name = 'localhost', \
address = 'localhost')
gels = UserId(bglfen1,'gels' ,'/cephome/gels/LOFAR/installed/gnu_bgl/bin/')
romein = UserId(bglfen2,'romein' ,'/cephome/romein/projects/cvs/LOFAR/installed/gnu_bgl/bin/')
lofarsystem = UserId(bglfen3,'lofarsystem' ,'/cephome/lofarsystem/LOFAR/installed/gnu_bgl/bin/')
romein = UserId(bglfen2,'romein' ,'/cephome/romein/LOFAR/installed/gnu_bgl/bin/')
broekema = UserId(bglfen2,'broekema' ,'/cephome/broekema/LOFAR/installed/gnu_bgl/bin/')
......@@ -36,7 +36,7 @@ def doObservation(obsID, parset):
sys.exit(0)
sections = [\
DelayCompensationSection(parset, list012),
DelayCompensationSection(parset, list001),
InputSection(parset, liifen),
BGLProcSection(parset, userId.getHost(), 'R000_128_0', userId.getPath()),
StorageSection(parset, listfen)
......@@ -157,17 +157,18 @@ if __name__ == '__main__':
MSNumber = '/data/L' + year + '_' + '%05d' % measurementnumber
MSName = MSNumber + '.MS'
subbandsPerStorage = parset.getInt32('OLAP.subbandsPerPset') * parset.getInt32('OLAP.psetsPerStorage')
subbandsPerMS = parset.getInt32('OLAP.StorageProc.subbandsPerMS')
if (subbandsPerStorage == 1):
if (subbandsPerMS == 1):
MSName = '\'' + MSNumber + '_SB%01d' % 0 + '.MS' + '\''
for i in range(1, len(parset.getInt32Vector('Observation.subbandList'))):
MSName = MSName + ', ' + '\'' + MSNumber + '_SB%01d' % i + '.MS' + '\''
MSName = MSName + ', \\\n' + '\'' + MSNumber + '_SB%01d' % i + '.MS' + '\''
else:
MSName = '\'' + MSNumber + '_SB%01d' % 0 + '-%01d' % (subbandsPerStorage - 1) +'.MS' + '\''
for i in range(1, len(parset.getInt32Vector('Observation.subbandList')) / subbandsPerStorage):
first = i * subbandsPerStorage
last = first + subbandsPerStorage - 1
MSName = MSName + ', ' + '\'' + MSNumber + '_SB%01d' % first + '-%01d' % last +'.MS' + '\''
MSName = '\'' + MSNumber + '_SB%01d' % 0 + '-%01d' % (subbandsPerMS - 1) +'.MS' + '\''
for i in range(1, len(parset.getInt32Vector('Observation.subbandList')) / subbandsPerMS):
first = i * subbandsPerMS
last = first + subbandsPerMS - 1
MSName = MSName + ', \\\n' + '\'' + MSNumber + '_SB%01d' % first + '-%01d' % last +'.MS' + '\''
outf = open(runningNumberFile, 'w')
outf.write(str(measurementnumber + 1) + '\n')
......
......@@ -10,7 +10,7 @@ class Section(object):
Represents a part of the CS1 application
"""
def __init__(self, parset, package, host, buildvar = 'gnu_opt', workingDir = '~/'):
def __init__(self, parset, package, host, buildvar = 'gnu_opt', workingDir = '~/projects/cvs/'):
self.workingDir = workingDir
self.parset = parset
self.package = package
......
......@@ -83,14 +83,15 @@ namespace LOFAR
uint itsNPolSquared;
uint itsNVisibilities;
MSWriter* itsWriter;
vector <MSWriter *> itsWriters;
uint itsNrSubbandsPerCell; ///< Number of subbands per BG/L cell
uint itsNrSubbandsPerStorage;
uint itsNrNodesPerCell;
uint itsNrSubbandsPerMS;
vector<uint> itsCurrentInputs;
vector<uint> itsBandIDs; ///< MS IDs of the frequency bands
uint itsFieldID; ///< MS ID of the field, i.e. the beam.
vector<uint> itsFieldIDs; ///< MS IDs of the field, i.e. the beam.
uint itsTimeCounter; ///< Counts the time
uint itsTimesToIntegrate; ///< Number of timeSteps to integrate
bool *itsFlagsBuffers;//[NR_SUBBANDS][NR_BASELINES][NR_SUBBAND_CHANNELS][NR_POLARIZATIONS][NR_POLARIZATIONS];
......
......@@ -57,7 +57,6 @@ namespace LOFAR
"WH_SubbandWriter"),
itsCS1PS (pset),
itsSubbandIDs (subbandID),
itsWriter (0),
itsTimeCounter(0),
itsTimesToIntegrate(1),
itsFlagsBuffers(0),
......@@ -97,7 +96,11 @@ namespace LOFAR
WH_SubbandWriter::~WH_SubbandWriter()
{
delete itsWriter;
for (unsigned i = 0; i < itsWriters.size(); i ++)
delete itsWriters[i];
itsWriters.clear();
#ifdef USE_MAC_PI
delete itsPropertySet;
......@@ -170,46 +173,54 @@ namespace LOFAR
itsNrSubbandsPerStorage = itsNrSubbandsPerCell * itsCS1PS->getUint32("OLAP.psetsPerStorage");
itsNrNodesPerCell = itsCS1PS->nrBGLNodesPerCell();
itsCurrentInputs.resize(itsNrSubbandsPerStorage / itsNrSubbandsPerCell, 0);
LOG_TRACE_VAR_STR("SubbandsPerStorage = " << itsNrSubbandsPerStorage);
vector<string> storageStationNames = itsCS1PS->getStringVector("OLAP.storageStationNames");
itsNrSubbandsPerMS = itsCS1PS->getUint32("OLAP.StorageProc.subbandsPerMS");
ASSERT(itsCS1PS->getUint32("OLAP.subbandsPerPset") * itsCS1PS->getUint32("OLAP.psetsPerStorage") % itsNrSubbandsPerMS == 0);
itsWriters.resize(itsCS1PS->getUint32("OLAP.subbandsPerPset") * itsCS1PS->getUint32("OLAP.psetsPerStorage") / itsNrSubbandsPerMS);
itsFieldIDs.resize(itsWriters.size());
for (unsigned i = 0; i < itsWriters.size(); i ++) {
itsWriters[i] = new MSWriter(
#if defined HAVE_MPI
itsWriter = new MSWriter(msNames[TH_MPI::getCurrentRank()].c_str(),
msNames[TH_MPI::getCurrentRank() * itsWriters.size() + i].c_str(),
#else
itsWriter = new MSWriter(msNames[0].c_str(),
msNames[0].c_str(),
#endif
startTime, timeStep * itsTimesToIntegrate,
itsNChannels, itsNPolSquared,
itsNBeams,
itsNStations,
antPos, storageStationNames, itsTimesToIntegrate,
itsCS1PS->getUint32("OLAP.subbandsPerPset")* itsCS1PS->getUint32("OLAP.psetsPerStorage"));
startTime, timeStep * itsTimesToIntegrate,
itsNChannels, itsNPolSquared,
itsNBeams,
itsNStations,
antPos, storageStationNames, itsTimesToIntegrate,
itsNrSubbandsPerMS);
double chanWidth = itsCS1PS->chanWidth();
LOG_TRACE_VAR_STR("chanWidth = " << chanWidth);
//## TODO: add support for more than 1 beam ##//
ASSERT(itsCS1PS->getDoubleVector("Observation.Beam.angle1").size() == itsNBeams);
ASSERT(itsCS1PS->getDoubleVector("Observation.Beam.angle2").size() == itsNBeams);
double RA = itsCS1PS->getDoubleVector("Observation.Beam.angle1")[0];
double DEC = itsCS1PS->getDoubleVector("Observation.Beam.angle2")[0];
// For nr of beams
itsFieldIDs[i] = itsWriters[i]->addField(RA, DEC);
}
vector<double> refFreqs= itsCS1PS->refFreqs();
// Now we must add \a itsNrSubbandsPerStorage to the measurement set. The
// correct indices for the reference frequencies are in the vector of
// subbandIDs.
itsBandIDs.resize(itsNrSubbandsPerStorage);
double chanWidth = itsCS1PS->chanWidth();
LOG_TRACE_VAR_STR("chanWidth = " << chanWidth);
for (uint sb = 0; sb < itsNrSubbandsPerStorage; ++sb) {
// compensate for the half-channel shift introduced by the PPF
double refFreq = refFreqs[itsSubbandIDs[sb]] - chanWidth / 2;
itsBandIDs[sb] = itsWriter->addBand (itsNPolSquared, itsNChannels,
refFreq, chanWidth);
itsBandIDs[sb] = itsWriters[sb / itsNrSubbandsPerMS]->addBand(itsNPolSquared, itsNChannels, refFreq, chanWidth);
}
//## TODO: add support for more than 1 beam ##//
ASSERT(itsCS1PS->getDoubleVector("Observation.Beam.angle1").size() == itsNBeams);
ASSERT(itsCS1PS->getDoubleVector("Observation.Beam.angle2").size() == itsNBeams);
double RA = itsCS1PS->getDoubleVector("Observation.Beam.angle1")[0];
double DEC = itsCS1PS->getDoubleVector("Observation.Beam.angle2")[0];
// For nr of beams
itsFieldID = itsWriter->addField (RA, DEC);
// Allocate buffers
itsFlagsBuffers = new bool[itsNrSubbandsPerStorage * itsNVisibilities];
itsWeightsBuffers = new float[itsNrSubbandsPerStorage * itsNBaselines * itsNChannels];
......@@ -275,11 +286,13 @@ namespace LOFAR
if ((itsTimeCounter + 1) % itsTimesToIntegrate == 0) {
itsWriteTimer.start();
itsWriter->write (itsBandIDs[sb], itsFieldID, 0, itsNChannels,
itsTimeCounter, itsNVisibilities,
&(itsVisibilities[sb * itsNVisibilities]),
&(itsFlagsBuffers[sb * itsNVisibilities]),
&(itsWeightsBuffers[sb * itsNBaselines * itsNChannels]));
std::clog << "CPU " << TH_MPI::getCurrentRank() << " writes sb " << sb << " to writer " << (sb / itsNrSubbandsPerMS) << ", band = " << itsBandIDs[sb] << std::endl;
itsWriters[sb / itsNrSubbandsPerMS]->write(itsBandIDs[sb],
itsFieldIDs[sb / itsNrSubbandsPerMS], 0, itsNChannels,
itsTimeCounter, itsNVisibilities,
&itsVisibilities[sb * itsNVisibilities],
&itsFlagsBuffers[sb * itsNVisibilities],
&itsWeightsBuffers[sb * itsNBaselines * itsNChannels]);
itsWriteTimer.stop();
}
......@@ -297,9 +310,13 @@ namespace LOFAR
void WH_SubbandWriter::postprocess()
{
delete [] itsFlagsBuffers; itsFlagsBuffers = 0;
delete [] itsFlagsBuffers; itsFlagsBuffers = 0;
delete [] itsWeightsBuffers; itsWeightsBuffers = 0;
delete itsWriter; itsWriter = 0;
for (unsigned i = 0; i < itsWriters.size(); i ++)
delete itsWriters[i];
itsWriters.clear();
cout<<itsWriteTimer<<endl;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment