Skip to content
Snippets Groups Projects
Commit 68eb8451 authored by Martin Gels's avatar Martin Gels
Browse files

BugID:1005

Checked in first version of the file scripts.

Modified consist of integration with SAS/MAC.
parent 7a2b6ddc
Branches
Tags
No related merge requests found
#!/bin/bash
#
# CorrAppl: a start/stop/status script for swlevel
#
# Copyright (C) 2007
# ASTRON (Netherlands Foundation for Research in Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Syntax: CorrAppl start|stop|status
#
# $Id$
#
#
# SyntaxError msg
#
SyntaxError()
{
Msg=$1
[ -z "${Msg}" ] || echo "ERROR: ${Msg}"
echo ""
echo "Syntax: $(basename $0) start | stop | status"
echo ""
exit 1
}
#
# Start the program when it exists
#
start_prog()
{
# put here your code to start your program
echo 'start_prog()'
}
#
# Stop the program when it is running
#
stop_prog()
{
# put here your code to stop your program
killall ApplController
ps -ef | grep -v grep | grep -v ACDaemon[^\ ] | grep ACDaemon 2>&1 >/dev/null
if [ $? -ne 0 ]; then
if [ -f ../etc/ACD.admin ]; then
rm ../etc/ACD.admin
fi
fi
echo 'Freeing CorrAppl (5 minutes)'
ssh $USER@bglsn /opt/lofar/bin/stopBGL.py
}
#
# show status of program
#
# arg1 = levelnr
#
status_prog()
{
levelnr=$1
# put here code to figure out the status of your program and
# fill the variables prog and pid with the right information
# e.g.
prog=CorrAppl
# this line should be left in, it shows the status in the right format
#echo ${levelnr} ${prog} ${pid} | awk '{ printf "%s : %-25s %s\n", $1, $2, $3 }'
echo ${levelnr} ${prog} `ssh $USER@bglsn /opt/lofar/bin/stopBGL.py --status=true` | awk '{ printf "%s : %-25s %s\n", $1, $2, $3 }'
}
#
# MAIN
#
# when no argument is given show syntax error.
if [ -z "$1" ]; then
SyntaxError
fi
# first power down to this level
case $1 in
start) start_prog
;;
stop) stop_prog
;;
status) status_prog $2
;;
*) SyntaxError
;;
esac
#!/usr/bin/env python
import math
import time
import datetime
import os
import sys
import copy
class CS1_Parset(object):
def __init__(self):
self.stationList = list()
self.parameters = dict()
def readFromFile(self, fileName):
lastline = ''
for line in open(fileName, 'r').readlines():
lastline = lastline + line.split('#')[0]
lastline = lastline.rstrip()
if len(lastline) > 0 and lastline[-1] == '\\':
lastline = lastline[:-1]
elif '=' in lastline:
key, value = lastline.split('=')
self.parameters[key.strip()] = value.strip()
lastline = ''
def writeToFile(self, fileName):
outf = open(fileName, 'w')
items = self.parameters.items()
items.sort()
for key, value in items:
outf.write(key + ' = ' + str(value) + '\n')
outf.close()
def __contains__(self, key):
return key in self.parameters
def __setitem__(self, key, value):
self.parameters[key] = value
def __getitem__(self, key):
return self.parameters[key]
def getInt32Vector(self, key):
ln = self.parameters[key]
ln_tmp = ln.split('[')
line = ln_tmp[1].split(']')
return [int(lp) for lp in line[0].split(',')]
def getInt32(self, key):
return int(self.parameters[key])
def getStringVector(self, key):
line = self.parameters[key]
line.strip('[').strip(']')
return line.split(',')
def getString(self, key):
return self.parameters[key]
def getFloat(self, key):
return float(self.parameters[key])
def getBool(self, key):
return self.parameters[key] == 'true'
class ClusterSlave(object):
def __init__(self, intName, extIP):
self.intName = intName
self.extIP = extIP
def getIntName(self):
return self.intName
def getExtIP(self):
return self.extIP
class ClusterFEN(object):
def __init__(self, name, address, slaves = list()):
self.slaves = slaves
#Host.__init__(self, name, address)
def getSlaves(self, number = None):
return self.slaves[0:number]
def setSlaves(self, slaves):
self.slaves = slaves
def setSlavesByPattern(self, intNamePattern, extIPPattern, numberRange):
self.slaves = list()
for number in numberRange:
self.slaves.append(ClusterSlave(intNamePattern % number, extIPPattern % number))
def parseStationList():
"""
pattern = '^CS010_dipole0|CS010_dipole4|CS010_dipole8|CS010_dipole12| \
CS008_dipole0|CS008_dipole4|CS008_dipole8|CS008_dipole12| \
CS001_dipole0|CS001_dipole4|CS001_dipole8|CS001_dipole12| \
CS016_dipole0|CS016_dipole4|CS016_dipole8|CS016_dipole12$'
print 'pattern = ' + str(re.search(pattern, 'CS010_dipole8'))
"""
def getInputNodes(stationList, parset):
inputNodelist = list()
for s in stationList:
s = s.strip(" ")
s = s.strip("[ ]")
s = s.strip("'")
name = parset.getString('PIC.Core.' + s + '.port')
name=name.split(":")
name=name[0].strip("lii")
inputNodelist.append(int(name))
return inputNodelist
if __name__ == '__main__':
# create the parset
parset = CS1_Parset()
stationList = list()
if os.path.exists("../share/Correlator.parset"):
parset.readFromFile('../share/Correlator.parset')
#read keys from parset file: OLAP.parset
if os.path.exists("OLAP.parset"):
parset.readFromFile('OLAP.parset')
else:
print 'file OLAP.parset does not exist!'
sys.exit(0)
'''
#set start/stop time
if parset.getString('OLAP.OLAP_Conn.station_Input_Transport') == 'NULL':
# Read from memory!
parset['Observation.startTime'] = datetime.datetime.fromtimestamp(1)
else:
#start=int(time.asctime(time.gmtime()))+ 90
start=int(time.time() + 90)
#parset['Observation.startTime'] = datetime.datetime.fromtimestamp(start)
parset['Observation.startTime'] = datetime.datetime.utcfromtimestamp(start)
duration = 300
parset['Observation.stopTime'] = datetime.datetime.utcfromtimestamp(start + duration)
'''
if parset.getString('OLAP.OLAP_Conn.input_BGLProc_Transport') == 'Null':
parset['OLAP.OLAP_Conn.input_BGLProc_Transport'] = 'NULL'
if parset.getString('OLAP.OLAP_Conn.station_Input_Transport') == 'Null':
parset['OLAP.OLAP_Conn.station_Input_Transport'] = 'NULL'
if parset.getString('OLAP.OLAP_Conn.BGLProc_Storage_Transport') == 'Null':
parset['OLAP.OLAP_Conn.BGLProc_Storage_Transport'] = 'NULL'
if not parset.getBool('OLAP.BGLProc.useZoid'): # override CS1.parset
print 'ZOID!!!!'
parset['OLAP.IONProc.useScatter'] = 'false'
parset['OLAP.IONProc.useGather'] = 'false'
parset['OLAP.BGLProc.nodesPerPset'] = 8
parset['OLAP.IONProc.maxConcurrentComm'] = 2
BGLPartition = ('R000_128_0', 'R000_128_0Z')[parset.getBool('OLAP.BGLProc.useZoid')]
parset['CorrAppl.Correlator.partition'] = BGLPartition
if parset.getBool('OLAP.IONProc.useGather'):
print 'useGather!!!!'
#parset['OLAP.IONProc.integrationSteps'] = integrationTime
parset['OLAP.StorageProc.integrationSteps'] = 1
else:
parset['OLAP.IONProc.integrationSteps'] = 1
#parset['OLAP.StorageProc.integrationSteps'] = integrationTime
if parset.getInt32('Observation.sampleClock') == 160:
parset['OLAP.BGLProc.integrationSteps'] = 608
elif parset.getInt32('Observation.sampleClock') == 200:
parset['OLAP.BGLProc.integrationSteps'] = 768
#get the stations
stationList = parset.getStringVector('OLAP.storageStationNames')
parset['OLAP.nrRSPboards'] = len(stationList)
#create input cluster objects
liifen = ClusterFEN(name = 'liifen', address = '129.125.99.51')
liifen.setSlavesByPattern('lii%03d', '10.162.0.%d', [1,2,3,4,5,6,7,8,9,10,11,12])
#set keys 'Input.InputNodes' and 'Input.OutputNodes'
nSubbands = len(parset.getInt32Vector('Observation.subbandList'))
nSubbandsPerCell = parset.getInt32('OLAP.subbandsPerPset') * parset.getInt32('OLAP.BGLProc.psetsPerCell')
nCells = float(nSubbands) / float(nSubbandsPerCell)
if not nSubbands % nSubbandsPerCell == 0:
raise Exception('Not a integer number of compute cells (nSubbands = %d and nSubbandsPerCell = %d)' % (nSubbands, nSubbandsPerCell))
nCells = int(nCells)
host = copy.deepcopy(liifen)
slaves = host.getSlaves()
inputNodes = getInputNodes(stationList, parset)
outputNodes = range(1, nCells + 1)
allNodes = inputNodes + [node for node in outputNodes if not node in inputNodes]
inputIndices = range(len(inputNodes))
outputIndices = [allNodes.index(node) for node in outputNodes]
newslaves = [slaves[ind - 1] for ind in allNodes]
host.setSlaves(newslaves)
noProcesses = len(newslaves)
parset['Input.InputNodes'] = inputIndices
parset['Input.OutputNodes'] = outputIndices
bglprocIPs = [newslaves[j].getExtIP() for j in outputIndices]
parset['OLAP.OLAP_Conn.input_BGLProc_ServerHosts'] = '[' + ','.join(bglprocIPs) + ']'
#create output cluster objects
listfen = ClusterFEN(name = 'listfen', address = '129.125.99.50')
listfen.setSlavesByPattern('list%03d', '10.181.0.%d', [1,2])
#set key 'Connections.BGLProc_Storage.ServerHosts'
nSubbandsPerCellStorage = parset.getInt32('OLAP.subbandsPerPset')
nPsetsPerStorage = parset.getInt32('OLAP.psetsPerStorage')
if not nSubbands % (nSubbandsPerCellStorage * nPsetsPerStorage) == 0:
raise Exception('Not a integer number of subbands per storage node!')
noProcessesStorage = nSubbands / (nSubbandsPerCellStorage * nPsetsPerStorage)
host = copy.deepcopy(listfen)
storageIPs = [s.getExtIP() for s in host.getSlaves(noProcessesStorage)]
parset['OLAP.OLAP_Conn.BGLProc_Storage_ServerHosts'] = '[' + ','.join(storageIPs) + ']'
parset.writeToFile('./CS1_BGL_Processing.parset')
else:
print 'file ../share/Correlator.parset does not exist!'
sys.exit(0)
# startBGL.sh jobName partition executable workingDir paramfile noNodes
#
# jobName
# partition
# executable executable file (should be in a place that is readable from BG/L)
# workingDir directory for output files (should be readable by BG/L)
# parameterfile jobName.ps
# noNodes number of nodes of the partition to use
#
# start the job and stores the jobID in jobName.jobID
#
# all ACC processes expect to be started with "ACC" as first parameter
# start process
if [ -f ../share/Correlator.parset ]
then
echo "../share/Correlator.parset file exist"
else
echo "Sorry, ../share/Correlator.parset file does not exist"
fi
./prepare_$3.py
cd $4; mpirun -partition $2 -mode VN -label -cwd $4 $4/$3 $4/CS1_BGL_Processing.parset 5422 >> ../log/$3.log 2>&1 &
#!/usr/bin/env python
import socket
import os
import sys
import time
from optparse import OptionParser
host = 'localhost'
port = 32031
replyformat = 0
class switch(object):
def __init__(self, value):
self.value = value
self.fall = False
def __iter__(self):
"""Return the match method once, then stop"""
yield self.match
raise StopIteration
def match(self, *args):
"""Indicate whether or not to enter a case suite"""
if self.fall or not args:
return True
elif self.value in args: # changed for v1.5, see below
self.fall = True
return True
else:
return False
################################################################
# mmcs_cmd(szSocket, szCmd)
#
# mmcs_cmd -- send an mmcs command and gather and check the response.
# inputs:
# param0 -- remote tcp port to send command to.
# param1 -- command string
# outputs:
# return values in a list.
#
def mmcs_cmd (socket, szCmd):
lines = list()
socket.send(szCmd) # execute the command.
if replyformat == 0:
results = socket.recv(1024) # read the result...
lines.append(results.rstrip("\n")) # get rid of lf at end.
else:
file = socket.makefile()
for line in file:
if line.find('\0') >= 0:
break
lines.append(line.rstrip("\n")) # get rid of lf at end.
return lines
def set_replyformat(socket, rformat):
results = mmcs_cmd(socket, 'replyformat ' + str(rformat) + '\n')
if results[0] != 'OK':
print 'set replyformat:' + str(rformat) + ' ...failed'
global replyformat
replyformat = rformat
def list_jobs(socket):
set_replyformat(socket, 1)
return mmcs_cmd(socket, 'list_jobs\n')
def list_blocks(socket):
set_replyformat(socket, 1)
return mmcs_cmd(socket, 'list_blocks\n')
def jobId(socket):
results = list_jobs(socket)
for line in results:
if line.find(options.blockid) >= 0:
return line.split()[0]
def free_block(socket):
set_replyformat(socket, 0)
results = mmcs_cmd(socket, 'free ' + options.blockid + '\n');
if results[0] != 'OK':
print 'free \'%s\' ' % options.blockid + ' ...failed'
def killjob(socket):
set_replyformat(socket, 0)
jobid = jobId(socket)
results = mmcs_cmd(socket, 'killjob ' + options.blockid + ' ' + jobid + '\n');
if results[0] != 'OK':
print 'killjob ' + options.blockid + jobid + ' ...failed'
def partition_exist(socket):
results = list_blocks(socket)
for line in results:
if line.find(options.blockid) >= 0 and line.find(options.user) >= 0:
return True
return False
def block_status(socket):
set_replyformat(socket, 1)
results = list_blocks(socket)
for line in results:
if line.find(options.blockid) >= 0:
return line.split()[1]
def job_status(socket):
set_replyformat(socket, 1)
results = list_jobs(socket)
for line in results:
if line.find(options.blockid) >= 0:
return line.split()[1]
def show_block_status(socket):
set_replyformat(socket, 1)
results = list_jobs(socket)
for line in results:
if line.find(options.blockid) >= 0:
print line
#
# Start of mainline
#
if __name__ == '__main__':
parser = OptionParser()
parser.add_option('--user' , dest='user' , default=os.environ.get('USER', 'default'), type='string', help='username [%default]')
parser.add_option('--blockid' , dest='blockid' , default='R000_128_0Z' , type='string', help='name of the blockid [%default]')
parser.add_option('--status' , dest='status' , default='false' , type='string', help='Show status of the blockid ')
# parse the options
(options, args) = parser.parse_args()
remote = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
remote.connect((host, port))
results = mmcs_cmd(remote, 'set_username ' + options.user + '\n');
if results[0] != 'OK':
print 'set_username ' + options.user + ' ...failed'
if options.status == 'true':
if partition_exist(remote):
print 'UP'
else:
print 'DOWN'
sys.exit(0)
partitionExist = partition_exist(remote)
bl_stat = ''
job_stat = ''
while (partitionExist):
status_block = block_status(remote)
if status_block != bl_stat:
print 'Block %s' % options.blockid + ' status: ' + str(status_block)
bl_stat = status_block
for case in switch(status_block):
if case('A'):
free_block(remote)
break
if case('B'):
free_block(remote)
break
if case('D'):
break
if case('T'):
break
if case('I'):
status_job = job_status(remote)
if status_job != job_stat:
print 'Job status: ' + str(status_job)
job_stat = status_job
for case in switch(status_job):
if case('S'):
killjob(remote)
break
if case('R'):
killjob(remote)
break
if case('D'):
break
break
if case(): # default, could also just omit condition or 'if True'
print "something else!"
# No need to break here, it'll stop anyway
time.sleep(2)
partitionExist = partition_exist(remote)
remote.close()
sys.exit(0)
# stopAP.sh partition jobName
#
# partition BG/L partition the job is running on
# jobName The name of the job
#
ssh $USER@bglsn /opt/lofar/bin/stopBGL.py --blockid=$1
#
# swlevel.conf
#
# Table to manage the progrma that should be started and stopped
# level : up : down : root : mpi : program
#
1:u:d:::ACDaemon
6::d:::ApplController
6::d:::CorrAppl
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment