Skip to content
Snippets Groups Projects
Commit f56aaf31 authored by Jan David Mol's avatar Jan David Mol
Browse files

Task #3113: FastFileStream::skip() now works correctly

parent d330b97f
No related branches found
No related tags found
No related merge requests found
......@@ -2611,6 +2611,7 @@ RTCP/Run/test/OLAP.parset -text
RTCP/Run/test/RTCP-validate.parset -text
RTCP/Run/test/test.py -text
RTCP/Run/test/validate.py -text
RTCP/Storage/include/Storage/FastFileStream.h -text
RTCP/Storage/include/Storage/Format.h -text
RTCP/Storage/include/Storage/HDF5Attributes.h -text
RTCP/Storage/include/Storage/IOPriority.h -text
......@@ -2619,6 +2620,7 @@ RTCP/Storage/include/Storage/MSWriterLDA.h -text
RTCP/Storage/include/Storage/MSWriterNull.h -text
RTCP/Storage/include/Storage/MeasurementSetFormat.h -text
RTCP/Storage/include/Storage/OutputThread.h -text
RTCP/Storage/src/FastFileStream.cc -text
RTCP/Storage/src/Format.cc -text
RTCP/Storage/src/MSWriterFile.cc -text
RTCP/Storage/src/MSWriterLDA.cc -text
......@@ -2627,6 +2629,7 @@ RTCP/Storage/src/MeasurementSetFormat.cc -text
RTCP/Storage/src/OutputThread.cc -text
RTCP/Storage/src/gnuplotMS.sh -text
RTCP/Storage/src/plotMS.cc -text
RTCP/Storage/test/tFastFileStream.cc -text
RTCP/Storage/test/tLDA.cc -text
RTCP/Storage/test/tMeasurementSetFormat.parset-j2000 -text
RTCP/Storage/test/tMeasurementSetFormat.parset-sun -text
......
......@@ -37,7 +37,7 @@ class FileStream : public FileDescriptorBasedStream
virtual ~FileStream();
void skip( size_t bytes ); // seek ahead
virtual void skip( size_t bytes ); // seek ahead
};
} // namespace LOFAR
......
// FastFileStream.h: a FileStream using O_DIRECT
//
// Copyright (C) 2001
// ASTRON (Netherlands Foundation for Research in Astronomy)
// P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//
// $Id: MSWriterImpl.h 11891 2008-10-14 13:43:51Z gels $
//
//////////////////////////////////////////////////////////////////////
#ifndef LOFAR_STORAGE_FASTFILESTREAM_H
#define LOFAR_STORAGE_FASTFILESTREAM_H
#include <Stream/FileStream.h>
#include <Interface/SmartPtr.h>
#include <string>
namespace LOFAR {
namespace RTCP {
class FastFileStream : public FileStream
{
public:
FastFileStream(const std::string &name, int flags, int mode); // rd/wr; create file
virtual size_t tryWrite(const void *ptr, size_t size);
virtual ~FastFileStream();
virtual void skip( size_t bytes );
// formally, the required alignment for O_DIRECT is determined by the file system
static const unsigned alignment = 512;
private:
// writes the remainder, padded with zeros if needed. Returns the number of bytes written.
size_t writeRemainder();
// we only support writing
virtual size_t tryRead(void *, size_t size) { return size; }
// enlarge the buffer if needed
void ensureBuffer(size_t newsize);
// use the FileStream to force these data to disk
void forceWrite(const void *ptr, size_t size);
size_t bufsize;
SmartPtr<char, SmartPtrFree<char> > buffer;
size_t remainder;
};
}
}
#endif
......@@ -25,9 +25,6 @@
#ifndef LOFAR_STORAGE_MSWRITER_H
#define LOFAR_STORAGE_MSWRITER_H
#include <Common/LofarTypes.h>
#include <Common/lofar_vector.h>
#include <Interface/StreamableData.h>
namespace LOFAR {
......
......@@ -28,37 +28,12 @@
#include <Storage/MSWriter.h>
#include <Stream/FileStream.h>
#include <Interface/SmartPtr.h>
#include <Storage/FastFileStream.h>
namespace LOFAR {
namespace RTCP {
class FastFileStream : public FileStream
{
public:
FastFileStream(const string &name, int flags, int mode); // rd/wr; create file
virtual size_t tryWrite(const void *ptr, size_t size);
virtual ~FastFileStream();
static const unsigned alignment = 512;
private:
// we only support writing
virtual size_t tryRead(void *, size_t size) { return size; }
// enlarge the buffer if needed
void ensureBuffer(size_t newsize);
// use the FileStream to force these data to disk
void forceWrite(const void *ptr, size_t size);
size_t bufsize;
SmartPtr<char, SmartPtrFree<char> > buffer;
size_t remainder;
};
class MSWriterFile : public MSWriter
{
......
......@@ -4,6 +4,7 @@ include(LofarPackageVersion)
lofar_add_library(storage
Package__Version.cc
FastFileStream.cc
InputThread.cc
OutputThread.cc
SubbandWriter.cc
......
//# FastFileStream.cc: a file writer using O_DIRECT
//#
//# Copyright (C) 2001
//# ASTRON (Netherlands Foundation for Research in Astronomy)
//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
//#
//# This program is free software; you can redistribute it and/or modify
//# it under the terms of the GNU General Public License as published by
//# the Free Software Foundation; either version 2 of the License, or
//# (at your option) any later version.
//#
//# This program is distributed in the hope that it will be useful,
//# but WITHOUT ANY WARRANTY; without even the implied warranty of
//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//# GNU General Public License for more details.
//#
//# You should have received a copy of the GNU General Public License
//# along with this program; if not, write to the Free Software
//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//#
//# $Id: $
#include <lofar_config.h>
#include <Storage/FastFileStream.h>
#include <Interface/SmartPtr.h>
#include <Interface/Exceptions.h>
#include <Common/LofarLogger.h>
#include <sys/types.h>
#include <fcntl.h>
#include <unistd.h>
#include <cstring>
namespace LOFAR {
namespace RTCP {
FastFileStream::FastFileStream(const std::string &name, int flags, int mode)
:
FileStream(name.c_str(), flags | O_DIRECT | O_SYNC, mode),
bufsize(0),
buffer(0),
remainder(0)
{
// alignment must be a power of two for easy calculations
ASSERT( (alignment & (alignment-1)) == 0 );
// alignment must be a multiple of sizeof(void*) for posix_memalign to work
ASSERT( alignment % sizeof(void*) == 0 );
}
FastFileStream::~FastFileStream()
{
off_t curlen = lseek(fd, 0, SEEK_CUR); // NOT SEEK_END, because skip() might push us beyond the end
size_t origremainder = remainder;
writeRemainder();
if (curlen != (off_t)-1) {
// truncate the file to the exact right length
(void)ftruncate(fd, curlen + origremainder);
}
}
size_t FastFileStream::writeRemainder()
{
if (remainder) {
// pad with zeroes
ensureBuffer(alignment);
memset(buffer.get() + remainder, 0, alignment - remainder);
forceWrite(buffer, alignment);
remainder = 0;
return alignment;
}
return 0;
}
void FastFileStream::ensureBuffer(size_t newsize)
{
if (newsize <= bufsize)
return;
void *buf;
if (posix_memalign(&buf, alignment, newsize) != 0)
THROW( StorageException, "Not enough memory to allocate " << newsize << " bytes for fast writing");
if (remainder) {
ASSERT( buffer.get() );
ASSERT( newsize >= remainder );
memcpy(buf, buffer.get(), remainder);
}
buffer = static_cast<char*>(buf); // SmartPtr will take care of deleting the old buffer
bufsize = newsize;
}
void FastFileStream::forceWrite(const void *ptr, size_t size)
{
// emulate Stream::write using FileStream::write to make sure all bytes are written
while (size > 0) {
ASSERT( (size & (alignment-1)) == 0 );
ASSERT( (reinterpret_cast<size_t>(ptr) & (alignment-1)) == 0 );
size_t bytes = FileStream::tryWrite(ptr, size);
size -= bytes;
ptr = static_cast<const char *>(ptr) + bytes;
}
}
size_t FastFileStream::tryWrite(const void *ptr, size_t size)
{
const size_t orig_size = size;
if (!remainder && (reinterpret_cast<size_t>(ptr) & (alignment-1)) == 0) {
// pointer is aligned and we can write from it immediately
ensureBuffer(alignment); // although remainder is enough, we want to avoid reallocating every time remainder grows slightly
// save the remainder
remainder = size & (alignment-1);
memcpy(buffer.get(), static_cast<const char*>(ptr) + size - remainder, remainder);
// write bulk
forceWrite(ptr, size - remainder);
} else {
// not everything is aligned or there is a remainder -- use the buffer
// move data to our buffer, and recompute new sizes
ensureBuffer(alignment + size); // although remainder + size is enough, we want to avoid reallocating every time remainder grows slightly
memcpy(buffer.get() + remainder, ptr, size);
size += remainder;
remainder = size & (alignment-1);
// write bulk
forceWrite(buffer.get(), size - remainder);
// move remainder to the front
memmove(buffer.get(), buffer.get() + size - remainder, remainder);
}
// lie about how many bytes we've written, since we might be caching
// a remainder which we can't write to disk.
return orig_size;
}
void FastFileStream::skip(size_t bytes)
{
// make sure that the file pointer remains
// at a full block boundary, so catch any
// remainders.
if (bytes == 0)
return;
// get rid of the old remainder first
if (bytes + remainder >= alignment) {
bytes -= (writeRemainder() - remainder);
if (bytes >= alignment ) {
// skip whole number of blocks
size_t newremainder = bytes & (alignment - 1);
size_t fullblocks = bytes - newremainder;
FileStream::skip(fullblocks);
bytes = newremainder;
}
}
if (bytes > 0) {
ASSERT( bytes < alignment );
char zeros[bytes];
tryWrite(&zeros, sizeof zeros);
}
}
} // namespace RTCP
} // namespace LOFAR
//# MSWriterNull: a null MSWriter
//# MSWriterFile: a raw file writer
//#
//# Copyright (C) 2001
//# ASTRON (Netherlands Foundation for Research in Astronomy)
......@@ -22,114 +22,16 @@
#include <lofar_config.h>
#include <Common/LofarLogger.h>
#include <Storage/MSWriterFile.h>
#include <Interface/SmartPtr.h>
#include <sys/types.h>
#include <fcntl.h>
namespace LOFAR {
namespace RTCP {
FastFileStream::FastFileStream(const string &name, int flags, int mode)
:
FileStream(name.c_str(), flags | O_DIRECT | O_SYNC, mode),
bufsize(0),
buffer(0),
remainder(0)
{
// alignment must be a power of two for easy calculations
ASSERT( (alignment & (alignment-1)) == 0 );
// alignment must be a multiple of sizeof(void*) for posix_memalign to work
ASSERT( alignment % sizeof(void*) == 0 );
}
FastFileStream::~FastFileStream()
{
if (remainder) {
// pad with zeroes
ensureBuffer(alignment);
memset(buffer.get() + remainder, 0, alignment - remainder);
forceWrite(buffer, alignment);
}
}
void FastFileStream::ensureBuffer(size_t newsize)
{
if (newsize <= bufsize)
return;
void *buf;
if (posix_memalign(&buf, alignment, newsize) != 0)
THROW( StorageException, "Not enough memory to allocate " << newsize << " bytes for fast writing");
if (remainder) {
ASSERT( buffer.get() );
ASSERT( newsize >= remainder );
memcpy(buf, buffer.get(), remainder);
}
buffer = static_cast<char*>(buf); // SmartPtr will take care of deleting the old buffer
bufsize = newsize;
}
void FastFileStream::forceWrite(const void *ptr, size_t size)
{
// emulate Stream::write using FileStream::write to make sure all bytes are written
while (size > 0) {
ASSERT( (size & (alignment-1)) == 0 );
ASSERT( (reinterpret_cast<size_t>(ptr) & (alignment-1)) == 0 );
size_t bytes = FileStream::tryWrite(ptr, size);
size -= bytes;
ptr = static_cast<const char *>(ptr) + bytes;
}
}
size_t FastFileStream::tryWrite(const void *ptr, size_t size)
{
const size_t orig_size = size;
if (!remainder && (reinterpret_cast<size_t>(ptr) & (alignment-1)) == 0) {
// pointer is aligned and we can write from it immediately
ensureBuffer(alignment); // although remainder is enough, we want to avoid reallocating every time remainder grows slightly
// save the remainder
remainder = size & (alignment-1);
memcpy(buffer.get(), static_cast<const char*>(ptr) + size - remainder, remainder);
// write bulk
forceWrite(ptr, size - remainder);
} else {
// not everything is aligned or there is a remainder -- use the buffer
// move data to our buffer, and recompute new sizes
ensureBuffer(alignment + size); // although remainder + size is enough, we want to avoid reallocating every time remainder grows slightly
memcpy(buffer.get() + remainder, ptr, size);
size += remainder;
remainder = size & (alignment-1);
// write bulk
forceWrite(buffer.get(), size - remainder);
// move remainder to the front
memmove(buffer.get(), buffer.get() + size - remainder, remainder);
}
// lie about how many bytes we've written, since we might be caching
// a remainder which we can't write to disk.
return orig_size;
}
MSWriterFile::MSWriterFile (const string &msName, bool oldFileFormat)
:
itsFile(msName, O_RDWR | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH),
......
......@@ -4,4 +4,5 @@ include(LofarCTest)
lofar_add_test(tMeasurementSetFormat tMeasurementSetFormat.cc)
lofar_add_test(tLDA tLDA.cc)
lofar_add_test(tFastFileStream tFastFileStream.cc)
#lofar_add_test(tAH_TestStorage tAH_TestStorage.cc)
//# tFastFileStream: Test FastFileStream class
//#
//# Copyright (C) 2001
//# ASTRON (Netherlands Foundation for Research in Astronomy)
//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
//#
//# This program is free software; you can redistribute it and/or modify
//# it under the terms of the GNU General Public License as published by
//# the Free Software Foundation; either version 2 of the License, or
//# (at your option) any later version.
//#
//# This program is distributed in the hope that it will be useful,
//# but WITHOUT ANY WARRANTY; without even the implied warranty of
//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//# GNU General Public License for more details.
//#
//# You should have received a copy of the GNU General Public License
//# along with this program; if not, write to the Free Software
//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//#
//# $Id: $
#include <lofar_config.h>
#include <iostream>
#include <string>
#include <Storage/FastFileStream.h>
#include <cassert>
#include <cstdio>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
using namespace std;
using namespace LOFAR;
using namespace LOFAR::RTCP;
class TempFile {
public:
TempFile( const string &dirname = "/tmp/") {
char templ[1024];
snprintf(templ, sizeof templ, "%sFastFileStreamXXXXXX", dirname.c_str());
filename = mktemp(templ);
}
~TempFile() {
if (filename != "")
(void)unlink(filename.c_str());
}
string filename;
};
ssize_t filesize(const string &filename)
{
int fd = open(filename.c_str(), O_RDONLY);
off_t len;
if (fd < 0)
return -1;
len = lseek(fd, 0, SEEK_END);
if (len == (off_t)-1)
return -1;
if (close(fd) < 0)
return -1;
return len;
}
void test_smallwrite( size_t bytes )
{
printf("test_smallwrite(%lu)\n", bytes);
TempFile tmpfile;
int flags = O_RDWR | O_CREAT | O_TRUNC;
int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
char buf[bytes];
{
FastFileStream s(tmpfile.filename, flags, mode);
s.write(&buf, sizeof buf);
}
assert(filesize(tmpfile.filename) == bytes);
}
void test_skip( size_t bytes1, size_t skip, size_t bytes2 )
{
printf("test_skip(%lu, %lu, %lu)\n", bytes1, skip, bytes2);
TempFile tmpfile;
int flags = O_RDWR | O_CREAT | O_TRUNC;
int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
char buf1[bytes1];
char buf2[bytes2];
{
FastFileStream s(tmpfile.filename, flags, mode);
s.write(&buf1, sizeof buf1);
s.skip(skip);
s.write(&buf2, sizeof buf2);
}
assert(filesize(tmpfile.filename) == bytes1 + skip + bytes2);
}
int main() {
const size_t blocksize = FastFileStream::alignment;
// test write()
test_smallwrite( 1 );
test_smallwrite( blocksize );
test_smallwrite( blocksize - 1 );
test_smallwrite( blocksize + 1 );
test_smallwrite( 2 * blocksize );
test_smallwrite( 2 * blocksize - 1 );
test_smallwrite( 2 * blocksize + 1 );
// test write() + skip() + write()
size_t values[] = {0, 1, blocksize - 1, blocksize, blocksize + 1};
size_t numvalues = sizeof values / sizeof values[0];
for (unsigned bytes1 = 0; bytes1 < numvalues; bytes1++)
for (unsigned skip = 0; skip < numvalues; skip++)
for (unsigned bytes2 = 0; bytes2 < numvalues; bytes2++)
test_skip(values[bytes1], values[skip], values[bytes2]);
return 0;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment