Skip to content
Snippets Groups Projects
Commit 9d4ee0ea authored by John Romein's avatar John Romein
Browse files

bug 225:

Use assembly code to copy data from RSP packet into BeamletBuffer.
nrTimesPerPacket must be 16.
parent e69ce3e7
No related branches found
No related tags found
No related merge requests found
......@@ -16,6 +16,7 @@ AC_PROG_YACC
AC_PROG_CC
AC_PROG_CXX
AM_PROG_LEX
AM_PROG_AS
AC_PROG_INSTALL
AC_PROG_LN_S
AC_DISABLE_STATIC
......
......@@ -23,32 +23,46 @@
//# Always #include <lofar_config.h> first!
#include <lofar_config.h>
#include <CS1_Interface/Align.h>
#include <BeamletBuffer.h>
#include <ION_Allocator.h>
#include <InputThreadAsm.h>
#include <boost/lexical_cast.hpp>
#include <stdexcept>
namespace LOFAR {
namespace CS1 {
BeamletBuffer::BeamletBuffer(unsigned bufferSize, unsigned nrSubbands, unsigned history, bool isSynchronous, unsigned maxNetworkDelay)
// The buffer size is a multiple of the input packet size. By setting
// itsOffset to a proper value, we can assure that input packets never
// wrap around the circular buffer
BeamletBuffer::BeamletBuffer(unsigned bufferSize, unsigned nrTimesPerPacket, unsigned nrSubbands, unsigned nrBeams, unsigned history, bool isSynchronous, unsigned maxNetworkDelay)
:
itsNSubbands(nrSubbands),
itsSize(bufferSize),
itsSize(align(bufferSize, nrTimesPerPacket)),
itsHistorySize(history),
itsSBBuffers(reinterpret_cast<SampleType *>(ION_Allocator().allocate(nrSubbands * bufferSize * NR_POLARIZATIONS * sizeof(SampleType), 32)), boost::extents[nrSubbands][bufferSize][NR_POLARIZATIONS]),
itsSBBuffers(reinterpret_cast<SampleType *>(ION_Allocator().allocate(nrSubbands * itsSize * NR_POLARIZATIONS * sizeof(SampleType), 32)), boost::extents[nrSubbands][itsSize][NR_POLARIZATIONS]),
itsOffset(0),
itsStride(reinterpret_cast<char *>(itsSBBuffers[1].origin()) - reinterpret_cast<char *>(itsSBBuffers[0].origin())),
itsReadTimer("buffer read", true),
itsWriteTimer("buffer write", true)
{
if (nrTimesPerPacket != this->nrTimesPerPacket)
throw std::runtime_error(std::string("OLAP.nrTimesInFrame should be ") + boost::lexical_cast<std::string>(nrTimesPerPacket));
pthread_mutex_init(&itsValidDataMutex, 0);
if (isSynchronous)
itsSynchronizedReaderWriter = new SynchronizedReaderAndWriter(bufferSize);
itsSynchronizedReaderWriter = new SynchronizedReaderAndWriter(itsSize);
else
itsSynchronizedReaderWriter = new TimeSynchronizedReader(maxNetworkDelay);
itsEnd.reserve(MAX_BEAMLETS);
itsStartI.reserve(MAX_BEAMLETS);
itsEndI.reserve(MAX_BEAMLETS);
itsEnd.resize(nrBeams);
itsStartI.resize(nrBeams);
itsEndI.resize(nrBeams);
}
......@@ -60,14 +74,33 @@ BeamletBuffer::~BeamletBuffer()
}
void BeamletBuffer::writeElements(Beamlet *data, const TimeStamp &begin, unsigned nrElements)
void BeamletBuffer::writePacketData(Beamlet *data, const TimeStamp &begin)
{
TimeStamp end = begin + nrElements;
TimeStamp end = begin + nrTimesPerPacket;
itsWriteTimer.start();
// cache previous index, to avoid expensive mapTime2Index()
unsigned startI = (begin == itsPreviousTimeStamp) ? itsPreviousI : mapTime2Index(begin);
unsigned endI = startI + nrElements;
unsigned startI;
if (begin == itsPreviousTimeStamp) {
startI = itsPreviousI;
} else {
startI = mapTime2Index(begin);
if (!aligned(startI, nrTimesPerPacket)) {
// RSP board reset? Recompute itsOffset and clear the entire buffer.
itsOffset = - (startI % nrTimesPerPacket);
startI = mapTime2Index(begin);
pthread_mutex_lock(&itsValidDataMutex);
itsValidData.reset();
pthread_mutex_unlock(&itsValidDataMutex);
}
//std::clog << "timestamp = " << (uint64_t) begin << ", itsOffset = " << itsOffset << std::endl;
}
unsigned endI = startI + nrTimesPerPacket;
if (endI >= itsSize)
endI -= itsSize;
......@@ -80,30 +113,29 @@ void BeamletBuffer::writeElements(Beamlet *data, const TimeStamp &begin, unsigne
// do not write in circular buffer section that is being read
itsLockedRanges.lock(startI, endI, itsSize);
if (endI < startI) {
// the data wraps around the allocated memory, so do it in two parts
unsigned chunk1 = itsSize - startI;
for (unsigned sb = 0; sb < itsNSubbands; sb ++) {
memcpy(itsSBBuffers[sb][startI].origin(), &data[0] , sizeof(SampleType[chunk1][NR_POLARIZATIONS]));
memcpy(itsSBBuffers[sb][0].origin() , &data[chunk1], sizeof(SampleType[endI][NR_POLARIZATIONS]));
data += nrElements;
}
} else {
for (unsigned sb = 0; sb < itsNSubbands; sb ++) {
if (sizeof(SampleType[NR_POLARIZATIONS]) == sizeof(double)) {
double *dst = reinterpret_cast<double *>(itsSBBuffers[sb][startI].origin());
const double *src = reinterpret_cast<const double *>(data);
for (unsigned time = 0; time < nrElements; time ++)
dst[time] = src[time];
} else {
memcpy(itsSBBuffers[sb][startI].origin(), data, sizeof(SampleType[endI - startI][NR_POLARIZATIONS]));
}
data += nrElements;
}
#if defined HAVE_BGP
void *dst = itsSBBuffers[0][startI].origin();
#if NR_BITS_PER_SAMPLE == 16
_copy_pkt_to_bbuffer_128_bytes(dst, itsStride, data, itsNSubbands);
#elif NR_BITS_PER_SAMPLE == 8
_copy_pkt_to_bbuffer_64_bytes(dst, itsStride, data, itsNSubbands);
#elif NR_BITS_PER_SAMPLE == 4
_copy_pkt_to_bbuffer_32_bytes(dst, itsStride, data, itsNSubbands);
#else
#error Not implemented
#endif
#else
Beamlet *dst = reinterpret_cast<Beamlet *>(itsSBBuffers[0][startI].origin());
size_t stride = reinterpret_cast<Beamlet *>(itsSBBuffers[1][startI].origin()) - dst;
for (unsigned sb = 0; sb < itsNSubbands; sb ++) {
for (unsigned time = 0; time < nrTimesPerPacket; time ++)
dst[time] = *data ++;
dst += stride;
}
#endif
// forget old ValidData
pthread_mutex_lock(&itsValidDataMutex);
......@@ -131,9 +163,9 @@ void BeamletBuffer::startReadTransaction(const std::vector<TimeStamp> &begin, un
itsBegin = begin;
for (unsigned beam = 0; beam < begin.size(); beam++) {
itsEnd.push_back(begin[beam] + nrElements);
itsStartI.push_back(mapTime2Index(begin[beam]));
itsEndI.push_back(mapTime2Index(itsEnd[beam]));
itsEnd[beam] = begin[beam] + nrElements;
itsStartI[beam] = mapTime2Index(begin[beam]);
itsEndI[beam] = mapTime2Index(itsEnd[beam]);
}
TimeStamp minBegin = *std::min_element(itsBegin.begin(), itsBegin.end());
......@@ -153,8 +185,8 @@ void BeamletBuffer::sendSubband(Stream *str, unsigned subband, unsigned beam) co
{
// Align to 32 bytes and make multiple of 32 bytes by prepending/appending
// extra data. Always send 32 bytes extra, even if data was already aligned.
unsigned startI = itsStartI[beam] & ~(32 / sizeof(Beamlet) - 1); // round down
unsigned endI = (itsEndI[beam] + 32 / sizeof(Beamlet)) & ~(32 / sizeof(Beamlet) - 1); // round up, possibly adding 32 bytes
unsigned startI = align(itsStartI[beam] - itsAlignment + 1, itsAlignment); // round down
unsigned endI = align(itsEndI[beam] + 1, itsAlignment); // round up, possibly adding 32 bytes
if (endI < startI) {
// the data wraps around the allocated memory, so copy in two parts
......@@ -204,10 +236,6 @@ void BeamletBuffer::stopReadTransaction()
// subtract 16 extra; due to alignment restrictions and the changing delays,
// it is hard to predict where the next read will begin.
itsStartI.clear();
itsEndI.clear();
itsEnd.clear();
itsReadTimer.stop();
}
......
......@@ -48,17 +48,24 @@ namespace CS1 {
typedef INPUT_SAMPLE_TYPE SampleType;
struct Beamlet {
SampleType Xpol, Ypol;
};
// define a "simple" type of which the size equals the size of two samples
// (X and Y polarizations)
#if NR_BITS_PER_SAMPLE == 16
typedef double Beamlet;
#elif NR_BITS_PER_SAMPLE == 8
typedef int32_t Beamlet;
#elif NR_BITS_PER_SAMPLE == 4
typedef int16_t Beamlet;
#endif
class BeamletBuffer
{
public:
BeamletBuffer(unsigned bufferSize, unsigned nrSubbands, unsigned history, bool isSynchronous, unsigned maxNetworkDelay);
BeamletBuffer(unsigned bufferSize, unsigned nrTimesPerPacket, unsigned nrSubbands, unsigned nrBeams, unsigned history, bool isSynchronous, unsigned maxNetworkDelay);
~BeamletBuffer();
void writeElements(Beamlet *data, const TimeStamp &begin, unsigned nrElements);
void writePacketData(Beamlet *data, const TimeStamp &begin);
void startReadTransaction(const std::vector<TimeStamp> &begin, unsigned nrElements);
void sendSubband(Stream *, unsigned subband, unsigned currentBeam) const;
......@@ -67,7 +74,7 @@ class BeamletBuffer
SparseSet<unsigned> readFlags(unsigned beam);
void stopReadTransaction();
static const unsigned MAX_BEAMLETS = 8;
const static unsigned nrTimesPerPacket = 16;
private:
unsigned mapTime2Index(TimeStamp time) const;
......@@ -79,6 +86,8 @@ class BeamletBuffer
ReaderAndWriterSynchronization *itsSynchronizedReaderWriter;
LockedRanges itsLockedRanges;
boost::multi_array_ref<SampleType, 3> itsSBBuffers;
int itsOffset;
const static unsigned itsAlignment = 32 / sizeof(Beamlet);
// read internals
std::vector<TimeStamp> itsBegin, itsEnd;
......@@ -89,6 +98,7 @@ class BeamletBuffer
// write internals
TimeStamp itsPreviousTimeStamp;
unsigned itsPreviousI;
size_t itsStride;
NSTimer itsReadTimer, itsWriteTimer;
};
......@@ -96,13 +106,13 @@ class BeamletBuffer
inline unsigned BeamletBuffer::alignmentShift(unsigned beam) const
{
return itsStartI[beam] % (32 / sizeof(Beamlet));
return itsStartI[beam] % itsAlignment;
}
inline unsigned BeamletBuffer::mapTime2Index(TimeStamp time) const
{
// TODO: this is very slow because of the %
return time % itsSize;
return time % itsSize + itsOffset;
}
} // namespace CS1
......
......@@ -200,7 +200,7 @@ void InputSection::preprocess(const CS1_Parset *ps)
itsBBuffers.resize(itsNrInputs);
for (unsigned rsp = 0; rsp < itsNrInputs; rsp ++)
itsBBuffers[rsp] = new BeamletBuffer(ps->inputBufferSize(), ps->nrSubbandsPerFrame(), itsNHistorySamples, !itsIsRealTime, itsMaxNetworkDelay);
itsBBuffers[rsp] = new BeamletBuffer(ps->inputBufferSize(), ps->getUint32("OLAP.nrTimesInFrame"), ps->nrSubbandsPerFrame(), itsNrBeams, itsNHistorySamples, !itsIsRealTime, itsMaxNetworkDelay);
#if defined DUMP_RAW_DATA
vector<string> rawDataServers = ps->getStringVector("OLAP.OLAP_Conn.rawDataServers");
......
......@@ -203,7 +203,7 @@ void InputThread::mainLoop()
}
// expected packet received so write data into corresponding buffer
itsArgs.BBuffer->writeElements(reinterpret_cast<Beamlet *>(packet + 16), actualstamp, itsArgs.nrTimesPerPacket);
itsArgs.BBuffer->writePacketData(reinterpret_cast<Beamlet *>(packet + 16), actualstamp);
}
std::clog << "InputThread::mainLoop() exiting loop" << std::endl;
......
# InputThreadAsm.S: fast packet->BBuffer copy routines
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# $Id$
#if defined HAVE_BGL || defined HAVE_BGP
.global _copy_pkt_to_bbuffer_32_bytes
_copy_pkt_to_bbuffer_32_bytes:
addi 4,4,-1*16
mtctr 6
li 8,16
addi 5,5,-16
sub 3,3,4
1: lfpdux 0,5,8
lfpdux 1,5,8
stfpdux 0,3,4
stfpdux 1,3,8
bdnz 1b
blr
.global _copy_pkt_to_bbuffer_64_bytes
_copy_pkt_to_bbuffer_64_bytes:
addi 4,4,-3*16
mtctr 6
li 8,16
addi 5,5,-16
sub 3,3,4
1: lfpdux 0,5,8
lfpdux 1,5,8
lfpdux 2,5,8
lfpdux 3,5,8
stfpdux 0,3,4
stfpdux 1,3,8
stfpdux 2,3,8
stfpdux 3,3,8
bdnz 1b
blr
.global _copy_pkt_to_bbuffer_128_bytes
_copy_pkt_to_bbuffer_128_bytes:
addi 4,4,-7*16
mtctr 6
li 8,16
addi 5,5,-16
sub 3,3,4
1: lfpdux 0,5,8
lfpdux 1,5,8
lfpdux 2,5,8
stfpdux 0,3,4
stfpdux 1,3,8
stfpdux 2,3,8
lfpdux 3,5,8
lfpdux 4,5,8
lfpdux 5,5,8
stfpdux 3,3,8
stfpdux 4,3,8
stfpdux 5,3,8
lfpdux 6,5,8
lfpdux 7,5,8
stfpdux 6,3,8
stfpdux 7,3,8
bdnz 1b
blr
#endif
//# InputThreadAsm.h: fast packet->BBuffer copy routines
//#
//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
//#
//# This program is free software; you can redistribute it and/or modify
//# it under the terms of the GNU General Public License as published by
//# the Free Software Foundation; either version 2 of the License, or
//# (at your option) any later version.
//#
//# This program is distributed in the hope that it will be useful,
//# but WITHOUT ANY WARRANTY; without even the implied warranty of
//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//# GNU General Public License for more details.
//#
//# You should have received a copy of the GNU General Public License
//# along with this program; if not, write to the Free Software
//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//#
//# $Id$
#ifndef LOFAR_APPL_CEP_CS1_CS1_ION_PROC_INPUT_THREAD_ASM_H
#define LOFAR_APPL_CEP_CS1_CS1_ION_PROC_INPUT_THREAD_ASM_H
#if 1 /*defined HAVE_BGL || defined HAVE_BGP*/
extern "C" {
void _copy_pkt_to_bbuffer_32_bytes(void *dst, size_t stride, const void *src, unsigned nrSubbands);
void _copy_pkt_to_bbuffer_64_bytes(void *dst, size_t stride, const void *src, unsigned nrSubbands);
void _copy_pkt_to_bbuffer_128_bytes(void *dst, size_t stride, const void *src, unsigned nrSubbands);
};
#endif
#endif
......@@ -3,6 +3,7 @@ BeamletBuffer.h \
BGL_Personality.h \
InputSection.h \
InputThread.h \
InputThreadAsm.h \
ION_Allocator.h \
LockedRanges.h \
LogThread.h \
......@@ -19,6 +20,7 @@ BGL_Personality.cc \
CS1_ION_main.cc \
InputSection.cc \
InputThread.cc \
InputThreadAsm.S \
ION_Allocator.cc \
LogThread.cc \
OutputSection.cc \
......
#gnubgp.compiler.conf: CC=/bgsys/drivers/ppcfloor/gnu-linux/bin/powerpc-bgp-linux-gcc CXX=/bgsys/drivers/ppcfloor/gnu-linux/bin/powerpc-bgp-linux-g++ CCAS=/bgsys/drivers/ppcfloor/gnu-linux/bin/powerpc-bgp-linux-gcc
gnu.compiler.conf: CC=/usr/bin/gcc CXX=/usr/bin/g++ CCAS=/bgsys/drivers/ppcfloor/gnu-linux/bin/powerpc-bgp-linux-gcc CCASFLAGS="-D HAVE_BGP" --with-cppflags="-D HAVE_BGP"
gnubgp.compiler.conf: CC=/usr/bin/gcc CXX=/usr/bin/g++ CCAS=/bgsys/drivers/ppcfloor/gnu-linux/bin/powerpc-bgp-linux-gcc
gnu.compiler.aipspp.var: --with-casacore=/cephome/romein/packages/casacore-0.3.0/stage --without-wcs
bgp.variant.conf: $(lofar_root) $(debugopt) $(nothreads) $(noshmem) $(bgp_cpp) $(bgp_ldd) --without-tinycep --without-log4cplus --with-bglmpich='/bgsys/drivers/ppcfloor/comm' --with-fftw2='/cephome/romein/packages/fftw-2.1.5-single-precision' --with-cppflags='-DHAVE_BGP -I/bgsys/drivers/ppcfloor/comm/include -I/bgsys/drivers/ppcfloor/arch/include'
fpic.variant.conf: $(debugopt) $(threads) $(aipspp) --without-log4cplus --with-cppflags='-fPIC' $(ion_searchpath) --with-ldflags='-L/bgl/lofar-utils/mass/lib' --with-libs='-lmass'
bgp.variant.conf: $(lofar_root) $(debugopt) $(nothreads) $(noshmem) $(bgp_cpp) $(bgp_ldd) --without-log4cplus --with-bglmpich='/bgsys/drivers/ppcfloor/comm' --with-fftw2='/cephome/romein/packages/fftw-2.1.5-single-precision' --with-cppflags='-DHAVE_BGP -I/bgsys/drivers/ppcfloor/comm/include -I/bgsys/drivers/ppcfloor/arch/include'
#fpic.variant.conf: $(debugopt) $(threads) $(aipspp) --without-log4cplus --with-cppflags='-fPIC' $(ion_searchpath) --with-ldflags='-L/bgl/lofar-utils/mass/lib' --with-libs='-lmass'
bgp_cpp.var: --with-cppflags='-DHAVE_BGP'
bgp_ldd.var: --with-ldflags='-L/bgsys/drivers/ppcfloor/comm/lib -L/bgsys/drivers/ppcfloor/runtime/SPI' --with-libs='-lcxxmpich.cnk -lmpich.cnk -ldcmfcoll.cnk -ldcmf.cnk -lpthread -lrt -lSPI.cna -lm'
#bgp_ldd.var: --with-ldflags='-L/bgsys/drivers/ppcfloor/comm/lib -L/bgsys/drivers/ppcfloor/runtime/SPI -L/opt/ibmcmp/vac/bg/9.0/bglib' --with-libs=' -lcxxmpich.cnk -lmpich.cnk -ldcmfcoll.cnk -ldcmf.cnk -lpthread -lrt -lSPI.cna -lxl -lm'
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment