diff --git a/.gitattributes b/.gitattributes index 86160631397ad60674bd29bb8fbcac2ea0373854..27a529357b14e3cb3deb1458024a35d6ebaca861 100644 --- a/.gitattributes +++ b/.gitattributes @@ -1227,6 +1227,8 @@ LCU/StationTest/xc_200_verify.sh eol=lf MAC/APL/APLCommon/include/APL/APLCommon/AntennaField.h -text MAC/APL/APLCommon/src/AntennaField.cc -text MAC/APL/APLCommon/src/StartDaemon_Protocol.prot -text svneol=native#application/octet-stream +MAC/APL/APLCommon/src/swlevel -text +MAC/APL/APLCommon/src/swlevel.conf -text MAC/APL/APLCommon/test/ControllerProtMenu.cc -text MAC/APL/APLCommon/test/ControllerProtMenu.h -text MAC/APL/APLCommon/test/tAntennaField.cc -text @@ -2469,11 +2471,12 @@ RTCP/LofarStMan/src/makeFLAGwritable -text RTCP/LofarStMan/test/CMakeLists.txt -text RTCP/RTCPTools/src/cexec-udp-copy -text RTCP/Run/CMakeLists.txt -text +RTCP/Run/src/BGPPartition.sh -text RTCP/Run/src/BlueGeneCheck.sh -text RTCP/Run/src/BlueGeneControl.conf -text -RTCP/Run/src/BlueGeneControl.sh -text RTCP/Run/src/CMakeLists.txt -text -RTCP/Run/src/LOFAR/BGcontrol.py -text +RTCP/Run/src/CNProcessing.sh -text +RTCP/Run/src/IONProcessing.sh -text RTCP/Run/src/LOFAR/CMakeLists.txt -text RTCP/Run/src/LOFAR/CommandClient.py -text RTCP/Run/src/LOFAR/Core.py -text @@ -2485,18 +2488,19 @@ RTCP/Run/src/LOFAR/Parset.py -text RTCP/Run/src/LOFAR/ParsetTester.py -text RTCP/Run/src/LOFAR/Partitions.py -text RTCP/Run/src/LOFAR/RingCoordinates.py -text -RTCP/Run/src/LOFAR/Sections.py -text RTCP/Run/src/LOFAR/Stations.py -text RTCP/Run/src/LOFAR/__init__.py -text RTCP/Run/src/MAC+IP.dat -text RTCP/Run/src/RSPConnections.dat -text RTCP/Run/src/commandOLAP.py -text +RTCP/Run/src/controller.sh -text RTCP/Run/src/deploy/Makefile -text RTCP/Run/src/generate_OLAP.parset.pl -text +RTCP/Run/src/gracefullyStopBGProcessing.sh -text +RTCP/Run/src/locations.sh.in -text RTCP/Run/src/multitail-olap.conf -text RTCP/Run/src/packetanalysis.c -text RTCP/Run/src/runParset.py -text -RTCP/Run/src/startOLAP.py -text RTCP/Run/src/util/Aborter.py -text RTCP/Run/src/util/CMakeLists.txt -text RTCP/Run/src/util/Commands.py -text @@ -2505,7 +2509,6 @@ RTCP/Run/src/util/Parset.py -text RTCP/Run/src/util/__init__.py -text RTCP/Run/src/util/dateutil.py -text RTCP/Run/src/util/shlex.py -text -RTCP/Run/src/util/tee.py -text RTCP/Run/src/watchlogs.sh -text RTCP/Run/test/OLAP.parset -text RTCP/Run/test/RTCP-validate.parset -text diff --git a/MAC/APL/APLCommon/src/swlevel b/MAC/APL/APLCommon/src/swlevel old mode 100755 new mode 100644 index cf76580e719064ecebc46d8913f013f89d502414..9df38e5b8091127e7709dea79cbc2ac97e425723 --- a/MAC/APL/APLCommon/src/swlevel +++ b/MAC/APL/APLCommon/src/swlevel @@ -25,9 +25,16 @@ # $Id$ # VERSION="v2.1 20100517" # loading image 1 before starting RSPdriver -BINDIR=/opt/lofar/bin -LOGDIR=/opt/lofar/log -ETCDIR=/opt/lofar/etc + +if [ "$LOFARROOT" == "" ]; then + # default value until all MAC controlled systems provide $LOFARROOT + LOFARROOT=/opt/lofar + echo "WARNING: LOFARROOT not set, using $LOFARROOT" +fi + +BINDIR=$LOFARROOT/bin +LOGDIR=$LOFARROOT/log +ETCDIR=$LOFARROOT/etc LEVELTABLE=${ETCDIR}/swlevel.conf # Make sure all files are user/group writeable (needed for Int. @@ -329,10 +336,13 @@ goto_level() tac $LEVELTABLE | cut -d"#" -f1 | awk '{ if (NF>0) print $0 }' | \ grep "^${l}:" | grep ":d:" | while read line do + ( asroot=`echo $line | cut -d":" -f4` withmpi=`echo $line | cut -d":" -f5` program=`echo $line | cut -d":" -f6` stop_prog $program x$asroot x$withmpi + ) <&- # cant have programs reading from stdin + # as that would mess up 'while read line' done done @@ -342,10 +352,13 @@ goto_level() cat $LEVELTABLE | cut -d"#" -f1 | awk '{ if (NF>0) print $0 }' | \ grep "^${l}:" | grep ":u:" | while read line do + ( asroot=`echo $line | cut -d":" -f4` withmpi=`echo $line | cut -d":" -f5` program=`echo $line | cut -d":" -f6` start_prog $program x$asroot x$withmpi + ) <&- # cant have programs reading from stdin + # as that would mess up 'while read line' done done } @@ -445,7 +458,7 @@ handle_args() if [ "$user" != "lofarsys" -a $level -gt 3 ]; then echo "Will only start up to level 3 as this appears to be local use" - $level=3 + level=3 fi # default image is 1 @@ -484,11 +497,9 @@ handle_args $* # All other options that act on the station status are for lofarsys only # Don't allow root to run swlevel because all logfile get root access. -if [ "$user" != "lofarsys" ]; then - if [ "$group" != "local" ]; then - echo "swlevel must be run by user lofarsys or group local members!" - exit - fi +if [ "$LOFARROOT" == "/opt/lofar" -a "$user" != "lofarsys" -a "$group" != "local" ]; then + echo "swlevel must be run by user lofarsys or group local members!" + exit fi # first power down to this level diff --git a/MAC/APL/APLCommon/src/swlevel.conf b/MAC/APL/APLCommon/src/swlevel.conf index 550ece83722dc9424a5eab66061d51d96902795b..b56a7e7710f1f80720796157effe6737d02a4fe7 100644 --- a/MAC/APL/APLCommon/src/swlevel.conf +++ b/MAC/APL/APLCommon/src/swlevel.conf @@ -15,11 +15,13 @@ 2:u:d:r::_EPAStub 2:u:d:r::RSPDriver 2:u:d:r::TBBDriver -2:u:d:::BlueGeneControl +2:u:d:::BGPPartition # 3:u:d:::AMCServer 3:u:d:::CalServer 3:u:d:::BeamServer +3:u:d:::IONProcessing # IONProc makes the logdir, so start it before CNProc +3:u:d:::CNProcessing # 4:u:d:::SoftwareMonitor 4:u:d:::HardwareMonitor diff --git a/MAC/APL/Appl_Controller/startBGL.sh b/MAC/APL/Appl_Controller/startBGL.sh index c64119c79edbdcc901b3c24b3dcfdb9e0a8c66a5..0578437b2339395d1f143391ab344e2677d494aa 100755 --- a/MAC/APL/Appl_Controller/startBGL.sh +++ b/MAC/APL/Appl_Controller/startBGL.sh @@ -26,4 +26,4 @@ echo "OLAP.IONProc.PLC_controlled = T" ) >> $PARSET # Inject the parset into the correlator -$BINPATH/runParset.py -P $PARTITION parset=$PARSET >>/opt/lofar/log/run.runParset.py.log 2>&1 & +/opt/lofar/bin/runParset.py -P $PARTITION parset=$PARSET >>/opt/lofar/log/run.runParset.py.log 2>&1 & diff --git a/RTCP/Run/src/BGPPartition.sh b/RTCP/Run/src/BGPPartition.sh new file mode 100755 index 0000000000000000000000000000000000000000..d4696197d66b7a19410a50fd1e88dd4df6ddc961 --- /dev/null +++ b/RTCP/Run/src/BGPPartition.sh @@ -0,0 +1,32 @@ +#!/bin/bash + +. locations.sh + +function start() { + mpirun -partition $PARTITION -timeout 300 -nofree -exe /bgsys/tools/hello >/dev/null +} + +function stop() { + mpirun -partition $PARTITION -free wait +} + +function getpid() { + STATUS=`bgpartstatus $PARTITION </dev/null` + + case $STATUS in + busy) PID=UP + ;; + *) PID=DOWN + ;; + esac +} + +function setpid() { + true +} + +function delpid() { + true +} + +. controller.sh diff --git a/RTCP/Run/src/BlueGeneControl.conf b/RTCP/Run/src/BlueGeneControl.conf index f2ea24109d64e5348525588bc7f672e138a79a2b..7c421cbb716f268c2718ce1b203a5ccc179eb606 100644 --- a/RTCP/Run/src/BlueGeneControl.conf +++ b/RTCP/Run/src/BlueGeneControl.conf @@ -1,13 +1,15 @@ -# BG/P Partition to use for the correlator -PARTITION=R00 - -# Root directory for the binaries -BINPATH=/opt/lofar/bin - -# Location of PID file -PIDFILE=/tmp/BlueGeneControl-$PARTITION.pid - -# Location of log file -LOGDIR=/opt/lofar/log -LOGFILE=$LOGDIR/BlueGeneControl.log +if [ "$USER" == "lofarsys" ] +then + # Production systems + + # BG/P Partition to use for the correlator + PARTITION=R00 +else + # development should define their own $PARTITION + if [ -z "$PARTITION" ] + then + echo Please define \$PARTITION. >&2 + exit + fi +fi diff --git a/RTCP/Run/src/BlueGeneControl.sh b/RTCP/Run/src/BlueGeneControl.sh deleted file mode 100755 index d851da575523e0633623547e6949ec242b833557..0000000000000000000000000000000000000000 --- a/RTCP/Run/src/BlueGeneControl.sh +++ /dev/null @@ -1,88 +0,0 @@ -#!/bin/bash -COMMAND=$1 - -CONFIG=/opt/lofar/etc/BlueGeneControl.conf - -. $CONFIG - -function getpid() { - if [ -e $PIDFILE ] - then - PID=`cat $PIDFILE` - if [ ! -e /proc/$PID ] - then - PID=DOWN - fi - else - PID=DOWN - fi -} - -function start() { - $BINPATH/LOFAR/Partitions.py -kfa $PARTITION - - $BINPATH/startOLAP.py -P $PARTITION & - PID=$! - echo $PID > $PIDFILE -} - -function wait_for_graceful_exit() { - # wait for correlator to stop - for i in `seq 1 30` - do - if [ -e /proc/$PID ] - then - break - fi - done -} - -function stop() { - $BINPATH/commandOLAP.py -P $PARTITION cancel all - $BINPATH/commandOLAP.py -P $PARTITION quit - - wait_for_graceful_exit - - if [ -e /proc/$PID ] - then - # nudge startOLAP - kill -2 $PID - - wait_for_graceful_exit - - # kill startOLAP.py script and all its children (mpiruns) - pkill -P $PID - $BINPATH/LOFAR/Partitions.py -k $PARTITION - fi - - rm -f $PIDFILE -} - -getpid - -case $COMMAND in - start) if [ "$PID" = "DOWN" ] - then - ( - start - ) >> $LOGFILE 2>&1 - fi - ;; - - stop) if [ "$PID" != "DOWN" ] - then - ( - stop - ) >> $LOGFILE 2>&1 - fi - ;; - - status) - SWLEVEL=$2 - echo "$SWLEVEL : BlueGeneControl $PID" - ;; - - *) echo "usage: $0 {start|stop|status}" - ;; -esac - diff --git a/RTCP/Run/src/CMakeLists.txt b/RTCP/Run/src/CMakeLists.txt index 7c85177ba8d984b22bd48a6ffa1c923c3c6cc118..610f9fcd49503ee97d5d86587c197d12b0a616f4 100644 --- a/RTCP/Run/src/CMakeLists.txt +++ b/RTCP/Run/src/CMakeLists.txt @@ -2,10 +2,19 @@ lofar_add_bin_program(packetanalysis packetanalysis.c) +configure_file( + ${CMAKE_CURRENT_SOURCE_DIR}/locations.sh.in + ${CMAKE_CURRENT_BINARY_DIR}/locations.sh + @ONLY) + install(PROGRAMS - BlueGeneControl.sh + BGPPartition.sh + CNProcessing.sh + IONProcessing.sh + controller.sh + gracefullyStopBGProcessing.sh + ${CMAKE_CURRENT_BINARY_DIR}/locations.sh watchlogs.sh - startOLAP.py commandOLAP.py runParset.py DESTINATION bin) diff --git a/RTCP/Run/src/CNProcessing.sh b/RTCP/Run/src/CNProcessing.sh new file mode 100755 index 0000000000000000000000000000000000000000..1f1602a84531b2dc54ac572200b310a6d469975e --- /dev/null +++ b/RTCP/Run/src/CNProcessing.sh @@ -0,0 +1,65 @@ +#!/bin/bash + +source locations.sh + +function start() { + TMPDIR="`mktemp -d`" + PIDFILE="$TMPDIR/pid" + + # use a fifo to avoid race conditions + mkfifo "$PIDFILE" + + (mpirun -mode VN -partition "$PARTITION" -env DCMF_COLLECTIVES=0 -env BG_MAPPING=XYZT -env LD_LIBRARY_PATH=/bgsys/drivers/ppcfloor/comm/lib:/bgsys/drivers/ppcfloor/runtime/SPI:/globalhome/romein/lib.bgp -cwd "$LOGSYMLINK" -exe "$CNPROC" 2>&1 & + echo $! > "$PIDFILE") | LOFAR/Logger.py $LOGPARAMS "$LOGSYMLINK/CNProc.log" & + + PID=`cat "$PIDFILE"` + rm -f "$PIDFILE" + rmdir "$TMPDIR" + + if [ -z "$PID" ] + then + PID=DOWN + fi +} + +function stop() { + # graceful exit + alarm 10 gracefullyStopBGProcessing.sh + + # ungraceful exit + [ -e /proc/$PID ] && ( + # mpikill only works when mpirun has started running the application + mpikill "$PID" || + + # ask DNA to kill the job + (bgjobs -u $USER -s | awk "/$PARTITION/ { print \$1; }" | xargs -L 1 bgkilljob) || + + # kill -9 is the last resort + kill -9 "$PID" + ) && sleep 5 + + # wait for job to die + while true + do + JOBSTATUS=`bgjobs -u $USER -s | awk "/$PARTITION/ { print \\$6; }"` + JOBID=`bgjobs -u $USER -s | awk "/$PARTITION/ { print \\$1; }"` + + if [ -z "$JOBID" ] + then + # job is gone + break + fi + + case "$JOBSTATUS" in + dying) + sleep 1 + continue ;; + + *) + echo "Failed to kill BG/P job $JOBID. Status is $JOBSTATUS" + break ;; + esac + done +} + +. controller.sh diff --git a/RTCP/Run/src/IONProcessing.sh b/RTCP/Run/src/IONProcessing.sh new file mode 100755 index 0000000000000000000000000000000000000000..f53e822b0902d3e97a6e51af7c94370c3cc59568 --- /dev/null +++ b/RTCP/Run/src/IONProcessing.sh @@ -0,0 +1,38 @@ +#!/bin/bash + +source locations.sh + +function start() { + # create a new log dir + rm -f "$LOGSYMLINK" || true + mkdir -p "$LOGDIR" + ln -s "$LOGDIR" "$LOGSYMLINK" + + TMPDIR=`mktemp -d` + PIDFILE="$TMPDIR/pid" + + # use a fifo to avoid race conditions + mkfifo "$PIDFILE" + + (/bgsys/LOFAR/openmpi-ion/bin/mpirun -host "$PSETS" --pernode -wd "$LOGDIR" "$IONPROC" "$ISPRODUCTION" 2>&1 & + echo $! > "$PIDFILE") | LOFAR/Logger.py $LOGPARAMS "$LOGSYMLINK/IONProc.log" & + + PID=`cat $PIDFILE` + rm -f "$PIDFILE" + rmdir "$TMPDIR" + + if [ -z "$PID" ] + then + PID=DOWN + fi +} + +function stop() { + # graceful exit + alarm 10 gracefullyStopBGProcessing.sh + + # ungraceful exit + [ -e /proc/$PID ] && kill -15 "$PID" && (sleep 2; [ -e /proc/$PID ] && kill -9 "$PID") +} + +. controller.sh diff --git a/RTCP/Run/src/LOFAR/BGcontrol.py b/RTCP/Run/src/LOFAR/BGcontrol.py deleted file mode 100755 index 034a8cb94b4487d28240f44a4be00d643ce3a966..0000000000000000000000000000000000000000 --- a/RTCP/Run/src/LOFAR/BGcontrol.py +++ /dev/null @@ -1,284 +0,0 @@ -#!/usr/bin/env python - -print "WARNING: BGcontrol.py has been depricated. Use Stations.py or Partitions.py instead." - -import os -from Partitions import PartitionPsets -from Stations import Stations -import sys - -# allow ../util to be found, a bit of a hack -sys.path += [os.path.dirname(__file__)+"/.."] - -from util.Commands import SyncCommand,backquote - -__all__ = ["owner","runningJob","packetAnalysis","stationInPartition","killJobs","freePartition","allocatePartition"] - -""" - Reports about the status of the BlueGene. An interface to the bg* scripts, and to check - what data is being received on the BlueGene from the stations. -""" - -# Locations of the bg* scripts -BGBUSY = "/usr/local/bin/bgbusy" -BGJOBS = "/usr/local/bin/bgjobs" - -def owner( partition ): - """ Returns the name of the owner of the partition, or None if the partition is not allocated. """ - - cmd = "%s" % (BGBUSY,) - - for l in os.popen( cmd, "r" ).readlines(): - try: - part,nodes,cores,state,owner,net = l.split() - except ValueError: - continue - - if part == partition: - # partition found - return owner - - # partition is not allocated - return None - -def runningJob( partition ): - """ Returns a (jobId,name) tuple of the job running on the partition, or None if no job is running. """ - - cmd = "%s" % (BGJOBS,) - - for l in os.popen( cmd, "r" ).readlines(): - try: - job,part,mode,executable,user,state,queue,limit,wall = l.split() - except ValueError: - continue - - if part == partition: - # job found - return (job,executable) - - # partition is not allocated or has no job running - return None - -def packetAnalysis( name, ip, port ): - # locate packetanalysis binary, since its location differs per usage, mainly because - # nobody runs these scripts from an installed environment - locations = map( os.path.abspath, [ - "%s/../packetanalysis" % os.path.dirname(__file__), # when running straight from a source tree - "%s/../../packetanalysis" % os.path.dirname(__file__), # when running in an installed environment - "%s/../../build/gnu/src/packetanalysis" % os.path.dirname(__file__), # when running straight from a source tree - - "/globalhome/mol/projects/LOFAR/RTCP/Run/src/packetanalysis", # fallback: Jan David's version - ] ) - - location = None - for l in locations: - if os.path.exists( l ): - location = l - break - - if location is None: - return "ERROR: Could not find `packetanalysis' binary" - - mainAnalysis = backquote( "ssh -tq %s %s %s" % (ip,location,port) ) - - # do a tcpdump analysis to obtain source mac address - """ tcpdump: The following information will be received from stations: - -08:30:23.175116 10:fa:00:01:01:01 > 00:14:5e:7d:19:71, ethertype IPv4 (0x0800), length 6974: 10.159.1.2.4347 > 10.170.0.1.4347: UDP, length 6928 - """ - tcpdump = backquote("ssh -q %s /opt/lofar/bin/tcpdump -i eth0 -c 10 -e -n udp 2>/dev/null" % (ip,)).split("\n") - macaddress = "UNKNOWN" - for p in tcpdump: - if not p: continue - - try: - f = p.split() - srcmac = f[1] - dstip = f[-4] - - dstip,dstport = dstip[:-1].rsplit(".",1) - if dstport != port: - continue - except ValueError: - continue - - macaddress = srcmac - - if macaddress in [ - "00:12:f2:c3:3a:00", # Effelsberg - ]: - macline = " OK Source MAC address: %s (known router)" % (macaddress,) - elif macaddress == "UNKNOWN" or not macaddress.startswith("00:22:86:"): - macline = "NOK Source MAC address: %s (no LOFAR station)" % (macaddress,) - else: - rscs,nr,field = name[0:2],name[2:5],name[5:] - nr1,nr2,nr3 = nr - - macnrs = macaddress.split(":") - srcnr = "%s%s" % (macnrs[3],macnrs[4]) - - if int(nr) == int(srcnr): - macline = " OK Source MAC address: %s" % (macaddress,) - else: - macline = "NOK Source MAC address: %s (station %d?)" % (macaddress,int(srcnr)) - - return "%s\n%s" % (macline,mainAnalysis) - -def allInputs( station ): - """ Generates a list of name,ip,port tuples for all inputs for a certain station. """ - - for name,input,ionode in sum( ([(s.name,i,s.ionode)] for s in station for i in s.inputs), [] ): - # skip non-network inputs - if input == "null:": - continue - - if input.startswith( "file:" ): - continue - - # strip tcp: - if input.startswith( "tcp:" ): - input = input[4:] - - # only process ip:port combinations - if ":" in input: - ip,port = input.split(":") - if ip in ["0.0.0.0","0"]: - ip = ionode - yield (name,ip,port) - -def stationInPartition( station, partition ): - """ Returns a list of stations that are not received within the given partition. - - Returns (True,[]) if the station is received correctly. - Returns (False,missingInputs) if some inputs are missing, where missingInputs is a list of (name,ip:port) pairs. - """ - - notfound = [] - - for name,ip,port in allInputs( station ): - if ip not in PartitionPsets[partition]: - notfound.append( (name,"%s:%s" % (ip,port)) ) - - return (not notfound, notfound) - -def killJobs( partition ): - """ Kill anything running on the partition. """ - return SyncCommand( "%s | /usr/bin/grep %s | /usr/bin/awk '{ print $1; }' | /usr/bin/xargs -r bgkilljob" % (BGJOBS,partition,) ).isSuccess() - - -def freePartition( partition ): - """ Free the given partition. """ - return SyncCommand( "mpirun -partition %s -free wait" % (partition,) ).isSuccess() - -def allocatePartition( partition ): - """ Allocate the given partition by running Hello World. """ - return SyncCommand( "mpirun -partition %s -nofree -exe /bgsys/tools/hello" % (partition,), ["/dev/null"] ).isSuccess() - -if __name__ == "__main__": - from optparse import OptionParser,OptionGroup - import sys - - def okstr( q ): - return ["NOK","OK"][int(bool(q))] - - # parse the command line - parser = OptionParser( """usage: %prog [options] - - Typical invocations: - %prog -c -S CS302LBA Checks input from station CS302LBA - %prog -kfa -P R00 Kills jobs, frees and allocates partition R00 - """) - parser.add_option( "-q", "--quiet", - dest = "quiet", - action = "store_true", - default = False, - help = "output less" ) - parser.add_option( "-c", "--check", - dest = "check", - action = "store_true", - default = False, - help = "check whether a certain station or partition is ok" ) - - pargroup = OptionGroup(parser, "Partition" ) - pargroup.add_option( "-k", "--kill", - dest = "kill", - action = "store_true", - default = False, - help = "kill all jobs running on the partition" ) - pargroup.add_option( "-a", "--allocate", - dest = "allocate", - action = "store_true", - default = False, - help = "allocate the partition" ) - pargroup.add_option( "-f", "--free", - dest = "free", - action = "store_true", - default = False, - help = "free the partition" ) - parser.add_option_group( pargroup ) - - hwgroup = OptionGroup(parser, "Hardware" ) - hwgroup.add_option( "-S", "--stations", - dest = "stations", - action = "append", - type = "string", - help = "the station(s) to use [%default]" ) - hwgroup.add_option( "-P", "--partition", - dest = "partition", - type = "string", - help = "name of the BlueGene partition [%default]" ) - parser.add_option_group( hwgroup ) - - # parse arguments - (options, args) = parser.parse_args() - errorOccurred = False - - if options.partition: - assert options.partition in PartitionPsets - - if options.kill and not errorOccurred: - if not options.quiet: print "Killing jobs on %s..." % ( options.partition, ) - errorOccured = killJobs( options.partition ) - - if options.free and not errorOccurred: - if not options.quiet: print "Freeing %s..." % ( options.partition, ) - errorOccured = freePartition( options.partition ) - - if options.allocate and not errorOccurred: - if not options.quiet: print "Allocating %s..." % ( options.partition, ) - errorOccured = allocatePartition( options.partition ) - - # check partition if requested so - if options.check: - expected_owner = os.environ["USER"] - real_owner = owner( options.partition ) - - print "Partition Owner : %-40s %s" % (real_owner,okstr(real_owner == expected_owner)) - - expected_job = None - real_job = runningJob( options.partition ) - - print "Running Job : %-40s %s" % (real_job,okstr(real_job == expected_job)) - - sys.exit(int(errorOccurred)) - - # check stations if requested so - if options.stations: - if options.check: - for stationName in options.stations: - if stationName not in Stations: - # unknown station - errorOccurred = True - print "NOK Station name unknown: %s" % (stationName,) - continue - - for name,ip,port in allInputs( Stations[stationName] ): - print "---- Packet analysis for %s %s:%s" % (name,ip,port) - print packetAnalysis( name, ip, port ) - - sys.exit(int(errorOccurred)) - - parser.print_help() - sys.exit(0) - - diff --git a/RTCP/Run/src/LOFAR/CMakeLists.txt b/RTCP/Run/src/LOFAR/CMakeLists.txt index 93b9bcdc40585ec3856480c47c009bcd6846349d..b3a88225933ee2449cbc5dfe815f893aa77db270 100644 --- a/RTCP/Run/src/LOFAR/CMakeLists.txt +++ b/RTCP/Run/src/LOFAR/CMakeLists.txt @@ -2,7 +2,6 @@ install(PROGRAMS __init__.py - BGcontrol.py CommandClient.py Core.py Locations.py @@ -13,6 +12,5 @@ install(PROGRAMS ParsetTester.py Partitions.py RingCoordinates.py - Sections.py Stations.py DESTINATION bin/LOFAR) diff --git a/RTCP/Run/src/LOFAR/Core.py b/RTCP/Run/src/LOFAR/Core.py index 531b8e02acb091fdb1b83d6238bd89da7b44e0cb..9cd1657606590169e98d4a9844ce87cfa90d0557 100644 --- a/RTCP/Run/src/LOFAR/Core.py +++ b/RTCP/Run/src/LOFAR/Core.py @@ -2,83 +2,10 @@ import Logger from logging import debug,info,warning,error,critical -import Sections -from util import Commands from Locations import Locations -from CommandClient import sendCommand from ObservationID import ObservationID from Parset import Parset from Stations import Stations,overrideRack -from util.dateutil import format -import sys -import signal -from threading import Lock -import thread -import socket - -DRYRUN = False - -aborted = False -lock = Lock() -lock.acquire() # lock can be released by anyone to signal the end of the run - -def installSigHandlers(): - """ Translate signals to KeyboardInterrupts to catch them in a try block. """ - - def sigHandler( sig, frame ): - global aborted,lock - - critical( "Caught signal %s -- aborting" % (sig,) ) - aborted = True - - try: - lock.release() - except thread.error: - pass - - signal.signal( signal.SIGTERM, sigHandler ) - signal.signal( signal.SIGQUIT, sigHandler ) - signal.signal( signal.SIGINT, sigHandler ) - -def runCorrelator( partition, start_cnproc = True, start_ionproc = True ): - """ Run an observation using the provided parsets. """ - - # ----- Select the sections to start - sections = Sections.SectionSet() - - if start_ionproc: - sections += [Sections.IONProcSection( partition )] - if start_cnproc: - sections += [Sections.CNProcSection( partition )] - - # sanity check on sections - if not DRYRUN: - sections.check() - - installSigHandlers() - - # ----- Run all sections - try: - # start all sections - sections.run() - sections.wait( lock ) - - if aborted: - raise Exception("aborted") - - except Exception,e: - error( "%s", e ) - - try: - # soft abort -- wait for all observations to stop - sendCommand( partition, "cancel all" ) - sendCommand( partition, "quit" ) - except: - # hard abort -- kill all sections - sections.abort() - - # let the sections clean up - sections.postProcess() def buildParset( parset = None, args = "", olapparset = "OLAP.parset", partition = None ): """ diff --git a/RTCP/Run/src/LOFAR/Locations.py b/RTCP/Run/src/LOFAR/Locations.py index c76d44139db617570ece4ec624bae9169304fcfa..20c55b62a19cd7877b4a46c846322bc35b52476f 100644 --- a/RTCP/Run/src/LOFAR/Locations.py +++ b/RTCP/Run/src/LOFAR/Locations.py @@ -51,13 +51,9 @@ class Locations: def setDefaults(self): # default build variants self.buildvars.update( { - "CNProc": "bgpcn_opt", - "IONProc": "bgpion_opt", "Storage": "gnu_opt", } ) self.executables.update( { - "CNProc": "CN_Processing", - "IONProc": "ION_Processing", "Storage": "Storage_main", } ) @@ -102,8 +98,6 @@ class Locations: "basedir": "${HOME}/production/lofar", # the locations of the main executables - "cnproc": "${BASEDIR}/bgp_cn/bin/%s" % (self.executables["CNProc"],), - "ionproc": "${BASEDIR}/bgp_ion/bin/%s" % (self.executables["IONProc"],), "storage": "/opt/storage/current/bin/%s" % (self.executables["Storage"],), # where to start the executables. rundir needs to be reachable @@ -129,11 +123,6 @@ class Locations: "ionsuppfile": "", "storagesuppfile": "", } ) - - self.nodes.update( { - # default log server address - "logserver": "tcp:ccu001:24500", - } ) else: self.files.update( { # the base directory most paths will be related to @@ -143,8 +132,6 @@ class Locations: "configdir": "${BASEDIR}/RTCP/Run/src", "storage_configdir": "${BASEDIR}/installed/%s/etc" % (self.buildvars["Storage"],), - "cnproc": "${BASEDIR}/installed/%s/bin/%s" % (self.buildvars["CNProc"],self.executables["CNProc"]), - "ionproc": "${BASEDIR}/installed/%s/bin/%s" % (self.buildvars["IONProc"],self.executables["IONProc"]), "storage": "${BASEDIR}/installed/%s/bin/%s" % (self.buildvars["Storage"],self.executables["Storage"]), # location of valgrind suppressions file @@ -152,11 +139,6 @@ class Locations: "storagesuppfile": "${BASEDIR}/RTCP/Storage/src/Storage.supp", } ) - self.nodes.update( { - # no external log server - "logserver": "", - } ) - #if not os.path.isdir( self.files["configdir"] ): # # fall back to default config dir # self.files["configdir"] = os.path.dirname(__file__)+"/.." diff --git a/RTCP/Run/src/LOFAR/Logger.py b/RTCP/Run/src/LOFAR/Logger.py index a4398db9494ee07f47cc1a5f7f925ae2ee717a17..901f87acf4e8202778257b02006c3ec51838f8f2 100644 --- a/RTCP/Run/src/LOFAR/Logger.py +++ b/RTCP/Run/src/LOFAR/Logger.py @@ -1,14 +1,116 @@ +#!/usr/bin/python + import sys import os -from math import modf -from time import time,strftime,localtime +from time import strftime,localtime,sleep import logging from logging.handlers import TimedRotatingFileHandler from traceback import format_exception from itertools import count +import socket +import Queue +from threading import Thread DEBUG=False +class reconnecting_socket: + """ A socket that keeps reconnecting if the connection is lost. Data is sent + asynchronously, with a buffer which drops messages if full. """ + + def __init__( self, host, port, retry_timeout=10, socket_timeout=5, bufsize=256 ): + self.host = host + self.port = port + self.socket_timeout = socket_timeout + self.retry_timeout = retry_timeout + self.socket = None + self.done = False + + self.writebuf = Queue.Queue( bufsize ) + + self.iothread = Thread( target=self.iothread_main, name="I/O thread for %s:%s" % (host,port) ) + self.iothread.start() + + def iothread_main( self ): + def close(): + self.socket.close() + self.socket = None + + def reconnect(): + self.socket = socket.socket() + self.socket.settimeout( self.socket_timeout ) + + while not self.done: + try: + self.socket.connect( (self.host,self.port) ) + except socket.error: + pass + except socket.timeout: + pass + else: + # connected! + break + + # sleep, but do stop when told + for i in xrange( self.retry_timeout ): + if self.done: + return + sleep( 1 ) + + def write( data ): + if self.socket is None: + reconnect() + + if self.done: + return + + written = 0 + + try: + while written < len(data): + written += self.socket.send( data[written:] ) + except socket.error: + # do not attempt to send remaining data + close() + return + except socket.timeout: + # do not attempt to send remaining data + close() + return + + # start with a connection + if self.socket is None: + reconnect() + + while True: + try: + data = self.writebuf.get( timeout=1 ) + except Queue.Empty: + # TODO: we can't keep a close check on our socket, delaying + # closing and reconnecting and keeping the line open + continue + + if data is None: + # close request + break + + write( data ) + + def write( self, data ): + if self.done: + return + + try: + self.writebuf.put_nowait( data ) + except Queue.Full: + # queue full -- drop data + pass + + def close( self ): + self.done = True # abort any reconnection attempts + self.writebuf.put( None ) # prod the deque, wait if necessary + + self.iothread.join() + def my_excepthook( etype, value, tb ): """ Replacement for default exception handler, which uses the logger instead of stderr. """ @@ -72,7 +174,7 @@ class TimedSizeRotatingFileHandler(TimedRotatingFileHandler): if os.path.exists(t): os.remove(t) - os.rename(f,t) + os.rename(f,t) t = self.rolloverAt - self.interval timeTuple = localtime(t) @@ -107,3 +209,76 @@ def rotatingLogger( appname, filename ): logger.addHandler( handler ) return logger + +if __name__ == "__main__": + import sys + + if len(sys.argv) < 2: + print "Usage: %s outputfilename [maxfilesize]" % (sys.argv[0],) + sys.exit(1) + +if __name__ == "__main__": + from optparse import OptionParser,OptionGroup + + parser = OptionParser( usage = """usage: %prog [options] outputfilename + """ ) + + parser.add_option( "-s", "--server", + dest = "server", + type = "string", + help = "output to logserver (host:port)" ) + parser.add_option( "-v", "--verbose", + dest = "verbose", + action = "store_true", + default = False, + help = "output to stdout [%default]" ) + parser.add_option( "-m", "--maxmb", + dest = "maxmb", + type = "int", + default = 512, + help = "maximum file size in megabytes [%default]" ) + + # parse arguments + (options, args) = parser.parse_args() + + if not args: + parser.print_help() + sys.exit(1) + + initLogger() + + logfilename = args[0] + logger = rotatingLogger( "foo", logfilename ) + logger.handlers[0].maxBytes = options.maxmb * 1024 * 1024 + + verbose = options.verbose + if options.server: + host,port = options.server.split(":") + port = int(port) + + if port == 0: + print "Invalid port number: %s" % (sys.argv[2],) + sys.exit(1) + + server = reconnecting_socket(host, port) + else: + server = None + + # 'for line in sys.stdin' buffers input, which + # is not what we want at all, so we use + # sys.stdin.readline instead. + for line in iter(sys.stdin.readline, ""): + if server: + server.write(line) + + line = line[:-1] # strip trailing \n + + logger.info( "%s", line ) + + if verbose: + print line + + + if server: + server.close() + diff --git a/RTCP/Run/src/LOFAR/Partitions.py b/RTCP/Run/src/LOFAR/Partitions.py index a8cb488e59be01995c6697a1022c189a80ad5a33..7fbdea5b80aaa4cc38a4dfa1a5a36d15d06512e4 100755 --- a/RTCP/Run/src/LOFAR/Partitions.py +++ b/RTCP/Run/src/LOFAR/Partitions.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -__all__ = [ "PartitionPsets","owner","runningJob","killJobs","freePartition","allocatePartition"] +__all__ = [ "PartitionPsets" ] import os import sys @@ -8,8 +8,6 @@ import sys # allow ../util to be found, a bit of a hack sys.path += [os.path.abspath(os.path.dirname(__file__)+"/..")] -from util.Commands import SyncCommand - # PartitionPsets: A dict which maps partitions to I/O node IP addresses. # the pset hierarchy is is analogue to: # R00-M0-N00-J00 = R00-M0-N00-J00-16 consists of a single pset @@ -52,91 +50,6 @@ for R in xrange(3): # a rack PartitionPsets[rack] = PartitionPsets["%s-M0" % rack] + PartitionPsets["%s-M1" % rack] -# Locations of the bg* scripts -BGBUSY = "/usr/local/bin/bgbusy" -BGJOBS = "/usr/local/bin/bgjobs" - -def owner( partition ): - """ Returns the name of the owner of the partition, or None if the partition is not allocated. """ - - cmd = "%s" % (BGBUSY,) - - for l in os.popen( cmd, "r" ).readlines(): - try: - part,nodes,cores,state,owner,net = l.split() - except ValueError: - continue - - if part == partition: - # partition found - return owner - - # partition is not allocated - return None - -def runningJob( partition ): - """ Returns a (jobId,name) tuple of the job running on the partition, or None if no job is running. """ - - cmd = "%s" % (BGJOBS,) - - for l in os.popen( cmd, "r" ).readlines(): - try: - job,part,mode,executable,user,state,queue,limit,wall = l.split() - except ValueError: - continue - - if part == partition: - # job found - return (job,executable) - - # partition is not allocated or has no job running - return None - -def killJobs( partition ): - """ Kill anything running on the partition. """ - return SyncCommand( "%s | /usr/bin/grep %s | /usr/bin/awk '{ print $1; }' | /usr/bin/xargs -r bgkilljob" % (BGJOBS,partition,) ).isSuccess() - -def freePartition( partition ): - """ Free the given partition. """ - return SyncCommand( "mpirun -partition %s -free wait" % (partition,) ).isSuccess() - -def resetPartition( partition ): - """ Reset /dev/flatmem on all I/O nodes and kill all processes that we started. """ - success = True - - for node in PartitionPsets[partition]: - success = success and SyncCommand( "ssh -tq %s pkill IONProc ; pkill orted ; echo 1 > /proc/flatmem_reset" % (node,) ).isSuccess() - - return success - -def allocatePartition( partition ): - """ Allocate the given partition by running Hello World. """ - return SyncCommand( "mpirun -partition %s -nofree -exe /bgsys/tools/hello" % (partition,), ["/dev/null"] ).isSuccess() - -def stealPartition( partition ): - """ Take over a partition from another user. UNDER CONSTRUCTION. """ - old_owner = owner( partition ) - new_owner = os.environ["USER"] - - if old_owner is None: - # we already own it - return allocatePartition( partition ) - - jobinfo = runningJob( partition ) - - if jobinfo is not None: - # someone is still running a job - return False - - # reallocate partition - commands = [ - "set_username %s" % (old_owner,), - "free %s" % (partition,), - "set_username %s" % (new_owner,), - "allocate %s" % (partition), - ] - return SyncCommand("ssh bgsn echo '%s' | mmcs_db_console" % "\\n".join(commands) ).isSuccess() - if __name__ == "__main__": from optparse import OptionParser,OptionGroup import sys @@ -148,36 +61,6 @@ if __name__ == "__main__": action = "store_true", default = False, help = "list the psets in the partition" ) - parser.add_option( "-c", "--check", - dest = "check", - action = "store_true", - default = False, - help = "check the partition status" ) - parser.add_option( "-k", "--kill", - dest = "kill", - action = "store_true", - default = False, - help = "kill all jobs running on the partition" ) - parser.add_option( "-a", "--allocate", - dest = "allocate", - action = "store_true", - default = False, - help = "allocate the partition" ) - parser.add_option( "-f", "--free", - dest = "free", - action = "store_true", - default = False, - help = "free the partition" ) - parser.add_option( "-r", "--reset", - dest = "reset", - action = "store_true", - default = False, - help = "reset the partition without freeing it" ) - parser.add_option( "-s", "--steal", - dest = "steal", - action = "store_true", - default = False, - help = "take over a partition from another user (needs access to bgsn)" ) # parse arguments (options, args) = parser.parse_args() @@ -195,37 +78,5 @@ if __name__ == "__main__": for ip in PartitionPsets[partition]: print ip - if options.kill and not errorOccurred: - print "Killing jobs on %s..." % ( partition, ) - errorOccured = killJobs( partition ) - - if options.free and not errorOccurred: - print "Freeing %s..." % ( partition, ) - errorOccured = freePartition( partition ) - - if options.allocate and not errorOccurred: - print "Allocating %s..." % ( partition, ) - errorOccured = allocatePartition( partition ) - - if options.reset and not errorOccurred: - print "Resetting %s..." % ( partition, ) - errorOccured = resetPartition( partition ) - - if options.steal and not errorOccurred: - print "Taking over partition %s..." % ( partition, ) - errorOccured = stealPartition( partition ) - - # check partition if requested so - if options.check: - expected_owner = os.environ["USER"] - real_owner = owner( partition ) - - print "Partition Owner : %-40s" % (real_owner,) - - expected_job = None - real_job = runningJob( partition ) - - print "Running Job : %-40s" % (real_job,) - sys.exit(int(errorOccurred)) diff --git a/RTCP/Run/src/LOFAR/Sections.py b/RTCP/Run/src/LOFAR/Sections.py deleted file mode 100644 index d52971dd2f07d21ae2b114edcdc2ca3be77b9cdd..0000000000000000000000000000000000000000 --- a/RTCP/Run/src/LOFAR/Sections.py +++ /dev/null @@ -1,225 +0,0 @@ -#!/usr/bin/env python - -from util.Commands import SyncCommand,AsyncCommand,mpikill,backquote,PIPE -from util.Aborter import runUntilSuccess,runFunc -from Locations import Locations,Hosts,isProduction -import Logger -import os -import Partitions -import ObservationID -import time -from logging import debug,info,warning -from threading import Thread,Lock -import thread - -DEBUG=False -DRYRUN=False -VALGRIND_ION=False -VALGRIND_STORAGE=False - -SSH="ssh -o StrictHostKeyChecking=no -q " - -class Section: - """ A 'section' is a set of commands which together perform a certain function. """ - - def __init__(self, partition): - self.commands = [] - - self.logoutputs = [] - if Locations.nodes["logserver"]: - self.logoutputs.append( "%s" % (Locations.nodes["logserver"],) ) - - self.partition = partition - self.psets = Partitions.PartitionPsets[self.partition] - - self.preProcess() - - def __str__(self): - return self.__class__.__name__ - - def preProcess(self): - pass - - def run(self): - pass - - def postProcess(self): - pass - - def killSequence(self,name,killfunc,timeout): - killfuncs = [ lambda: killfunc(2), lambda: killfunc(9) ] - - success = runUntilSuccess( killfuncs, timeout ) - - if not success: - warning( "%s: Could not kill %s" % (self,name) ) - - return success - - def killCommand(self,cmd,timeout): - def kill( signal ): - cmd.abort( signal ) - cmd.wait() - - return self.killSequence(str(cmd),kill,timeout) - - def abort(self,timeout): - for c in self.commands: - self.killCommand(c,timeout) - - def wait(self): - for s in self.commands: - s.wait() - - def check(self): - pass - -class SectionSet(list): - def run(self): - for s in self: - info( "Starting %s." % (s,) ) - s.run() - - def postProcess(self): - for s in self: - info( "Post processing %s." % (s,) ) - s.postProcess() - - - def abort(self, timeout=5): - for s in self: - info( "Killing %s." % (s,) ) - s.abort(timeout) - - def wait(self, lock = Lock()): - """ Wait until the sections finish, or until the lock is released. Releases lock when section finishes. """ - - sections = self - - # wait in a separate thread to allow python to capture KeyboardInterrupts in the main thread - class WaitThread(Thread): - def run(self): - try: - for s in sections: - info( "Waiting for %s." % (s,) ) - s.wait() - finally: - try: - lock.release() - except thread.error: - # lock already released - pass - - WaitThread().start() - - # wait for lock to be released by waiting thread or from an external source - # !! DO NOT use lock.acquire() since that will delay signals (SIGQUIT/SIGTERM/etc) from arriving until - # the lock is acquired. - while lock.locked(): - time.sleep(1) - - def check(self): - for s in self: - info( "Checking %s for validity." % (s,) ) - s.check() - -class CNProcSection(Section): - def run(self): - logger = Logger.rotatingLogger( "CNProc", "%s/run.CNProc.log" % (Locations.files["logdir"]) ) - logfiles = self.logoutputs - - # CNProc is started on the Blue Gene, which has BG/P mpirun 1.65 - # NOTE: This mpirun needs either stdin or stdout to be line buffered, - # otherwise it will not provide output from CN_Processing (why?). - mpiparams = [ - # where - "-mode VN", - "-partition %s" % (self.partition,), - - # environment - "-env DCMF_COLLECTIVES=0", - "-env BG_MAPPING=XYZT", - "-env LD_LIBRARY_PATH=/bgsys/drivers/ppcfloor/comm/lib:/bgsys/drivers/ppcfloor/runtime/SPI:/globalhome/romein/lib.bgp", - - # working directory - "-cwd %s" % (Locations.files["rundir"],), - - # executable - "-exe %s" % (Locations.files["cnproc"],), - - # arguments - ] - - self.commands.append( AsyncCommand( "mpirun %s" % (" ".join(mpiparams),), logfiles, killcmd=mpikill, logger=logger ) ) - -class IONProcSection(Section): - def run(self): - logger = Logger.rotatingLogger( "IONProc", "%s/run.IONProc.log" % (Locations.files["logdir"]) ) - logfiles = self.logoutputs - - if VALGRIND_ION: - valgrind = "/globalhome/mol/root-ppc/bin/valgrind --suppressions=%s --leak-check=full --show-reachable=no" % (Locations.files["ionsuppfile"],) - else: - valgrind = "" - - mpiparams = [ - # where - "-host %s" % (",".join(self.psets),), - "--pernode", - - # "--tag-output", - - # environment - # "-x FOO=BAR", - - # working directory - "-wd %s" % (Locations.files["rundir"],), - - # valgrind - valgrind, - - # executable - "%s" % (Locations.files["ionproc"],), - - # arguments - "%d" % (int(isProduction()),), - ] - - - if DEBUG: - mpiparams = [ - "-v", - ] + mpiparams - - self.commands.append( AsyncCommand( "/bgsys/LOFAR/openmpi-ion/bin/mpirun %s" % (" ".join(mpiparams),), logfiles, logger=logger ) ) - - def check(self): - # I/O nodes need to be reachable -- check in parallel - - # ping - - pingcmds = [ - (node,AsyncCommand( "ping %s -c 1 -w 2 -q" % (node,), ["/dev/null"] )) - for node in self.psets - ] - - for (node,c) in pingcmds: - assert c.isSuccess(), "Cannot reach I/O node %s [ping]" % (node,) - - # ssh & flatmem access - - # cat /dev/flatmem returns "Invalid argument" if the memory is available, - # and "Cannot allocate memory" if not. - successStr = "cat: /dev/flatmem: Invalid argument" - sshcmds = [ - (node,AsyncCommand( SSH+"%s cat /dev/flatmem 2>&1 | grep -F '%s'" % (node,successStr),PIPE) ) - for node in self.psets - ] - - def waitForSuccess(): - for (node,c) in sshcmds: - c.wait() - - assert successStr in c.output(), "Cannot allocate flat memory on I/O node %s" % (node,) - - assert runFunc( waitForSuccess, 20 ), "Failed to reach one or more I/O nodes [ssh]" diff --git a/RTCP/Run/src/LOFAR/Stations.py b/RTCP/Run/src/LOFAR/Stations.py index 6ef830e821e71d9db9f474fc9f4cf71e40ff766d..42010a8bef641bc0533be42213a5c7e676921fbb 100755 --- a/RTCP/Run/src/LOFAR/Stations.py +++ b/RTCP/Run/src/LOFAR/Stations.py @@ -8,7 +8,7 @@ from Locations import Locations # allow ../util to be found, a bit of a hack sys.path += [(os.path.dirname(__file__) or ".")+"/.."] -from util.Commands import SyncCommand,backquote +from util.Commands import backquote __all__ = ["packetAnalysis","stationInPartition","Stations","overrideRack"] diff --git a/RTCP/Run/src/controller.sh b/RTCP/Run/src/controller.sh new file mode 100755 index 0000000000000000000000000000000000000000..bf31830e23e5051ff0dc25cf7282379a4e7bb7c1 --- /dev/null +++ b/RTCP/Run/src/controller.sh @@ -0,0 +1,92 @@ +#!/bin/bash +COMMAND=$1 + +# helper function to limit the execution time of a command. usage: +# alarm timeout cmd arg1 arg2 +function alarm() { + perl -e 'alarm shift; exec @ARGV' "$@"; +} + +type getpid >&/dev/null || function getpid() { + PID=DOWN + PIDFILE="/tmp/`procname`-$USER.pid" + + if [ -f "$PIDFILE" ] + then + PID=`cat -- "$PIDFILE"` + fi + + if [ ! -e /proc/$PID ] + then + PID=DOWN + fi +} + +function isstarted() { + [ "DOWN" != "$PID" ] +} + +type setpid >&/dev/null || function setpid() { + PID=$1 + PIDFILE="/tmp/`procname`-$USER.pid" + + if [ "x$PID" == "x" ] + then + exit + fi + + echo "$PID" > "$PIDFILE" +} + +type delpid >&/dev/null || function delpid() { + PIDFILE="/tmp/`procname`-$USER.pid" + rm -f -- "$PIDFILE" +} + +function procname() { + # the basename of this script, without its extension + basename -- "$0" | sed 's/[.][^.]*$//g' +} + +type start >&/dev/null || function start() { + tail -F / >&/dev/null & +} + +type stop >&/dev/null || function stop() { + kill -15 "$PID" +} + +getpid + +case $COMMAND in + start) if ! isstarted + then + PID= + + start || exit $! + + FUNCPID=$! + if [ -z "$PID" ] + then + PID=$FUNCPID + fi + setpid $PID + fi + ;; + + stop) if isstarted + then + stop || exit $! + delpid + fi + ;; + + status) + SWLEVEL=$2 + printf "%d : %-25s %s\n" "$SWLEVEL" "`procname`" "$PID" + ;; + + *) echo "usage: $0 {start|stop|status}" + ;; +esac + diff --git a/RTCP/Run/src/gracefullyStopBGProcessing.sh b/RTCP/Run/src/gracefullyStopBGProcessing.sh new file mode 100644 index 0000000000000000000000000000000000000000..0efc15b9bfc6d075904dfcb5bc26df32ded00761 --- /dev/null +++ b/RTCP/Run/src/gracefullyStopBGProcessing.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +source locations.sh + +echo "cancel all" > /dev/tcp/$FIRSTPSET/4000 && +echo "quit" > /dev/tcp/$FIRSTPSET/4000 && +sleep 5 # allow processes to quit diff --git a/RTCP/Run/src/locations.sh.in b/RTCP/Run/src/locations.sh.in new file mode 100644 index 0000000000000000000000000000000000000000..06104765296f6743ca1ed14671b4b479a93931c8 --- /dev/null +++ b/RTCP/Run/src/locations.sh.in @@ -0,0 +1,45 @@ +function isproduction() { + [ "lofarsys" == "$USER" ] +} + +TIMESTAMP=`date +%Y-%m-%d_%H%M%S` + +if isproduction +then + ISPRODUCTION=1 + + CNPROC=$HOME/production/lofar/bgp_cn/bin/CN_Processing + IONPROC=$HOME/production/lofar/bgp_ion/bin/ION_Processing + ETCDIR=/opt/lofar/etc + + LOGDIR=$HOME/log/L$TIMESTAMP + LOGSYMLINK=$HOME/log/latest + LOGPARAMS="-s ccu001:24500" +else + ISPRODUCTION=0 + + CNPROC=$HOME/projects/LOFAR/installed/bgpcn_opt/bin/CN_Processing + IONPROC=$HOME/projects/LOFAR/installed/bgpion_opt/bin/ION_Processing + ETCDIR=@CMAKE_INSTALL_PREFIX@/etc + + LOGDIR=$HOME/projects/LOFAR/L$TIMESTAMP + LOGSYMLINK=$HOME/projects/LOFAR/log + LOGPARAMS="-v" +fi + +if [ -d $ETCDIR ] +then + . $ETCDIR/BlueGeneControl.conf +fi + +# list both the partition directly (small partitions) and recursively (large partitions) to get all -32 subpartitions +# bghierarchy needs a valid stdin for some reason and will read from it, so provide a fake one +SUBPARTITIONS="`(bghierarchy -s $PARTITION;bghierarchy -s \`bghierarchy -s $PARTITION\`) </dev/null`" + +# a comma-separated list of all psets in $PARTITION +# xxx-32 means both xxx-J00 and xxx-J01 +PSETS=`for i in $SUBPARTITIONS; do echo $i; done|grep -- "-32$"|sort -u|sed 's/-32$/-J00/;p;s/-J00$/-J01/'|xargs -L 1 host -4|cut -d\ -f 4|tr '\n' ','` + +# the address of the first pset in the $PARTITION +FIRSTPSET=`for i in $SUBPARTITIONS; do echo $i; done|grep -- "-32$"|sort -u|sed 's/-32$/-J00/;p;s/-J00$/-J01/'|xargs -L 1 host -4|cut -d\ -f 4|head -n 1` + diff --git a/RTCP/Run/src/multitail-olap.conf b/RTCP/Run/src/multitail-olap.conf index 43b8a5814d09798e935ff9d9018f00c8d2b28737..6681df3d1945efdacca3021626515750a2f5e7be 100644 --- a/RTCP/Run/src/multitail-olap.conf +++ b/RTCP/Run/src/multitail-olap.conf @@ -53,6 +53,6 @@ editrule:ke:delays: \[.*]+, editrule:ke:\[[0-9.]+> # ----- general default configuration values -scheme:olap:run.(CNProc|IONProc).log.* +scheme:olap:(CNProc|IONProc).log.* check_mail:0 diff --git a/RTCP/Run/src/startOLAP.py b/RTCP/Run/src/startOLAP.py deleted file mode 100755 index f5ea40312cd978ca0250dca3d241299c9bb67ee5..0000000000000000000000000000000000000000 --- a/RTCP/Run/src/startOLAP.py +++ /dev/null @@ -1,163 +0,0 @@ -#!/usr/bin/python - -from LOFAR import Logger -from logging import debug,info,warning,error,critical -from LOFAR.Core import runCorrelator -from util import Commands -from LOFAR.Locations import Locations -from util.Hosts import rmkdir,rexists,runlink,rsymlink -import sys - -DRYRUN = False - -if __name__ == "__main__": - from optparse import OptionParser,OptionGroup - import os - import time - - # parse the command line - parser = OptionParser( usage = """usage: %prog -P partition [options]""" ) - parser.add_option( "-d", "--dry-run", - dest = "dryrun", - action = "store_true", - default = False, - help = "do not actually execute anything [%default]" ) - parser.add_option( "--valgrind-ion", - dest = "valgrind_ion", - action = "store_true", - default = False, - help = "run IONProc under valgrind [%default]" ) - - opgroup = OptionGroup(parser, "Output" ) - opgroup.add_option( "-v", "--verbose", - dest = "verbose", - action = "store_true", - default = False, - help = "be verbose [%default]" ) - opgroup.add_option( "-q", "--quiet", - dest = "quiet", - action = "store_true", - default = False, - help = "be quiet [%default]" ) - opgroup.add_option( "-l", "--log-server", - dest = "logserver", - type = "string", - default = Locations.nodes["logserver"], - help = "TCP address (IP:port) to send logging to [%default]" ) - parser.add_option_group( opgroup ) - - hwgroup = OptionGroup(parser, "Hardware" ) - hwgroup.add_option( "-O", "--olap-parset", - dest = "olapparset", - type = "string", - default = "%s/OLAP.parset" % (os.path.dirname(__file__),), - help = "the parset containing station definitions [%default]" ) - hwgroup.add_option( "-P", "--partition", - dest = "partition", - type = "string", - help = "name of the BlueGene partition [%default]" ) - parser.add_option_group( hwgroup ) - - secgroup = OptionGroup(parser, "Sections" ) - secgroup.add_option( "--nocnproc", - dest = "nocnproc", - action = "store_true", - default = False, - help = "disable the CNProc section [%default]" ) - secgroup.add_option( "--noionproc", - dest = "noionproc", - action = "store_true", - default = False, - help = "disable the IONProc section [%default]" ) - parser.add_option_group( secgroup ) - - dirgroup = OptionGroup(parser, "Directory and file locations") - dirgroup.add_option( "--basedir", - dest = "basedir", - default = Locations.files["basedir"], - help = "base directory [%default]" ) - dirgroup.add_option( "--logdir", - dest = "logdir", - default = Locations.files["logdir"], - help = "log directory (syntax: [host:]path) [%default]" ) - dirgroup.add_option( "--rundir", - dest = "rundir", - default = Locations.files["rundir"], - help = "run directory [%default]" ) - dirgroup.add_option( "--cnproc", - dest = "cnproc", - default = Locations.files["cnproc"], - help = "CNProc executable [%default]" ) - dirgroup.add_option( "--ionproc", - dest = "ionproc", - default = Locations.files["ionproc"], - help = "IONProc executable [%default]" ) - dirgroup.add_option( "--valgrind-ion-suppressions", - dest = "ionsuppfile", - default = Locations.files["ionsuppfile"], - help = "Valgrind suppressions file for IONProc [%default]" ) - parser.add_option_group( dirgroup ) - - # parse arguments - (options, args) = parser.parse_args() - - if not options.partition: - parser.print_help() - sys.exit(1) - - # ========== Global options - - if options.verbose: - Commands.debug = debug - Logger.DEBUG = True - Sections.DEBUG = True - - if options.valgrind_ion: - Sections.VALGRIND_ION = True - - if not options.quiet: - DEBUG = True - - if options.dryrun: - DRYRUN = True - Commands.DRYRUN = True - - Logger.initLogger() - - # set log server - Locations.nodes["logserver"] = options.logserver - - for opt in dirgroup.option_list: - Locations.setFilename( opt.dest, getattr( options, opt.dest ) ) - - Locations.resolveAllPaths() - - # create log and directory if it does not exist - for d in ["logdir","rundir"]: - if not rexists(Locations.files[d]): - warning( "Creating %s directory %s" % ( d, Locations.files[d], ) ) - - if not DRYRUN: - rmkdir( Locations.files[d] ) - - # create symlink to log directory in run directory - log_symlink = { - "source": "%s" % (Locations.files["logsymlink"],), - "dest": "%s" % (Locations.files["logdir"],), - } - - if not DRYRUN: - try: - if rexists( log_symlink["source"] ): - runlink( log_symlink["source"] ) - - rsymlink( log_symlink["source"], log_symlink["dest"] ) - except OSError,e: - warning( "Could not create symlink %s pointing to %s" % (log_symlink["source"],log_symlink["dest"]) ) - - # ========== Run - info( "========== Run ==========" ) - - runCorrelator( options.partition, not options.nocnproc, not options.noionproc ) - - info( "========== Done ==========" ) diff --git a/RTCP/Run/src/util/CMakeLists.txt b/RTCP/Run/src/util/CMakeLists.txt index e007e16e5a328ff01880b5267d44d0037a09fc12..c53024b8d49d5f9a6201f88b95fdfeea6d43264f 100644 --- a/RTCP/Run/src/util/CMakeLists.txt +++ b/RTCP/Run/src/util/CMakeLists.txt @@ -8,5 +8,4 @@ install(PROGRAMS Hosts.py Parset.py shlex.py - tee.py DESTINATION bin/util) diff --git a/RTCP/Run/src/util/Commands.py b/RTCP/Run/src/util/Commands.py index ea23884f30e83126af80c7891a0935ec9b0af323..6f9cf8d27db93b498dd013b66713b57528a59044 100644 --- a/RTCP/Run/src/util/Commands.py +++ b/RTCP/Run/src/util/Commands.py @@ -1,14 +1,6 @@ -import os -import fcntl -import socket -from subprocess import Popen,STDOUT,call,PIPE -from Hosts import ropen -from tee import Tee +from subprocess import Popen,STDOUT,PIPE from Aborter import runFunc - -DRYRUN = False - # define our own PIPE as an alias to subprocess.PIPE PIPE=PIPE @@ -35,130 +27,3 @@ def debug( str ): """ Override with custom logging function. """ pass - -def mpikill( pid, signal ): - SyncCommand( "mpikill -s %s %s" % (signal,pid) ) - -class AsyncCommand(object): - """ - Executes an external command in the background - """ - - def __init__(self, cmd, outfiles=[], infile=None, killcmd=os.kill, logger=None ): - """ Run command `cmd', with I/O optionally redirected. - - cmd: command to execute. - outfiles: filenames for output, or [] to use stdout, or PIPE to record it within python. - infile: filename for input, or None to use stdin. - killcmd: function used to abort, called as killcmd( pid, signal ). """ - - if DRYRUN: - self.cmd = "echo %s" % (cmd,) - else: - self.cmd = cmd - - if outfiles not in [[],PIPE]: - outstr = "> %s" % (", ".join(outfiles),) - else: - outstr = "" - debug("RUN %s: %s %s" % (self.__class__.__name__,cmd,outstr) ) - - if DRYRUN: - self.outputs = [] - stdout = None # no logging - elif outfiles == PIPE: - self.outputs = [] - stdout = PIPE # no logging - else: - # open all outputs, remember them to prevent the files - # from being closed at the end of __init__ - self.outputs = [ropen( o, "w", 1 ) for o in outfiles] - - # create a pipe to multiplex everything through - r,w = os.pipe() - - # feed all file descriptors to Tee - Tee( r, self.outputs, logger ) - - # keep the pipe input - stdout = w - - if infile: - # Line buffer stdin to satisfy MPIRUN on the BG/P. It will not provide output without it. - stdin = ropen( infile, "w", 1 ) - else: - stdin = None - - self.done = False - self.reaped = False - self.success = False - self.killcmd = killcmd - self.popen = Popen( self.cmd.split(), stdin=stdin, stdout=stdout, stderr=STDOUT ) - self.run() - - def __str__(self): - return "(%s) %s" % (self.__class__.__name__,self.cmd) - - def run(self): - """ Will be called when the command has just been started. """ - - pass - - def output(self): - """ Return the output of the program (when started with outfiles="PIPE"). """ - - output = self.popen.communicate()[0] or "" - - # even though process closed stdout, we still need to wait for termination - self.wait() - - return output - - def isDone(self): - """ Return whether the command has finished execution, but do not block. """ - - self.done = (self.popen.poll() is not None) - - return self.done - - def wait(self): - """ Block until the command finishes execution. """ - - if self.reaped: - # already wait()ed before - return - - try: - self.success = self.popen.wait() == 0 - except OSError: - # process died prematurely or never started? - self.success = False - - self.done = True - self.reaped = True - - for o in self.outputs: - o.close() - - def abort(self, signal = 2): - """ Abort the command, if it has not finished yet. """ - - if not self.isDone(): - debug( "Sending signal %s to PID %s" % (signal,self.popen.pid) ) - - self.killcmd( self.popen.pid, signal ) - - def isSuccess(self): - """ Returns whether the command returned succesfully. """ - - self.wait() - return self.success - -class SyncCommand(AsyncCommand): - """ - Executes an external command immediately. - """ - - def run(self): - self.wait() - diff --git a/RTCP/Run/src/util/Hosts.py b/RTCP/Run/src/util/Hosts.py index ef1db704031cffbe15c08370fc091928c7113779..1ec7c412e2a9685fae3a6863b4d2107f8cf78c29 100644 --- a/RTCP/Run/src/util/Hosts.py +++ b/RTCP/Run/src/util/Hosts.py @@ -1,14 +1,10 @@ #!/usr/bin/env python -__all__ = ["reconnecting_socket","ropen","rmkdir","rexists","runlink","rsymlink"] +__all__ = ["ropen","rmkdir","rexists","runlink","rsymlink"] -import Queue import os import sys import subprocess -import socket -from threading import Thread -from time import sleep HOSTNAME = os.environ.get("HOSTNAME") @@ -20,105 +16,6 @@ def correct_hostname( host ): return host -class reconnecting_socket: - """ A socket that keeps reconnecting if the connection is lost. Data is sent - asynchronously, with a buffer which drops messages if full. """ - - def __init__( self, host, port, retry_timeout=10, socket_timeout=5, bufsize=256 ): - self.host = host - self.port = port - self.socket_timeout = socket_timeout - self.retry_timeout = retry_timeout - self.socket = None - self.done = False - - self.writebuf = Queue.Queue( bufsize ) - - self.iothread = Thread( target=self.iothread_main, name="I/O thread for %s:%s" % (host,port) ) - self.iothread.start() - - def iothread_main( self ): - def close(): - self.socket.close() - self.socket = None - - def reconnect(): - self.socket = socket.socket() - self.socket.settimeout( self.socket_timeout ) - - while not self.done: - try: - self.socket.connect( (self.host,self.port) ) - except socket.error: - pass - except socket.timeout: - pass - else: - # connected! - break - - # sleep, but do stop when told - for i in xrange( self.retry_timeout ): - if self.done: - return - sleep( 1 ) - - def write( data ): - if self.socket is None: - reconnect() - - if self.done: - return - - written = 0 - - try: - while written < len(data): - written += self.socket.send( data[written:] ) - except socket.error: - # do not attempt to send remaining data - close() - return - except socket.timeout: - # do not attempt to send remaining data - close() - return - - # start with a connection - if self.socket is None: - reconnect() - - while True: - try: - data = self.writebuf.get( timeout=1 ) - except Queue.Empty: - # TODO: we can't keep a close check on our socket, delaying - # closing and reconnecting and keeping the line open - continue - - if data is None: - # close request - break - - write( data ) - - def write( self, data ): - if self.done: - return - - try: - self.writebuf.put_nowait( data ) - except Queue.Full: - # queue full -- drop data - pass - - def close( self ): - self.done = True # abort any reconnection attempts - self.writebuf.put( None ) # prod the deque, wait if necessary - - self.iothread.join() - - def split( filename ): """ Internally used: split a filename into host,file. Host == '' if pointing to localhost. """ @@ -150,13 +47,6 @@ def ropen( filename, mode = "r", buffering = -1 ): return open(file, mode, buffering) - if host == "tcp": - # create a TCP socket - assert mode in "wa", "ropen: only modes w and a are supported when using tcp" - - ip,port = file.split(":") - return reconnecting_socket( ip, int(port) ) - modelist = { "r": "cat %s" % (file,), "w": "cat - > %s" % (file,), diff --git a/RTCP/Run/src/util/tee.py b/RTCP/Run/src/util/tee.py deleted file mode 100644 index 1fe5640bbe59bb1ad976443edb856cc3985d8544..0000000000000000000000000000000000000000 --- a/RTCP/Run/src/util/tee.py +++ /dev/null @@ -1,74 +0,0 @@ -from threading import Thread -from select import select -import os,fcntl -import sys - -def setNonBlock( fd ): - # get the file's current flag settings - fl = fcntl.fcntl(fd, fcntl.F_GETFL) - - # update the file's flags to put the file into non-blocking mode. - fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) - - -class Tee(Thread): - def __init__(self,inputfd,outputfiles,logger=None): - Thread.__init__(self) - - self.inputfd = inputfd - self.outputfiles = filter( lambda x: x is not None, outputfiles ) - self.logger = logger - self.done = False - - setNonBlock( self.inputfd ) - - self.setDaemon( True ) - self.start() - - def close(self): - self.done = True - - def run(self): - def close(): - for f in self.outputfiles: - f.close() - - fileno = self.inputfd - prevline = "" # last incomplete line - - while True: - if self.done: - close() - break - - rlist,_,xlist = select( [fileno], [], [fileno], 1 ) - - if fileno in xlist: - # exceptional condition - break - - if fileno in rlist: - data = os.read( self.inputfd, 4096 ) - - if len(data) == 0: - # eof - close() - break - - lines = data.split("\n") - lines[0] = "%s%s" % (prevline,lines[0]) - - for line in lines[:-1]: - if self.logger: - self.logger.info( "%s", line ) - - for f in self.outputfiles: - try: - f.write( "%s\n" % (line,) ) - except OSError,e: - pass - except ValueError,e: # ValueError: I/O operation on closed file - pass - - prevline = lines[-1] - diff --git a/RTCP/Run/src/watchlogs.sh b/RTCP/Run/src/watchlogs.sh index 88e903b021dbc3019d1b18cb6887f0a03604052e..742efbf856bf7aabd384f02be433351795dc0f5d 100755 --- a/RTCP/Run/src/watchlogs.sh +++ b/RTCP/Run/src/watchlogs.sh @@ -13,8 +13,8 @@ PATH=$PATH:/globalhome/broekema/bin function set_logdir { LOGDIR=$1 - CNPROC=$LOGDIR/run.CNProc.log - IONPROC=$LOGDIR/run.IONProc.log + CNPROC=$LOGDIR/CNProc.log + IONPROC=$LOGDIR/IONProc.log } for d in . "$HOME/production/lofar/bgfen/log/latest" "$HOME/log/latest" "$HOME/projects/LOFAR/log"