diff --git a/.gitattributes b/.gitattributes index a2fdb17a104c1fd58fb273935dc6234724a7b40c..ffc17517be9c87baf02ed010c03e2c019b6758db 100644 --- a/.gitattributes +++ b/.gitattributes @@ -2611,6 +2611,7 @@ RTCP/Run/test/OLAP.parset -text RTCP/Run/test/RTCP-validate.parset -text RTCP/Run/test/test.py -text RTCP/Run/test/validate.py -text +RTCP/Storage/include/Storage/FastFileStream.h -text RTCP/Storage/include/Storage/Format.h -text RTCP/Storage/include/Storage/HDF5Attributes.h -text RTCP/Storage/include/Storage/IOPriority.h -text @@ -2619,6 +2620,7 @@ RTCP/Storage/include/Storage/MSWriterLDA.h -text RTCP/Storage/include/Storage/MSWriterNull.h -text RTCP/Storage/include/Storage/MeasurementSetFormat.h -text RTCP/Storage/include/Storage/OutputThread.h -text +RTCP/Storage/src/FastFileStream.cc -text RTCP/Storage/src/Format.cc -text RTCP/Storage/src/MSWriterFile.cc -text RTCP/Storage/src/MSWriterLDA.cc -text @@ -2627,6 +2629,7 @@ RTCP/Storage/src/MeasurementSetFormat.cc -text RTCP/Storage/src/OutputThread.cc -text RTCP/Storage/src/gnuplotMS.sh -text RTCP/Storage/src/plotMS.cc -text +RTCP/Storage/test/tFastFileStream.cc -text RTCP/Storage/test/tLDA.cc -text RTCP/Storage/test/tMeasurementSetFormat.parset-j2000 -text RTCP/Storage/test/tMeasurementSetFormat.parset-sun -text diff --git a/LCS/Stream/include/Stream/FileStream.h b/LCS/Stream/include/Stream/FileStream.h index 2346e0145faef94ae14a2ce8c0a4ea0e0b0426e5..a45a7990849f5ca164423d7aca72df29c22e823d 100644 --- a/LCS/Stream/include/Stream/FileStream.h +++ b/LCS/Stream/include/Stream/FileStream.h @@ -37,7 +37,7 @@ class FileStream : public FileDescriptorBasedStream virtual ~FileStream(); - void skip( size_t bytes ); // seek ahead + virtual void skip( size_t bytes ); // seek ahead }; } // namespace LOFAR diff --git a/RTCP/Storage/include/Storage/FastFileStream.h b/RTCP/Storage/include/Storage/FastFileStream.h new file mode 100644 index 0000000000000000000000000000000000000000..e2955c541d2e0473ef811f1acd01309d4aacad98 --- /dev/null +++ b/RTCP/Storage/include/Storage/FastFileStream.h @@ -0,0 +1,71 @@ +// FastFileStream.h: a FileStream using O_DIRECT +// +// Copyright (C) 2001 +// ASTRON (Netherlands Foundation for Research in Astronomy) +// P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +// +// $Id: MSWriterImpl.h 11891 2008-10-14 13:43:51Z gels $ +// +////////////////////////////////////////////////////////////////////// + + +#ifndef LOFAR_STORAGE_FASTFILESTREAM_H +#define LOFAR_STORAGE_FASTFILESTREAM_H + + +#include <Stream/FileStream.h> +#include <Interface/SmartPtr.h> +#include <string> + + +namespace LOFAR { +namespace RTCP { + +class FastFileStream : public FileStream +{ + public: + FastFileStream(const std::string &name, int flags, int mode); // rd/wr; create file + + virtual size_t tryWrite(const void *ptr, size_t size); + virtual ~FastFileStream(); + + virtual void skip( size_t bytes ); + + // formally, the required alignment for O_DIRECT is determined by the file system + static const unsigned alignment = 512; + private: + // writes the remainder, padded with zeros if needed. Returns the number of bytes written. + size_t writeRemainder(); + + // we only support writing + virtual size_t tryRead(void *, size_t size) { return size; } + + // enlarge the buffer if needed + void ensureBuffer(size_t newsize); + + // use the FileStream to force these data to disk + void forceWrite(const void *ptr, size_t size); + + size_t bufsize; + SmartPtr<char, SmartPtrFree<char> > buffer; + size_t remainder; +}; + +} +} + +#endif diff --git a/RTCP/Storage/include/Storage/MSWriter.h b/RTCP/Storage/include/Storage/MSWriter.h index fd26533139091b63c6c04ef13a8d1ba325183db4..7325afcd627801b7a57a33da852585813c87935e 100644 --- a/RTCP/Storage/include/Storage/MSWriter.h +++ b/RTCP/Storage/include/Storage/MSWriter.h @@ -25,9 +25,6 @@ #ifndef LOFAR_STORAGE_MSWRITER_H #define LOFAR_STORAGE_MSWRITER_H -#include <Common/LofarTypes.h> -#include <Common/lofar_vector.h> - #include <Interface/StreamableData.h> namespace LOFAR { diff --git a/RTCP/Storage/include/Storage/MSWriterFile.h b/RTCP/Storage/include/Storage/MSWriterFile.h index 650e0fb362fa8843bf62adac676729a08656bfbe..22176f83852c83513a58c658c63eab2c1bde0ac7 100644 --- a/RTCP/Storage/include/Storage/MSWriterFile.h +++ b/RTCP/Storage/include/Storage/MSWriterFile.h @@ -28,37 +28,12 @@ #include <Storage/MSWriter.h> -#include <Stream/FileStream.h> -#include <Interface/SmartPtr.h> +#include <Storage/FastFileStream.h> namespace LOFAR { namespace RTCP { -class FastFileStream : public FileStream -{ - public: - FastFileStream(const string &name, int flags, int mode); // rd/wr; create file - - virtual size_t tryWrite(const void *ptr, size_t size); - virtual ~FastFileStream(); - - static const unsigned alignment = 512; - private: - // we only support writing - virtual size_t tryRead(void *, size_t size) { return size; } - - // enlarge the buffer if needed - void ensureBuffer(size_t newsize); - - // use the FileStream to force these data to disk - void forceWrite(const void *ptr, size_t size); - - size_t bufsize; - SmartPtr<char, SmartPtrFree<char> > buffer; - size_t remainder; -}; - class MSWriterFile : public MSWriter { diff --git a/RTCP/Storage/src/CMakeLists.txt b/RTCP/Storage/src/CMakeLists.txt index dd25fa80194788d767c8cf3c272d84074b2f74e1..1391854dd9d0b9bc390e4ee957086f3ba41e0cd7 100644 --- a/RTCP/Storage/src/CMakeLists.txt +++ b/RTCP/Storage/src/CMakeLists.txt @@ -4,6 +4,7 @@ include(LofarPackageVersion) lofar_add_library(storage Package__Version.cc + FastFileStream.cc InputThread.cc OutputThread.cc SubbandWriter.cc diff --git a/RTCP/Storage/src/FastFileStream.cc b/RTCP/Storage/src/FastFileStream.cc new file mode 100644 index 0000000000000000000000000000000000000000..6ce674c3c80f5510f4903834f9382fa6cd217984 --- /dev/null +++ b/RTCP/Storage/src/FastFileStream.cc @@ -0,0 +1,192 @@ + +//# FastFileStream.cc: a file writer using O_DIRECT +//# +//# Copyright (C) 2001 +//# ASTRON (Netherlands Foundation for Research in Astronomy) +//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl +//# +//# This program is free software; you can redistribute it and/or modify +//# it under the terms of the GNU General Public License as published by +//# the Free Software Foundation; either version 2 of the License, or +//# (at your option) any later version. +//# +//# This program is distributed in the hope that it will be useful, +//# but WITHOUT ANY WARRANTY; without even the implied warranty of +//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +//# GNU General Public License for more details. +//# +//# You should have received a copy of the GNU General Public License +//# along with this program; if not, write to the Free Software +//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +//# +//# $Id: $ + +#include <lofar_config.h> + +#include <Storage/FastFileStream.h> +#include <Interface/SmartPtr.h> +#include <Interface/Exceptions.h> +#include <Common/LofarLogger.h> + +#include <sys/types.h> +#include <fcntl.h> +#include <unistd.h> +#include <cstring> + +namespace LOFAR { +namespace RTCP { + + +FastFileStream::FastFileStream(const std::string &name, int flags, int mode) +: + FileStream(name.c_str(), flags | O_DIRECT | O_SYNC, mode), + bufsize(0), + buffer(0), + remainder(0) +{ + // alignment must be a power of two for easy calculations + ASSERT( (alignment & (alignment-1)) == 0 ); + + // alignment must be a multiple of sizeof(void*) for posix_memalign to work + ASSERT( alignment % sizeof(void*) == 0 ); +} + +FastFileStream::~FastFileStream() +{ + off_t curlen = lseek(fd, 0, SEEK_CUR); // NOT SEEK_END, because skip() might push us beyond the end + size_t origremainder = remainder; + + writeRemainder(); + + if (curlen != (off_t)-1) { + // truncate the file to the exact right length + (void)ftruncate(fd, curlen + origremainder); + } +} + +size_t FastFileStream::writeRemainder() +{ + if (remainder) { + // pad with zeroes + ensureBuffer(alignment); + memset(buffer.get() + remainder, 0, alignment - remainder); + forceWrite(buffer, alignment); + + remainder = 0; + + return alignment; + } + + return 0; +} + +void FastFileStream::ensureBuffer(size_t newsize) +{ + if (newsize <= bufsize) + return; + + void *buf; + + if (posix_memalign(&buf, alignment, newsize) != 0) + THROW( StorageException, "Not enough memory to allocate " << newsize << " bytes for fast writing"); + + if (remainder) { + ASSERT( buffer.get() ); + ASSERT( newsize >= remainder ); + + memcpy(buf, buffer.get(), remainder); + } + + buffer = static_cast<char*>(buf); // SmartPtr will take care of deleting the old buffer + bufsize = newsize; +} + +void FastFileStream::forceWrite(const void *ptr, size_t size) +{ + // emulate Stream::write using FileStream::write to make sure all bytes are written + while (size > 0) { + ASSERT( (size & (alignment-1)) == 0 ); + ASSERT( (reinterpret_cast<size_t>(ptr) & (alignment-1)) == 0 ); + + size_t bytes = FileStream::tryWrite(ptr, size); + + size -= bytes; + ptr = static_cast<const char *>(ptr) + bytes; + } +} + +size_t FastFileStream::tryWrite(const void *ptr, size_t size) +{ + const size_t orig_size = size; + + if (!remainder && (reinterpret_cast<size_t>(ptr) & (alignment-1)) == 0) { + // pointer is aligned and we can write from it immediately + + ensureBuffer(alignment); // although remainder is enough, we want to avoid reallocating every time remainder grows slightly + + // save the remainder + remainder = size & (alignment-1); + memcpy(buffer.get(), static_cast<const char*>(ptr) + size - remainder, remainder); + + // write bulk + forceWrite(ptr, size - remainder); + } else { + // not everything is aligned or there is a remainder -- use the buffer + + // move data to our buffer, and recompute new sizes + ensureBuffer(alignment + size); // although remainder + size is enough, we want to avoid reallocating every time remainder grows slightly + memcpy(buffer.get() + remainder, ptr, size); + + size += remainder; + remainder = size & (alignment-1); + + // write bulk + forceWrite(buffer.get(), size - remainder); + + // move remainder to the front + memmove(buffer.get(), buffer.get() + size - remainder, remainder); + } + + // lie about how many bytes we've written, since we might be caching + // a remainder which we can't write to disk. + return orig_size; +} + + +void FastFileStream::skip(size_t bytes) +{ + // make sure that the file pointer remains + // at a full block boundary, so catch any + // remainders. + + if (bytes == 0) + return; + + // get rid of the old remainder first + if (bytes + remainder >= alignment) { + bytes -= (writeRemainder() - remainder); + + if (bytes >= alignment ) { + // skip whole number of blocks + size_t newremainder = bytes & (alignment - 1); + size_t fullblocks = bytes - newremainder; + + FileStream::skip(fullblocks); + + bytes = newremainder; + } + } + + if (bytes > 0) { + ASSERT( bytes < alignment ); + + char zeros[bytes]; + + tryWrite(&zeros, sizeof zeros); + } +} + + +} // namespace RTCP +} // namespace LOFAR + diff --git a/RTCP/Storage/src/MSWriterFile.cc b/RTCP/Storage/src/MSWriterFile.cc index 0d7208a80ca04a9ef0c0c2e30d5bd56d8ac1d170..0cf2169e33163b33d312f6a03a6b0132c773b5f0 100644 --- a/RTCP/Storage/src/MSWriterFile.cc +++ b/RTCP/Storage/src/MSWriterFile.cc @@ -1,4 +1,4 @@ -//# MSWriterNull: a null MSWriter +//# MSWriterFile: a raw file writer //# //# Copyright (C) 2001 //# ASTRON (Netherlands Foundation for Research in Astronomy) @@ -22,114 +22,16 @@ #include <lofar_config.h> +#include <Common/LofarLogger.h> #include <Storage/MSWriterFile.h> -#include <Interface/SmartPtr.h> #include <sys/types.h> #include <fcntl.h> - namespace LOFAR { namespace RTCP { -FastFileStream::FastFileStream(const string &name, int flags, int mode) -: - FileStream(name.c_str(), flags | O_DIRECT | O_SYNC, mode), - bufsize(0), - buffer(0), - remainder(0) -{ - // alignment must be a power of two for easy calculations - ASSERT( (alignment & (alignment-1)) == 0 ); - - // alignment must be a multiple of sizeof(void*) for posix_memalign to work - ASSERT( alignment % sizeof(void*) == 0 ); -} - -FastFileStream::~FastFileStream() -{ - if (remainder) { - // pad with zeroes - ensureBuffer(alignment); - memset(buffer.get() + remainder, 0, alignment - remainder); - forceWrite(buffer, alignment); - } -} - -void FastFileStream::ensureBuffer(size_t newsize) -{ - if (newsize <= bufsize) - return; - - void *buf; - - if (posix_memalign(&buf, alignment, newsize) != 0) - THROW( StorageException, "Not enough memory to allocate " << newsize << " bytes for fast writing"); - - if (remainder) { - ASSERT( buffer.get() ); - ASSERT( newsize >= remainder ); - - memcpy(buf, buffer.get(), remainder); - } - - buffer = static_cast<char*>(buf); // SmartPtr will take care of deleting the old buffer - bufsize = newsize; -} - -void FastFileStream::forceWrite(const void *ptr, size_t size) -{ - // emulate Stream::write using FileStream::write to make sure all bytes are written - while (size > 0) { - ASSERT( (size & (alignment-1)) == 0 ); - ASSERT( (reinterpret_cast<size_t>(ptr) & (alignment-1)) == 0 ); - - size_t bytes = FileStream::tryWrite(ptr, size); - - size -= bytes; - ptr = static_cast<const char *>(ptr) + bytes; - } -} - -size_t FastFileStream::tryWrite(const void *ptr, size_t size) -{ - const size_t orig_size = size; - - if (!remainder && (reinterpret_cast<size_t>(ptr) & (alignment-1)) == 0) { - // pointer is aligned and we can write from it immediately - - ensureBuffer(alignment); // although remainder is enough, we want to avoid reallocating every time remainder grows slightly - - // save the remainder - remainder = size & (alignment-1); - memcpy(buffer.get(), static_cast<const char*>(ptr) + size - remainder, remainder); - - // write bulk - forceWrite(ptr, size - remainder); - } else { - // not everything is aligned or there is a remainder -- use the buffer - - // move data to our buffer, and recompute new sizes - ensureBuffer(alignment + size); // although remainder + size is enough, we want to avoid reallocating every time remainder grows slightly - memcpy(buffer.get() + remainder, ptr, size); - - size += remainder; - remainder = size & (alignment-1); - - // write bulk - forceWrite(buffer.get(), size - remainder); - - // move remainder to the front - memmove(buffer.get(), buffer.get() + size - remainder, remainder); - } - - // lie about how many bytes we've written, since we might be caching - // a remainder which we can't write to disk. - return orig_size; -} - - MSWriterFile::MSWriterFile (const string &msName, bool oldFileFormat) : itsFile(msName, O_RDWR | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH), diff --git a/RTCP/Storage/test/CMakeLists.txt b/RTCP/Storage/test/CMakeLists.txt index f8cbb15bd3c9c14844c55b118c6755639e7d20d1..00a59edf1caae1b404afb370b7baf89598de06fc 100644 --- a/RTCP/Storage/test/CMakeLists.txt +++ b/RTCP/Storage/test/CMakeLists.txt @@ -4,4 +4,5 @@ include(LofarCTest) lofar_add_test(tMeasurementSetFormat tMeasurementSetFormat.cc) lofar_add_test(tLDA tLDA.cc) +lofar_add_test(tFastFileStream tFastFileStream.cc) #lofar_add_test(tAH_TestStorage tAH_TestStorage.cc) diff --git a/RTCP/Storage/test/tFastFileStream.cc b/RTCP/Storage/test/tFastFileStream.cc new file mode 100644 index 0000000000000000000000000000000000000000..70c9e266524b524d2c9393eea198fec8368974f8 --- /dev/null +++ b/RTCP/Storage/test/tFastFileStream.cc @@ -0,0 +1,136 @@ +//# tFastFileStream: Test FastFileStream class +//# +//# Copyright (C) 2001 +//# ASTRON (Netherlands Foundation for Research in Astronomy) +//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl +//# +//# This program is free software; you can redistribute it and/or modify +//# it under the terms of the GNU General Public License as published by +//# the Free Software Foundation; either version 2 of the License, or +//# (at your option) any later version. +//# +//# This program is distributed in the hope that it will be useful, +//# but WITHOUT ANY WARRANTY; without even the implied warranty of +//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +//# GNU General Public License for more details. +//# +//# You should have received a copy of the GNU General Public License +//# along with this program; if not, write to the Free Software +//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +//# +//# $Id: $ + +#include <lofar_config.h> + +#include <iostream> +#include <string> +#include <Storage/FastFileStream.h> +#include <cassert> +#include <cstdio> + +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> + +using namespace std; +using namespace LOFAR; +using namespace LOFAR::RTCP; + +class TempFile { +public: + TempFile( const string &dirname = "/tmp/") { + char templ[1024]; + snprintf(templ, sizeof templ, "%sFastFileStreamXXXXXX", dirname.c_str()); + + filename = mktemp(templ); + } + ~TempFile() { + if (filename != "") + (void)unlink(filename.c_str()); + } + + string filename; +}; + +ssize_t filesize(const string &filename) +{ + int fd = open(filename.c_str(), O_RDONLY); + off_t len; + + if (fd < 0) + return -1; + + len = lseek(fd, 0, SEEK_END); + + if (len == (off_t)-1) + return -1; + + if (close(fd) < 0) + return -1; + + return len; +} + +void test_smallwrite( size_t bytes ) +{ + printf("test_smallwrite(%lu)\n", bytes); + + TempFile tmpfile; + int flags = O_RDWR | O_CREAT | O_TRUNC; + int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; + + char buf[bytes]; + + { + FastFileStream s(tmpfile.filename, flags, mode); + s.write(&buf, sizeof buf); + } + + assert(filesize(tmpfile.filename) == bytes); +} + +void test_skip( size_t bytes1, size_t skip, size_t bytes2 ) +{ + printf("test_skip(%lu, %lu, %lu)\n", bytes1, skip, bytes2); + + TempFile tmpfile; + int flags = O_RDWR | O_CREAT | O_TRUNC; + int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; + + char buf1[bytes1]; + char buf2[bytes2]; + + { + FastFileStream s(tmpfile.filename, flags, mode); + s.write(&buf1, sizeof buf1); + s.skip(skip); + s.write(&buf2, sizeof buf2); + } + + assert(filesize(tmpfile.filename) == bytes1 + skip + bytes2); +} + +int main() { + const size_t blocksize = FastFileStream::alignment; + + // test write() + test_smallwrite( 1 ); + test_smallwrite( blocksize ); + test_smallwrite( blocksize - 1 ); + test_smallwrite( blocksize + 1 ); + test_smallwrite( 2 * blocksize ); + test_smallwrite( 2 * blocksize - 1 ); + test_smallwrite( 2 * blocksize + 1 ); + + // test write() + skip() + write() + size_t values[] = {0, 1, blocksize - 1, blocksize, blocksize + 1}; + size_t numvalues = sizeof values / sizeof values[0]; + + for (unsigned bytes1 = 0; bytes1 < numvalues; bytes1++) + for (unsigned skip = 0; skip < numvalues; skip++) + for (unsigned bytes2 = 0; bytes2 < numvalues; bytes2++) + test_skip(values[bytes1], values[skip], values[bytes2]); + + return 0; +} +