Skip to content
Snippets Groups Projects
Commit e146811b authored by John Romein's avatar John Romein
Browse files

bug 225:

* Use separate Input/OutputThread per Stream.
* Use one stream per subband (no more multiplexing).
* Buffering is done per stream.
parent f71303f8
Branches
Tags
No related merge requests found
......@@ -9,6 +9,7 @@ InputThreadAsm.h \
ION_Allocator.h \
LockedRanges.h \
LogThread.h \
OutputThread.h \
OutputSection.h \
ReaderWriterSynchronization.h \
RSP.h \
......@@ -26,6 +27,7 @@ InputThread.cc \
InputThreadAsm.S \
ION_Allocator.cc \
LogThread.cc \
OutputThread.cc \
OutputSection.cc \
ReaderWriterSynchronization.cc \
WH_DelayCompensation.cc \
......
......@@ -56,69 +56,42 @@ OutputSection::OutputSection(unsigned psetNumber, const std::vector<Stream *> &s
}
OutputSection::~OutputSection()
{
}
void *OutputSection::sendThreadStub(void *arg)
{
std::clog << "sendThread started" << std::endl;
try {
static_cast<OutputSection *>(arg)->sendThread();
} catch (std::exception &ex) {
std::cerr << "Caught std::exception: " << ex.what() << std::endl;
} catch (...) {
std::cerr << "Caught non-std::exception" << std::endl;
}
std::clog << "sendThread finished" << std::endl;
return 0;
}
void OutputSection::sendThread()
{
CorrelatedData *data;
while ((data = itsSendQueue.remove()) != 0) {
data->write(itsStreamToStorage);
itsFreeQueue.append(data);
}
}
void OutputSection::connectToStorage(const Parset *ps)
{
unsigned myPsetIndex = ps->outputPsetIndex(itsPsetNumber);
unsigned nrPsetsPerStorage = ps->nrPsetsPerStorage();
unsigned storageHostIndex = myPsetIndex / nrPsetsPerStorage;
unsigned storagePortIndex = myPsetIndex % nrPsetsPerStorage;
//unsigned storagePortIndex = myPsetIndex % nrPsetsPerStorage;
string prefix = "OLAP.OLAP_Conn.IONProc_Storage";
string connectionType = ps->getString(prefix + "_Transport");
for (unsigned subband = 0; subband < itsNrSubbandsPerPset; subband ++) {
unsigned subbandNumber = myPsetIndex * itsNrSubbandsPerPset + subband;
if (connectionType == "NULL") {
std::clog << "output section discards data to null:" << std::endl;
itsStreamToStorage = new NullStream;
std::clog << "subband " << subbandNumber << " written to null:" << std::endl;
itsStreamsToStorage.push_back(new NullStream);
} else if (connectionType == "TCP") {
std::string server = ps->getStringVector(prefix + "_ServerHosts")[storageHostIndex];
unsigned short port = boost::lexical_cast<unsigned short>(ps->getPortsOf(prefix)[storagePortIndex]);
//unsigned short port = boost::lexical_cast<unsigned short>(ps->getPortsOf(prefix)[storagePortIndex]);
unsigned short port = boost::lexical_cast<unsigned short>(ps->getPortsOf(prefix)[subbandNumber]);
std::clog << "output section connects to tcp:" << server << ':' << port << std::endl;
itsStreamToStorage = new SocketStream(server.c_str(), port, SocketStream::TCP, SocketStream::Client);
std::clog << "subband " << subbandNumber << " written to tcp:" << server << ':' << port << std::endl;
itsStreamsToStorage.push_back(new SocketStream(server.c_str(), port, SocketStream::TCP, SocketStream::Client));
} else if (connectionType == "FILE") {
std::string filename = ps->getString(prefix + "_BaseFileName") + '.' +
boost::lexical_cast<std::string>(storageHostIndex) + '.' +
boost::lexical_cast<std::string>(storagePortIndex);
boost::lexical_cast<std::string>(subbandNumber);
//boost::lexical_cast<std::string>(storagePortIndex);
std::clog << "output section write to file:" << filename << std::endl;
itsStreamToStorage = new FileStream(filename.c_str(), 0666);
std::clog << "subband " << subbandNumber << " written to file:" << filename << std::endl;
itsStreamsToStorage.push_back(new FileStream(filename.c_str(), 0666));
} else {
THROW(IONProcException, "unsupported ION->Storage stream type");
}
}
}
void OutputSection::preprocess(const Parset *ps)
......@@ -139,11 +112,8 @@ void OutputSection::preprocess(const Parset *ps)
for (unsigned subband = 0; subband < itsNrSubbandsPerPset; subband ++)
itsVisibilitySums.push_back(new CorrelatedData(nrBaselines, nrChannels, hugeMemoryAllocator));
for (unsigned i = 0; i < maxSendQueueSize; i ++)
itsFreeQueue.append(new CorrelatedData(nrBaselines, nrChannels, hugeMemoryAllocator));
if (pthread_create(&itsSendThread, 0, sendThreadStub, this) != 0)
THROW(IONProcException, "could not create send thread");
for (unsigned subband = 0; subband < itsNrSubbandsPerPset; subband ++)
itsOutputThreads.push_back(new OutputThread(itsStreamsToStorage[subband], nrBaselines, nrChannels));
}
......@@ -155,25 +125,22 @@ void OutputSection::process()
for (unsigned subband = 0; subband < itsNrSubbandsPerPset; subband ++) {
// TODO: make sure that there are more free buffers than subbandsPerPset
CorrelatedData *data = lastTime ? itsFreeQueue.remove() : firstTime ? itsVisibilitySums[subband] : itsTmpSum;
unsigned inputChannel = CN_Mapping::mapCoreOnPset(itsCurrentComputeCore, itsPsetNumber);
unsigned channel = CN_Mapping::mapCoreOnPset(itsCurrentComputeCore, itsPsetNumber);
if (lastTime) {
CorrelatedData *data = itsOutputThreads[subband]->itsFreeQueue.remove();
data->read(itsStreamsFromCNs[channel]);
data->read(itsStreamsFromCNs[inputChannel]);
if (!firstTime)
if (lastTime)
*data += *itsVisibilitySums[subband];
else
*itsVisibilitySums[subband] += *data;
if (lastTime) {
#if 0
for (unsigned ch = 1; ch < 256; ch ++)
std::clog << std::setprecision(7) << ch << ' ' << abs(data->visibilities[0][ch][0][0]) << std::endl;
#endif
itsSendQueue.append(data);
itsOutputThreads[subband]->itsSendQueue.append(data);
} else if (firstTime) {
itsVisibilitySums[subband]->read(itsStreamsFromCNs[inputChannel]);
} else {
itsTmpSum->read(itsStreamsFromCNs[inputChannel]);
*itsVisibilitySums[subband] += *itsTmpSum;
}
if (++ itsCurrentComputeCore == itsNrComputeCores)
......@@ -187,24 +154,19 @@ void OutputSection::process()
void OutputSection::postprocess()
{
itsSendQueue.append(0); // 0 indicates that no more messages will be sent
if (pthread_join(itsSendThread, 0) != 0)
THROW(IONProcException, "could not join send thread");
delete itsStreamToStorage; // closes stream, stopping the Storage section
itsStreamToStorage = 0;
for (unsigned subband = 0; subband < itsVisibilitySums.size(); subband ++)
for (unsigned subband = 0; subband < itsNrSubbandsPerPset; subband ++) {
itsOutputThreads[subband]->itsSendQueue.append(0); // 0 indicates that no more messages will be sent
delete itsOutputThreads[subband];
delete itsVisibilitySums[subband];
delete itsStreamsToStorage[subband];
}
itsVisibilitySums.resize(0);
itsOutputThreads.clear();
itsVisibilitySums.clear();
itsStreamsToStorage.clear();
delete itsTmpSum;
itsTmpSum = 0;
for (unsigned i = 0; i < maxSendQueueSize; i ++)
delete itsFreeQueue.remove();
}
}
......
......@@ -23,11 +23,9 @@
#include <Interface/CorrelatedData.h>
#include <Interface/Parset.h>
#include <Interface/Queue.h>
#include <IONProc/OutputThread.h>
#include <Stream/Stream.h>
#include <pthread.h>
#include <vector>
......@@ -38,31 +36,24 @@ class OutputSection
{
public:
OutputSection(unsigned psetNumber, const std::vector<Stream *> &streamsFromCNs);
~OutputSection();
void preprocess(const Parset *);
void process();
void postprocess();
private:
static void *sendThreadStub(void *);
void sendThread();
void connectToStorage(const Parset *);
static const unsigned maxSendQueueSize = 3;
std::vector<CorrelatedData *> itsVisibilitySums;
CorrelatedData *itsTmpSum;
Queue<CorrelatedData *> itsFreeQueue, itsSendQueue;
std::vector<OutputThread *> itsOutputThreads;
unsigned itsPsetNumber, itsNrComputeCores, itsCurrentComputeCore;
unsigned itsNrSubbandsPerPset;
unsigned itsNrIntegrationSteps, itsCurrentIntegrationStep;
const std::vector<Stream *> &itsStreamsFromCNs;
Stream *itsStreamToStorage;
pthread_t itsSendThread;
std::vector<Stream *> itsStreamsToStorage;
};
}
......
//# OutputThread.cc:
//#
//# Copyright (C) 2008
//# ASTRON (Netherlands Foundation for Research in Astronomy)
//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
//#
//# This program is free software; you can redistribute it and/or modify
//# it under the terms of the GNU General Public License as published by
//# the Free Software Foundation; either version 2 of the License, or
//# (at your option) any later version.
//#
//# This program is distributed in the hope that it will be useful,
//# but WITHOUT ANY WARRANTY; without even the implied warranty of
//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//# GNU General Public License for more details.
//#
//# You should have received a copy of the GNU General Public License
//# along with this program; if not, write to the Free Software
//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//#
//# $Id$
//# Always #include <lofar_config.h> first!
#include <lofar_config.h>
#include <IONProc/OutputThread.h>
#include <IONProc/ION_Allocator.h>
namespace LOFAR {
namespace RTCP {
OutputThread::OutputThread(Stream *streamToStorage, unsigned nrBaselines, unsigned nrChannels)
:
itsStreamToStorage(streamToStorage)
{
for (unsigned i = 0; i < maxSendQueueSize; i ++)
itsFreeQueue.append(new CorrelatedData(nrBaselines, nrChannels, hugeMemoryAllocator));
if (pthread_create(&thread, 0, mainLoopStub, this) != 0) {
std::cerr << "could not create output thread" << std::endl;
exit(1);
}
}
OutputThread::~OutputThread()
{
if (pthread_join(thread, 0) != 0) {
std::cerr << "could not join output thread" << std::endl;
exit(1);
}
for (unsigned i = 0; i < maxSendQueueSize; i ++)
delete itsFreeQueue.remove();
}
void OutputThread::mainLoop()
{
CorrelatedData *data;
while ((data = itsSendQueue.remove()) != 0) {
data->write(itsStreamToStorage);
itsFreeQueue.append(data);
}
}
void *OutputThread::mainLoopStub(void *outputThread)
{
try {
static_cast<OutputThread *>(outputThread)->mainLoop();
} catch (Exception &ex) {
std::cerr << "caught Exception: " << ex.what() << std::endl;
} catch (std::exception &ex) {
std::cerr << "caught std::exception: " << ex.what() << std::endl;
} catch (...) {
std::cerr << "caught non-std:exception" << std::endl;
}
//static_cast<OutputThread *>(outputThread)->stopped = true;
return 0;
}
} // namespace RTCP
} // namespace LOFAR
//# OutputThread.h
//#
//# Copyright (C) 2006
//# ASTRON (Netherlands Foundation for Research in Astronomy)
//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
//#
//# This program is free software; you can redistribute it and/or modify
//# it under the terms of the GNU General Public License as published by
//# the Free Software Foundation; either version 2 of the License, or
//# (at your option) any later version.
//#
//# This program is distributed in the hope that it will be useful,
//# but WITHOUT ANY WARRANTY; without even the implied warranty of
//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//# GNU General Public License for more details.
//#
//# You should have received a copy of the GNU General Public License
//# along with this program; if not, write to the Free Software
//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//#
//# $Id$
#ifndef LOFAR_IONPROC_OUTPUT_THREAD_H
#define LOFAR_IONPROC_OUTPUT_THREAD_H
//# Never #include <config.h> or #include <lofar_config.h> in a header file!
#include <Interface/CorrelatedData.h>
#include <Interface/Queue.h>
#include <Stream/Stream.h>
#include <pthread.h>
namespace LOFAR {
namespace RTCP {
class OutputThread
{
public:
OutputThread(Stream *streamToStorage, unsigned nrBaselines, unsigned nrChannels);
~OutputThread();
static const unsigned maxSendQueueSize = 3;
Queue<CorrelatedData *> itsFreeQueue, itsSendQueue;
private:
static void *mainLoopStub(void *outputThread);
void mainLoop();
Stream *itsStreamToStorage;
pthread_t thread;
};
} // namespace RTCP
} // namespace LOFAR
#endif
//# InputThread.h
//#
//# Copyright (C) 2008
//# ASTRON (Netherlands Foundation for Research in Astronomy)
//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
//#
//# This program is free software; you can redistribute it and/or modify
//# it under the terms of the GNU General Public License as published by
//# the Free Software Foundation; either version 2 of the License, or
//# (at your option) any later version.
//#
//# This program is distributed in the hope that it will be useful,
//# but WITHOUT ANY WARRANTY; without even the implied warranty of
//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//# GNU General Public License for more details.
//#
//# You should have received a copy of the GNU General Public License
//# along with this program; if not, write to the Free Software
//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//#
//# $Id$
#ifndef LOFAR_RTCP_STORAGE_INPUT_THREAD_H
#define LOFAR_RTCP_STORAGE_INPUT_THREAD_H
//# Never #include <config.h> or #include <lofar_config.h> in a header file!
#include <Interface/CorrelatedData.h>
#include <Interface/Queue.h>
#include <Stream/Stream.h>
#include <pthread.h>
namespace LOFAR {
namespace RTCP {
class InputThread
{
public:
InputThread(Stream *streamFromION, unsigned nrBaselines, unsigned nrChannels);
~InputThread();
static const unsigned maxReceiveQueueSize = 3;
Queue<CorrelatedData *> itsFreeQueue, itsReceiveQueue;
private:
static void *mainLoopStub(void *inputThread);
void mainLoop();
Stream *itsStreamFromION;
pthread_t thread;
};
} // namespace RTCP
} // namespace LOFAR
#endif
pkginclude_HEADERS = Package__Version.h
noinst_HEADERS = \
InputThread.h \
SubbandWriter.h \
MSWriter.h \
MSWriterCasa.h \
......
......@@ -37,10 +37,9 @@
#include <Interface/Parset.h>
#include <Interface/CorrelatedData.h>
#include <Interface/Queue.h>
#include <Storage/InputThread.h>
#include <Stream/Stream.h>
#include <pthread.h>
namespace LOFAR {
namespace RTCP {
......@@ -63,20 +62,11 @@ class SubbandWriter
void writeLogMessage();
bool processSubband(unsigned sb);
void createInputThread();
void stopInputThread();
static void *inputThreadStub(void *);
void inputThread();
const Parset *itsPS;
unsigned itsRank;
std::vector<Stream *> itsInputStreams;
static const unsigned nrInputBuffers = 10;
Queue<CorrelatedData *> itsFreeQueue, itsReceiveQueue;
pthread_t itsInputThread;
std::vector<InputThread *> itsInputThreads;
unsigned itsNStations;
unsigned itsNBaselines;
......
//# InputThread.cc:
//#
//# Copyright (C) 2008
//# ASTRON (Netherlands Foundation for Research in Astronomy)
//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
//#
//# This program is free software; you can redistribute it and/or modify
//# it under the terms of the GNU General Public License as published by
//# the Free Software Foundation; either version 2 of the License, or
//# (at your option) any later version.
//#
//# This program is distributed in the hope that it will be useful,
//# but WITHOUT ANY WARRANTY; without even the implied warranty of
//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//# GNU General Public License for more details.
//#
//# You should have received a copy of the GNU General Public License
//# along with this program; if not, write to the Free Software
//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//#
//# $Id$
//# Always #include <lofar_config.h> first!
#include <lofar_config.h>
#include <Storage/InputThread.h>
#include <Stream/NullStream.h>
namespace LOFAR {
namespace RTCP {
InputThread::InputThread(Stream *streamFromION, unsigned nrBaselines, unsigned nrChannels)
:
itsStreamFromION(streamFromION)
{
for (unsigned i = 0; i < maxReceiveQueueSize; i ++)
itsFreeQueue.append(new CorrelatedData(nrBaselines, nrChannels));
if (pthread_create(&thread, 0, mainLoopStub, this) != 0) {
std::cerr << "could not create input thread" << std::endl;
exit(1);
}
}
InputThread::~InputThread()
{
if (pthread_join(thread, 0) != 0) {
std::cerr << "could not join input thread" << std::endl;
exit(1);
}
for (unsigned i = 0; i < maxReceiveQueueSize; i ++)
delete itsFreeQueue.remove();
}
void InputThread::mainLoop()
{
// limit reads from NullStream to 10 blocks; otherwise unlimited
bool nullInput = dynamic_cast<NullStream *>(itsStreamFromION) != 0;
unsigned increment = nullInput ? 1 : 0;
CorrelatedData *data = 0;
try {
for (unsigned count = 0; count < 10; count += increment) {
data = itsFreeQueue.remove();
data->read(itsStreamFromION);
itsReceiveQueue.append(data);
}
} catch (Stream::EndOfStreamException &) {
itsFreeQueue.append(data);
}
itsReceiveQueue.append(0); // no more data
}
void *InputThread::mainLoopStub(void *inputThread)
{
try {
static_cast<InputThread *>(inputThread)->mainLoop();
} catch (Exception &ex) {
std::cerr << "caught Exception: " << ex.what() << std::endl;
} catch (std::exception &ex) {
std::cerr << "caught std::exception: " << ex.what() << std::endl;
} catch (...) {
std::cerr << "caught non-std:exception" << std::endl;
}
//static_cast<InputThread *>(inputThread)->stopped = true;
return 0;
}
} // namespace RTCP
} // namespace LOFAR
lib_LTLIBRARIES = libstorage.la
libstorage_la_SOURCES = Package__Version.cc \
InputThread.cc \
SubbandWriter.cc \
MSWriter.cc \
MSWriterCasa.cc \
......
......@@ -107,86 +107,29 @@ void SubbandWriter::createInputStreams()
string prefix = "OLAP.OLAP_Conn.IONProc_Storage";
string connectionType = itsPS->getString(prefix + "_Transport");
itsInputStreams.resize(itsPS->nrPsetsPerStorage());
for (unsigned subband = 0; subband < itsNrSubbandsPerStorage; subband ++) {
unsigned subbandNumber = itsRank * itsNrSubbandsPerStorage + subband;
for (unsigned i = 0; i < itsPS->nrPsetsPerStorage(); i ++)
if (connectionType == "NULL") {
std::cout << "input " << i << ": null stream" << std::endl;
itsInputStreams[i] = new NullStream;
std::cout << "subband " << subbandNumber << " read from null stream" << std::endl;
itsInputStreams.push_back(new NullStream);
} else if (connectionType == "TCP") {
std::string server = itsPS->getStringVector(prefix + "_ServerHosts")[itsRank];
unsigned short port = boost::lexical_cast<unsigned short>(itsPS->getPortsOf(prefix)[i]);
unsigned short port = boost::lexical_cast<unsigned short>(itsPS->getPortsOf(prefix)[subbandNumber]);
std::cout << "input " << i << ": tcp:" << server << ':' << port << std::endl;
itsInputStreams[i] = new SocketStream(server.c_str(), port, SocketStream::TCP, SocketStream::Server);
std::cout << "subband " << subbandNumber << " read from tcp:" << server << ':' << port << std::endl;
itsInputStreams.push_back(new SocketStream(server.c_str(), port, SocketStream::TCP, SocketStream::Server));
} else if (connectionType == "FILE") {
std::string filename = itsPS->getString(prefix + "_BaseFileName") + '.' +
boost::lexical_cast<std::string>(itsRank) + '.' +
boost::lexical_cast<std::string>(i);
boost::lexical_cast<std::string>(subbandNumber);
std::cout << "input " << i << ": file:" << filename << std::endl;
itsInputStreams[i] = new FileStream(filename.c_str());
std::cout << "subband " << subbandNumber << " read from file:" << filename << std::endl;
itsInputStreams.push_back(new FileStream(filename.c_str()));
} else {
THROW(StorageException, "unsupported ION->Storage stream type");
}
}
void *SubbandWriter::inputThreadStub(void *arg)
{
try {
static_cast<SubbandWriter *>(arg)->inputThread();
} catch (Exception &ex) {
std::cerr << "caught Exception: " << ex.what() << std::endl;
} catch (std::exception &ex) {
std::cerr << "caught std::exception: " << ex.what() << std::endl;
} catch (...) {
std::cerr << "caught non-std::exception" << std::endl;
}
return 0;
}
void SubbandWriter::inputThread()
{
bool nullInput = dynamic_cast<NullStream *>(itsInputStreams[0]) != 0;
do {
for (unsigned sb = 0; sb < itsNrSubbandsPerStorage; ++ sb) {
// find out from which input channel we should read
unsigned pset = sb / itsNrSubbandsPerPset;
CorrelatedData *data = itsFreeQueue.remove();
try {
data->read(itsInputStreams[pset]);
} catch (Stream::EndOfStreamException &) {
itsFreeQueue.append(data);
goto end; // nested loop; cannot use "break"
}
itsReceiveQueue.append(data);
}
} while (!nullInput); // prevent infinite loop when using NullStream
end:
itsReceiveQueue.append(0); // signal main thread that this was the last
}
void SubbandWriter::createInputThread()
{
if (pthread_create(&itsInputThread, 0, inputThreadStub, this) != 0) {
std::cerr << "could not create input thread" << std::endl;
exit(1);
}
}
void SubbandWriter::stopInputThread()
{
if (pthread_join(itsInputThread, 0) != 0)
std::cerr << "could not join input thread";
}
......@@ -204,9 +147,6 @@ void SubbandWriter::preprocess()
};
#endif
for (unsigned i = 0; i < nrInputBuffers; i ++)
itsFreeQueue.append(new CorrelatedData(itsNBaselines, itsNChannels));
double startTime = itsPS->startTime();
LOG_TRACE_VAR_STR("startTime = " << startTime);
......@@ -278,7 +218,9 @@ void SubbandWriter::preprocess()
#endif // defined HAVE_AIPSPP
createInputStreams();
createInputThread();
for (unsigned sb = 0; sb < itsNrSubbandsPerStorage; sb ++)
itsInputThreads.push_back(new InputThread(itsInputStreams[sb], itsNBaselines, itsNChannels));
}
......@@ -312,7 +254,7 @@ void SubbandWriter::writeLogMessage()
bool SubbandWriter::processSubband(unsigned sb)
{
CorrelatedData *data = itsReceiveQueue.remove();
CorrelatedData *data = itsInputThreads[sb]->itsReceiveQueue.remove();
if (data == 0)
return false;
......@@ -367,7 +309,7 @@ bool SubbandWriter::processSubband(unsigned sb)
}
#endif
itsFreeQueue.append(data);
itsInputThreads[sb]->itsFreeQueue.append(data);
return true;
}
......@@ -393,11 +335,12 @@ void SubbandWriter::process()
void SubbandWriter::postprocess()
{
stopInputThread();
for (unsigned i = 0; i < itsInputStreams.size(); i ++)
delete itsInputStreams[i];
for (unsigned sb = 0; sb < itsNrSubbandsPerStorage; sb ++) {
delete itsInputThreads[sb];
delete itsInputStreams[sb];
}
itsInputThreads.clear();
itsInputStreams.clear();
delete [] itsFlagsBuffers; itsFlagsBuffers = 0;
......@@ -410,9 +353,6 @@ void SubbandWriter::postprocess()
itsWriters.clear();
#endif
for (unsigned i = 0; i < nrInputBuffers; i ++)
delete itsFreeQueue.remove();
delete itsVisibilities; itsVisibilities = 0;
cout<<itsWriteTimer<<endl;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment