Skip to content
Snippets Groups Projects
Commit 19b20ceb authored by Jan David Mol's avatar Jan David Mol
Browse files

Task #8443: Fixed replacing clustername by node names for global fs. Do not...

Task #8443: Fixed replacing clustername by node names for global fs. Do not use SLURM on CEP4 for now.
parent 1dfcf1ed
No related branches found
No related tags found
No related merge requests found
...@@ -45,9 +45,11 @@ function parse_cluster_description { ...@@ -45,9 +45,11 @@ function parse_cluster_description {
CLUSTER_NAME=cep4 CLUSTER_NAME=cep4
HEADNODE=head01.cep4.control.lofar HEADNODE=head01.cep4.control.lofar
COMPUTENODES="`seq -f "cpu%02.0f" 1 50`"
NRCOMPUTENODES=50 NRCOMPUTENODES=50
SLURM=true #SLURM=true
SLURM=false # Don't use SLURM for now, let's get it working without it first
GLOBALFS=true GLOBALFS=true
DOCKER=true DOCKER=true
;; ;;
...@@ -55,6 +57,7 @@ function parse_cluster_description { ...@@ -55,6 +57,7 @@ function parse_cluster_description {
CLUSTER_NAME=cep2 CLUSTER_NAME=cep2
HEADNODE=lhn001.cep2.lofar HEADNODE=lhn001.cep2.lofar
COMPUTENODES="`seq -f "locus%02.0f" 1 94`"
NRCOMPUTENODES=94 NRCOMPUTENODES=94
SLURM=false SLURM=false
...@@ -63,4 +66,3 @@ function parse_cluster_description { ...@@ -63,4 +66,3 @@ function parse_cluster_description {
;; ;;
esac esac
} }
#!/usr/bin/python
def replace_host(location, cluster_name, hosts):
"""
Returns location, with its hostname replaced by one
of `hosts', but only if the host matches `cluster_name':
cluster_name:... -> hosts[0]:...
other:... -> other:...
The hosts array is rotated to obtain a round-robin allocation
through repeated use.
"""
host, dir = location.split(":", 2)
if host == cluster_name:
host = hosts.pop(0)
hosts.append(host)
return "%s:%s" % (host,dir)
def process_parset(parset, cluster_name, hosts):
data_products = [ "Correlated", "CoherentStokes", "IncoherentStokes" ]
for dp in data_products:
key = "Observation.DataProducts.Output_%s.locations" % (dp,)
if not parset.isDefined(key):
continue
# obtain current locations
locations = parset._getStringVector1(key, True)
# replace global fs references
locations = [replace_host(x, cluster_name, hosts) for x in locations]
# update locations field
parset.replace(key, "[%s]" % (", ".join(locations),))
if __name__ == "__main__":
import sys
from optparse import OptionParser
from lofar.parameterset import PyParameterSet
# Command-line arguments
parser = OptionParser("%prog [options] < parset")
parser.add_option("-C", "--cluster", dest="cluster", type="string", default="cep4",
help="Cluster name to replace")
parser.add_option("-H", "--hosts", dest="hosts", type="string", default="",
help="Pool of host names to use (space separated)")
(options, args) = parser.parse_args()
if not options.cluster or not options.hosts:
print "Require both --cluster and --hosts."
parser.print_help()
sys.exit(1)
hosts = options.hosts.split()
# Read from stdin ...
parset = PyParameterSet("/dev/stdin", False)
# ... process ...
process_parset(parset, options.cluster, hosts)
# Write to stdout ...
print str(parset)
...@@ -30,6 +30,7 @@ function usage { ...@@ -30,6 +30,7 @@ function usage {
"LOFAR_CHECKTOOL"\ "LOFAR_CHECKTOOL"\
"\n -F: do NOT send data points to a PVSS gateway"\ "\n -F: do NOT send data points to a PVSS gateway"\
"\n -P: create PID file"\ "\n -P: create PID file"\
"\n -d: dummy run: don't execute anything"\
"\n -l: run solely on localhost using 'nprocs' MPI processes (isolated test)"\ "\n -l: run solely on localhost using 'nprocs' MPI processes (isolated test)"\
"\n -p: enable profiling" \ "\n -p: enable profiling" \
"\n -o: add option KEY=VALUE to the parset" \ "\n -o: add option KEY=VALUE to the parset" \
...@@ -116,6 +117,9 @@ AUGMENT_PARSET=1 ...@@ -116,6 +117,9 @@ AUGMENT_PARSET=1
# Extra parset keys to add # Extra parset keys to add
EXTRA_PARSET_KEYS="" EXTRA_PARSET_KEYS=""
# Whether to execute anything
DUMMY_RUN=false
# File to write PID to # File to write PID to
PIDFILE="" PIDFILE=""
...@@ -136,7 +140,7 @@ RTCP_PARAMS="" ...@@ -136,7 +140,7 @@ RTCP_PARAMS=""
# ****************************** # ******************************
# Parse command-line options # Parse command-line options
# ****************************** # ******************************
while getopts ":ABCFP:l:o:px:" opt; do while getopts ":ABCFP:dl:o:px:" opt; do
case $opt in case $opt in
A) AUGMENT_PARSET=0 A) AUGMENT_PARSET=0
;; ;;
...@@ -148,6 +152,8 @@ while getopts ":ABCFP:l:o:px:" opt; do ...@@ -148,6 +152,8 @@ while getopts ":ABCFP:l:o:px:" opt; do
;; ;;
P) PIDFILE="$OPTARG" P) PIDFILE="$OPTARG"
;; ;;
d) DUMMY_RUN=true
;;
l) FORCE_LOCALHOST=1 l) FORCE_LOCALHOST=1
MPIRUN_PARAMS="$MPIRUN_PARAMS -np $OPTARG" MPIRUN_PARAMS="$MPIRUN_PARAMS -np $OPTARG"
;; ;;
...@@ -350,45 +356,8 @@ PID_LIST_FILE="$LOFARROOT/var/run/outputProc-$OBSERVATIONID.pids" ...@@ -350,45 +356,8 @@ PID_LIST_FILE="$LOFARROOT/var/run/outputProc-$OBSERVATIONID.pids"
if $GLOBALFS; then if $GLOBALFS; then
# Update locations in parset # Update locations in parset
mv -fT "$PARSET" "$PARSET.allocate-globalFS" mv -fT "$PARSET" "$PARSET.generate_globalfs"
generate_globalfs_locations.py --cluster "$CLUSTER_NAME" --hosts "$COMPUTENODES" < "$PARSET.generate_globalfs" > "$PARSET"
<$PARSET.allocate-outputProc >$PARSET perl -e '
@hosts = qw('"$NODE_LIST"');
while (<>) {
if (/^
(Observation.DataProducts.Output_[A-Za-z]+.locations) # key
\s*=\s*
\[(.*?)\] # value
/x) {
# output location key -> replace hostnames
$key, $locations = $1, $2;
# locations are of the format "locus001:/dir, locus002:/dir, ..."
foreach $loc (split /,/, $locations) {
# split off directory
($host, $dir) = split /:/, $loc, 2;
# replace hostname iff it matches our cluster
if ($host =~ /^\s*'"$CLUSTER_NAME"'\s$*/i) {
# determine new host (rotate @hosts)
$host = shift @hosts;
push @hosts, $host;
}
# add new location to the list
push @newlocations, join(":", $host, $dir);
}
# print key with new value
printf "%s=[%s]\n", $key, join(",", @newlocations);
} else {
# print any other parset key verbatim
print;
}
}
'
fi fi
...@@ -443,43 +412,55 @@ if $DOCKER; then ...@@ -443,43 +412,55 @@ if $DOCKER; then
OUTPUTPROC_CMDLINE="docker run -it -e LUSER=`id -u $SSH_USER_NAME` --net=host -v $DATADIR:$DATADIR lofar-outputproc:$TAG bash -c \"$OUTPUTPROC_CMDLINE\"" OUTPUTPROC_CMDLINE="docker run -it -e LUSER=`id -u $SSH_USER_NAME` --net=host -v $DATADIR:$DATADIR lofar-outputproc:$TAG bash -c \"$OUTPUTPROC_CMDLINE\""
fi fi
if $SLURM; then echo "[outputProc] command line = $OUTPUTPROC_CMDLINE"
# The nodes we need (and can use) are part of this job
COMMAND="srun -N $SLURM_JOB_NUM_NODES $OUTPUTPROC_CMDLINE"
echo "Starting $COMMAND"
$COMMAND &
PID=$!
echo -n "$PID " >> $PID_LIST_FILE # Save the pid for cleanup if ! $DUMMY_RUN; then
else if $SLURM; then
for HOST in $NODE_LIST # The nodes we need (and can use) are part of this job
do COMMAND="srun -N $SLURM_JOB_NUM_NODES $OUTPUTPROC_CMDLINE"
COMMAND="ssh -tt -l $SSH_USER_NAME $KEY_STRING $SSH_USER_NAME@$HOST $OUTPUTPROC_CMDLINE"
echo "Starting $COMMAND" echo "Starting $COMMAND"
command_retry "$COMMAND" & # Start retrying function in the background $COMMAND &
PID=$! # get the pid PID=$!
echo -n "$PID " >> $PID_LIST_FILE # Save the pid for cleanup echo -n "$PID " >> $PID_LIST_FILE # Save the pid for cleanup
done else
for HOST in $NODE_LIST
do
COMMAND="ssh -tt -l $SSH_USER_NAME $KEY_STRING $SSH_USER_NAME@$HOST $OUTPUTPROC_CMDLINE"
echo "Starting $COMMAND"
command_retry "$COMMAND" & # Start retrying function in the background
PID=$! # get the pid
echo -n "$PID " >> $PID_LIST_FILE # Save the pid for cleanup
done
fi
fi fi
# ************************************ # ************************************
# Start rtcp # Start rtcp
# *********************************** # ***********************************
echo "[cobalt] LOFARROOT = $LOFARROOT"
echo "[cobalt] parset = $PARSET"
# Run in the background to allow signals to propagate # Run in the background to allow signals to propagate
# #
# -x LOFARROOT Propagate $LOFARROOT for rtcp to find GPU kernels, config files, etc. # -x LOFARROOT Propagate $LOFARROOT for rtcp to find GPU kernels, config files, etc.
# -x QUEUE_PREFIX Propagate $QUEUE_PREFIX for test-specific interaction over the message bus # -x QUEUE_PREFIX Propagate $QUEUE_PREFIX for test-specific interaction over the message bus
# -H The host list to run on, derived earlier. # -H The host list to run on, derived earlier.
mpirun.sh -x LOFARROOT="$LOFARROOT" \ if $DUMMY_RUN; then
-x QUEUE_PREFIX="$QUEUE_PREFIX" \ # Just return success
-H "$HOSTS" \ true &
$MPIRUN_PARAMS \ else
$CHECK_TOOL \ mpirun.sh -x LOFARROOT="$LOFARROOT" \
`which rtcp` $RTCP_PARAMS "$PARSET" & -x QUEUE_PREFIX="$QUEUE_PREFIX" \
-H "$HOSTS" \
$MPIRUN_PARAMS \
$CHECK_TOOL \
`which rtcp` $RTCP_PARAMS "$PARSET" &
fi
PID=$! PID=$!
# Propagate SIGTERM # Propagate SIGTERM
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment