Skip to content
Snippets Groups Projects
Commit f35d0738 authored by John Romein's avatar John Romein
Browse files

bug 225:

More cleanups.
parent 3a369adc
No related branches found
No related tags found
No related merge requests found
......@@ -292,9 +292,6 @@ void BGL_Processing::preprocess(BGL_Configuration &configuration)
}
if (itsIsTransposeOutput) {
// TODO: !useGather not implemented
//ASSERT(parset->getBool("OLAP.IONProc.useGather"));
unsigned nrSubbandsPerPset = configuration.nrSubbandsPerPset();
unsigned logicalNode = usedCoresPerPset * (outputPsetIndex - outputPsets.begin()) + myCore;
// TODO: logicalNode assumes output psets are consecutively numbered
......
......@@ -314,7 +314,7 @@ void *master_thread(void *)
#endif
}
if (parset.useGather() && parset.outputPsetIndex(myPsetNumber) >= 0) {
if (parset.outputPsetIndex(myPsetNumber) >= 0) {
static char nrRuns[16], *argv[] = {
global_argv[0],
global_argv[1],
......
......@@ -63,6 +63,8 @@ public:
itsObservation(aParSet)
{}
void check() const;
double startTime() const;
double stopTime() const;
uint32 nrStations() const;
......@@ -87,16 +89,11 @@ public:
uint32 nrChannelsPerSubband() const;
uint32 nrPsets() const;
uint32 nrCoresPerPset() const;
double chanWidth() const;
double channelWidth() const;
vector<string> getPortsOf(const string& aKey) const;
void IONodeRSPDestPorts(uint32 pset, vector<pair<string, string> > &RSPDestPort) const;
string stationName(const int index) const;
static string expandedArrayString(const string& orgStr);
bool useScatter() const;
bool useGather() const;
uint32 nrPsetsPerStorage() const;
uint32 nrOutputsPerInputNode() const;
uint32 nrInputsPerStorageNode() const;
vector<uint32> inputPsets() const;
vector<uint32> outputPsets() const;
vector<uint32> psetDimensions() const;
......@@ -138,6 +135,7 @@ private:
void addPosition(string stName);
double getTime(const char *name) const;
static int findIndex(uint32 pset, const vector<uint32> &psets);
void checkSubbandCount(const char *key) const;
Observation itsObservation;
};
......@@ -266,7 +264,7 @@ inline uint32 CS1_Parset::nrCoresPerPset() const
inline unsigned CS1_Parset::nrSubbands() const
{
return subbandToBeamMapping().size();
return getUint32Vector("Observation.subbandList").size();
}
inline vector<unsigned> CS1_Parset::subbandToBeamMapping() const
......@@ -286,36 +284,16 @@ inline vector<unsigned> CS1_Parset::subbandToRSPslotMapping() const
}
inline double CS1_Parset::chanWidth() const
inline double CS1_Parset::channelWidth() const
{
return sampleRate() / nrChannelsPerSubband();
}
inline bool CS1_Parset::useScatter() const
{
return getBool("OLAP.IONProc.useScatter");
}
inline bool CS1_Parset::useGather() const
{
return getBool("OLAP.IONProc.useGather");
}
inline uint32 CS1_Parset::nrPsetsPerStorage() const
{
return getUint32("OLAP.psetsPerStorage");
}
inline uint32 CS1_Parset::nrOutputsPerInputNode() const
{
return useScatter() ? 1 : nrCoresPerPset();
}
inline uint32 CS1_Parset::nrInputsPerStorageNode() const
{
return (useGather() ? 1 : nrCoresPerPset()) * nrPsetsPerStorage();
}
inline vector<uint32> CS1_Parset::inputPsets() const
{
return getUint32Vector("OLAP.BGLProc.inputPsets");
......
......@@ -50,38 +50,26 @@ using namespace ACC::APS;
CS1_Parset::CS1_Parset() :
name()
{
check();
}
CS1_Parset::~CS1_Parset()
{
}
void CS1_Parset::IONodeRSPDestPorts(uint32 pset, vector<pair<string, string> > &RSPDestPort) const
void CS1_Parset::checkSubbandCount(const char *key) const
{
char buf[50];
sprintf(buf,"PIC.Core.IONode[%d].RSP",pset);
vector<string> snames = getStringVector(buf);
string rspdest_ports;
for (uint i = 0; i < snames.size(); i++) {
rspdest_ports = getStringVector("PIC.Core." + snames[i].substr(0, snames[i].length()-1) + ".dest.ports")[atoi(snames[i].substr(snames[i].length()-1,snames[i].length()).c_str())];
if (rspdest_ports.substr(0,5) == "file:" || rspdest_ports.substr(0,5) == "FILE:") {
RSPDestPort.push_back(pair<string, string>(rspdest_ports.substr(rspdest_ports.find(":")+1),""));
}
else if (rspdest_ports.substr(0,4) == "udp:" || rspdest_ports.substr(0,4) == "UDP:" ||
rspdest_ports.substr(0,4) == "tcp:" || rspdest_ports.substr(0,4) == "TCP:") {
RSPDestPort.push_back(pair<string, string>(StringUtil::split(rspdest_ports, ':')[1], rspdest_ports.substr(rspdest_ports.rfind(":")+1)));
}
else if (rspdest_ports.find(":") != string::npos){
// udp
RSPDestPort.push_back(pair<string, string>(StringUtil::split(rspdest_ports, ':')[0], StringUtil::split(rspdest_ports, ':')[1]));
}
else {
// file
RSPDestPort.push_back(pair<string, string>(rspdest_ports,""));
}
}
if (getUint32Vector(key).size() != nrSubbands())
throw std::runtime_error(string(key) + " contains wrong number (" + boost::lexical_cast<string>(getUint32Vector(key).size()) + ") of subbands (expected " + boost::lexical_cast<string>(nrSubbands()) + ')');
}
void CS1_Parset::check() const
{
checkSubbandCount("Observation.beamList");
checkSubbandCount("Observation.rspBoardList");
checkSubbandCount("Observation.rspSlotList");
}
......@@ -247,9 +235,9 @@ vector<double> CS1_Parset::getBeamDirection(const unsigned beam) const
char buf[50];
std::vector<double> beamDirs(2);
sprintf(buf,"Observation.Beam[%d].angle1", beam);
sprintf(buf, "Observation.Beam[%d].angle1", beam);
beamDirs[0] = getDouble(buf);
sprintf(buf,"Observation.Beam[%d].angle2", beam);
sprintf(buf, "Observation.Beam[%d].angle2", beam);
beamDirs[1] = getDouble(buf);
return beamDirs;
......
......@@ -12,9 +12,6 @@
OLAP.OLAP_Conn.BGLProc_Storage_Ports = [8300..8363]
# should be one of NULL(inputFromMemory), FILE, TCP, UDP, ETHERNET
OLAP.OLAP_Conn.station_Input_Transport = NULL
# should be one of ZOID, FCNP, TCP, NULL
# BGLProc doesn't open the parset file!
OLAP.OLAP_Conn.IONProc_BGLProc_Transport = FCNP
......@@ -31,7 +28,6 @@ OLAP.BGLProc.nrPPFTaps=16
OLAP.BGLProc.coresPerPset = 64
OLAP.BGLProc.maxConcurrentComm = 1
OLAP.IONProc.useGather = T
OLAP.IONProc.integrationSteps = 1
OLAP.StorageProc.integrationSteps = 60
......@@ -49,7 +45,7 @@ OLAP.nrSubbandsPerFrame = 36
OLAP.nrBitsPerSample=16
OLAP.nrSecondsOfBuffer = 4
OLAP.maxNetworkDelay = 0.5 # 1 second extra added to compensate for timestamp bug
OLAP.delayCompensation = F
OLAP.delayCompensation = T
Observation.sampleClock = 160
......@@ -61,15 +57,15 @@ Observation.bandFilter = LBL_30_80
#Observation.bandFilter = HB_170_230
#Observation.bandFilter = HB_210_240
Observation.Beam[0].angle1 = 0 # NCP
Observation.Beam[0].angle2 = 1.570796327
Observation.Beam[1].angle1 = 5.2336866848083394 # Cygnus
Observation.Beam[1].angle2 = 0.71094251447010637
Observation.Beam[2].angle1 = 6.1234876806221052 # Cas A
Observation.Beam[2].angle2 = 1.0265153995604648
#Observation.Beam[1].angle1 = 0 # NCP
#Observation.Beam[1].angle2 = 1.570796327
Observation.Beam[0].angle1 = 5.2336866848083394 # Cygnus
Observation.Beam[0].angle2 = 0.71094251447010637
#Observation.Beam[1].angle1 = 6.1234876806221052 # Cas A
#Observation.Beam[1].angle2 = 1.0265153995604648
#Observation.Beam[1].angle1 = 0 # NCP
Observation.Beam[1].angle2 = 1.570796327
Observation.Beam[1].angle1 = 0.9293405574 # pulsar
#Observation.Beam[1].angle1 = 0.9293405574 # pulsar
#Observation.Beam[1].angle2 = 0.9525774347
#Observation.Beam[1].angle1 = 4.5192832066722115 # Jupiter
#Observation.Beam[1].angle2 = 5.893698795
......@@ -81,12 +77,18 @@ Observation.Beam[1].angle1 = 0.9293405574 # pulsar
#Observation.nrBeams = 1
Observation.Beam[0].directionTypes = J2000
Observation.Beam[1].directionTypes = J2000
Observation.Beam[2].directionTypes = J2000
#Observation.Beam[0].subbandList = [300..335]
#Observation.Beam[1].beamletList = [0..1, 54..55, 108..109, 162..163]
#Observation.Beam[0].beamletList = [0..35]
#Observation.subbandList = [300]
#Observation.beamList = [0]
#Observation.rspBoardList = [0]
#Observation.rspSlotList = [0]
Observation.subbandList = [300,301,302,303,304,305,306,307,308,309,310,311,312,313,314,315,316,317,318,319,320,321,322,323,324,325,326,327,328,329,330,331,332,333,334,335]
Observation.beamList = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
Observation.beamList = [0,1,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
Observation.rspBoardList = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
Observation.rspSlotList = [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35]
......
......@@ -71,12 +71,8 @@ class CS1_Parset(LOFAR_Parset.Parset):
self['Observation.stopTime'] = datetime.datetime.fromtimestamp(start + duration)
def setIntegrationTime(self, integrationTime):
if self.getBool('OLAP.IONProc.useGather'):
self['OLAP.IONProc.integrationSteps'] = integrationTime
self['OLAP.StorageProc.integrationSteps'] = 1
else:
self['OLAP.IONProc.integrationSteps'] = 1
self['OLAP.StorageProc.integrationSteps'] = integrationTime
self['OLAP.IONProc.integrationSteps'] = integrationTime
self['OLAP.StorageProc.integrationSteps'] = 1
def setMSName(self, msName):
self['Observation.MSNameMask'] = msName
......
......@@ -128,8 +128,7 @@ class StorageSection(Section):
self.parset['OLAP.OLAP_Conn.BGLProc_Storage_ServerHosts'] = '[' + ','.join(storageIPs) + ']'
def run(self, runlog, noRuns, runCmd = None):
if self.parset.getBool("OLAP.IONProc.useGather"):
noRuns = noRuns / self.parset.getInt32("OLAP.IONProc.integrationSteps");
noRuns = noRuns / self.parset.getInt32("OLAP.IONProc.integrationSteps");
Section.run(self, runlog, noRuns, runCmd)
class IONProcSection(Section):
......
......@@ -484,10 +484,16 @@ PIC.Core.B03_RSP.dest.ports = [10.170.0.13:4346,10.170.0.14:4347]
PIC.Core.B03_0.RSP = 0
PIC.Core.B03_1.RSP = 1
PIC.Core.Station.B00_0.RSP.ports = [10.170.0.1:4346,10.170.0.1:4347,10.170.0.1:4348,10.170.0.1:4349]
#PIC.Core.Station.B00_0.RSP.ports = [10.170.0.1:4346,10.170.0.1:4347,10.170.0.1:4348,10.170.0.1:4349]
#PIC.Core.Station.B00_1.RSP.ports = [10.170.0.2:4346,10.170.0.2:4347,10.170.0.2:4348,10.170.0.2:4349]
#PIC.Core.IONProc.R000-B00[0].inputs = [B00_0/RSP0,B00_0/RSP1,B00_0/RSP2,B00_0/RSP3]
#PIC.Core.IONProc.R000-B00[1].inputs = [B00_1/RSP0,B00_1/RSP1,B00_1/RSP2,B00_1/RSP3]
#PIC.Core.Station.B00_0.RSP.ports = [tcp:10.170.0.1:4346]
PIC.Core.Station.B00_0.RSP.ports = [10.170.0.1:4346]
PIC.Core.Station.B00_1.RSP.ports = [10.170.0.2:4346,10.170.0.2:4347,10.170.0.2:4348,10.170.0.2:4349]
PIC.Core.IONProc.R000-B00[0].inputs = [B00_0/RSP0,B00_0/RSP1,B00_0/RSP2,B00_0/RSP3]
PIC.Core.IONProc.R000-B00[1].inputs = [B00_1/RSP0,B00_1/RSP1,B00_1/RSP2,B00_1/RSP3]
PIC.Core.IONProc.R000-B00[0].inputs = [B00_0/RSP0]
PIC.Core.IONProc.R000-B00[1].inputs = []
PIC.Core.Station.B01_0.RSP.ports = [10.170.0.5:4346,10.170.0.5:4347,10.170.0.5:4348,10.170.0.5:4349]
PIC.Core.Station.B01_1.RSP.ports = [10.170.0.6:4346,10.170.0.6:4347,10.170.0.6:4348,10.170.0.6:4349]
......
......@@ -87,9 +87,7 @@ namespace LOFAR
uint itsNrSubbandsPerPset;
uint itsNrSubbandsPerStorage;
uint itsNrInputChannelsPerPset;
vector<uint> itsCurrentInputs;
vector<uint> itsBandIDs; ///< MS IDs of the frequency bands
uint itsTimeCounter; ///< Counts the time
uint itsTimesToIntegrate; ///< Number of timeSteps to integrate
......
......@@ -69,8 +69,6 @@ namespace LOFAR
ASSERT(nrSubbands > 0);
uint nrSubbandsPerPset = itsCS1PS->nrSubbandsPerPset();
ASSERT(nrSubbandsPerPset > 0);
uint nrInputChannels = itsCS1PS->useGather() ? 1 : itsCS1PS->nrCoresPerPset();
ASSERT(nrInputChannels > 0);
uint nrPsetsPerStorage = itsParamSet.getUint32("OLAP.psetsPerStorage");
ASSERT(nrSubbands % nrSubbandsPerPset == 0);
ASSERT(nrSubbands / nrSubbandsPerPset % nrPsetsPerStorage == 0);
......@@ -79,7 +77,6 @@ namespace LOFAR
// create. Each WH_SubbandWriter will write up to \a nrSubbandsPerPset
// to an AIPS++ Measurement Set.
uint nrWriters = nrSubbands / nrSubbandsPerPset / nrPsetsPerStorage;
uint maxConcurrent = itsCS1PS->getInt32("OLAP.BGLProc.maxConcurrentComm");
LOG_TRACE_VAR_STR("Creating " << nrWriters << " subband writers ...");
for (unsigned nw = 0; nw < nrWriters; ++nw)
......@@ -106,19 +103,9 @@ namespace LOFAR
// Each writer will run on a separate node.
step.runOnNode(nw);
// Connect to BG output
for (unsigned pset = 0; pset < nrPsetsPerStorage; pset ++) {
vector<int> channels;
for (unsigned core = 0; core < nrInputChannels; core++) {
int channel = pset * nrInputChannels + core;
step.getInDataManager(channel).setInBuffer(channel, false, 10);
itsStub->connect(nw, channel, step.getInDataManager(channel), channel);
channels.push_back(channel);
}
// limit the number of concurrent incoming connections
// actually, we would like to set the number of concurrent
// communications for all psets together, but we cannot express this
// thus we do this for each pset
step.getInDataManager(0).setInRoundRobinPolicy(channels, maxConcurrent);
for (unsigned channel = 0; channel < nrPsetsPerStorage; channel ++) {
step.getInDataManager(channel).setInBuffer(channel, false, 10);
itsStub->connect(nw, channel, step.getInDataManager(channel), channel);
}
}
#ifdef HAVE_MPI
......
......@@ -32,7 +32,6 @@
// Application specific includes
#include <CS1_Storage/WH_SubbandWriter.h>
#include <CS1_Interface/DH_Visibilities.h>
#include <CS1_Interface/BGL_Mapping.h>
#include <CS1_Storage/MSWriter.h>
#include <tinyCEP/Sel_RoundRobin.h>
#include <Transport/TH_MPI.h>
......@@ -52,7 +51,7 @@ namespace LOFAR
WH_SubbandWriter::WH_SubbandWriter(const string& name,
const vector<uint>& subbandID,
CS1_Parset *pset)
: WorkHolder(pset->nrInputsPerStorageNode(), 0, name, "WH_SubbandWriter"),
: WorkHolder(pset->nrPsetsPerStorage(), 0, name, "WH_SubbandWriter"),
itsCS1PS(pset),
itsSubbandIDs(subbandID),
itsTimeCounter(0),
......@@ -142,8 +141,6 @@ namespace LOFAR
antPos.size() << " == " << 3 * itsNStations);
itsNrSubbandsPerPset = itsCS1PS->nrSubbandsPerPset();
itsNrSubbandsPerStorage = itsNrSubbandsPerPset * itsCS1PS->nrPsetsPerStorage();
itsNrInputChannelsPerPset = itsCS1PS->useGather() ? 1 : itsCS1PS->nrCoresPerPset();
itsCurrentInputs.resize(itsNrSubbandsPerStorage / itsNrSubbandsPerPset, 0);
LOG_TRACE_VAR_STR("SubbandsPerStorage = " << itsNrSubbandsPerStorage);
vector<string> storageStationNames = itsCS1PS->getStringVector("OLAP.storageStationNames");
......@@ -185,7 +182,7 @@ namespace LOFAR
// correct indices for the reference frequencies are in the vector of
// subbandIDs.
itsBandIDs.resize(itsNrSubbandsPerStorage);
double chanWidth = itsCS1PS->chanWidth();
double chanWidth = itsCS1PS->channelWidth();
LOG_TRACE_VAR_STR("chanWidth = " << chanWidth);
for (uint sb = 0; sb < itsNrSubbandsPerStorage; ++sb) {
......@@ -241,14 +238,8 @@ namespace LOFAR
for (uint sb = 0; sb < itsNrSubbandsPerStorage; ++ sb) {
// find out from which input channel we should read
unsigned pset = sb / itsNrSubbandsPerPset;
unsigned core = itsCurrentInputs[pset];
if (!itsCS1PS->useGather())
core = BGL_Mapping::mapCoreOnPset(core, pset);
unsigned inputChannel = core + pset * itsNrInputChannelsPerPset;
DH_Visibilities *inputDH = static_cast<DH_Visibilities *>(getDataManager().getInHolder(inputChannel));
DH_Visibilities *inputDH = static_cast<DH_Visibilities *>(getDataManager().getInHolder(pset));
DH_Visibilities::NrValidSamplesType *valSamples = &inputDH->getNrValidSamples(0, 0);
DH_Visibilities::VisibilityType *newVis = &inputDH->getVisibility(0, 0, 0, 0);
......@@ -299,11 +290,7 @@ namespace LOFAR
}
#endif
getDataManager().readyWithInHolder(inputChannel);
// select next channel
if (++ itsCurrentInputs[pset] == itsNrInputChannelsPerPset)
itsCurrentInputs[pset] = 0;
getDataManager().readyWithInHolder(pset);
}
// Update the time counter.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment