Skip to content
Snippets Groups Projects
Commit 22fa632c authored by Ger van Diepen's avatar Ger van Diepen
Browse files

bug 1314:

Added distributed capablities to ParmFacade
parent f7d9a7bb
No related branches found
No related tags found
No related merge requests found
......@@ -139,10 +139,12 @@ CEP/Imager/MWImager/test/tmwimager.in_ms2.vds -text
CEP/Imager/MWImager/test/tmwimager.in_vd -text
CEP/MS/test/tcombinevds.in_vds1 -text
CEP/MS/test/tcombinevds.in_vds2 -text
CEP/ParmDB/src/parmdbremote-scr -text
CEP/ParmDB/src/setupparmdb -text
CEP/ParmDB/src/setupparmdb-part -text
CEP/ParmDB/src/setupsourcedb -text
CEP/ParmDB/src/setupsourcedb-part -text
CEP/ParmDB/src/startparmdbdistr -text
CEP/ParmDB/test/tmakesourcedb.in_2 -text
CEP/ParmDB/test/tsetupparmdb.in_cd -text
CEP/ParmDB/test/tsetupparmdb.in_ms0.vds -text
......
......@@ -31,6 +31,11 @@
#include <MWCommon/SocketConnectionSet.h>
#include <Common/lofar_vector.h>
//# Forward Declaration.
namespace casa {
class Record;
}
namespace LOFAR { namespace BBS {
......@@ -114,7 +119,10 @@ namespace LOFAR { namespace BBS {
void freePort();
// </group>
//# Data membe rs
// Read a Record from the BlobStream.
void getRecord (BlobIStream& bis, casa::Record& rec);
//# Data members
string itsPort; //# declare this before itsConn!!
mutable LOFAR::CEP::SocketConnectionSet itsConn;
static int theirNextPort;
......
......@@ -15,15 +15,15 @@ SourceDB.cc SourceDBCasa.cc
# LEX_OUTPUT_ROOT = lex.KeyTokenize
bin_PROGRAMS = versionparmdb \
parmdb parmdbclient makesourcedb
parmdb parmdbremote makesourcedb
parmdb_SOURCES = parmdb_main.cc
parmdb_LDADD = libparmdb.la
parmdb_DEPENDENCIES = libparmdb.la $(LOFAR_DEPEND)
parmdbclient_SOURCES = parmdbclient.cc
parmdbclient_LDADD = libparmdb.la
parmdbclient_DEPENDENCIES = libparmdb.la $(LOFAR_DEPEND)
parmdbremote_SOURCES = parmdbremote.cc
parmdbremote_LDADD = libparmdb.la
parmdbremote_DEPENDENCIES = libparmdb.la $(LOFAR_DEPEND)
makesourcedb_SOURCES = makesourcedb.cc
makesourcedb_LDADD = libparmdb.la
......@@ -33,7 +33,9 @@ pythondir = $(bindir)
dist_python_SCRIPTS =
scriptdir = $(bindir)
dist_script_SCRIPTS = setupparmdb setupparmdb-part
dist_script_SCRIPTS = setupparmdb setupparmdb-part \
setupsourcedb setupsourcedb-part \
startparmdbdistr parmdbremote-scr
versionparmdb_SOURCES = versionparmdb.cc
versionparmdb_LDADD = libparmdb.la
......
......@@ -23,6 +23,8 @@
#include <lofar_config.h>
#include <ParmDB/ParmFacade.h>
#include <ParmDB/ParmFacadeLocal.h>
#include <ParmDB/ParmFacadeDistr.h>
#include <tables/Tables/Table.h>
using namespace std;
using namespace casa;
......@@ -37,8 +39,15 @@ namespace LOFAR {
namespace BBS {
ParmFacade::ParmFacade (const string& tableName)
: itsRep(new ParmFacadeLocal(tableName))
{}
{
// If it is a table, open it directly.
// Otherwise it is a distributed ParmDB.
if (Table::isReadable(tableName)) {
itsRep = ParmFacadeRep::ShPtr(new ParmFacadeLocal(tableName));
} else {
itsRep = ParmFacadeRep::ShPtr(new ParmFacadeDistr(tableName));
}
}
ParmFacade::~ParmFacade()
{}
......
......@@ -28,6 +28,7 @@
#include <Blob/BlobAipsIO.h>
#include <Common/LofarLogger.h>
#include <casa/IO/AipsIO.h>
#include <Common/lofar_iostream.h>
using namespace LOFAR::CEP;
using namespace casa;
......@@ -50,23 +51,29 @@ namespace LOFAR {
: itsPort (getPort()),
itsConn (itsPort)
{
VdsDesc vds(tableName);
int nparts = vds.getParts().size();
string cdescName = vds.getDesc().getClusterDescName();
// Start all clients.
string command("startdistproc -mode " + itsPort +
" -nowait -nomasterhost -cdn " +
cdescName + " parmdbclient");
// Get info from VDS. It is automatically closed thereafter.
int nparts;
string cdescName;
{
VdsDesc vds(tableName);
nparts = vds.getParts().size();
cdescName = vds.getDesc().getClusterDescName();
}
// Start all remote processes.
string command("startparmdbdistr " + itsPort + ' ' +
cdescName + ' ' + tableName);
ASSERT (system(command.c_str()) == 0);
// Accept a connection from the clients and check if they are
// initialized correctly.
itsConn.addConnections (nparts);
ASSERT (system(command.c_str()) == 0);
BlobString buf;
for (int i=0; i<itsConn.size(); ++i) {
itsConn.read (i, buf);
MWBlobIn bbi(buf);
bbi.finish();
ASSERT (bbi.getOperation() == 1); // ensure successful init
string fname;
bbi.blobStream() >> fname;
bbi.finish();
}
}
......@@ -110,14 +117,23 @@ namespace LOFAR {
bbo.blobStream() << parmNamePattern;
bbo.finish();
itsConn.writeAll (buf);
vector<double> range, result;
for (int i=0; i<itsConn.size(); ++i) {
itsConn.read (i, buf);
MWBlobIn bbi(buf);
ASSERT (bbi.getOperation() == 1); // ensure success
if (i == 0) {
bbi.blobStream() >> result;
} else {
bbi.blobStream() >> range;
if (range[0] < result[0]) result[0] = range[0];
if (range[1] > result[1]) result[1] = range[1];
if (range[2] < result[2]) result[2] = range[2];
if (range[3] > result[3]) result[3] = range[3];
}
bbi.finish();
ASSERT (bbi.getOperation() == 1); // ensure successful init
}
vector<double> res(4);
return res;
return result;
}
// Get all parameter names in the table.
......@@ -128,14 +144,19 @@ namespace LOFAR {
bbo.blobStream() << parmNamePattern;
bbo.finish();
itsConn.writeAll (buf);
vector<string> names, result;
for (int i=0; i<itsConn.size(); ++i) {
itsConn.read (i, buf);
MWBlobIn bbi(buf);
ASSERT (bbi.getOperation() == 1); // ensure success
if (i == 0) {
bbi.blobStream() >> result;
} else {
bbi.blobStream() >> names;
}
bbi.finish();
ASSERT (bbi.getOperation() == 1); // ensure successful init
}
vector<string> res;
return res;
return result;
}
Record ParmFacadeDistr::getValues (const string& parmNamePattern,
......@@ -145,17 +166,24 @@ namespace LOFAR {
{
BlobString buf;
MWBlobOut bbo(buf, 3, 0);
bbo.blobStream() << parmNamePattern;
bbo.blobStream() << parmNamePattern
<< freqv1 << freqv2 << nfreq
<< timev1 << timev2 << ntime << asStartEnd;
bbo.finish();
itsConn.writeAll (buf);
Record values, result;
for (int i=0; i<itsConn.size(); ++i) {
itsConn.read (i, buf);
MWBlobIn bbi(buf);
ASSERT (bbi.getOperation() == 1); // ensure success
if (i == 0) {
getRecord (bbi.blobStream(), result);
} else {
getRecord (bbi.blobStream(), values);
}
bbi.finish();
ASSERT (bbi.getOperation() == 1); // ensure successful init
}
Record res;
return res;
return result;
}
Record ParmFacadeDistr::getValues (const string& parmNamePattern,
......@@ -167,17 +195,23 @@ namespace LOFAR {
{
BlobString buf;
MWBlobOut bbo(buf, 4, 0);
bbo.blobStream() << parmNamePattern;
bbo.blobStream() << parmNamePattern
<< freqv1 << freqv2 << timev1 << timev2 << asStartEnd;
bbo.finish();
itsConn.writeAll (buf);
Record values, result;
for (int i=0; i<itsConn.size(); ++i) {
itsConn.read (i, buf);
MWBlobIn bbi(buf);
ASSERT (bbi.getOperation() == 1); // ensure success
if (i == 0) {
getRecord (bbi.blobStream(), result);
} else {
getRecord (bbi.blobStream(), values);
}
bbi.finish();
ASSERT (bbi.getOperation() == 1); // ensure successful init
}
Record res;
return res;
return result;
}
Record ParmFacadeDistr::getValuesGrid (const string& parmNamePattern,
......@@ -186,17 +220,30 @@ namespace LOFAR {
{
BlobString buf;
MWBlobOut bbo(buf, 5, 0);
bbo.blobStream() << parmNamePattern;
bbo.blobStream() << parmNamePattern
<< sfreq << efreq << stime << etime;
bbo.finish();
itsConn.writeAll (buf);
Record values, result;
for (int i=0; i<itsConn.size(); ++i) {
itsConn.read (i, buf);
MWBlobIn bbi(buf);
ASSERT (bbi.getOperation() == 1); // ensure success
if (i == 0) {
getRecord (bbi.blobStream(), result);
} else {
getRecord (bbi.blobStream(), values);
}
bbi.finish();
ASSERT (bbi.getOperation() == 1); // ensure successful init
}
Record res;
return res;
return result;
}
void ParmFacadeDistr::getRecord (BlobIStream& bis, Record& rec)
{
BlobAipsIO baio(bis);
casa::AipsIO aio(&baio);
aio >> rec;
}
} // namespace ParmDB
......
#!/bin/sh
if test $# != 10; then
echo "run as: parmdbremote-src 'socket' host port np rank"
echo " pdb-part filesys ms-part-vds"
echo " lofarroot wd"
exit 1
fi
lroot=$9
wd=${10}
# Source lofarinit to find parmdbremote in production environment.
if test -e $lroot/lofarinit.sh; then
sh $lroot/lofarinit.sh
fi
# cd to work directory and add . to PATH for test purposes (make check).
if test -d $wd; then
cd $wd
PATH=.:$PATH
export PATH
fi
parmdbremote $2 $3 $6 # host,port,parmdb-part
//# parmdbclient.cc: Client handling a distributed ParmDB part
//# parmdbremote.cc: Remote handling a distributed ParmDB part
//#
//# Copyright (C) 2009
//# ASTRON (Netherlands Foundation for Research in Astronomy)
......@@ -124,7 +124,7 @@ void doIt (SocketConnection& conn, ParmFacadeLocal& pdb)
getValuesGrid (pdb, bbi.blobStream(), bbo.blobStream());
break;
default:
ASSERTSTR(false, "parmdbclient: unknown command-id "
ASSERTSTR(false, "parmdbremote: unknown command-id "
<< bbi.getOperation());
}
// Finish the blobstreams and write the result message.
......@@ -134,41 +134,32 @@ void doIt (SocketConnection& conn, ParmFacadeLocal& pdb)
}
}
int main (int argc, const char* argv[])
int main (int argc, char* argv[])
{
const char* progName = basename(argv[0]);
INIT_LOGGER(progName);
SocketConnection::ShPtr conn;
try {
ASSERTSTR (argc >= 7, "Use as: parmdbclient socket <host> "
"<port> <#processes> <rank> <mspart>");
string host (argv[2]);
string port (argv[3]);
istringstream iss(argv[4]);
int nnode, rank;
iss >> nnode;
istringstream iss1(argv[5]);
iss1 >> rank;
string fname(argv[6]);
ASSERTSTR (argc >= 4, "Use as: parmdbremote <host> <port> <mspart>");
string host (argv[1]);
string port (argv[2]);
string fname(argv[3]);
// Setup the connection.
conn = SocketConnection::ShPtr(new SocketConnection(host, port));
// Open the ParmDB after getting its name from the VDS file.
{
VdsPartDesc vds((ParameterSet(fname)));
fname = vds.getFileName();
}
// Open the ParmDB.
ParmFacadeLocal parmdb(fname);
{
// Tell master init was successful.
// Tell master the MS-part to process.
BlobString bufout;
MWBlobOut bbo(bufout, 1, 0);
bbo.blobStream() << fname;
bbo.finish();
conn->write (bufout);
}
// Handle requests.
doIt (*conn, parmdb);
} catch (std::exception& x) {
LOG_FATAL (string("Unexpected exception in parmdbclient: ") + x.what());
LOG_FATAL (string("Unexpected exception in parmdbremote: ") + x.what());
// Tell master there is an error.
BlobString bufout;
MWBlobOut bbo(bufout, 0, 0);
......
......@@ -26,8 +26,9 @@
pgmpath=`dirname $0`
pgmpath=`cd $pgmpath > /dev/null 2>&1 && pwd`
if test $# != 11; then
echo "run as: setupparmdb-part dummy dummy dummy dummy rank ms-part filesys"
if test $# != 12; then
echo "run as: setupparmdb-part dummy dummy dummy dummy rank"
" ms-part filesys ms-part-vds"
echo " lofarroot pdbfile pdbname dry"
exit 1
fi
......@@ -37,12 +38,11 @@ shift
shift
shift
seqnr=$1
msn=$2
shift
lroot=$3
pdbfile=$4
pdb=$5
dry=$6
msn=$4
lroot=$5
pdbfile=$6
pdb=$7
dry=$8
# Initialize lofar environment.
......
......@@ -26,8 +26,9 @@
pgmpath=`dirname $0`
pgmpath=`cd $pgmpath > /dev/null 2>&1 && pwd`
if test $# != 12; then
echo "run as: setupsourcedb-part dummy dummy dummy dummy rank ms-part filesys"
if test $# != 13; then
echo "run as: setupsourcedb-part dummy dummy dummy dummy rank"
" ms-part filesys ms-part-vds"
echo " lofarroot pdbfile pdbname overwrite dry"
exit 1
fi
......@@ -37,13 +38,12 @@ shift
shift
shift
seqnr=$1
msn=$2
shift
lroot=$3
srccat=$4
pdb=$5
overwrite=$6
dry=$7
msn=$4
lroot=$5
srccat=$6
pdb=$7
overwrite=$8
dry=$9
# Initialize lofar environment.
......
#!/bin/sh
# Find the path used to start the script.
pgmpath=`dirname $0`
pgmpath=`cd $pgmpath > /dev/null 2>&1 && pwd`
# Find the hostname.
host=`uname -n`
# If we cannot connect to it, replace it by localhost.
ssh $host date > /dev/null 2>&1
if test $? != 0; then
host=localhost
fi
# Start parmdbremote for each part.
startdistproc -mode "$1" -nowait -masterhost localhost -nostartmaster \
-cdn "$2" -dsn "$3" $pgmpath/parmdbremote-scr "$LOFARROOT" `pwd`
......@@ -9,7 +9,8 @@ CHECKTOOLPROGS = $(check_PROGRAMS)
# scripts used to run tests
TESTSCRIPTS = tAxis.sh tAxisMapping.sh tBox.sh tGrid.sh \
tParmValue.sh tParmDBCasa.sh \
tParmSet.sh tParmCache tParm.sh tParmFacade.sh \
tParmSet.sh tParmCache tParm.sh \
tParmFacade.sh tParmFacadeDistr.sh \
tparmdb.sh \
tSourceDBCasa.sh tmakesourcedb.sh \
tsetupparmdb.sh tsetupsourcedb.sh
......@@ -71,6 +72,7 @@ tTimeAxis_DEPENDENCIES = ../src/libparmdb.la $(LOFAR_DEPEND)
EXTRA_DIST = $(TESTSCRIPTS) \
tparmdb.run tparmdb.in tparmdb.stdout \
tParmFacade.run tParmFacade.stdout \
tParmFacadeDistr.run tParmFacadeDistr.stdout \
tmakesourcedb.run tmakesourcedb.in \
tmakesourcedb.in_2 tmakesourcedb.stdout \
tsetupparmdb.run tsetupparmdb.stdout \
......
......@@ -11,6 +11,6 @@ if [ $? != 0 ]; then
exit 1
fi
$LOFAR_CHECKTOOL tParmFacade tParmFacade_tmp.pdb
$LOFAR_CHECKTOOL ./tParmFacade tParmFacade_tmp.pdb
echo ""
$LOFAR_CHECKTOOL tParmFacade tParmFacade_tmp.pdb '*' 1
$LOFAR_CHECKTOOL ./tParmFacade tParmFacade_tmp.pdb '*' 1
#!/bin/sh
# If first argument is non-empty, only the files are created.
# It makes it possible to run the debugger on tParmFacadeDistr.
# The test uses 2 remote pdb-s, so create them.
../src/parmdb <<EOF > tParmFacadeDistr_tmp.pdbout
create tablename='tParmFacadeDistr_tmp.pdb1'
add parm1 domain=[1,5,4,10],values=2
add parm2 type='polc', domain=[1,5,4,10], values=[2,0.1], nx=2
quit
EOF
if [ $? != 0 ]; then
cat tParmFacadeDistr_tmp.pdbout
exit 1
fi
../src/parmdb <<EOF > tParmFacadeDistr_tmp.pdbout
create tablename='tParmFacadeDistr_tmp.pdb2'
add parm1 domain=[1,5,10,15],values=2
add parm2 type='polc', domain=[1,5,10,15], values=[2,0.1], nx=2
quit
EOF
if [ $? != 0 ]; then
cat tParmFacadeDistr_tmp.pdbout
exit 1
fi
fname1=`pwd`/tParmFacadeDistr_tmp.pdb1
fname2=`pwd`/tParmFacadeDistr_tmp.pdb2
cdname=`pwd`/tParmFacadeDistr_tmp.cd
cat > tParmFacadeDistr_tmp.cd <<EOF
ClusterName = cl
NNodes = 2
Node0.NodeName = localhost
Node0.NodeFileSys = [node1:/usr]
Node0.NodeMountPoints = [/usr]
Node1.NodeName = localhost
Node1.NodeFileSys = [node1:/usr]
Node1.NodeMountPoints = [/usr]
EOF
cat > tParmFacadeDistr_tmp.pdb.vd <<EOF
Name = /usr/local/xyx
FileSys =
ClusterDesc = $cdname
StartTime = 0
EndTime = 2
StepTime = 0.5
NChan = [64,128]
StartFreqs = [20,120]
EndFreqs = [100,300]
NParts = 2
Part0.Name = /usr/local/xyx0.vds
Part0.FileName = $fname1
Part0.FileSys = node1:/usr
Part0.StartTime = 0
Part0.EndTime = 2
Part0.StepTime = 0.5
Part0.NChan = [64,128]
Part0.StartFreqs = [20,120]
Part0.EndFreqs = [100,300]
Part1.Name = /usr/local/xyx0.vds
Part1.FileName = $fname2
Part1.FileSys = node1:/usr
Part1.StartTime = 0
Part1.EndTime = 2
Part1.StepTime = 0.5
Part1.NChan = [64,128]
Part1.StartFreqs = [20,120]
Part1.EndFreqs = [100,300]
EOF
# Make a symlink to scripts in src, so they can be found.
# Make sure . is in PATH.
rm -f startparmdbdistr parmdbremote-scr parmdbremote
ln -s ../../../src/startparmdbdistr
ln -s ../../../src/parmdbremote-scr
ln -s ../src/parmdbremote
PATH=.:$PATH
export PATH
if test "$1" = ""; then
$LOFAR_CHECKTOOL ./tParmFacade tParmFacadeDistr_tmp.pdb.vd
echo ""
$LOFAR_CHECKTOOL ./tParmFacade tParmFacadeDistr_tmp.pdb '*' 1
fi
#!/bin/sh
$lofar_sharedir/runtest.sh tParmFacadeDistr > tParmFacadeDistr.log 2>&1
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment