diff --git a/Appl/CEP/CS1/CS1_IONProc/configure.in b/Appl/CEP/CS1/CS1_IONProc/configure.in index f0dfccd5c30d3066761b7b1e476a597d1cf0420b..ac8595516edaac1a82b341c501ffc5bf7829487b 100644 --- a/Appl/CEP/CS1/CS1_IONProc/configure.in +++ b/Appl/CEP/CS1/CS1_IONProc/configure.in @@ -63,7 +63,7 @@ lofar_INTERNAL(CEP/tinyCEP,tinyCEP,,1,tinyCEP/TinyDataManager.h,,) lofar_INTERNAL(CEP/CEPFrame,CEPFrame,,1,CEPFrame/DataManager.h,,) lofar_INTERNAL(Appl/CEP/CS1/CS1_Interface,CS1_Interface,,1,CS1_Interface/CS1_Config.h,,) lofar_INTERNAL(Appl/ApplCommon,ApplCommon,,1,ApplCommon/Observation.h,,) -lofar_EXTERNAL(bglpersonality,1,bglpersonality.h,"",/bgl/BlueLight/ppcfloor/bglsys) +lofar_EXTERNAL(bglpersonality,0,bglpersonality.h,"",/bgl/BlueLight/ppcfloor/bglsys) dnl lofar_EXTERNAL(zoid_api,0,zoid_api.h,"",/cephome/romein/projects/zoid/zoid) dnl lofar_EXTERNAL(zoid,0,lofar.h,"",/cephome/romein/projects/zoid/zoid/lofar) lofar_EXTERNAL(zoid_api,0,zoid_api.h,"") diff --git a/Appl/CEP/CS1/CS1_IONProc/src/AH_ION_Gather.cc b/Appl/CEP/CS1/CS1_IONProc/src/AH_ION_Gather.cc index 01b957351612c843130e470927be68c93c8e0711..e67770b0cacda8fb55dbe82fb4fbfdb6b7d95a74 100644 --- a/Appl/CEP/CS1/CS1_IONProc/src/AH_ION_Gather.cc +++ b/Appl/CEP/CS1/CS1_IONProc/src/AH_ION_Gather.cc @@ -34,11 +34,12 @@ namespace LOFAR { namespace CS1 { -AH_ION_Gather::AH_ION_Gather() +AH_ION_Gather::AH_ION_Gather(const std::vector<TransportHolder *> &clientTHs) : itsCS1PS(0), itsWH(0), - itsVisibilitiesStub(0) + itsVisibilitiesStub(0), + itsClientTHs(clientTHs) { } @@ -53,9 +54,13 @@ void AH_ION_Gather::define(const KeyValueMap&) { itsCS1PS = new CS1_Parset(&itsParamSet); +#if defined HAVE_BGLPERSONALITY unsigned myPsetNumber = getBGLpersonality()->getPsetNum(); +#else + unsigned myPsetNumber = 0; +#endif - itsWH = new WH_ION_Gather("ION_Gather", myPsetNumber, itsCS1PS); + itsWH = new WH_ION_Gather("ION_Gather", myPsetNumber, itsCS1PS, itsClientTHs); itsWH->runOnNode(0); DataManager *dm = new DataManager(itsWH->getDataManager()); diff --git a/Appl/CEP/CS1/CS1_IONProc/src/AH_ION_Gather.h b/Appl/CEP/CS1/CS1_IONProc/src/AH_ION_Gather.h index d8b1da409c40bc38860d646e605f853e0e2e746d..96c6259bf263d40fd42c636f8527620c021538c1 100644 --- a/Appl/CEP/CS1/CS1_IONProc/src/AH_ION_Gather.h +++ b/Appl/CEP/CS1/CS1_IONProc/src/AH_ION_Gather.h @@ -23,6 +23,7 @@ #include <CEPFrame/ApplicationHolder.h> #include <CS1_Interface/Stub_BGL.h> +#include <Transport/TransportHolder.h> #include <WH_ION_Gather.h> @@ -32,7 +33,7 @@ namespace CS1 { class AH_ION_Gather : public ApplicationHolder { public: - AH_ION_Gather(); + AH_ION_Gather(const std::vector<TransportHolder *> &clientTHs); virtual ~AH_ION_Gather(); virtual void undefine(); virtual void define(const KeyValueMap&); @@ -44,6 +45,8 @@ class AH_ION_Gather : public ApplicationHolder CS1_Parset *itsCS1PS; WH_ION_Gather *itsWH; Stub_BGL *itsVisibilitiesStub; + + const std::vector<TransportHolder *> &itsClientTHs; }; } // namespace CS1 diff --git a/Appl/CEP/CS1/CS1_IONProc/src/BGL_Personality.cc b/Appl/CEP/CS1/CS1_IONProc/src/BGL_Personality.cc index ffd77b51ba919dea95b42b9152ab5841caeac3a3..39bae7b6e9785b098e38951962aec2abb12969cc 100644 --- a/Appl/CEP/CS1/CS1_IONProc/src/BGL_Personality.cc +++ b/Appl/CEP/CS1/CS1_IONProc/src/BGL_Personality.cc @@ -21,6 +21,8 @@ //# Always #include <lofar_config.h> first! #include <lofar_config.h> +#if defined HAVE_BGLPERSONALITY + #include <BGL_Personality.h> #include <cstdio> @@ -72,3 +74,5 @@ struct BGLPersonality *getBGLpersonality() } } + +#endif diff --git a/Appl/CEP/CS1/CS1_IONProc/src/BGL_Personality.h b/Appl/CEP/CS1/CS1_IONProc/src/BGL_Personality.h index 0c9bcc6c3e68721d5e6bf3e56cdb812c1eb735f7..e98f7c4a280d4ad83805cd061d4695fcbd7276cd 100644 --- a/Appl/CEP/CS1/CS1_IONProc/src/BGL_Personality.h +++ b/Appl/CEP/CS1/CS1_IONProc/src/BGL_Personality.h @@ -23,6 +23,7 @@ #ifndef LOFAR_CS1_ION_PROC_BGL_PERSONALITY_H #define LOFAR_CS1_ION_PROC_BGL_PERSONALITY_H +#if defined HAVE_BGLPERSONALITY #include <bglpersonality.h> namespace LOFAR { @@ -34,3 +35,4 @@ struct BGLPersonality *getBGLpersonality(); } // end namespace LOFAR #endif +#endif diff --git a/Appl/CEP/CS1/CS1_IONProc/src/CS1_ION_main.cc b/Appl/CEP/CS1/CS1_IONProc/src/CS1_ION_main.cc index 573248e65590e3f624ba045ce8e09ebfcb4eeb6e..17f89cda4d7c3e6eb51e00e6a38b9414f163c173 100644 --- a/Appl/CEP/CS1/CS1_IONProc/src/CS1_ION_main.cc +++ b/Appl/CEP/CS1/CS1_IONProc/src/CS1_ION_main.cc @@ -29,6 +29,7 @@ #include <InputSection.h> #include <AH_ION_Gather.h> #include <BGL_Personality.h> +#include <Transport/TH_Null.h> #include <TH_ZoidServer.h> #include <Package__Version.h> @@ -40,9 +41,11 @@ #include <cstdlib> #include <cstring> +#if defined HAVE_ZOID extern "C" { #include <lofar.h> } +#endif using namespace LOFAR; @@ -52,14 +55,18 @@ using namespace LOFAR::CS1; static char **global_argv; static unsigned nrCoresPerPset; static unsigned nrInputSectionRuns; +static std::vector<TransportHolder *> clientTHs; static void checkParset(const CS1_Parset &parset) { +#if defined HAVE_BGLPERSONALITY BGLPersonality *personality = getBGLpersonality(); if (parset.nrPsets() > personality->numPsets()) std::cerr << "needs " << parset.nrPsets() << " psets; only " << personality->numPsets() << " available" << std::endl; +#endif + #if 0 if (parset.sizeBeamletAndSubbandList); std::cerr << "size of the beamletlist must be equal to the size of subbandlist." << std::endl; @@ -69,6 +76,26 @@ static void checkParset(const CS1_Parset &parset) } +void createClientTHs(unsigned nrClients) +{ + clientTHs.resize(nrClients); + + for (unsigned core = 0; core < nrClients; core ++) +#if defined HAVE_ZOID + clientTHs[core] = new TH_ZoidServer(core); +#else + clientTHs[core] = new TH_Null; +#endif +} + + +void deleteClientTHs() +{ + for (unsigned core = 0; core < clientTHs.size(); core ++) + delete clientTHs[core]; +} + + static void configureCNs(const CS1_Parset &parset) { BGL_Command command(BGL_Command::PREPROCESS); @@ -90,8 +117,8 @@ static void configureCNs(const CS1_Parset &parset) for (unsigned core = 0; core < parset.nrCoresPerPset(); core ++) { std::clog << "configure core " << core << std::endl; - command.write(TH_ZoidServer::theirTHs[core]); - configuration.write(TH_ZoidServer::theirTHs[core]); + command.write(clientTHs[core]); + configuration.write(clientTHs[core]); } } @@ -102,7 +129,7 @@ static void unconfigureCNs(CS1_Parset &parset) for (unsigned core = 0; core < parset.nrCoresPerPset(); core ++) { std::clog << "unconfigure core " << core << std::endl; - command.write(TH_ZoidServer::theirTHs[core]); + command.write(clientTHs[core]); } } @@ -113,7 +140,7 @@ static void stopCNs() for (unsigned core = 0; core < nrCoresPerPset; core ++) { std::clog << "stopping core " << core << std::endl; - command.write(TH_ZoidServer::theirTHs[core]); + command.write(clientTHs[core]); } } @@ -123,9 +150,9 @@ void *input_thread(void *parset) std::clog << "starting input thread" << std::endl; try { - InputSection inputSection(static_cast<CS1_Parset *>(parset)); + InputSection inputSection(clientTHs); - inputSection.preprocess(); + inputSection.preprocess(static_cast<CS1_Parset *>(parset)); for (unsigned run = 0; run < nrInputSectionRuns; run ++) inputSection.process(); @@ -149,7 +176,7 @@ void *gather_thread(void *argv) std::clog << "starting gather thread, nrRuns = " << ((char **) argv)[2] << std::endl; try { - AH_ION_Gather myAH; + AH_ION_Gather myAH(clientTHs); ApplicationHolderController myAHController(myAH, 1); //listen to ACC every 1 runs ACC::PLC::ACCmain(3, (char **) argv, &myAHController); } catch (Exception &ex) { @@ -181,9 +208,19 @@ void *master_thread(void *) cs1_parset.adoptFile("OLAP.parset"); checkParset(cs1_parset); + +#if !defined HAVE_ZOID + nrCoresPerPset = cs1_parset.nrCoresPerPset(); + createClientTHs(nrCoresPerPset); +#endif + configureCNs(cs1_parset); +#if defined HAVE_BGLPERSONALITY unsigned myPsetNumber = getBGLpersonality()->getPsetNum(); +#else + unsigned myPsetNumber = 0; +#endif if (cs1_parset.inputPsetIndex(myPsetNumber) >= 0) { #if 0 @@ -236,7 +273,7 @@ void *master_thread(void *) for (unsigned run = 0; run < nrRuns; run ++) for (unsigned core = 0; core < nrCores; core ++) - command.write(TH_ZoidServer::theirTHs[BGL_Mapping::mapCoreOnPset(core, myPsetNumber)]); + command.write(clientTHs[BGL_Mapping::mapCoreOnPset(core, myPsetNumber)]); } if (input_thread_id != 0) { @@ -271,12 +308,14 @@ void *master_thread(void *) std::cerr << "could not detach master thread" << std::endl; } +#if defined HAVE_ZOID if (global_argv != 0) { for (char **arg = &global_argv[0]; *arg != 0; arg ++) free(*arg); delete [] global_argv; } +#endif std::clog << "master thread finishes" << std::endl; return 0; @@ -292,6 +331,7 @@ extern "C" inline static void redirect_output() { +#if defined HAVE_BGLPERSONALITY int fd; char file_name[64], block_id[17]; @@ -302,16 +342,19 @@ inline static void redirect_output() if ((fd = open(file_name, O_CREAT | O_TRUNC | O_RDWR, 0666)) < 0 || dup2(fd, 1) < 0 || dup2(fd, 2) < 0) perror("redirecting stdout/stderr"); +#endif } +#if defined HAVE_ZOID + void lofar__init(int nrComputeCores) { nrCoresPerPset = nrComputeCores; redirect_output(); std::clog << "begin of lofar__init" << std::endl; - TH_ZoidServer::createAllTH_ZoidServers(nrComputeCores); + createClientTHs(nrComputeCores); global_argv = 0; } @@ -336,8 +379,10 @@ void lofar_init(char **argv /* in:arr2d:size=+1 */, global_argv[argc] = 0; // terminating zero pointer - if (argc != 3) - std::cerr << "unexpected number of arguments, expect trouble!" << std::endl; + if (argc != 3) { + std::cerr << "unexpected number of arguments" << std::endl; + exit(1); + } pthread_t master_thread_id; @@ -351,8 +396,24 @@ void lofar_init(char **argv /* in:arr2d:size=+1 */, void lofar__fini(void) { std::clog << "begin of lofar__fini" << std::endl; + deleteClientTHs(); + std::clog << "end of lofar__fini" << std::endl; +} + +#else - TH_ZoidServer::deleteAllTH_ZoidServers(); +int main(int argc, char **argv) +{ + global_argv = argv; - std::clog << "end of lofar__fini" << std::endl; + if (argc != 3) { + std::cerr << "unexpected number of arguments" << std::endl; + exit(1); + } + + master_thread(0); + deleteClientTHs(); + return 0; } + +#endif diff --git a/Appl/CEP/CS1/CS1_IONProc/src/ION_Allocator.cc b/Appl/CEP/CS1/CS1_IONProc/src/ION_Allocator.cc index 9e534d7fba260dcf7590b46e31956742fa4a353e..74a3871f9db07b8179acd22b2ff08a3dc4c9ba01 100644 --- a/Appl/CEP/CS1/CS1_IONProc/src/ION_Allocator.cc +++ b/Appl/CEP/CS1/CS1_IONProc/src/ION_Allocator.cc @@ -42,16 +42,22 @@ namespace LOFAR { namespace CS1 { #if !defined USE_ZOID_ALLOCATOR +#if defined HAVE_ZOID FixedArena ION_Allocator::arena((void *) 0xA4002400, 0xBFFDC00); SparseSetAllocator ION_Allocator::allocator(ION_Allocator::arena); +#else +HeapAllocator ION_Allocator::allocator; +#endif #endif + ION_Allocator *ION_Allocator::clone() const { return new ION_Allocator(); } -void *ION_Allocator::allocate(size_t nbytes, size_t alignment) + +void *ION_Allocator::allocate(size_t nbytes, unsigned alignment) { #if defined USE_ZOID_ALLOCATOR void *ptr = __zoid_alloc(nbytes); @@ -69,6 +75,7 @@ void *ION_Allocator::allocate(size_t nbytes, size_t alignment) return ptr; } + void ION_Allocator::deallocate(void *ptr) { std::clog << "ION_Allocator::deallocate(" << ptr << ")" << std::endl; diff --git a/Appl/CEP/CS1/CS1_IONProc/src/ION_Allocator.h b/Appl/CEP/CS1/CS1_IONProc/src/ION_Allocator.h index c6b77b85bef3051a25399552e99a62931a8d33cf..0646d1fe2442eeeb47b83d978bc504c23a45ff1c 100644 --- a/Appl/CEP/CS1/CS1_IONProc/src/ION_Allocator.h +++ b/Appl/CEP/CS1/CS1_IONProc/src/ION_Allocator.h @@ -26,8 +26,11 @@ #include <Common/Allocator.h> #include <CS1_Interface/Allocator.h> +#include <Common/Allocator.h> +#if defined HAVE_ZOID #define USE_ZOID_ALLOCATOR +#endif namespace LOFAR { @@ -36,14 +39,20 @@ namespace CS1 { class ION_Allocator: public Allocator { public: - virtual void *allocate(size_t nbytes, size_t alignment = 1); + virtual void *allocate(size_t nbytes, unsigned alignment = 1); virtual void deallocate(void *); virtual ION_Allocator *clone() const; private: +#if !defined USE_ZOID_ALLOCATOR +#if defined HAVE_ZOID static FixedArena arena; static SparseSetAllocator allocator; +#else + static HeapAllocator allocator; +#endif +#endif }; } // end namespace CS1 diff --git a/Appl/CEP/CS1/CS1_IONProc/src/InputSection.cc b/Appl/CEP/CS1/CS1_IONProc/src/InputSection.cc index 71bca38f954ea3a64c08b794a2ee90131d2ac13d..5375525a31860b51799b9f9d72055a3b31f4a011 100644 --- a/Appl/CEP/CS1/CS1_IONProc/src/InputSection.cc +++ b/Appl/CEP/CS1/CS1_IONProc/src/InputSection.cc @@ -57,30 +57,19 @@ static TransportHolder *rawDataTH; #endif -InputSection::InputSection(const CS1_Parset *ps) +InputSection::InputSection(const std::vector<TransportHolder *> &clientTHs) : itsInputThread(0), - // itsCS1PS(new CS1_Parset(ps)), - itsCS1PS(ps), + itsInputTH(0), + itsClientTHs(clientTHs), itsBBuffer(0), itsDelayComp(0), - itsSampleRate(ps->sampleRate()), itsDelayTimer("delay") { - TimeStamp::setMaxBlockId(ps->sampleRate()); - - itsStationNr = ps->inputPsetIndex(getBGLpersonality()->getPsetNum()); - std::string stationName = ps->stationName(itsStationNr); - std::clog << "station " << itsStationNr << " = " << stationName << std::endl; - - itsInputTH = Connector::readTH(ps, stationName); - itsNSubbandsPerPset = ps->nrSubbandsPerPset(); - itsNSamplesPerSec = ps->nrSubbandSamples(); - -#if defined DUMP_RAW_DATA - itsNHistorySamples = 0; +#if defined HAVE_BGLPERSONALITY + itsPsetNumber = getBGLpersonality()->getPsetNum(); #else - itsNHistorySamples = ps->nrHistorySamples(); + itsPsetNumber = 0; #endif } @@ -106,49 +95,61 @@ void InputSection::startThread() args.nSubbandsPerFrame = itsCS1PS->nrSubbandsPerFrame(); args.frameSize = args.frameHeaderSize + args.nSubbandsPerFrame * args.nTimesPerFrame * sizeof(Beamlet); -#if 0 - if (itsInputTH->getType() == "TH_File" || itsInputTH->getType() == "TH_Null") { - // if we are reading from file, overwriting the buffer should not be allowed - // this way we can work with smaller files - itsBBuffer->setAllowOverwrite(false); - } -#endif - itsInputThread = new InputThread(args); } -void InputSection::preprocess() +void InputSection::preprocess(const CS1_Parset *ps) { + itsCS1PS = ps; + itsSampleRate = ps->sampleRate(); + TimeStamp::setMaxBlockId(itsSampleRate); + itsStationNr = ps->inputPsetIndex(itsPsetNumber); + + std::string stationName = ps->stationName(itsStationNr); + std::clog << "station " << itsStationNr << " = " << stationName << std::endl; + + itsInputTH = Connector::readTH(ps, stationName); + itsNSubbandsPerPset = ps->nrSubbandsPerPset(); + itsNSamplesPerSec = ps->nrSubbandSamples(); + +#if defined DUMP_RAW_DATA + itsNHistorySamples = 0; +#else + itsNHistorySamples = ps->nrHistorySamples(); +#endif + itsCurrentComputeCore = 0; - itsNrCoresPerPset = itsCS1PS->nrCoresPerPset(); - itsPsetNumber = getBGLpersonality()->getPsetNum(); - itsSampleDuration = itsCS1PS->sampleDuration(); + itsNrCoresPerPset = ps->nrCoresPerPset(); + itsSampleDuration = ps->sampleDuration(); // create the buffer controller. - int subbandsToReadFromFrame = itsCS1PS->subbandsToReadFromFrame(); - ASSERTSTR(subbandsToReadFromFrame <= itsCS1PS->nrSubbandsPerFrame(), subbandsToReadFromFrame << " < " << itsCS1PS->nrSubbandsPerFrame()); - - itsDelayCompensation = itsCS1PS->getBool("OLAP.delayCompensation"); - itsBeamlet2beams = itsCS1PS->beamlet2beams(); - itsSubband2Index = itsCS1PS->subband2Index(); + int subbandsToReadFromFrame = ps->subbandsToReadFromFrame(); + std::clog << "nrSubbands = "<< ps->nrSubbands() << std::endl; + std::clog << "nrStations = "<< ps->nrStations() << std::endl; + std::clog << "nrRSPboards = "<< ps->nrRSPboards() << std::endl; + ASSERTSTR(subbandsToReadFromFrame <= ps->nrSubbandsPerFrame(), subbandsToReadFromFrame << " < " << ps->nrSubbandsPerFrame()); + + itsDelayCompensation = ps->getBool("OLAP.delayCompensation"); + itsBeamlet2beams = ps->beamlet2beams(); + itsSubband2Index = ps->subband2Index(); - itsNrCalcDelays = itsCS1PS->getUint32("OLAP.DelayComp.nrCalcDelays"); + itsNrCalcDelays = ps->getUint32("OLAP.DelayComp.nrCalcDelays"); - double startTime = itsCS1PS->startTime(); // UTC + double startTime = itsInputTH->getType() == "TH_Null" ? 0 : ps->startTime(); // UTC - int sampleFreq = (int) itsCS1PS->sampleRate(); + int sampleFreq = (int) ps->sampleRate(); int seconds = (int) floor(startTime); int samples = (int) ((startTime - floor(startTime)) * sampleFreq); itsSyncedStamp = TimeStamp(seconds, samples); if (itsDelayCompensation) { - itsDelaysAtBegin.resize(itsCS1PS->nrBeams()); - itsDelaysAfterEnd.resize(itsCS1PS->nrBeams()); - itsNrCalcDelaysForEachTimeNrDirections.resize(itsNrCalcDelays * itsCS1PS->nrBeams()); + itsDelaysAtBegin.resize(ps->nrBeams()); + itsDelaysAfterEnd.resize(ps->nrBeams()); + itsNrCalcDelaysForEachTimeNrDirections.resize(itsNrCalcDelays * ps->nrBeams()); - itsDelayComp = new WH_DelayCompensation(itsCS1PS, itsCS1PS->stationName(itsStationNr)); + itsDelayComp = new WH_DelayCompensation(ps, ps->stationName(itsStationNr)); std::vector<double> startTimes(itsNrCalcDelays); @@ -159,21 +160,21 @@ void InputSection::preprocess() itsCounter = 0; - for (unsigned beam = 0; beam < itsCS1PS->nrBeams(); beam ++) + for (unsigned beam = 0; beam < ps->nrBeams(); beam ++) itsDelaysAfterEnd[beam] = itsNrCalcDelaysForEachTimeNrDirections[beam]; itsDelaysAtBegin = itsDelaysAfterEnd; } - unsigned cyclicBufferSize = itsCS1PS->nrSamplesToBuffer(); - itsIsSynchronous = itsCS1PS->getString("OLAP.OLAP_Conn.station_Input_Transport") != "UDP"; - itsMaxNetworkDelay = itsCS1PS->maxNetworkDelay(); + unsigned cyclicBufferSize = ps->nrSamplesToBuffer(); + itsIsSynchronous = ps->getString("OLAP.OLAP_Conn.station_Input_Transport") != "UDP"; + itsMaxNetworkDelay = ps->maxNetworkDelay(); std::clog << "maxNetworkDelay = " << itsMaxNetworkDelay << std::endl; itsBBuffer = new BeamletBuffer(cyclicBufferSize, subbandsToReadFromFrame, itsNHistorySamples, itsIsSynchronous, itsMaxNetworkDelay); #if defined DUMP_RAW_DATA - vector<string> rawDataServers = itsCS1PS->getStringVector("OLAP.OLAP_Conn.rawDataServers"); - vector<string> rawDataPorts = itsCS1PS->getStringVector("OLAP.OLAP_Conn.rawDataPorts"); + vector<string> rawDataServers = ps->getStringVector("OLAP.OLAP_Conn.rawDataServers"); + vector<string> rawDataPorts = ps->getStringVector("OLAP.OLAP_Conn.rawDataPorts"); if (itsStationNr >= rawDataServers.size() || itsStationNr >= rawDataPorts.size()) { std::cerr << "too many stations for too few raw data servers/ports" << std::endl; @@ -372,7 +373,7 @@ void InputSection::process() for (unsigned subbandBase = 0; subbandBase < itsNSubbandsPerPset; subbandBase ++) { unsigned core = BGL_Mapping::mapCoreOnPset(itsCurrentComputeCore, itsPsetNumber); - TransportHolder *th = TH_ZoidServer::theirTHs[core]; + TransportHolder *th = itsClientTHs[core]; command.write(th); itsIONtoCNdata.write(th, itsCS1PS->nrBeams()); @@ -401,10 +402,11 @@ void InputSection::postprocess() std::clog << "InputSection::postprocess" << std::endl; delete itsInputThread; itsInputThread = 0; + delete itsInputTH; itsInputTH = 0; delete itsBBuffer; itsBBuffer = 0; delete itsDelayComp; itsDelayComp = 0; - itsDelayTimer.print(clog); + itsDelayTimer.print(std::clog); } } // namespace CS1 diff --git a/Appl/CEP/CS1/CS1_IONProc/src/InputSection.h b/Appl/CEP/CS1/CS1_IONProc/src/InputSection.h index 69c029e163158636425f63e63cb2265c158f1bfd..a3715749104f96d41a3a1aeb2800961a9f8305e4 100644 --- a/Appl/CEP/CS1/CS1_IONProc/src/InputSection.h +++ b/Appl/CEP/CS1/CS1_IONProc/src/InputSection.h @@ -29,10 +29,10 @@ //# Never #include <config.h> or #include <lofar_config.h> in a header file! //# Includes -#include <tinyCEP/WorkHolder.h> #include <CS1_Interface/CS1_Parset.h> #include <CS1_Interface/RSPTimeStamp.h> #include <CS1_Interface/ION_to_CN.h> +#include <Transport/TransportHolder.h> #include <BeamletBuffer.h> #include <WH_DelayCompensation.h> #include <InputThread.h> @@ -48,10 +48,10 @@ namespace CS1 { // and distributes it per subband to the Blue Gene/L class InputSection { public: - InputSection(const CS1_Parset *ps); + InputSection(const std::vector<TransportHolder *> &); ~InputSection(); - void preprocess(); + void preprocess(const CS1_Parset *ps); void process(); void postprocess(); @@ -68,6 +68,7 @@ class InputSection { InputThread *itsInputThread; TransportHolder *itsInputTH; + const std::vector<TransportHolder *> &itsClientTHs; unsigned itsStationNr; const CS1_Parset *itsCS1PS; @@ -93,7 +94,7 @@ class InputSection { BeamletBuffer *itsBBuffer; WH_DelayCompensation *itsDelayComp; - const double itsSampleRate; + double itsSampleRate; NSTimer itsDelayTimer; diff --git a/Appl/CEP/CS1/CS1_IONProc/src/InputThread.cc b/Appl/CEP/CS1/CS1_IONProc/src/InputThread.cc index 8daad650a60fbb8a9176d181b4d5f520f978c38f..2fd85130a0903b9226193c59db2200b64b34e932 100644 --- a/Appl/CEP/CS1/CS1_IONProc/src/InputThread.cc +++ b/Appl/CEP/CS1/CS1_IONProc/src/InputThread.cc @@ -38,7 +38,9 @@ namespace CS1 { volatile bool InputThread::theirShouldStop = false; volatile unsigned InputThread::nrPacketsReceived, InputThread::nrPacketsRejected; -InputThread::InputThread(const ThreadArgs &args) : itsArgs(args) +InputThread::InputThread(const ThreadArgs &args) +: + itsArgs(args) { std::clog << "InputThread::InputThread(...)" << std::endl; if (pthread_create(&thread, 0, mainLoopStub, this) != 0) { diff --git a/Appl/CEP/CS1/CS1_IONProc/src/Makefile.am b/Appl/CEP/CS1/CS1_IONProc/src/Makefile.am index afc09f3473b5db1e5ce8298d5d1fe5e3007ee573..5f08f06704905046946415dd11797ea4d1e2ce5f 100644 --- a/Appl/CEP/CS1/CS1_IONProc/src/Makefile.am +++ b/Appl/CEP/CS1/CS1_IONProc/src/Makefile.am @@ -1,46 +1,50 @@ -pkginclude_HEADERS = Package__Version.h - -lib_LTLIBRARIES = liblofar_impl.la - -liblofar_impl_la_SOURCES = $(DOCHDRS) \ +pkginclude_HEADERS = Package__Version.h \ BeamletBuffer.h \ -BeamletBuffer.cc \ BGL_Personality.h \ -BGL_Personality.cc \ -CS1_ION_main.h \ -CS1_ION_main.cc \ Connector.h \ -Connector.cc \ InputThread.h \ -InputThread.cc \ ION_Allocator.h \ -ION_Allocator.cc \ LockedRanges.h \ ReaderWriterSynchronization.h \ -ReaderWriterSynchronization.cc \ SlidingPointer.h \ WallClockTime.h \ InputSection.h \ -InputSection.cc \ WH_DelayCompensation.h \ -WH_DelayCompensation.cc \ AH_ION_Gather.h \ -AH_ION_Gather.cc \ WH_ION_Gather.h \ +TH_ZoidServer.h + +liblofar_impl_la_SOURCES = $(pkginclude_HEADERS) \ +BeamletBuffer.cc \ +BGL_Personality.cc \ +CS1_ION_main.cc \ +Connector.cc \ +InputThread.cc \ +ION_Allocator.cc \ +ReaderWriterSynchronization.cc \ +InputSection.cc \ +WH_DelayCompensation.cc \ +AH_ION_Gather.cc \ WH_ION_Gather.cc \ -TH_ZoidServer.h \ TH_ZoidServer.cc \ -Package__Version.h \ Package__Version.cc +#if HAVE_ZOID +lib_LTLIBRARIES = liblofar_impl.la +bin_PROGRAMS = versioncs1_ionproc + install-exec-local: ln -sf .libs/liblofar_impl.so.0.0.0 lofar_impl.so cp lofar_impl.so /bgl/lofar-utils/zoid-test/lib uninstall-local: rm lofar_impl.so +#else +#IONProc_SOURCES = $(liblofar_impl_la_SOURCES) +#IONProc_DEPENDENCIES = $(LOFAR_DEPEND) -bin_PROGRAMS = versioncs1_ionproc +#bin_PROGRAMS = versioncs1_ionproc IONProc +#endif versioncs1_ionproc_SOURCES = versioncs1_ionproc.cc Package__Version.cc versioncs1_ionproc_DEPENDENCIES = $(LOFAR_DEPEND) diff --git a/Appl/CEP/CS1/CS1_IONProc/src/TH_ZoidServer.cc b/Appl/CEP/CS1/CS1_IONProc/src/TH_ZoidServer.cc index fcb7ecefbd441b7f2828669cfa5d180de9986086..3a624cb9d9dc5b514db75ee7eb5b09b2e1960bf0 100644 --- a/Appl/CEP/CS1/CS1_IONProc/src/TH_ZoidServer.cc +++ b/Appl/CEP/CS1/CS1_IONProc/src/TH_ZoidServer.cc @@ -23,10 +23,15 @@ //# Always #include <lofar_config.h> first! #include <lofar_config.h> +#if defined HAVE_ZOID + #include <Common/Timer.h> #include <Transport/DataHolder.h> #include <TH_ZoidServer.h> +#include <algorithm> + + namespace LOFAR { namespace CS1 { @@ -37,6 +42,7 @@ extern "C" } +//std::vector<TH_ZoidServer *> TH_ZoidServer::theirTHs; std::vector<TH_ZoidServer *> TH_ZoidServer::theirTHs; @@ -164,6 +170,11 @@ TH_ZoidServer::TH_ZoidServer(unsigned core) bytesToSend(0), bytesToReceive(0) { + if (theirTHs.size() <= core) + theirTHs.resize(core + 1); + + theirTHs[core] = this; + pthread_cond_init(&newSendDataAvailable, 0); pthread_cond_init(&newReceiveBufferAvailable, 0); pthread_cond_init(&dataSent, 0); @@ -175,6 +186,8 @@ TH_ZoidServer::TH_ZoidServer(unsigned core) TH_ZoidServer::~TH_ZoidServer() { + *std::find(theirTHs.begin(), theirTHs.end(), this) = 0; + pthread_cond_destroy(&newSendDataAvailable); pthread_cond_destroy(&newReceiveBufferAvailable); pthread_cond_destroy(&dataSent); @@ -184,6 +197,7 @@ TH_ZoidServer::~TH_ZoidServer() } +#if 0 void TH_ZoidServer::createAllTH_ZoidServers(unsigned nrCoresPerPset) { theirTHs.resize(nrCoresPerPset); @@ -200,6 +214,7 @@ void TH_ZoidServer::deleteAllTH_ZoidServers() theirTHs.clear(); } +#endif bool TH_ZoidServer::init() @@ -290,3 +305,5 @@ void TH_ZoidServer::reset() } // namespace CS1 } // namespace LOFAR + +#endif diff --git a/Appl/CEP/CS1/CS1_IONProc/src/TH_ZoidServer.h b/Appl/CEP/CS1/CS1_IONProc/src/TH_ZoidServer.h index 4c5226262f792f6def8d76bcfa5f222584fcbf76..fac9fd3f85655a6305cad082f1996e65e1ccf8bf 100644 --- a/Appl/CEP/CS1/CS1_IONProc/src/TH_ZoidServer.h +++ b/Appl/CEP/CS1/CS1_IONProc/src/TH_ZoidServer.h @@ -23,6 +23,8 @@ #ifndef LOFAR_TRANSPORTTH_ZOID_SERVER_H #define LOFAR_TRANSPORTTH_ZOID_SERVER_H +#if defined HAVE_ZOID + // \file // TransportHolder that does nothing @@ -39,8 +41,10 @@ namespace CS1 { class TH_ZoidServer : public TransportHolder { public: +#if 0 static void createAllTH_ZoidServers(unsigned nrCoresPerPset); static void deleteAllTH_ZoidServers(); +#endif virtual bool init(); @@ -61,7 +65,7 @@ class TH_ZoidServer : public TransportHolder virtual TransportHolder* clone() const; virtual void reset(); - private: + //private: // create via createAllTH_ZoidServers(...) TH_ZoidServer(unsigned core); virtual ~TH_ZoidServer(); @@ -86,3 +90,4 @@ class TH_ZoidServer : public TransportHolder } // namespace LOFAR #endif +#endif diff --git a/Appl/CEP/CS1/CS1_IONProc/src/WH_ION_Gather.cc b/Appl/CEP/CS1/CS1_IONProc/src/WH_ION_Gather.cc index 4086ad91cbb350f5dc0d7fb6286830b823e8875d..38ff005266be3b3c30d78745746d2c9d931f3544 100644 --- a/Appl/CEP/CS1/CS1_IONProc/src/WH_ION_Gather.cc +++ b/Appl/CEP/CS1/CS1_IONProc/src/WH_ION_Gather.cc @@ -39,11 +39,12 @@ namespace LOFAR { namespace CS1 { -WH_ION_Gather::WH_ION_Gather(const string &name, unsigned psetNumber, const CS1_Parset *ps) +WH_ION_Gather::WH_ION_Gather(const string &name, unsigned psetNumber, const CS1_Parset *ps, const std::vector<TransportHolder *> &clientTHs) : WorkHolder(0, 1, name, "WH_ION_Gather"), itsPsetNumber(psetNumber), - itsPS(ps) + itsPS(ps), + itsClientTHs(clientTHs) { itsTmpDH = 0; itsNrComputeCores = ps->nrCoresPerPset(); @@ -83,7 +84,7 @@ WorkHolder* WH_ION_Gather::construct(const string &name, const ACC::APS::Paramet WH_ION_Gather* WH_ION_Gather::make(const string &name) { - return new WH_ION_Gather(name, itsPsetNumber, itsPS); + return new WH_ION_Gather(name, itsPsetNumber, itsPS, itsClientTHs); } @@ -133,9 +134,9 @@ void WH_ION_Gather::process() DH_Visibilities *dh = lastTime ? dynamic_cast<DH_Visibilities *>(getDataManager().getOutHolder(0)) : firstTime ? itsSumDHs[itsCurrentSubband] : itsTmpDH; unsigned channel = BGL_Mapping::mapCoreOnPset(itsCurrentComputeCore, itsPsetNumber); - //TH_ZoidServer::theirTHs[channel]->recvBlocking(dh->getDataPtr(), (dh->getDataSize() + 31) & ~31, 0, 0, dh); - TH_ZoidServer::theirTHs[channel]->recvBlocking(dh->getVisibilities().origin(), dh->getVisibilities().num_elements() * sizeof(fcomplex), 0, 0, 0); - TH_ZoidServer::theirTHs[channel]->recvBlocking(dh->getNrValidSamples().origin(), dh->getNrValidSamples().num_elements() * sizeof(unsigned short), 0, 0, 0); + //itsClientTHs[channel]->recvBlocking(dh->getDataPtr(), (dh->getDataSize() + 31) & ~31, 0, 0, dh); + itsClientTHs[channel]->recvBlocking(dh->getVisibilities().origin(), dh->getVisibilities().num_elements() * sizeof(fcomplex), 0, 0, 0); + itsClientTHs[channel]->recvBlocking(dh->getNrValidSamples().origin(), dh->getNrValidSamples().num_elements() * sizeof(unsigned short), 0, 0, 0); if (!firstTime) if (lastTime) diff --git a/Appl/CEP/CS1/CS1_IONProc/src/WH_ION_Gather.h b/Appl/CEP/CS1/CS1_IONProc/src/WH_ION_Gather.h index e6b3960b46449dd13f4696a645fd7f68a7717d10..8827207ae2dc86adfbf7b1071100242c43e5c860 100644 --- a/Appl/CEP/CS1/CS1_IONProc/src/WH_ION_Gather.h +++ b/Appl/CEP/CS1/CS1_IONProc/src/WH_ION_Gather.h @@ -23,6 +23,7 @@ #include <CS1_Interface/DH_Visibilities.h> #include <tinyCEP/WorkHolder.h> +#include <Transport/TransportHolder.h> #include <APS/ParameterSet.h> #include <vector> @@ -34,7 +35,7 @@ namespace CS1 { class WH_ION_Gather : public WorkHolder { public: - explicit WH_ION_Gather(const string &name, unsigned psetNumber, const CS1_Parset *ps); + WH_ION_Gather(const string &name, unsigned psetNumber, const CS1_Parset *ps, const std::vector<TransportHolder *> &clientTHs); virtual ~WH_ION_Gather(); //static WorkHolder *construct(const string &name, const ACC::APS::ParameterSet &); @@ -59,6 +60,7 @@ class WH_ION_Gather : public WorkHolder unsigned itsNrIntegrationSteps, itsCurrentIntegrationStep; const CS1_Parset *itsPS; + const std::vector<TransportHolder *> &itsClientTHs; }; } diff --git a/Appl/CEP/CS1/CS1_IONProc/src/WallClockTime.h b/Appl/CEP/CS1/CS1_IONProc/src/WallClockTime.h index 66dd09a6d46f9dc9cae135d42fb5f392f3ac7c19..8de87cb28aa90a38edd1c32d02ccb338ed20491a 100644 --- a/Appl/CEP/CS1/CS1_IONProc/src/WallClockTime.h +++ b/Appl/CEP/CS1/CS1_IONProc/src/WallClockTime.h @@ -26,6 +26,7 @@ #include <CS1_Interface/RSPTimeStamp.h> #include <pthread.h> +#include <errno.h> namespace LOFAR {