diff --git a/Appl/CEP/CS1/CS1_InputSection/src/InputAppl.sh b/Appl/CEP/CS1/CS1_InputSection/src/InputAppl.sh new file mode 100755 index 0000000000000000000000000000000000000000..d5fee786c5ae24d777627d91a5cd43395137ad4e --- /dev/null +++ b/Appl/CEP/CS1/CS1_InputSection/src/InputAppl.sh @@ -0,0 +1,119 @@ +#!/bin/bash +# +# /InputAppl: a start/stop/status script for swlevel +# +# Copyright (C) 2007 +# ASTRON (Netherlands Foundation for Research in Astronomy) +# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Syntax: InputAppl start|stop|status +# +# $Id$ +# + +# +# SyntaxError msg +# +SyntaxError() +{ + Msg=$1 + + [ -z "${Msg}" ] || echo "ERROR: ${Msg}" + echo "" + echo "Syntax: $(basename $0) start | stop | status" + echo "" + exit 1 +} + +# +# Start the program when it exists +# +start_prog() +{ + # put here your code to start your program + echo 'start_prog()' +} + +# +# Stop the program when it is running +# +stop_prog() +{ + # put here your code to stop your program + ps -ef | grep -v grep | grep -v ACDaemon[^\ ] | grep ACDaemon 2>&1 >/dev/null + if [ $? -ne 0 ]; then + if [ -f ../etc/ACD.admin ]; then + rm ../etc/ACD.admin + fi + fi + cexec killall -9 CS1_InputSection +} + +# +# show status of program +# +# arg1 = levelnr +# +status_prog() +{ + levelnr=$1 + + # put here code to figure out the status of your program and + # fill the variables prog and pid with the right information + + # e.g. + prog=InputAppl + pid=DOWN + cexec 'ps -ef | grep -v grep '| grep CS1_InputSection| grep -v bash2>$1 1>/dev/null + if [ $? -eq 0 ]; then + cexec 'ps -ef | grep -v grep | grep CS1_InputSection'| grep -v bash | awk -v levelnr=${levelnr} '{ + if (substr($1,1,3) == "---") { + machine = substr($2,1,6) + } + else { + if (substr($1,1,3) != "***") { + fullname="InputAppl@"machine + printf "%s : %-25.25s %s\n", levelnr,fullname,$2 + } + } + }' + else + # this line should be left in, it shows the status in the right format + echo ${levelnr} ${prog} ${pid} | awk '{ printf "%s : %-25s %s\n", $1, $2, $3 }' + fi +} + +# +# MAIN +# + +# when no argument is given show syntax error. +if [ -z "$1" ]; then + SyntaxError +fi + +# first power down to this level +case $1 in + start) start_prog + ;; + stop) stop_prog + ;; + status) status_prog $2 + ;; + *) SyntaxError + ;; +esac diff --git a/Appl/CEP/CS1/CS1_InputSection/src/prepare_CS1_InputSection.py b/Appl/CEP/CS1/CS1_InputSection/src/prepare_CS1_InputSection.py new file mode 100755 index 0000000000000000000000000000000000000000..836487af53e77bec1b4748a740f50b840e9b3e20 --- /dev/null +++ b/Appl/CEP/CS1/CS1_InputSection/src/prepare_CS1_InputSection.py @@ -0,0 +1,240 @@ +#!/usr/bin/env python + +import math +import time +import datetime +import os +import sys +import copy +from optparse import OptionParser + +class CS1_Parset(object): + + def __init__(self): + self.stationList = list() + self.parameters = dict() + + def readFromFile(self, fileName): + lastline = '' + for line in open(fileName, 'r').readlines(): + lastline = lastline + line.split('#')[0] + lastline = lastline.rstrip() + if len(lastline) > 0 and lastline[-1] == '\\': + lastline = lastline[:-1] + elif '=' in lastline: + key, value = lastline.split('=') + self.parameters[key.strip()] = value.strip() + lastline = '' + + def writeToFile(self, fileName): + outf = open(fileName, 'w') + for key, value in sorted(self.parameters.iteritems()): + outf.write(key + ' = ' + str(value) + '\n') + outf.close() + + def __contains__(self, key): + return key in self.parameters + + def __setitem__(self, key, value): + self.parameters[key] = value + + def __getitem__(self, key): + return self.parameters[key] + + def getInt32Vector(self, key): + ln = self.parameters[key] + ln_tmp = ln.split('[') + line = ln_tmp[1].split(']') + return [int(lp) for lp in line[0].split(',')] + + def getInt32(self, key): + return int(self.parameters[key]) + + def getStringVector(self, key): + line = self.parameters[key] + line.strip('[').strip(']') + return line.split(',') + + def getString(self, key): + return self.parameters[key] + + def getFloat(self, key): + return float(self.parameters[key]) + + def getBool(self, key): + return self.parameters[key] == 'true' + +class ClusterSlave(object): + def __init__(self, intName, extIP): + self.intName = intName + self.extIP = extIP + def getIntName(self): + return self.intName + def getExtIP(self): + return self.extIP + +class ClusterFEN(object): + def __init__(self, name, address, slaves = list()): + self.slaves = slaves + #Host.__init__(self, name, address) + def getSlaves(self, number = None): + return self.slaves[0:number] + def setSlaves(self, slaves): + self.slaves = slaves + def setSlavesByPattern(self, intNamePattern, extIPPattern, numberRange): + self.slaves = list() + for number in numberRange: + self.slaves.append(ClusterSlave(intNamePattern % number, extIPPattern % number)) + +def parseStationList(): + """ + pattern = '^CS010_dipole0|CS010_dipole4|CS010_dipole8|CS010_dipole12| \ + CS008_dipole0|CS008_dipole4|CS008_dipole8|CS008_dipole12| \ + CS001_dipole0|CS001_dipole4|CS001_dipole8|CS001_dipole12| \ + CS016_dipole0|CS016_dipole4|CS016_dipole8|CS016_dipole12$' + print 'pattern = ' + str(re.search(pattern, 'CS010_dipole8')) + """ + +def getInputNodes(stationList, parset): + inputNodelist = list() + + for s in stationList: + s = s.strip(" ") + s = s.strip("[ ]") + s = s.strip("'") + name = parset.getString('PIC.Core.' + s + '.port') + name=name.split(":") + name=name[0].strip("lii") + inputNodelist.append(int(name)) + + return inputNodelist + +if __name__ == '__main__': + + parser = OptionParser() + + parser.add_option('--parsetfile' , dest='parsetfile' , default='../share/Transpose.parset', type='string', help='username [%default]') + + # parse the options + (options, args) = parser.parse_args() + + # create the parset + parset = CS1_Parset() + stationList = list() + + if os.path.exists(options.parsetfile): + + #read keys from parset file. + parset.readFromFile(options.parsetfile) + + #read keys from parset file: OLAP.parset + if os.path.exists("OLAP.parset"): + parset.readFromFile('OLAP.parset') + else: + print 'file OLAP.parset does not exist!' + sys.exit(0) + + ''' + if parset.getString('OLAP.OLAP_Conn.station_Input_Transport') == 'NULL': + # Read from memory! + parset['Observation.startTime'] = datetime.datetime.fromtimestamp(1) + else: + start=int(time.time() + 80) + parset['Observation.startTime'] = datetime.datetime.fromtimestamp(start) + + duration = 300 + + parset['Observation.stopTime'] = datetime.datetime.fromtimestamp(start + duration) + + nSubbandSamples = parset.getFloat('OLAP.BGLProc.integrationSteps') * parset.getFloat('Observation.channelsPerSubband') + stepTime = nSubbandSamples / (parset.getFloat('Observation.sampleClock') * 1000000.0 / 1024) + startTime = parset['Observation.startTime'] + stopTime = parset['Observation.stopTime'] + sz = int(math.ceil((time.mktime(stopTime.timetuple()) - time.mktime(startTime.timetuple())) / stepTime)) + noRuns = ((sz+15)&~15) + 16 + parset['Observation.stopTime'] = datetime.datetime.fromtimestamp(time.mktime(startTime.timetuple()) + noRuns) + ''' + + if parset.getString('OLAP.OLAP_Conn.input_DelayComp_Transport') == 'Null': + parset['OLAP.OLAP_Conn.input_DelayComp_Transport'] = 'NULL' + + if parset.getString('OLAP.OLAP_Conn.input_BGLProc_Transport') == 'Null': + parset['OLAP.OLAP_Conn.input_BGLProc_Transport'] = 'NULL' + + if parset.getString('OLAP.OLAP_Conn.station_Input_Transport') == 'Null': + parset['OLAP.OLAP_Conn.station_Input_Transport'] = 'NULL' + + if parset.getString('OLAP.OLAP_Conn.BGLProc_Storage_Transport') == 'Null': + parset['OLAP.OLAP_Conn.BGLProc_Storage_Transport'] = 'NULL' + + if not parset.getBool('OLAP.BGLProc.useZoid'): # override CS1.parset + print 'ZOID!!!!' + parset['OLAP.IONProc.useScatter'] = 'false' + parset['OLAP.IONProc.useGather'] = 'false' + parset['OLAP.BGLProc.nodesPerPset'] = 8 + parset['OLAP.IONProc.maxConcurrentComm'] = 2 + + if parset.getBool('OLAP.IONProc.useGather'): + print 'useGather!!!!' + #parset['OLAP.IONProc.integrationSteps'] = integrationTime + parset['OLAP.StorageProc.integrationSteps'] = 1 + else: + parset['OLAP.IONProc.integrationSteps'] = 1 + #parset['OLAP.StorageProc.integrationSteps'] = integrationTime + + if parset.getInt32('Observation.sampleClock') == 160: + parset['OLAP.BGLProc.integrationSteps'] = 608 + elif parset.getInt32('Observation.sampleClock') == 200: + parset['OLAP.BGLProc.integrationSteps'] = 768 + + #get the stations + stationList = parset.getStringVector('OLAP.storageStationNames') + parset['OLAP.nrRSPboards'] = len(stationList) + + #create input cluster objects + liifen = ClusterFEN(name = 'liifen', address = '129.125.99.51') + liifen.setSlavesByPattern('lii%03d', '10.162.0.%d', [1,2,3,4,5,6,7,8,9,10,11,12]) + + #set keys 'Input.InputNodes' and 'Input.OutputNodes' + nSubbands = len(parset.getInt32Vector('Observation.subbandList')) + nSubbandsPerCell = parset.getInt32('OLAP.subbandsPerPset') * parset.getInt32('OLAP.BGLProc.psetsPerCell') + nCells = float(nSubbands) / float(nSubbandsPerCell) + if not nSubbands % nSubbandsPerCell == 0: + raise Exception('Not a integer number of compute cells (nSubbands = %d and nSubbandsPerCell = %d)' % (nSubbands, nSubbandsPerCell)) + nCells = int(nCells) + host = copy.deepcopy(liifen) + slaves = host.getSlaves() + + inputNodes = getInputNodes(stationList, parset) + outputNodes = range(1, nCells + 1) + allNodes = inputNodes + [node for node in outputNodes if not node in inputNodes] + + inputIndices = range(len(inputNodes)) + outputIndices = [allNodes.index(node) for node in outputNodes] + + newslaves = [slaves[ind - 1] for ind in allNodes] + host.setSlaves(newslaves) + noProcesses = len(newslaves) + + parset['Input.InputNodes'] = inputIndices + parset['Input.OutputNodes'] = outputIndices + + bglprocIPs = [newslaves[j].getExtIP() for j in outputIndices] + parset['OLAP.OLAP_Conn.input_BGLProc_ServerHosts'] = '[' + ','.join(bglprocIPs) + ']' + + parset.writeToFile('../share/Transpose.parset') + + #createMachinefile + lmf = '/tmp/CS1_tmpfile' + slaves = host.getSlaves(noProcesses) + outf = open(lmf, 'w') + for slave in slaves: + outf.write(slave.getIntName() + '\n') + outf.close() + os.system('mv /tmp/CS1_tmpfile /opt/lofar/bin/Transpose.machinefile 2>&1 >> /dev/null') + print 'noProcesses = ' + str(noProcesses) + sys.exit(noProcesses) + else: + print 'file ' + options.parsetfile + ' does not exist!' + sys.exit(0) + diff --git a/Appl/CEP/CS1/CS1_InputSection/src/startMPI.sh b/Appl/CEP/CS1/CS1_InputSection/src/startMPI.sh new file mode 100755 index 0000000000000000000000000000000000000000..6e8011dd172f4269aee09aa5b269b6eb040d4adf --- /dev/null +++ b/Appl/CEP/CS1/CS1_InputSection/src/startMPI.sh @@ -0,0 +1,20 @@ +# startMPI.sh jobName machinefile executable paramfile noNodes +# +# $1 jobName identifier for this job +# $2 machinefile procID.machinefile +# $3 executable processname +# $4 parameterfile procID.ps +# $5 numberOfNodes +# +# calls mpirun and remembers the pid +# + +# now all ACC processes expect to be started with ACC as first parameter + +# start process +# TODO: on some hosts, mpirun has a different name (or a specific path) +# on some hosts, we should use -hostfile instead of -machinefile + +./prepare_$3.py + +cd /opt/lofar/bin/; mpirun_rsh -np $5 -hostfile $2 ./$3 ACC $4 $1>>/opt/lofar/log/$3.log 2>&1 & diff --git a/Appl/CEP/CS1/CS1_InputSection/src/stopMPI.sh b/Appl/CEP/CS1/CS1_InputSection/src/stopMPI.sh new file mode 100755 index 0000000000000000000000000000000000000000..107a6598e7e34d34dd1e88f82781c0cc7028d322 --- /dev/null +++ b/Appl/CEP/CS1/CS1_InputSection/src/stopMPI.sh @@ -0,0 +1,13 @@ +# stopMPI.sh execName +# +# +# Stops the given process by killing the process whose pid is in the +# proces.pid file. + +# TODO: for some mpi versions it is not enough to kill mpirun +# we could "killall executable", but that would also kill +# processes started by another ApplicationController + +cexec killall -9 $1 + +rm -f $1*.ps