diff --git a/RTCP/Cobalt/GPUProc/src/scripts/cobalt_functions.sh b/RTCP/Cobalt/GPUProc/src/scripts/cobalt_functions.sh index b9477c38e67ec5378c6668516d876f0c3905db7f..35918e69e4fa6c0ab6288f067988ec941a2ab01b 100755 --- a/RTCP/Cobalt/GPUProc/src/scripts/cobalt_functions.sh +++ b/RTCP/Cobalt/GPUProc/src/scripts/cobalt_functions.sh @@ -45,9 +45,11 @@ function parse_cluster_description { CLUSTER_NAME=cep4 HEADNODE=head01.cep4.control.lofar + COMPUTENODES="`seq -f "cpu%02.0f" 1 50`" NRCOMPUTENODES=50 - SLURM=true + #SLURM=true + SLURM=false # Don't use SLURM for now, let's get it working without it first GLOBALFS=true DOCKER=true ;; @@ -55,6 +57,7 @@ function parse_cluster_description { CLUSTER_NAME=cep2 HEADNODE=lhn001.cep2.lofar + COMPUTENODES="`seq -f "locus%02.0f" 1 94`" NRCOMPUTENODES=94 SLURM=false @@ -63,4 +66,3 @@ function parse_cluster_description { ;; esac } - diff --git a/RTCP/Cobalt/GPUProc/src/scripts/generate_globalfs_locations.py b/RTCP/Cobalt/GPUProc/src/scripts/generate_globalfs_locations.py new file mode 100755 index 0000000000000000000000000000000000000000..f74c642e84f5fd02b90b49409b79644a77f89ff0 --- /dev/null +++ b/RTCP/Cobalt/GPUProc/src/scripts/generate_globalfs_locations.py @@ -0,0 +1,68 @@ +#!/usr/bin/python + +def replace_host(location, cluster_name, hosts): + """ + Returns location, with its hostname replaced by one + of `hosts', but only if the host matches `cluster_name': + cluster_name:... -> hosts[0]:... + other:... -> other:... + + The hosts array is rotated to obtain a round-robin allocation + through repeated use. + """ + + host, dir = location.split(":", 2) + + if host == cluster_name: + host = hosts.pop(0) + hosts.append(host) + + return "%s:%s" % (host,dir) + +def process_parset(parset, cluster_name, hosts): + data_products = [ "Correlated", "CoherentStokes", "IncoherentStokes" ] + + for dp in data_products: + key = "Observation.DataProducts.Output_%s.locations" % (dp,) + if not parset.isDefined(key): + continue + + # obtain current locations + locations = parset._getStringVector1(key, True) + + # replace global fs references + locations = [replace_host(x, cluster_name, hosts) for x in locations] + + # update locations field + parset.replace(key, "[%s]" % (", ".join(locations),)) + +if __name__ == "__main__": + import sys + from optparse import OptionParser + from lofar.parameterset import PyParameterSet + + # Command-line arguments + parser = OptionParser("%prog [options] < parset") + parser.add_option("-C", "--cluster", dest="cluster", type="string", default="cep4", + help="Cluster name to replace") + parser.add_option("-H", "--hosts", dest="hosts", type="string", default="", + help="Pool of host names to use (space separated)") + + (options, args) = parser.parse_args() + + if not options.cluster or not options.hosts: + print "Require both --cluster and --hosts." + parser.print_help() + sys.exit(1) + + hosts = options.hosts.split() + + # Read from stdin ... + parset = PyParameterSet("/dev/stdin", False) + + # ... process ... + process_parset(parset, options.cluster, hosts) + + # Write to stdout ... + print str(parset) + diff --git a/RTCP/Cobalt/GPUProc/src/scripts/runObservation.sh b/RTCP/Cobalt/GPUProc/src/scripts/runObservation.sh index 70498ce89e0b6f6c5b7e15a0cbf64553d0023160..3506794f55bf9b39d4f13a4c254af70c654c1436 100755 --- a/RTCP/Cobalt/GPUProc/src/scripts/runObservation.sh +++ b/RTCP/Cobalt/GPUProc/src/scripts/runObservation.sh @@ -30,6 +30,7 @@ function usage { "LOFAR_CHECKTOOL"\ "\n -F: do NOT send data points to a PVSS gateway"\ "\n -P: create PID file"\ + "\n -d: dummy run: don't execute anything"\ "\n -l: run solely on localhost using 'nprocs' MPI processes (isolated test)"\ "\n -p: enable profiling" \ "\n -o: add option KEY=VALUE to the parset" \ @@ -116,6 +117,9 @@ AUGMENT_PARSET=1 # Extra parset keys to add EXTRA_PARSET_KEYS="" +# Whether to execute anything +DUMMY_RUN=false + # File to write PID to PIDFILE="" @@ -136,7 +140,7 @@ RTCP_PARAMS="" # ****************************** # Parse command-line options # ****************************** -while getopts ":ABCFP:l:o:px:" opt; do +while getopts ":ABCFP:dl:o:px:" opt; do case $opt in A) AUGMENT_PARSET=0 ;; @@ -148,6 +152,8 @@ while getopts ":ABCFP:l:o:px:" opt; do ;; P) PIDFILE="$OPTARG" ;; + d) DUMMY_RUN=true + ;; l) FORCE_LOCALHOST=1 MPIRUN_PARAMS="$MPIRUN_PARAMS -np $OPTARG" ;; @@ -350,45 +356,8 @@ PID_LIST_FILE="$LOFARROOT/var/run/outputProc-$OBSERVATIONID.pids" if $GLOBALFS; then # Update locations in parset - mv -fT "$PARSET" "$PARSET.allocate-globalFS" - - <$PARSET.allocate-outputProc >$PARSET perl -e ' - @hosts = qw('"$NODE_LIST"'); - - while (<>) { - if (/^ - (Observation.DataProducts.Output_[A-Za-z]+.locations) # key - \s*=\s* - \[(.*?)\] # value - /x) { - - # output location key -> replace hostnames - $key, $locations = $1, $2; - - # locations are of the format "locus001:/dir, locus002:/dir, ..." - foreach $loc (split /,/, $locations) { - # split off directory - ($host, $dir) = split /:/, $loc, 2; - - # replace hostname iff it matches our cluster - if ($host =~ /^\s*'"$CLUSTER_NAME"'\s$*/i) { - # determine new host (rotate @hosts) - $host = shift @hosts; - push @hosts, $host; - } - - # add new location to the list - push @newlocations, join(":", $host, $dir); - } - - # print key with new value - printf "%s=[%s]\n", $key, join(",", @newlocations); - } else { - # print any other parset key verbatim - print; - } - } - ' + mv -fT "$PARSET" "$PARSET.generate_globalfs" + generate_globalfs_locations.py --cluster "$CLUSTER_NAME" --hosts "$COMPUTENODES" < "$PARSET.generate_globalfs" > "$PARSET" fi @@ -443,43 +412,55 @@ if $DOCKER; then OUTPUTPROC_CMDLINE="docker run -it -e LUSER=`id -u $SSH_USER_NAME` --net=host -v $DATADIR:$DATADIR lofar-outputproc:$TAG bash -c \"$OUTPUTPROC_CMDLINE\"" fi -if $SLURM; then - # The nodes we need (and can use) are part of this job - COMMAND="srun -N $SLURM_JOB_NUM_NODES $OUTPUTPROC_CMDLINE" - echo "Starting $COMMAND" - - $COMMAND & - PID=$! +echo "[outputProc] command line = $OUTPUTPROC_CMDLINE" - echo -n "$PID " >> $PID_LIST_FILE # Save the pid for cleanup -else - for HOST in $NODE_LIST - do - COMMAND="ssh -tt -l $SSH_USER_NAME $KEY_STRING $SSH_USER_NAME@$HOST $OUTPUTPROC_CMDLINE" +if ! $DUMMY_RUN; then + if $SLURM; then + # The nodes we need (and can use) are part of this job + COMMAND="srun -N $SLURM_JOB_NUM_NODES $OUTPUTPROC_CMDLINE" echo "Starting $COMMAND" - - command_retry "$COMMAND" & # Start retrying function in the background - PID=$! # get the pid - + + $COMMAND & + PID=$! + echo -n "$PID " >> $PID_LIST_FILE # Save the pid for cleanup - done + else + for HOST in $NODE_LIST + do + COMMAND="ssh -tt -l $SSH_USER_NAME $KEY_STRING $SSH_USER_NAME@$HOST $OUTPUTPROC_CMDLINE" + echo "Starting $COMMAND" + + command_retry "$COMMAND" & # Start retrying function in the background + PID=$! # get the pid + + echo -n "$PID " >> $PID_LIST_FILE # Save the pid for cleanup + done + fi fi # ************************************ # Start rtcp # *********************************** +echo "[cobalt] LOFARROOT = $LOFARROOT" +echo "[cobalt] parset = $PARSET" + # Run in the background to allow signals to propagate # # -x LOFARROOT Propagate $LOFARROOT for rtcp to find GPU kernels, config files, etc. # -x QUEUE_PREFIX Propagate $QUEUE_PREFIX for test-specific interaction over the message bus # -H The host list to run on, derived earlier. -mpirun.sh -x LOFARROOT="$LOFARROOT" \ - -x QUEUE_PREFIX="$QUEUE_PREFIX" \ - -H "$HOSTS" \ - $MPIRUN_PARAMS \ - $CHECK_TOOL \ - `which rtcp` $RTCP_PARAMS "$PARSET" & +if $DUMMY_RUN; then + # Just return success + true & +else + mpirun.sh -x LOFARROOT="$LOFARROOT" \ + -x QUEUE_PREFIX="$QUEUE_PREFIX" \ + -H "$HOSTS" \ + $MPIRUN_PARAMS \ + $CHECK_TOOL \ + `which rtcp` $RTCP_PARAMS "$PARSET" & +fi PID=$! # Propagate SIGTERM