diff --git a/RTCP/Run/src/LOFAR/BGcontrol.py b/RTCP/Run/src/LOFAR/BGcontrol.py index 7b14185dd5b0c5a87f05545a728d2dd7b1757e65..61105be587f35d2dc176971427574c01d18b9388 100755 --- a/RTCP/Run/src/LOFAR/BGcontrol.py +++ b/RTCP/Run/src/LOFAR/BGcontrol.py @@ -195,7 +195,7 @@ def freePartition( partition ): def allocatePartition( partition ): """ Allocate the given partition by running Hello World. """ - return SyncCommand( "mpirun -partition %s -nofree -exe /bgsys/tools/hello" % (partition,), "/dev/null" ).isSuccess() + return SyncCommand( "mpirun -partition %s -nofree -exe /bgsys/tools/hello" % (partition,), ["/dev/null"] ).isSuccess() if __name__ == "__main__": from optparse import OptionParser,OptionGroup diff --git a/RTCP/Run/src/LOFAR/Locations.py b/RTCP/Run/src/LOFAR/Locations.py index 402abcd0b01e064bc0009a337b7f60ba503fc323..ecf9d1e4a63244b8b0baf8c4575f9ddaba141b67 100644 --- a/RTCP/Run/src/LOFAR/Locations.py +++ b/RTCP/Run/src/LOFAR/Locations.py @@ -42,12 +42,14 @@ class Locations: "cnproc": "${BASEDIR}/bin/CN_Processing.cnk", "ionproc": "${BASEDIR}/bin/IONProc.ppc", "storage": "${BASEDIR}/bin/Storage.x86_64", - } ) self.nodes.update( { # which storage nodes to use "storage": ["list001","list002"], + + # default log server address + "logserver": "tcp:127.0.0.1:24500" } ) else: self.files.update( { @@ -59,6 +61,9 @@ class Locations: self.nodes.update( { "storage": ["list003","list004"], + + # no external log server + "logserver": "", } ) self.files.update( { @@ -69,8 +74,8 @@ class Locations: "parset": "${RUNDIR}/RTCP.parset", # where to store logs and start the executables - "logdir": "${BASEDIR}/log", - "rundir": "${LOGDIR}", + "logdir": "listfen:/log/L${YEAR}_${MSNUMBER}", + "rundir": "${BASEDIR}", # locations of the observation id counter and tables "mslist": "listfen:/log/MSList", @@ -86,7 +91,10 @@ class Locations: self.files[name] = path def resolvePath(self,path,parset=None): - """ Resolve a path by replacing ${BASEDIR} by self["basedir"], etc. """ + """ Resolve a path by replacing ${BASEDIR} by self["basedir"], etc. + + For replacements, the paths in self.files are used, and optionally + the masks allowed by parset.parseMask, such as ${OBSID}. """ allNames = [("${%s}" % (name.upper(),),value) for name,value in self.files.iteritems()] diff --git a/RTCP/Run/src/LOFAR/Sections.py b/RTCP/Run/src/LOFAR/Sections.py index 9383f73abff7e7b8dcc94eaed2cfe1598b1a0079..375e7b6f8e6b6c9033b102840d03a9e144de1516 100644 --- a/RTCP/Run/src/LOFAR/Sections.py +++ b/RTCP/Run/src/LOFAR/Sections.py @@ -13,6 +13,10 @@ class Section: self.parset = parset self.commands = [] + self.logoutputs = [] + if Locations.nodes["logserver"]: + self.logoutputs.append( "tcp:%s" % (Locations.nodes["logserver"],) ) + def __str__(self): return self.__class__.__name__ @@ -63,7 +67,7 @@ class SectionSet(list): class CNProcSection(Section): def run(self): - logfile = "%s/run.CNProc.%s.log" % (Locations.files["logdir"],self.parset.partition) + logfiles = ["%s/run.CNProc.%s.log" % (Locations.files["logdir"],self.parset.partition)] + self.logoutputs # CNProc is started on the Blue Gene, which has BG/P mpirun 1.65 # NOTE: This mpirun needs either stdin or stdout to be line buffered, @@ -88,7 +92,7 @@ class CNProcSection(Section): "-args %s" % (Locations.files["parset"],), ] - self.commands.append( AsyncCommand( "mpirun %s" % (" ".join(mpiparams),), logfile, killcmd=mpikill ) ) + self.commands.append( AsyncCommand( "mpirun %s" % (" ".join(mpiparams),), logfiles, killcmd=mpikill ) ) def check(self): # we have to own the partition @@ -121,9 +125,9 @@ class IONProcSection(Section): self.pidfiles[node], Locations.files["ionproc"],Locations.files["parset"] ) - logfile = "%s/run.IONProc.%s.%s" % (Locations.files["logdir"],self.parset.partition,nodenr) + logfiles = ["%s/run.IONProc.%s.%s" % (Locations.files["logdir"],self.parset.partition,nodenr)] + self.logoutputs - self.commands.append( AsyncCommand( "ssh %s %s" % (node,cmd,), logfile ) ) + self.commands.append( AsyncCommand( "ssh %s %s" % (node,cmd,), logfiles ) ) def abort( self, soft = True ): signal = [9,2][bool(soft)] @@ -140,12 +144,12 @@ class IONProcSection(Section): ionodes = self.parset.psets for node in ionodes: - c = SyncCommand( "ping %s -c 1 -w 2 -q" % (node,), outfile="/dev/null" ) + c = SyncCommand( "ping %s -c 1 -w 2 -q" % (node,), ["/dev/null"] ) assert c.isSuccess(), "Cannot reach I/O node %s" % (node,) class StorageSection(Section): def run(self): - self.logfile = "%s/run.Storage.%s.log" % (Locations.files["logdir"],self.parset.partition) + logfiles = ["%s/run.Storage.%s.log" % (Locations.files["logdir"],self.parset.partition)] + self.logoutputs # the PID of mpirun self.pidfile = "%s/Storage-%s.pid" % (Locations.files["rundir"],self.parset.partition) @@ -155,7 +159,7 @@ class StorageSection(Section): # create the target directories for n in Locations.nodes["storage"]: - self.commands.append( SyncCommand( "ssh %s mkdir %s" % (n,os.path.dirname(self.parset.parseMask()),), self.logfile ) ) + self.commands.append( SyncCommand( "ssh %s mkdir %s" % (n,os.path.dirname(self.parset.parseMask()),), logfiles ) ) # Storage is started on LIIFEN, which has mpirun (Open MPI) 1.1.1 mpiparams = [ @@ -180,7 +184,7 @@ class StorageSection(Section): "%s" % (Locations.files["parset"],), ] - self.commands.append( AsyncCommand( "ssh %s echo $$ > %s;exec mpirun %s" % (Locations.nodes["storagemaster"],self.pidfile," ".join(mpiparams),), self.logfile ) ) + self.commands.append( AsyncCommand( "ssh %s echo $$ > %s;exec mpirun %s" % (Locations.nodes["storagemaster"],self.pidfile," ".join(mpiparams),), logfiles ) ) def abort( self, soft = True ): signal = [9,2][bool(soft)] diff --git a/RTCP/Run/src/runOLAP.py b/RTCP/Run/src/runOLAP.py index a47859ab8d254ad724d3cde1b59ec8effc49b521..d3650d51edf12b626c0bb8716ae16479e9afcf37 100755 --- a/RTCP/Run/src/runOLAP.py +++ b/RTCP/Run/src/runOLAP.py @@ -9,6 +9,7 @@ from LOFAR.Stations import Stations from util import Commands from util.dateutil import format from LOFAR.Locations import Locations,isDevelopment +from util.Hosts import ropen,rmkdir,rexists import sys DRYRUN = False @@ -92,21 +93,30 @@ if __name__ == "__main__": # parse the command line parser = OptionParser( usage = "usage: %prog [options] parset parset ..." ) - parser.add_option( "-v", "--verbose", + parser.add_option( "-d", "--dry-run", + dest = "dryrun", + action = "store_true", + default = False, + help = "do not actually execute anything [%default]" ) + + opgroup = OptionGroup(parser, "Output" ) + opgroup.add_option( "-v", "--verbose", dest = "verbose", action = "store_true", default = False, help = "be verbose [%default]" ) - parser.add_option( "-q", "--quiet", + opgroup.add_option( "-q", "--quiet", dest = "quiet", action = "store_true", default = False, help = "be quiet [%default]" ) - parser.add_option( "-d", "--dry-run", - dest = "dryrun", - action = "store_true", - default = False, - help = "do not actually execute anything [%default]" ) + opgroup.add_option( "-l", "--log-server", + dest = "logserver", + type = "string", + default = Locations.nodes["logserver"], + help = "TCP address (IP:port) to send logging to [%default]" ) + parser.add_option_group( opgroup ) + psgroup = OptionGroup(parser, "Observation" ) psgroup.add_option( "-o", "--option", @@ -138,7 +148,6 @@ if __name__ == "__main__": action = "store_true", default = False, help = "station data will be generated by Blue Gene [%default]" ) - parser.add_option_group( psgroup ) hwgroup = OptionGroup(parser, "Hardware" ) @@ -247,6 +256,9 @@ if __name__ == "__main__": parset.setStorageNodes( Locations.nodes["storage"] ) + # set log server + Locations.nodes["logserver"] = options.logserver + # set stations if options.stations is not None: # join multiple station options with + to let them be parsed together @@ -299,11 +311,11 @@ if __name__ == "__main__": Locations.resolveAllPaths( parset ) # create log directory if it does not exist - if not os.path.exists(Locations.files["logdir"]): + if not rexists(Locations.files["logdir"]): warning( "Creating log directory %s" % ( Locations.files["logdir"], ) ) if not DRYRUN: - os.mkdir( Locations.files["logdir"] ) + rmkdir( Locations.files["logdir"] ) # run the observation runObservation( parset, not options.nocnproc, not options.noionproc, not options.nostorage ) diff --git a/RTCP/Run/src/util/Commands.py b/RTCP/Run/src/util/Commands.py index 422cbcb8a3e912ebeab72ecd0f1461d0b31bce23..ac887a61f83c840b98ead1de8df46b0bc0bafe30 100644 --- a/RTCP/Run/src/util/Commands.py +++ b/RTCP/Run/src/util/Commands.py @@ -1,7 +1,9 @@ import os import fcntl +import socket from subprocess import Popen,STDOUT,call from Hosts import ropen +from tee import Tee DRYRUN = False @@ -18,26 +20,40 @@ class AsyncCommand(object): Executes an external command in the background """ - def __init__(self, cmd, outfile=None, infile=None, killcmd=os.kill ): + def __init__(self, cmd, outfiles=[], infile=None, killcmd=os.kill ): """ Run command `cmd', with I/O optionally redirected. - cmd: command to execute. - outfile: filename for output, or None. - infile: filename for input, or None. - killcmd: function used to abort, called as killcmd( pid, signal ). """ + cmd: command to execute. + outfiles: filenames for output, or [] to use stdout. + infile: filename for input, or None to use stdin. + killcmd: function used to abort, called as killcmd( pid, signal ). """ if DRYRUN: self.cmd = "echo %s" % (cmd,) else: self.cmd = cmd - debug("RUN %s: %s > %s" % (self.__class__.__name__,cmd,outfile)) + debug("RUN %s: %s > %s" % (self.__class__.__name__,cmd,", ".join(outfiles))) - if outfile is None: + if outfiles == []: stdout = None else: - # Line buffer stdout to get lower latency logging info. - stdout = ropen( outfile, "w", 1 ) + # open all outputs, remember them to prevent the files + # from being closed at the end of __init__ + self.outputs = [ropen( o, "w", 1 ) for o in outfiles] + + if len(self.outputs) == 1: + # one output, no need to multiplex + stdout = self.outputs[0] + else: + # create a pipe to multiplex everything through + r,w = os.pipe() + + # feed all file descriptors to Tee + Tee( r, [o.fileno() for o in self.outputs] ) + + # keep the pipe input + stdout = w if infile is None: stdin = None @@ -52,6 +68,12 @@ class AsyncCommand(object): self.popen = Popen( self.cmd.split(), stdin=stdin, stdout=stdout, stderr=STDOUT ) self.run() + def mergeOutputs( outputs ): + """ Merges outputs (a list of strings) into one file descriptor. """ + + + return w + def run(self): """ Will be called when the command has just been started. """ @@ -71,7 +93,12 @@ class AsyncCommand(object): # already wait()ed before return - self.success = self.popen.wait() == 0 + try: + self.success = self.popen.wait() == 0 + except OSError: + # process died prematurely or never started? + self.success = False + self.done = True self.reaped = True diff --git a/RTCP/Run/src/util/Hosts.py b/RTCP/Run/src/util/Hosts.py index e28a931098b0ab482e2104c85a7c88328c52a76f..57f8c53e5f161b525fe27a847bfb65a2f0a9583a 100644 --- a/RTCP/Run/src/util/Hosts.py +++ b/RTCP/Run/src/util/Hosts.py @@ -1,15 +1,16 @@ #!/usr/bin/env python -__all__ = ["ropen"] +__all__ = ["ropen","rmkdir","rexists"] import os import subprocess +import socket HOSTNAME = os.environ.get("HOSTNAME") def ropen( filename, mode = "r", buffering = -1 ): """ Open a local or a remote file for reading or writing. A remote file - has the syntax host:filename. """ + has the syntax host:filename or tcp:<ip>:<port>. """ assert mode in "rwa", "Invalid mode: %s" % (mode,) @@ -17,11 +18,18 @@ def ropen( filename, mode = "r", buffering = -1 ): # a local file return open(filename, mode, buffering) - host,file = filename.split(":",2) + host,file = filename.split(":",1) if host in ["","localhost",HOSTNAME]: # a local file - return open(file, mode) + return open(file, mode, buffering) + + if host == "tcp": + # create a TCP socket + s = socket.socket() + ip,port = file.split(":") + s.connect( (ip,int(port)) ) + return s.makefile(mode, buffering) modelist = { "r": "cat %s" % (file,), @@ -44,3 +52,35 @@ def ropen( filename, mode = "r", buffering = -1 ): # reading return subprocess.Popen( ["ssh",host,modelist[mode]], bufsize=buffering, stdout=subprocess.PIPE ).stdout +def rmkdir( dirname ): + """ Make a local or a remote directory. A remote directory name + has the syntax host:filename. """ + + if ":" not in dirname: + # a local file + return os.path.exists( dirname ) or os.mkdir( dirname ) + + host,dir = dirname.split(":",2) + + if host in ["","localhost",HOSTNAME]: + # a local file + return os.path.exists( dir ) or os.mkdir( dire ) + + # only create directory if it does not exist + subprocess.call( ["ssh",host,"[ ! -e %s ] && mkdir %s" % (dir,dir)] ) + +def rexists( filename ): + """ Checks for the availability of a local or a remote file. A remote + file has the syntax host:filename. """ + + if ":" not in filename: + # a local file + return os.path.exists( filename ) + + host,file = filename.split(":",2) + + if host in ["","localhost",HOSTNAME]: + # a local file + return os.path.exists( file ) + + return int(subprocess.Popen( ["ssh",host,"[ ! -e %s ]; echo $?" % (file,)], stdout=subprocess.PIPE ).stdout.read()) == 1