diff --git a/Appl/CEP/CS1/CS1_BGLProc/src/CorrAppl.sh b/Appl/CEP/CS1/CS1_BGLProc/src/CorrAppl.sh new file mode 100755 index 0000000000000000000000000000000000000000..f67cf6c85cc4efb091c5418b6e2e61fc58957a9f --- /dev/null +++ b/Appl/CEP/CS1/CS1_BGLProc/src/CorrAppl.sh @@ -0,0 +1,107 @@ +#!/bin/bash +# +# CorrAppl: a start/stop/status script for swlevel +# +# Copyright (C) 2007 +# ASTRON (Netherlands Foundation for Research in Astronomy) +# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Syntax: CorrAppl start|stop|status +# +# $Id$ +# + +# +# SyntaxError msg +# +SyntaxError() +{ + Msg=$1 + + [ -z "${Msg}" ] || echo "ERROR: ${Msg}" + echo "" + echo "Syntax: $(basename $0) start | stop | status" + echo "" + exit 1 +} + +# +# Start the program when it exists +# +start_prog() +{ + # put here your code to start your program + echo 'start_prog()' +} + +# +# Stop the program when it is running +# +stop_prog() +{ + # put here your code to stop your program + killall ApplController + ps -ef | grep -v grep | grep -v ACDaemon[^\ ] | grep ACDaemon 2>&1 >/dev/null + if [ $? -ne 0 ]; then + if [ -f ../etc/ACD.admin ]; then + rm ../etc/ACD.admin + fi + fi + echo 'Freeing CorrAppl (5 minutes)' + ssh $USER@bglsn /opt/lofar/bin/stopBGL.py +} + +# +# show status of program +# +# arg1 = levelnr +# +status_prog() +{ + levelnr=$1 + + # put here code to figure out the status of your program and + # fill the variables prog and pid with the right information + + # e.g. + prog=CorrAppl + + # this line should be left in, it shows the status in the right format + #echo ${levelnr} ${prog} ${pid} | awk '{ printf "%s : %-25s %s\n", $1, $2, $3 }' + echo ${levelnr} ${prog} `ssh $USER@bglsn /opt/lofar/bin/stopBGL.py --status=true` | awk '{ printf "%s : %-25s %s\n", $1, $2, $3 }' +} + +# +# MAIN +# + +# when no argument is given show syntax error. +if [ -z "$1" ]; then + SyntaxError +fi + +# first power down to this level +case $1 in + start) start_prog + ;; + stop) stop_prog + ;; + status) status_prog $2 + ;; + *) SyntaxError + ;; +esac diff --git a/Appl/CEP/CS1/CS1_BGLProc/src/prepare_CS1_BGL_Processing.py b/Appl/CEP/CS1/CS1_BGLProc/src/prepare_CS1_BGL_Processing.py new file mode 100755 index 0000000000000000000000000000000000000000..6cdbbcb6cd3e72d8ef03e9a8692907b78e5a952e --- /dev/null +++ b/Appl/CEP/CS1/CS1_BGLProc/src/prepare_CS1_BGL_Processing.py @@ -0,0 +1,234 @@ +#!/usr/bin/env python + +import math +import time +import datetime +import os +import sys +import copy + +class CS1_Parset(object): + + def __init__(self): + self.stationList = list() + self.parameters = dict() + + def readFromFile(self, fileName): + lastline = '' + for line in open(fileName, 'r').readlines(): + lastline = lastline + line.split('#')[0] + lastline = lastline.rstrip() + if len(lastline) > 0 and lastline[-1] == '\\': + lastline = lastline[:-1] + elif '=' in lastline: + key, value = lastline.split('=') + self.parameters[key.strip()] = value.strip() + lastline = '' + + def writeToFile(self, fileName): + outf = open(fileName, 'w') + items = self.parameters.items() + items.sort() + for key, value in items: + outf.write(key + ' = ' + str(value) + '\n') + outf.close() + + def __contains__(self, key): + return key in self.parameters + + def __setitem__(self, key, value): + self.parameters[key] = value + + def __getitem__(self, key): + return self.parameters[key] + + def getInt32Vector(self, key): + ln = self.parameters[key] + ln_tmp = ln.split('[') + line = ln_tmp[1].split(']') + return [int(lp) for lp in line[0].split(',')] + + def getInt32(self, key): + return int(self.parameters[key]) + + def getStringVector(self, key): + line = self.parameters[key] + line.strip('[').strip(']') + return line.split(',') + + def getString(self, key): + return self.parameters[key] + + def getFloat(self, key): + return float(self.parameters[key]) + + def getBool(self, key): + return self.parameters[key] == 'true' + +class ClusterSlave(object): + def __init__(self, intName, extIP): + self.intName = intName + self.extIP = extIP + def getIntName(self): + return self.intName + def getExtIP(self): + return self.extIP + +class ClusterFEN(object): + def __init__(self, name, address, slaves = list()): + self.slaves = slaves + #Host.__init__(self, name, address) + def getSlaves(self, number = None): + return self.slaves[0:number] + def setSlaves(self, slaves): + self.slaves = slaves + def setSlavesByPattern(self, intNamePattern, extIPPattern, numberRange): + self.slaves = list() + for number in numberRange: + self.slaves.append(ClusterSlave(intNamePattern % number, extIPPattern % number)) + +def parseStationList(): + """ + pattern = '^CS010_dipole0|CS010_dipole4|CS010_dipole8|CS010_dipole12| \ + CS008_dipole0|CS008_dipole4|CS008_dipole8|CS008_dipole12| \ + CS001_dipole0|CS001_dipole4|CS001_dipole8|CS001_dipole12| \ + CS016_dipole0|CS016_dipole4|CS016_dipole8|CS016_dipole12$' + print 'pattern = ' + str(re.search(pattern, 'CS010_dipole8')) + """ + +def getInputNodes(stationList, parset): + inputNodelist = list() + + for s in stationList: + s = s.strip(" ") + s = s.strip("[ ]") + s = s.strip("'") + name = parset.getString('PIC.Core.' + s + '.port') + name=name.split(":") + name=name[0].strip("lii") + inputNodelist.append(int(name)) + + return inputNodelist + +if __name__ == '__main__': + + # create the parset + parset = CS1_Parset() + stationList = list() + + if os.path.exists("../share/Correlator.parset"): + + parset.readFromFile('../share/Correlator.parset') + + #read keys from parset file: OLAP.parset + if os.path.exists("OLAP.parset"): + parset.readFromFile('OLAP.parset') + else: + print 'file OLAP.parset does not exist!' + sys.exit(0) + + ''' + #set start/stop time + if parset.getString('OLAP.OLAP_Conn.station_Input_Transport') == 'NULL': + # Read from memory! + parset['Observation.startTime'] = datetime.datetime.fromtimestamp(1) + else: + #start=int(time.asctime(time.gmtime()))+ 90 + start=int(time.time() + 90) + #parset['Observation.startTime'] = datetime.datetime.fromtimestamp(start) + parset['Observation.startTime'] = datetime.datetime.utcfromtimestamp(start) + + duration = 300 + + parset['Observation.stopTime'] = datetime.datetime.utcfromtimestamp(start + duration) + ''' + + if parset.getString('OLAP.OLAP_Conn.input_BGLProc_Transport') == 'Null': + parset['OLAP.OLAP_Conn.input_BGLProc_Transport'] = 'NULL' + + if parset.getString('OLAP.OLAP_Conn.station_Input_Transport') == 'Null': + parset['OLAP.OLAP_Conn.station_Input_Transport'] = 'NULL' + + if parset.getString('OLAP.OLAP_Conn.BGLProc_Storage_Transport') == 'Null': + parset['OLAP.OLAP_Conn.BGLProc_Storage_Transport'] = 'NULL' + + if not parset.getBool('OLAP.BGLProc.useZoid'): # override CS1.parset + print 'ZOID!!!!' + parset['OLAP.IONProc.useScatter'] = 'false' + parset['OLAP.IONProc.useGather'] = 'false' + parset['OLAP.BGLProc.nodesPerPset'] = 8 + parset['OLAP.IONProc.maxConcurrentComm'] = 2 + + BGLPartition = ('R000_128_0', 'R000_128_0Z')[parset.getBool('OLAP.BGLProc.useZoid')] + parset['CorrAppl.Correlator.partition'] = BGLPartition + + if parset.getBool('OLAP.IONProc.useGather'): + print 'useGather!!!!' + #parset['OLAP.IONProc.integrationSteps'] = integrationTime + parset['OLAP.StorageProc.integrationSteps'] = 1 + else: + parset['OLAP.IONProc.integrationSteps'] = 1 + #parset['OLAP.StorageProc.integrationSteps'] = integrationTime + + if parset.getInt32('Observation.sampleClock') == 160: + parset['OLAP.BGLProc.integrationSteps'] = 608 + elif parset.getInt32('Observation.sampleClock') == 200: + parset['OLAP.BGLProc.integrationSteps'] = 768 + + #get the stations + stationList = parset.getStringVector('OLAP.storageStationNames') + parset['OLAP.nrRSPboards'] = len(stationList) + + #create input cluster objects + liifen = ClusterFEN(name = 'liifen', address = '129.125.99.51') + liifen.setSlavesByPattern('lii%03d', '10.162.0.%d', [1,2,3,4,5,6,7,8,9,10,11,12]) + + #set keys 'Input.InputNodes' and 'Input.OutputNodes' + nSubbands = len(parset.getInt32Vector('Observation.subbandList')) + nSubbandsPerCell = parset.getInt32('OLAP.subbandsPerPset') * parset.getInt32('OLAP.BGLProc.psetsPerCell') + nCells = float(nSubbands) / float(nSubbandsPerCell) + if not nSubbands % nSubbandsPerCell == 0: + raise Exception('Not a integer number of compute cells (nSubbands = %d and nSubbandsPerCell = %d)' % (nSubbands, nSubbandsPerCell)) + nCells = int(nCells) + host = copy.deepcopy(liifen) + slaves = host.getSlaves() + + inputNodes = getInputNodes(stationList, parset) + outputNodes = range(1, nCells + 1) + allNodes = inputNodes + [node for node in outputNodes if not node in inputNodes] + + inputIndices = range(len(inputNodes)) + outputIndices = [allNodes.index(node) for node in outputNodes] + + newslaves = [slaves[ind - 1] for ind in allNodes] + host.setSlaves(newslaves) + noProcesses = len(newslaves) + + parset['Input.InputNodes'] = inputIndices + parset['Input.OutputNodes'] = outputIndices + + bglprocIPs = [newslaves[j].getExtIP() for j in outputIndices] + parset['OLAP.OLAP_Conn.input_BGLProc_ServerHosts'] = '[' + ','.join(bglprocIPs) + ']' + + #create output cluster objects + listfen = ClusterFEN(name = 'listfen', address = '129.125.99.50') + listfen.setSlavesByPattern('list%03d', '10.181.0.%d', [1,2]) + + #set key 'Connections.BGLProc_Storage.ServerHosts' + nSubbandsPerCellStorage = parset.getInt32('OLAP.subbandsPerPset') + nPsetsPerStorage = parset.getInt32('OLAP.psetsPerStorage') + if not nSubbands % (nSubbandsPerCellStorage * nPsetsPerStorage) == 0: + raise Exception('Not a integer number of subbands per storage node!') + + noProcessesStorage = nSubbands / (nSubbandsPerCellStorage * nPsetsPerStorage) + host = copy.deepcopy(listfen) + + storageIPs = [s.getExtIP() for s in host.getSlaves(noProcessesStorage)] + parset['OLAP.OLAP_Conn.BGLProc_Storage_ServerHosts'] = '[' + ','.join(storageIPs) + ']' + + parset.writeToFile('./CS1_BGL_Processing.parset') + + else: + print 'file ../share/Correlator.parset does not exist!' + sys.exit(0) + diff --git a/Appl/CEP/CS1/CS1_BGLProc/src/startBGL.sh b/Appl/CEP/CS1/CS1_BGLProc/src/startBGL.sh new file mode 100755 index 0000000000000000000000000000000000000000..0f398e635bfc53df227b1d737f3f7acfe2498d5d --- /dev/null +++ b/Appl/CEP/CS1/CS1_BGLProc/src/startBGL.sh @@ -0,0 +1,25 @@ +# startBGL.sh jobName partition executable workingDir paramfile noNodes +# +# jobName +# partition +# executable executable file (should be in a place that is readable from BG/L) +# workingDir directory for output files (should be readable by BG/L) +# parameterfile jobName.ps +# noNodes number of nodes of the partition to use +# +# start the job and stores the jobID in jobName.jobID +# +# all ACC processes expect to be started with "ACC" as first parameter + +# start process + +if [ -f ../share/Correlator.parset ] +then + echo "../share/Correlator.parset file exist" +else + echo "Sorry, ../share/Correlator.parset file does not exist" +fi + +./prepare_$3.py + +cd $4; mpirun -partition $2 -mode VN -label -cwd $4 $4/$3 $4/CS1_BGL_Processing.parset 5422 >> ../log/$3.log 2>&1 & diff --git a/Appl/CEP/CS1/CS1_BGLProc/src/stopBGL.py b/Appl/CEP/CS1/CS1_BGLProc/src/stopBGL.py new file mode 100755 index 0000000000000000000000000000000000000000..01e3ba80736f9f997f285f481f2e5ae15ccfc3df --- /dev/null +++ b/Appl/CEP/CS1/CS1_BGLProc/src/stopBGL.py @@ -0,0 +1,193 @@ +#!/usr/bin/env python + +import socket +import os +import sys +import time +from optparse import OptionParser + +host = 'localhost' +port = 32031 +replyformat = 0 + +class switch(object): + def __init__(self, value): + self.value = value + self.fall = False + + def __iter__(self): + """Return the match method once, then stop""" + yield self.match + raise StopIteration + + def match(self, *args): + """Indicate whether or not to enter a case suite""" + if self.fall or not args: + return True + elif self.value in args: # changed for v1.5, see below + self.fall = True + return True + else: + return False + +################################################################ +# mmcs_cmd(szSocket, szCmd) +# +# mmcs_cmd -- send an mmcs command and gather and check the response. +# inputs: +# param0 -- remote tcp port to send command to. +# param1 -- command string +# outputs: +# return values in a list. +# + +def mmcs_cmd (socket, szCmd): + lines = list() + socket.send(szCmd) # execute the command. + if replyformat == 0: + results = socket.recv(1024) # read the result... + lines.append(results.rstrip("\n")) # get rid of lf at end. + else: + file = socket.makefile() + for line in file: + if line.find('\0') >= 0: + break + lines.append(line.rstrip("\n")) # get rid of lf at end. + return lines + +def set_replyformat(socket, rformat): + results = mmcs_cmd(socket, 'replyformat ' + str(rformat) + '\n') + if results[0] != 'OK': + print 'set replyformat:' + str(rformat) + ' ...failed' + global replyformat + replyformat = rformat + +def list_jobs(socket): + set_replyformat(socket, 1) + return mmcs_cmd(socket, 'list_jobs\n') + +def list_blocks(socket): + set_replyformat(socket, 1) + return mmcs_cmd(socket, 'list_blocks\n') + +def jobId(socket): + results = list_jobs(socket) + for line in results: + if line.find(options.blockid) >= 0: + return line.split()[0] + +def free_block(socket): + set_replyformat(socket, 0) + results = mmcs_cmd(socket, 'free ' + options.blockid + '\n'); + if results[0] != 'OK': + print 'free \'%s\' ' % options.blockid + ' ...failed' + +def killjob(socket): + set_replyformat(socket, 0) + jobid = jobId(socket) + results = mmcs_cmd(socket, 'killjob ' + options.blockid + ' ' + jobid + '\n'); + if results[0] != 'OK': + print 'killjob ' + options.blockid + jobid + ' ...failed' + +def partition_exist(socket): + results = list_blocks(socket) + for line in results: + if line.find(options.blockid) >= 0 and line.find(options.user) >= 0: + return True + return False + +def block_status(socket): + set_replyformat(socket, 1) + results = list_blocks(socket) + for line in results: + if line.find(options.blockid) >= 0: + return line.split()[1] + +def job_status(socket): + set_replyformat(socket, 1) + results = list_jobs(socket) + for line in results: + if line.find(options.blockid) >= 0: + return line.split()[1] + +def show_block_status(socket): + set_replyformat(socket, 1) + results = list_jobs(socket) + for line in results: + if line.find(options.blockid) >= 0: + print line + + +# +# Start of mainline +# +if __name__ == '__main__': + + parser = OptionParser() + + parser.add_option('--user' , dest='user' , default=os.environ.get('USER', 'default'), type='string', help='username [%default]') + parser.add_option('--blockid' , dest='blockid' , default='R000_128_0Z' , type='string', help='name of the blockid [%default]') + parser.add_option('--status' , dest='status' , default='false' , type='string', help='Show status of the blockid ') + + # parse the options + (options, args) = parser.parse_args() + + remote = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + remote.connect((host, port)) + + results = mmcs_cmd(remote, 'set_username ' + options.user + '\n'); + if results[0] != 'OK': + print 'set_username ' + options.user + ' ...failed' + + if options.status == 'true': + if partition_exist(remote): + print 'UP' + else: + print 'DOWN' + sys.exit(0) + + partitionExist = partition_exist(remote) + bl_stat = '' + job_stat = '' + while (partitionExist): + status_block = block_status(remote) + if status_block != bl_stat: + print 'Block %s' % options.blockid + ' status: ' + str(status_block) + bl_stat = status_block + for case in switch(status_block): + if case('A'): + free_block(remote) + break + if case('B'): + free_block(remote) + break + if case('D'): + break + if case('T'): + break + if case('I'): + status_job = job_status(remote) + if status_job != job_stat: + print 'Job status: ' + str(status_job) + job_stat = status_job + for case in switch(status_job): + if case('S'): + killjob(remote) + break + if case('R'): + killjob(remote) + break + if case('D'): + break + break + if case(): # default, could also just omit condition or 'if True' + print "something else!" + # No need to break here, it'll stop anyway + + time.sleep(2) + partitionExist = partition_exist(remote) + + remote.close() + + sys.exit(0) diff --git a/Appl/CEP/CS1/CS1_BGLProc/src/stopBGL.sh b/Appl/CEP/CS1/CS1_BGLProc/src/stopBGL.sh new file mode 100755 index 0000000000000000000000000000000000000000..926c370ed88085ac9c55611e2a81451d542aafc2 --- /dev/null +++ b/Appl/CEP/CS1/CS1_BGLProc/src/stopBGL.sh @@ -0,0 +1,7 @@ +# stopAP.sh partition jobName +# +# partition BG/L partition the job is running on +# jobName The name of the job +# + +ssh $USER@bglsn /opt/lofar/bin/stopBGL.py --blockid=$1 diff --git a/Appl/CEP/CS1/CS1_BGLProc/src/swlevel.conf b/Appl/CEP/CS1/CS1_BGLProc/src/swlevel.conf new file mode 100644 index 0000000000000000000000000000000000000000..9ffd28f7b36c8cf3eb6c473c2c8fe6f09e68aaba --- /dev/null +++ b/Appl/CEP/CS1/CS1_BGLProc/src/swlevel.conf @@ -0,0 +1,11 @@ +# +# swlevel.conf +# +# Table to manage the progrma that should be started and stopped +# level : up : down : root : mpi : program +# + +1:u:d:::ACDaemon +6::d:::ApplController +6::d:::CorrAppl +