Skip to content
Snippets Groups Projects
Commit 3fa9486d authored by Martin Gels's avatar Martin Gels
Browse files

Bug 1005: Renamed BGL in CN.

parent 65d9aca9
No related branches found
No related tags found
No related merge requests found
Showing
with 4166 additions and 0 deletions
...@@ -1221,6 +1221,7 @@ MAC/bootstrap -text svneol=native#application/octet-stream ...@@ -1221,6 +1221,7 @@ MAC/bootstrap -text svneol=native#application/octet-stream
MAC/lofarconf.in -text svneol=native#application/octet-stream MAC/lofarconf.in -text svneol=native#application/octet-stream
RTCP/BGLProc/bootstrap -text RTCP/BGLProc/bootstrap -text
RTCP/BGLProc/src/BGL_Processing.machinefile -text RTCP/BGLProc/src/BGL_Processing.machinefile -text
RTCP/CNProc/src/CN_Processing.machinefile -text
RTCP/IONProc/bootstrap -text RTCP/IONProc/bootstrap -text
RTCP/Interface/bootstrap -text RTCP/Interface/bootstrap -text
RTCP/Run/bootstrap -text RTCP/Run/bootstrap -text
......
# -*- Mode:rpm-spec -*-
# CNProc.spec.in
#
##############################################################################
#
# Preamble
#
##############################################################################
Summary: CNProc is ... brief description ...
%define release @RPM_RELEASE@
%define version @VERSION@
%define pkgname @PACKAGE@
%define pkgdir %{pkgname}-%{version}-%{release}
%define prefix /opt/lofar
%define configure_args @RPM_CONFIGURE_ARGS@
##define build_kernel_version @BUILD_KERNEL_VERSION@
Name: %{pkgname}
Version: %{version}
Release: %{release}
Copyright: LGPL
Group: Application/System
Source: %{pkgname}-%{version}.tar.gz
BuildRoot: %{_tmppath}/%{pkgdir}-root
URL: http://www.astron.nl
Prefix: %{prefix}
BuildArchitectures: i386 # Target platforms, i.e., i586
##Requires: Common = 1.2 ## define dependent packages here
Packager: %{packager}
Distribution: The LOFAR project
Vendor: ASTRON
AutoReqProv: no
%description
CNProc ... more detailed description ...
##############################################################################
#
# prep
#
##############################################################################
%prep
echo $prefix
# create the build directory, untar the source
%setup
##############################################################################
#
# build
#
##############################################################################
%build
./configure %{configure_args} --prefix=%{prefix} && make
##############################################################################
#
# install
#
##############################################################################
%install
# To make things work with BUILDROOT
if [ "$RPM_BUILD_ROOT" != "%{_tmppath}/%{pkgdir}-root" ]
then
echo
echo @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
echo @ @
echo @ RPM_BUILD_ROOT is not what I expected. Please clean it yourself. @
echo @ @
echo @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
echo
else
echo Cleaning RPM_BUILD_ROOT: "$RPM_BUILD_ROOT"
rm -rf "$RPM_BUILD_ROOT"
fi
mkdir -p $RPM_BUILD_ROOT%{prefix}
make DESTDIR="$RPM_BUILD_ROOT" install
#uninstall
##############################################################################
#
# verify
#
##############################################################################
#verify
##############################################################################
#
# clean
#
##############################################################################
%clean
# Call me paranoid, but I do not want to be responsible for nuking
# someone's harddrive!
if [ "$RPM_BUILD_ROOT" != "%{_tmppath}/%{pkgdir}-root" ]
then
echo
echo @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
echo @ @
echo @ RPM_BUILD_ROOT is not what I expected. Please clean it yourself. @
echo @ @
echo @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
echo
else
echo Cleaning RPM_BUILD_ROOT: "$RPM_BUILD_ROOT"
rm -rf "$RPM_BUILD_ROOT"
fi
##############################################################################
#
# files
#
##############################################################################
# empty 'files' means all distributed files
%files
%defattr(-, root, root)
%{prefix}
# Your application file list goes here
# %{prefix}/lib/lib*.so*
# Documentation
# doc COPYING ChangeLog README AUTHORS NEWS
# doc doc/*
# link the module to the correct path
%post
# before uninstall
%preun
# after uninstall
%postun
##############################################################################
#
# package devel
#
##############################################################################
#package devel
#Summary: Development files for %{pkgname}
#Group: Applications/System
#description devel
#Development files for %{pkgname}.
#files devel
# Your development files go here
# Programmers documentation goes here
#doc doc
# end of file
SUBDIRS=src test
ACLOCAL_AMFLAGS = -I $(top_srcdir)/autoconf_share
pkgextdir = $(prefix)/config/$(PACKAGE)
pkgext_DATA = pkgext pkgextcppflags pkgextcxxflags pkgextldflags
DISTCHECK_CONFIGURE_FLAGS=\
--with-common=$(prefix)
EXTRA_DIST = \
Makefile.common \
CNProc.spec \
autoconf_share/compiletool
include $(top_srcdir)/Makefile.common
dnl
dnl Process this file with autoconf to produce a configure script.
dnl
AC_INIT
dnl AC_CONFIG_AUX_DIR(config)
dnl AM_CONFIG_HEADER(config/config.h)
AM_CONFIG_HEADER(config.h)
AM_INIT_AUTOMAKE(CNProc, 1.0, no-define)
AM_PROG_AS(gcc)
dnl Initialize for LOFAR (may set compilers)
lofar_INIT
dnl Checks for programs.
AC_PROG_AWK
AC_PROG_YACC
AC_PROG_CC
AC_PROG_CXX
AM_PROG_LEX
AC_PROG_INSTALL
AC_PROG_LN_S
AC_DISABLE_SHARED
AC_PROG_LIBTOOL
dnl Checks for libraries.
dnl dnl Replace `main' with a function in -lfl:
dnl AC_CHECK_LIB(fl, main)
dnl dnl Replace `main' with a function in -lcosev_r:
dnl AC_CHECK_LIB(cosev_r, main)
dnl dnl Replace `main' with a function in -lcosnm_r:
dnl AC_CHECK_LIB(cosnm_r, main)
dnl dnl Replace `main' with a function in -lorb_r:
dnl AC_CHECK_LIB(orb_r, main)
dnl dnl Replace `main' with a function in -lpthread:
dnl AC_CHECK_LIB(pthread, main)
dnl dnl Replace `main' with a function in -lvport_r:
dnl AC_CHECK_LIB(vport_r, main)
dnl Checks for header files.
AC_HEADER_STDC
AC_CHECK_HEADERS(unistd.h)
dnl Checks for typedefs, structures, and compiler characteristics.
AC_C_CONST
AC_TYPE_SIZE_T
dnl Checks for library functions.
AC_FUNC_VPRINTF
dnl
dnl Check for LOFAR specific things
dnl
lofar_GENERAL
dnl lofar_BLITZ
lofar_MPI
lofar_FFTW3F(0)
lofar_FFTW2(0)
lofar_INTERNAL(LCS/Common,Common,,1,Common/LofarTypedefs.h,,)
lofar_INTERNAL(LCS/Stream,Stream,,1,Stream/Stream.h,,)
dnl lofar_INTERNAL(LCS/ACC/APS,APS,,1,APS/ParameterSet.h,,)
dnl lofar_INTERNAL(LCS/ACC/PLC,PLC,,1,PLC/ACCmain.h,,)
dnl lofar_INTERNAL(CEP/tinyCEP,tinyCEP,,1,tinyCEP/TinyDataManager.h,,)
lofar_INTERNAL(RTCP/Interface,Interface,,1,Interface/Config.h,,)
lofar_EXTERNAL(boost,1,boost/multi_array.hpp,"")
lofar_EXTERNAL(mass,0,"",,/opt/mass)
lofar_EXTERNAL(zoid,0,zoid_api.h,"",,-I/cephome/romein/projects/zoid/zoid/lofar,,"-L/cephome/romein/projects/zoid/glibc-build-zoid -L/cephome/romein/projects/zoid/zoid/lofar",-llofar_blrts)
lofar_EXTERNAL(fcnp,0,fcnp_cn.h,fcnp_cn,/cephome/romein/packages/fcnp)
dnl
dnl Output Makefiles
dnl
AC_OUTPUT(
src/Makefile
test/Makefile
Makefile
CNProc.spec
)
// \ingroup RTCP
// \defgroup CNProc CNProc Description
#!/bin/bash
#
# CorrAppl: a start/stop/status script for swlevel
#
# Copyright (C) 2007
# ASTRON (Netherlands Foundation for Research in Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Syntax: CorrAppl start|stop|status
#
# $Id$
#
#
# SyntaxError msg
#
SyntaxError()
{
Msg=$1
[ -z "${Msg}" ] || echo "ERROR: ${Msg}"
echo ""
echo "Syntax: $(basename $0) start | stop | status"
echo ""
exit 1
}
#
# Start the program when it exists
#
start_prog()
{
# put here your code to start your program
echo 'start_prog()'
}
#
# Stop the program when it is running
#
stop_prog()
{
# put here your code to stop your program
killall ApplController
ps -ef | grep -v grep | grep -v ACDaemon[^\ ] | grep ACDaemon 2>&1 >/dev/null
if [ $? -ne 0 ]; then
if [ -f ../etc/ACD.admin ]; then
rm ../etc/ACD.admin
fi
fi
echo 'Freeing CorrAppl (5 minutes)'
ssh $USER@bglsn /opt/lofar/bin/stopBGL.py
}
#
# show status of program
#
# arg1 = levelnr
#
status_prog()
{
levelnr=$1
# put here code to figure out the status of your program and
# fill the variables prog and pid with the right information
# e.g.
prog=CorrAppl
# this line should be left in, it shows the status in the right format
#echo ${levelnr} ${prog} ${pid} | awk '{ printf "%s : %-25s %s\n", $1, $2, $3 }'
echo ${levelnr} ${prog} `ssh $USER@bglsn /opt/lofar/bin/stopBGL.py --status=true` | awk '{ printf "%s : %-25s %s\n", $1, $2, $3 }'
}
#
# MAIN
#
# when no argument is given show syntax error.
if [ -z "$1" ]; then
SyntaxError
fi
# first power down to this level
case $1 in
start) start_prog
;;
stop) stop_prog
;;
status) status_prog $2
;;
*) SyntaxError
;;
esac
#!/usr/bin/env python
import math
import time
import datetime
import os
import sys
import copy
class CS1_Parset(object):
def __init__(self):
self.stationList = list()
self.parameters = dict()
def readFromFile(self, fileName):
lastline = ''
for line in open(fileName, 'r').readlines():
lastline = lastline + line.split('#')[0]
lastline = lastline.rstrip()
if len(lastline) > 0 and lastline[-1] == '\\':
lastline = lastline[:-1]
elif '=' in lastline:
key, value = lastline.split('=')
self.parameters[key.strip()] = value.strip()
lastline = ''
def writeToFile(self, fileName):
outf = open(fileName, 'w')
items = self.parameters.items()
items.sort()
for key, value in items:
outf.write(key + ' = ' + str(value) + '\n')
outf.close()
def __contains__(self, key):
return key in self.parameters
def __setitem__(self, key, value):
self.parameters[key] = value
def __getitem__(self, key):
return self.parameters[key]
def getInt32Vector(self, key):
ln = self.parameters[key]
ln_tmp = ln.split('[')
line = ln_tmp[1].split(']')
return [int(lp) for lp in line[0].split(',')]
def getInt32(self, key):
return int(self.parameters[key])
def getStringVector(self, key):
line = self.parameters[key]
line.strip('[').strip(']')
return line.split(',')
def getString(self, key):
return self.parameters[key]
def getFloat(self, key):
return float(self.parameters[key])
def getBool(self, key):
return self.parameters[key] == 'true'
class ClusterSlave(object):
def __init__(self, intName, extIP):
self.intName = intName
self.extIP = extIP
def getIntName(self):
return self.intName
def getExtIP(self):
return self.extIP
class ClusterFEN(object):
def __init__(self, name, address, slaves = list()):
self.slaves = slaves
#Host.__init__(self, name, address)
def getSlaves(self, number = None):
return self.slaves[0:number]
def setSlaves(self, slaves):
self.slaves = slaves
def setSlavesByPattern(self, intNamePattern, extIPPattern, numberRange):
self.slaves = list()
for number in numberRange:
self.slaves.append(ClusterSlave(intNamePattern % number, extIPPattern % number))
def parseStationList():
"""
pattern = '^CS010_dipole0|CS010_dipole4|CS010_dipole8|CS010_dipole12| \
CS008_dipole0|CS008_dipole4|CS008_dipole8|CS008_dipole12| \
CS001_dipole0|CS001_dipole4|CS001_dipole8|CS001_dipole12| \
CS016_dipole0|CS016_dipole4|CS016_dipole8|CS016_dipole12$'
print 'pattern = ' + str(re.search(pattern, 'CS010_dipole8'))
"""
def getInputNodes(stationList, parset):
inputNodelist = list()
for s in stationList:
s = s.strip(" ")
s = s.strip("[ ]")
s = s.strip("'")
name = parset.getString('PIC.Core.' + s + '.port')
name=name.split(":")
name=name[0].strip("lii")
inputNodelist.append(int(name))
return inputNodelist
if __name__ == '__main__':
# create the parset
parset = CS1_Parset()
stationList = list()
if os.path.exists("../share/Correlator.parset"):
parset.readFromFile('../share/Correlator.parset')
#read keys from parset file: OLAP.parset
if os.path.exists("OLAP.parset"):
parset.readFromFile('OLAP.parset')
else:
print 'file OLAP.parset does not exist!'
sys.exit(0)
'''
#set start/stop time
if parset.getString('OLAP.OLAP_Conn.station_Input_Transport') == 'NULL':
# Read from memory!
parset['Observation.startTime'] = datetime.datetime.fromtimestamp(1)
else:
#start=int(time.asctime(time.gmtime()))+ 90
start=int(time.time() + 90)
#parset['Observation.startTime'] = datetime.datetime.fromtimestamp(start)
parset['Observation.startTime'] = datetime.datetime.utcfromtimestamp(start)
duration = 300
parset['Observation.stopTime'] = datetime.datetime.utcfromtimestamp(start + duration)
'''
if parset.getString('OLAP.OLAP_Conn.input_BGLProc_Transport') == 'Null':
parset['OLAP.OLAP_Conn.input_BGLProc_Transport'] = 'NULL'
if parset.getString('OLAP.OLAP_Conn.station_Input_Transport') == 'Null':
parset['OLAP.OLAP_Conn.station_Input_Transport'] = 'NULL'
if parset.getString('OLAP.OLAP_Conn.BGLProc_Storage_Transport') == 'Null':
parset['OLAP.OLAP_Conn.BGLProc_Storage_Transport'] = 'NULL'
if not parset.getBool('OLAP.BGLProc.useZoid'): # override CS1.parset
print 'ZOID!!!!'
parset['OLAP.IONProc.useScatter'] = 'false'
parset['OLAP.IONProc.useGather'] = 'false'
parset['OLAP.BGLProc.nodesPerPset'] = 8
parset['OLAP.IONProc.maxConcurrentComm'] = 2
BGLPartition = ('R000_128_0', 'R000_128_0Z')[parset.getBool('OLAP.BGLProc.useZoid')]
parset['CorrAppl.Correlator.partition'] = BGLPartition
if parset.getBool('OLAP.IONProc.useGather'):
print 'useGather!!!!'
#parset['OLAP.IONProc.integrationSteps'] = integrationTime
parset['OLAP.StorageProc.integrationSteps'] = 1
else:
parset['OLAP.IONProc.integrationSteps'] = 1
#parset['OLAP.StorageProc.integrationSteps'] = integrationTime
if parset.getInt32('Observation.sampleClock') == 160:
parset['OLAP.BGLProc.integrationSteps'] = 608
elif parset.getInt32('Observation.sampleClock') == 200:
parset['OLAP.BGLProc.integrationSteps'] = 768
#get the stations
stationList = parset.getStringVector('OLAP.storageStationNames')
parset['OLAP.nrRSPboards'] = len(stationList)
#create input cluster objects
liifen = ClusterFEN(name = 'liifen', address = '129.125.99.51')
liifen.setSlavesByPattern('lii%03d', '10.162.0.%d', [1,2,3,4,5,6,7,8,9,10,11,12])
#set keys 'Input.InputNodes' and 'Input.OutputNodes'
nSubbands = len(parset.getInt32Vector('Observation.subbandList'))
nSubbandsPerCell = parset.getInt32('OLAP.subbandsPerPset') * parset.getInt32('OLAP.BGLProc.psetsPerCell')
nCells = float(nSubbands) / float(nSubbandsPerCell)
if not nSubbands % nSubbandsPerCell == 0:
raise Exception('Not a integer number of compute cells (nSubbands = %d and nSubbandsPerCell = %d)' % (nSubbands, nSubbandsPerCell))
nCells = int(nCells)
host = copy.deepcopy(liifen)
slaves = host.getSlaves()
inputNodes = getInputNodes(stationList, parset)
outputNodes = range(1, nCells + 1)
allNodes = inputNodes + [node for node in outputNodes if not node in inputNodes]
inputIndices = range(len(inputNodes))
outputIndices = [allNodes.index(node) for node in outputNodes]
newslaves = [slaves[ind - 1] for ind in allNodes]
host.setSlaves(newslaves)
noProcesses = len(newslaves)
parset['Input.InputNodes'] = inputIndices
parset['Input.OutputNodes'] = outputIndices
bglprocIPs = [newslaves[j].getExtIP() for j in outputIndices]
parset['OLAP.OLAP_Conn.input_BGLProc_ServerHosts'] = '[' + ','.join(bglprocIPs) + ']'
#create output cluster objects
listfen = ClusterFEN(name = 'listfen', address = '129.125.99.50')
listfen.setSlavesByPattern('list%03d', '10.181.0.%d', [1,2])
#set key 'Connections.BGLProc_Storage.ServerHosts'
nSubbandsPerCellStorage = parset.getInt32('OLAP.subbandsPerPset')
nPsetsPerStorage = parset.getInt32('OLAP.psetsPerStorage')
if not nSubbands % (nSubbandsPerCellStorage * nPsetsPerStorage) == 0:
raise Exception('Not a integer number of subbands per storage node!')
noProcessesStorage = nSubbands / (nSubbandsPerCellStorage * nPsetsPerStorage)
host = copy.deepcopy(listfen)
storageIPs = [s.getExtIP() for s in host.getSlaves(noProcessesStorage)]
parset['OLAP.OLAP_Conn.BGLProc_Storage_ServerHosts'] = '[' + ','.join(storageIPs) + ']'
parset.writeToFile('./CS1_BGL_Processing.parset')
else:
print 'file ../share/Correlator.parset does not exist!'
sys.exit(0)
# startBGL.sh jobName partition executable workingDir paramfile noNodes
#
# jobName
# partition
# executable executable file (should be in a place that is readable from BG/L)
# workingDir directory for output files (should be readable by BG/L)
# parameterfile jobName.ps
# noNodes number of nodes of the partition to use
#
# start the job and stores the jobID in jobName.jobID
#
# all ACC processes expect to be started with "ACC" as first parameter
# start process
if [ -f ../share/Correlator.parset ]
then
echo "../share/Correlator.parset file exist"
else
echo "Sorry, ../share/Correlator.parset file does not exist"
fi
./prepare_$3.py
cd $4; mpirun -partition $2 -mode VN -label -cwd $4 $4/$3 $4/CS1_BGL_Processing.parset 5422 >> ../log/$3.log 2>&1 &
#!/usr/bin/env python
import socket
import os
import sys
import time
from optparse import OptionParser
host = 'localhost'
port = 32031
replyformat = 0
class switch(object):
def __init__(self, value):
self.value = value
self.fall = False
def __iter__(self):
"""Return the match method once, then stop"""
yield self.match
raise StopIteration
def match(self, *args):
"""Indicate whether or not to enter a case suite"""
if self.fall or not args:
return True
elif self.value in args: # changed for v1.5, see below
self.fall = True
return True
else:
return False
################################################################
# mmcs_cmd(szSocket, szCmd)
#
# mmcs_cmd -- send an mmcs command and gather and check the response.
# inputs:
# param0 -- remote tcp port to send command to.
# param1 -- command string
# outputs:
# return values in a list.
#
def mmcs_cmd (socket, szCmd):
lines = list()
socket.send(szCmd) # execute the command.
if replyformat == 0:
results = socket.recv(1024) # read the result...
lines.append(results.rstrip("\n")) # get rid of lf at end.
else:
file = socket.makefile()
for line in file:
if line.find('\0') >= 0:
break
lines.append(line.rstrip("\n")) # get rid of lf at end.
return lines
def set_replyformat(socket, rformat):
results = mmcs_cmd(socket, 'replyformat ' + str(rformat) + '\n')
if results[0] != 'OK':
print 'set replyformat:' + str(rformat) + ' ...failed'
global replyformat
replyformat = rformat
def list_jobs(socket):
set_replyformat(socket, 1)
return mmcs_cmd(socket, 'list_jobs\n')
def list_blocks(socket):
set_replyformat(socket, 1)
return mmcs_cmd(socket, 'list_blocks\n')
def jobId(socket):
results = list_jobs(socket)
for line in results:
if line.find(options.blockid) >= 0:
return line.split()[0]
def free_block(socket):
set_replyformat(socket, 0)
results = mmcs_cmd(socket, 'free ' + options.blockid + '\n');
if results[0] != 'OK':
print 'free \'%s\' ' % options.blockid + ' ...failed'
def killjob(socket):
set_replyformat(socket, 0)
jobid = jobId(socket)
results = mmcs_cmd(socket, 'killjob ' + options.blockid + ' ' + jobid + '\n');
if results[0] != 'OK':
print 'killjob ' + options.blockid + jobid + ' ...failed'
def partition_exist(socket):
results = list_blocks(socket)
for line in results:
if line.find(options.blockid) >= 0 and line.find(options.user) >= 0:
return True
return False
def block_status(socket):
set_replyformat(socket, 1)
results = list_blocks(socket)
for line in results:
if line.find(options.blockid) >= 0:
return line.split()[1]
def job_status(socket):
set_replyformat(socket, 1)
results = list_jobs(socket)
for line in results:
if line.find(options.blockid) >= 0:
return line.split()[1]
def show_block_status(socket):
set_replyformat(socket, 1)
results = list_jobs(socket)
for line in results:
if line.find(options.blockid) >= 0:
print line
#
# Start of mainline
#
if __name__ == '__main__':
parser = OptionParser()
parser.add_option('--user' , dest='user' , default=os.environ.get('USER', 'default'), type='string', help='username [%default]')
parser.add_option('--blockid' , dest='blockid' , default='R000_128_0Z' , type='string', help='name of the blockid [%default]')
parser.add_option('--status' , dest='status' , default='false' , type='string', help='Show status of the blockid ')
# parse the options
(options, args) = parser.parse_args()
remote = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
remote.connect((host, port))
results = mmcs_cmd(remote, 'set_username ' + options.user + '\n');
if results[0] != 'OK':
print 'set_username ' + options.user + ' ...failed'
if options.status == 'true':
if partition_exist(remote):
print 'UP'
else:
print 'DOWN'
sys.exit(0)
partitionExist = partition_exist(remote)
bl_stat = ''
job_stat = ''
while (partitionExist):
status_block = block_status(remote)
if status_block != bl_stat:
print 'Block %s' % options.blockid + ' status: ' + str(status_block)
bl_stat = status_block
for case in switch(status_block):
if case('A'):
free_block(remote)
break
if case('B'):
free_block(remote)
break
if case('D'):
break
if case('T'):
break
if case('I'):
status_job = job_status(remote)
if status_job != job_stat:
print 'Job status: ' + str(status_job)
job_stat = status_job
for case in switch(status_job):
if case('S'):
killjob(remote)
break
if case('R'):
killjob(remote)
break
if case('D'):
break
break
if case(): # default, could also just omit condition or 'if True'
print "something else!"
# No need to break here, it'll stop anyway
time.sleep(2)
partitionExist = partition_exist(remote)
remote.close()
sys.exit(0)
# stopAP.sh partition jobName
#
# partition BG/L partition the job is running on
# jobName The name of the job
#
ssh $USER@bglsn /opt/lofar/bin/stopBGL.py --blockid=$1
#
# swlevel.conf
#
# Table to manage the progrma that should be started and stopped
# level : up : down : root : mpi : program
#
1:u:d:::ACDaemon
6::d:::ApplController
6::d:::CorrAppl
//# Always <lofar_config.h> first!
#include <lofar_config.h>
#include <AsyncCommunication.h>
#include <Common/Timer.h>
#include <cassert>
#include <map>
#include <iostream>
#define USE_TIMING 0
namespace LOFAR {
namespace RTCP {
#if defined HAVE_MPI
AsyncCommunication::AsyncCommunication(MPI_Comm comm)
{
itsCommunicator = comm;
itsCurrentReadHandle = 0;
itsCurrentWriteHandle = 0;
}
AsyncCommunication::~AsyncCommunication()
{
}
// returns handle to this read
int AsyncCommunication::asyncRead(void* buf, unsigned size, unsigned source, int tag)
{
AsyncRequest* req = new AsyncRequest();
int res = MPI_Irecv(buf, size, MPI_BYTE, source, tag, itsCommunicator, &req->mpiReq);
if (res != MPI_SUCCESS) {
std::cerr << "MPI_Irecv() failed" << std::endl;
exit(1);
}
req->buf = buf;
req->size = size;
req->rank = source;
req->tag = tag;
int handle = itsCurrentReadHandle++;
itsReadHandleMap[handle] = req;
return handle;
}
// returns handle to this write
int AsyncCommunication::asyncWrite(const void* buf, unsigned size, unsigned dest, int tag)
{
AsyncRequest* req = new AsyncRequest();
int res = MPI_Isend((void*)buf, size, MPI_BYTE, dest, tag, itsCommunicator, &req->mpiReq);
if (res != MPI_SUCCESS) {
std::cerr << "MPI_Isend() failed" << std::endl;
exit(1);
}
req->buf = (void*)buf;
req->size = size;
req->rank = dest;
req->tag = tag;
int handle = itsCurrentWriteHandle++;
itsWriteHandleMap[handle] = req;
return handle;
}
void AsyncCommunication::waitForRead(int handle)
{
AsyncRequest* req = itsReadHandleMap[handle];
MPI_Status status;
int res = MPI_Wait(&req->mpiReq, &status);
if (res != MPI_SUCCESS) {
std::cerr << "MPI_Wait() failed" << std::endl;
exit(1);
}
// done, now remove from map, and free req
itsReadHandleMap.erase(handle);
delete req;
}
void AsyncCommunication::waitForWrite(int handle)
{
AsyncRequest* req = itsWriteHandleMap[handle];
MPI_Status status;
int res = MPI_Wait(&req->mpiReq, &status);
if (res != MPI_SUCCESS) {
std::cerr << "MPI_Wait() failed" << std::endl;
exit(1);
}
// done, now remove from map, and free req
itsWriteHandleMap.erase(handle);
delete req;
}
// returns the handle of the read that was done.
int AsyncCommunication::waitForAnyRead(void*& buf, unsigned& size, unsigned& source, int& tag)
{
MPI_Status status;
int count = itsReadHandleMap.size();
MPI_Request reqs[count];
int mapping[count];
int i = 0;
for (std::map<int, AsyncRequest*>::const_iterator it = itsReadHandleMap.begin(); it != itsReadHandleMap.end(); it++) {
int handle = it->first;
AsyncRequest* r = it->second;
reqs[i] = r->mpiReq;
mapping[i] = handle;
i++;
}
NSTimer waitAnyTimer("MPI_Waitany", USE_TIMING);
waitAnyTimer.start();
int index = -1;
int res = MPI_Waitany(count, reqs, &index, &status);
waitAnyTimer.stop();
if (res != MPI_SUCCESS) {
std::cerr << "MPI_Waitany() failed" << std::endl;
exit(1);
}
int handle = mapping[index];
AsyncRequest* req = itsReadHandleMap[handle];
buf = req->buf;
size = req->size;
source = req->rank;
tag = req->tag;
itsReadHandleMap.erase(handle);
delete req;
return handle;
}
void AsyncCommunication::waitForAllReads()
{
int count = itsReadHandleMap.size();
MPI_Request reqs[count];
MPI_Status status[count];
int i = 0;
for (std::map<int, AsyncRequest*>::const_iterator it = itsReadHandleMap.begin(); it != itsReadHandleMap.end(); it++) {
AsyncRequest* r = it->second;
reqs[i] = r->mpiReq;
i++;
}
int res = MPI_Waitall(count, reqs, status);
if (res != MPI_SUCCESS) {
std::cerr << "MPI_Waitall() failed" << std::endl;
exit(1);
}
for (std::map<int, AsyncRequest*>::const_iterator it = itsReadHandleMap.begin(); it != itsReadHandleMap.end(); it++) {
AsyncRequest* r = it->second;
delete r;
}
itsReadHandleMap.clear();
itsCurrentReadHandle = 0;
}
void AsyncCommunication::waitForAllWrites()
{
int count = itsWriteHandleMap.size();
MPI_Request reqs[count];
MPI_Status status[count];
int i = 0;
for (std::map<int, AsyncRequest*>::const_iterator it = itsWriteHandleMap.begin(); it != itsWriteHandleMap.end(); it++) {
AsyncRequest* r = it->second;
reqs[i] = r->mpiReq;
i++;
}
int res = MPI_Waitall(count, reqs, status);
if (res != MPI_SUCCESS) {
std::cerr << "MPI_Waitall() failed" << std::endl;
exit(1);
}
for (std::map<int, AsyncRequest*>::const_iterator it = itsWriteHandleMap.begin(); it != itsWriteHandleMap.end(); it++) {
AsyncRequest* r = it->second;
delete r;
}
itsWriteHandleMap.clear();
itsCurrentWriteHandle = 0;
}
#endif // HAVE_MPI
} // namespace RTCPs
} // namespace LOFAR
#ifndef LOFAR_CNPROC_ASYNC_COMMUNICATION_H
#define LOFAR_CNPROC_ASYNC_COMMUNICATION_H
#if defined HAVE_MPI
#define MPICH_IGNORE_CXX_SEEK
#include <mpi.h>
#endif
#include <map>
namespace LOFAR {
namespace RTCP {
#if defined HAVE_MPI
class AsyncRequest {
public:
MPI_Request mpiReq;
void* buf;
unsigned size;
unsigned rank;
int tag;
};
class AsyncCommunication {
public:
AsyncCommunication(MPI_Comm communicator = MPI_COMM_WORLD);
~AsyncCommunication();
// returns handle to this read
int asyncRead(void* buf, unsigned size, unsigned source, int tag);
// returns handle to this write
int asyncWrite(const void* buf, unsigned size, unsigned dest, int tag);
void waitForRead(int handle);
void waitForWrite(int handle);
// returns the handle of the read that was done.
int waitForAnyRead(void*& buf, unsigned& size, unsigned& source, int& tag);
void waitForAllReads();
void waitForAllWrites();
private:
MPI_Comm itsCommunicator;
int itsCurrentReadHandle;
int itsCurrentWriteHandle;
std::map<int, AsyncRequest*> itsReadHandleMap;
std::map<int, AsyncRequest*> itsWriteHandleMap;
};
#endif // defined HAVE_MPI
} // namespace RTCP
} // namespace LOFAR
#endif
//# Always #include <lofar_config.h> first!
#include <lofar_config.h>
#include <AsyncTranspose.h>
#include <Interface/CN_Mapping.h>
#include <Interface/PrintVector.h>
#include <cassert>
namespace LOFAR {
namespace RTCP {
#if defined HAVE_MPI
#define MAX_TAG 100000 // The maximum tag we use to represent a data message.
// Higher tags are metadata.
template <typename SAMPLE_TYPE> AsyncTranspose<SAMPLE_TYPE>::AsyncTranspose(bool isTransposeInput, bool isTransposeOutput, unsigned nrCoresPerPset,
const LocationInfo &locationInfo,
const std::vector<unsigned> &inputPsets,
const std::vector<unsigned> &outputPsets, unsigned nrSamplesToCNProc)
:
itsIsTransposeInput(isTransposeInput),
itsIsTransposeOutput(isTransposeOutput),
itsInputPsets(inputPsets),
itsOutputPsets(outputPsets),
itsLocationInfo(locationInfo)
{
itsGroupNumber = -1;
for(unsigned core = 0; core < nrCoresPerPset; core++) {
unsigned rank = locationInfo.remapOnTree(locationInfo.psetNumber(), core);
if(rank == locationInfo.rank()) {
itsGroupNumber = core;
break;
}
}
for(unsigned i=0; i<inputPsets.size(); i++) {
unsigned rank = locationInfo.remapOnTree(inputPsets[i], itsGroupNumber);
itsRankToPsetIndex[rank] = i;
}
itsMessageSize = InputData<SAMPLE_TYPE>::requiredSize(1, nrSamplesToCNProc);
dataHandles.resize(inputPsets.size());
metaDataHandles.resize(inputPsets.size());
itsAsyncComm = new AsyncCommunication();
}
template <typename SAMPLE_TYPE> AsyncTranspose<SAMPLE_TYPE>::~AsyncTranspose()
{
delete itsAsyncComm;
}
template <typename SAMPLE_TYPE> void AsyncTranspose<SAMPLE_TYPE>::postAllReceives(TransposedData<SAMPLE_TYPE> *transposedData)
{
for(unsigned i=0; i<itsInputPsets.size(); i++) {
void* buf = (void*) transposedData->samples[i].origin();
unsigned pset = itsInputPsets[i];
unsigned rank = itsLocationInfo.remapOnTree(pset, itsGroupNumber); // TODO cache this? maybe in locationInfo itself?
dataHandles[i] = itsAsyncComm->asyncRead(buf, itsMessageSize, rank, rank);
metaDataHandles[i] = itsAsyncComm->asyncRead(&transposedData->metaData[i], sizeof(SubbandMetaData), rank, rank + MAX_TAG);
}
}
// returns station number (= pset index)
template <typename SAMPLE_TYPE> unsigned AsyncTranspose<SAMPLE_TYPE>::waitForAnyReceive()
{
void* buf;
unsigned size, source;
int tag;
while(true) {
// This read could return either a data message, or a meta data message.
itsAsyncComm->waitForAnyRead(buf, size, source, tag);
// source is the real rank, calc pset index
unsigned psetIndex = itsRankToPsetIndex[source];
if(tag < MAX_TAG) { // real data message
dataHandles[psetIndex] = -1; // record that we have received the data
if(metaDataHandles[psetIndex] == -1) { // We already have the metadata
return psetIndex;
}
} else { // metadata message
metaDataHandles[psetIndex] = -1; // record that we have received the metadata
if(dataHandles[psetIndex] == -1) {
return psetIndex; // We already have the data
}
}
}
}
template <typename SAMPLE_TYPE> void AsyncTranspose<SAMPLE_TYPE>::asyncSend(unsigned outputPsetNr, const InputData<SAMPLE_TYPE> *inputData)
{
unsigned pset = itsOutputPsets[outputPsetNr];
unsigned rank = itsLocationInfo.remapOnTree(pset, itsGroupNumber);
int tag = itsLocationInfo.rank();
itsAsyncComm->asyncWrite(inputData->samples[outputPsetNr].origin(), itsMessageSize, rank, tag);
itsAsyncComm->asyncWrite(&inputData->metaData[outputPsetNr], sizeof(SubbandMetaData), rank, tag + MAX_TAG);
}
template <typename SAMPLE_TYPE> void AsyncTranspose<SAMPLE_TYPE>::waitForAllSends()
{
// this includes the metadata writes...
itsAsyncComm->waitForAllWrites();
}
template class AsyncTranspose<i4complex>;
template class AsyncTranspose<i8complex>;
template class AsyncTranspose<i16complex>;
#endif // MPI
} // namespace RTCP
} // namespace LOFAR
#ifndef LOFAR_CNPROC_ASYNC_TRANSPOSE_H
#define LOFAR_CNPROC_ASYNC_TRANSPOSE_H
#include <AsyncCommunication.h>
#include <InputData.h>
#include <LocationInfo.h>
#include <TransposedData.h>
#include <Interface/SubbandMetaData.h>
#if defined HAVE_MPI
#define MPICH_IGNORE_CXX_SEEK
#include <mpi.h>
#endif
#if defined HAVE_BGL
#include <bglpersonality.h>
#endif
#include <vector>
namespace LOFAR {
namespace RTCP {
#if defined HAVE_MPI
// Nodes in input psets read outputPsets.size subbands from their I/O node (one by one).
// Cores communicate with the same logical core number in another pset
// (due to an extra mapping, this is not the physical core number).
// # sends = size outputPsets (= nrSubbands) on the input nodes.
// # recvs = size inputPsets (= nrStations) on the output nodes.
// Only the output nodes are actually calculating (filtering and correlating).
template <typename SAMPLE_TYPE> class AsyncTranspose
{
public:
AsyncTranspose(bool isTransposeInput, bool isTransposeOutput, unsigned nrCoresPerPset, const LocationInfo &,
const std::vector<unsigned> &inputPsets, const std::vector<unsigned> &outputPsets, unsigned nrSamplesToCNProc);
~AsyncTranspose();
// Post all async receives for the transpose.
void postAllReceives(TransposedData<SAMPLE_TYPE> *transposedData);
// Wait for a data message. Returns the station number where the message originates.
unsigned waitForAnyReceive();
// Asynchronously send a subband.
void asyncSend(unsigned outputPsetNr, const InputData<SAMPLE_TYPE> *inputData);
// Make sure all async sends have finished.
void waitForAllSends();
private:
bool itsIsTransposeInput, itsIsTransposeOutput;
// the size of a data message
unsigned itsMessageSize;
// A mapping that tells us, if we receive a message from a source,
// to which pset that source belongs.
std::map<unsigned, unsigned> itsRankToPsetIndex;
AsyncCommunication* itsAsyncComm;
const std::vector<unsigned> &itsInputPsets;
const std::vector<unsigned> &itsOutputPsets;
const LocationInfo &itsLocationInfo;
// Two maps that contain the handles to the asynchronous reads.
// The maps are indexed by the inputPset index.
// The value is -1 if the read finished.
std::vector<int> dataHandles;
std::vector<int> metaDataHandles;
// The number of the transpose group we belong to.
// The cores with the same index in a pset together form a group.
unsigned itsGroupNumber;
};
#endif // defined HAVE_MPI
} // namespace RTCP
} // namespace LOFAR
#endif
This diff is collapsed.
#ifndef LOFAR_CNPROC_BANDPASS_H
#define LOFAR_CNPROC_BANDPASS_H
namespace LOFAR {
namespace RTCP {
class BandPass {
public:
BandPass(bool correct, unsigned nrChannels);
~BandPass();
const float *correctionFactors() const;
private:
void computeCorrectionFactors(unsigned nrChannels);
static const float stationFilterConstants[65536];
float *factors;
};
inline const float *BandPass::correctionFactors() const
{
return factors;
}
} // namespace RTCP
} // namespace LOFAR
#endif
# Configure the rootLogger
log4cplus.rootLogger=INFO, STDOUT
log4cplus.LCS.Common=INFO, STDOUT
log4cplus.logger.TRC=TRACE, NOLOG
log4cplus.logger.TRC.additivity=false
log4cplus.additivity=false
# Define the STDOUT appender
log4cplus.appender.STDOUT=log4cplus::ConsoleAppender
log4cplus.appender.STDOUT.Threshhold=TRACE2
log4cplus.appender.STDOUT.layout=log4cplus::PatternLayout
log4cplus.appender.STDOUT.layout.ConversionPattern=%-5p [%x]%c{3} - %m%n
log4cplus.appender.STDOUT.logToStdErr=false
log4cplus.appender.STDOUT.ImmediateFlush=true
log4cplus.appender.NOLOG=log4cplus::NullAppender
//# CN_Processing.cc: Blue Gene processing for 1 second of sampled data
//#
//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
//#
//# This program is free software; you can redistribute it and/or modify
//# it under the terms of the GNU General Public License as published by
//# the Free Software Foundation; either version 2 of the License, or
//# (at your option) any later version.
//#
//# This program is distributed in the hope that it will be useful,
//# but WITHOUT ANY WARRANTY; without even the implied warranty of
//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//# GNU General Public License for more details.
//#
//# You should have received a copy of the GNU General Public License
//# along with this program; if not, write to the Free Software
//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//#
//# $Id$
//# Always #include <lofar_config.h> first!
#include <lofar_config.h>
//# Includes
#include <CN_Processing.h>
#include <CorrelatorAsm.h>
#include <FIR_Asm.h>
#include <Common/Timer.h>
#include <Interface/CN_Configuration.h>
#include <Interface/CN_Mapping.h>
#include <cassert>
#include <complex>
#include <cmath>
#include <iomanip>
#include <iostream>
#include <map>
#if defined HAVE_BGP
#include <common/bgp_personality_inlines.h>
#include <spi/kernel_interface.h>
#endif
#if defined HAVE_ZOID && (defined HAVE_BGL || defined HAVE_BGP)
extern "C" {
#include <lofar.h>
}
#endif
#if (defined HAVE_BGP || defined HAVE_BGL)
#define LOG_CONDITION (itsLocationInfo.rankInPset() == 0)
//#define LOG_CONDITION (itsLocationInfo.rank() == 0)
//#define LOG_CONDITION 1
#else
#define LOG_CONDITION 1
#endif
namespace LOFAR {
namespace RTCP {
#if !defined HAVE_MASS
inline static dcomplex cosisin(double x)
{
return makedcomplex(cos(x), sin(x));
}
#endif
//static NSTimer transposeTimer("transpose()", true); // Unused --Rob
static NSTimer computeTimer("computing", true);
static NSTimer totalProcessingTimer("global total processing", true);
CN_Processing_Base::~CN_Processing_Base()
{
}
template <typename SAMPLE_TYPE> CN_Processing<SAMPLE_TYPE>::CN_Processing(Stream *str, const LocationInfo &locationInfo)
:
itsStream(str),
itsLocationInfo(locationInfo),
itsInputData(0),
itsTransposedData(0),
itsFilteredData(0),
itsCorrelatedData(0),
#if defined HAVE_BGL || defined HAVE_BGP
itsDoAsyncCommunication(false),
itsTranspose(0),
itsAsyncTranspose(0),
#endif
itsPPF(0),
itsCorrelator(0)
{
memset(itsArenas, 0, sizeof itsArenas);
// #if defined HAVE_BGL
// getPersonality();
// #endif
#if defined HAVE_ZOID && (defined HAVE_BGL || defined HAVE_BGP)
initIONode();
#endif
}
template <typename SAMPLE_TYPE> CN_Processing<SAMPLE_TYPE>::~CN_Processing()
{
}
#if 0
//#if defined HAVE_BGL
struct Location {
unsigned pset, rankInPset;
};
template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::getPersonality()
{
int retval = rts_get_personality(&itsPersonality, sizeof itsPersonality);
assert(retval == 0);
if (itsLocationInfo.rank() == 0)
std::clog << "topology = ("
<< itsPersonality.getXsize() << ','
<< itsPersonality.getYsize() << ','
<< itsPersonality.getZsize() << "), torus wraparound = ("
<< (itsPersonality.isTorusX() ? 'T' : 'F') << ','
<< (itsPersonality.isTorusY() ? 'T' : 'F') << ','
<< (itsPersonality.isTorusZ() ? 'T' : 'F') << ')'
<< std::endl;
itsRankInPset = itsPersonality.rankInPset() + itsPersonality.numNodesInPset() * (itsLocationInfo.rank() / itsPersonality.numComputeNodes());
Location myLocation = {
itsPersonality.getPsetNum(), itsRankInPset
};
std::vector<Location> allLocations(itsLocationInfo.nrNodes());
MPI_Gather(&myLocation, 2, MPI_INT, &allLocations[0], 2, MPI_INT, 0, MPI_COMM_WORLD);
if (itsLocationInfo.rank() == 0) {
unsigned nrCoresPerPset = itsPersonality.numNodesInPset() * (itsPersonality.isVirtualNodeMode() ? 2 : 1);
std::vector<std::vector<unsigned> > cores(itsPersonality.numPsets(), std::vector<unsigned>(nrCoresPerPset));
for (unsigned rank = 0; rank < allLocations.size(); rank ++)
cores[allLocations[rank].pset][allLocations[rank].rankInPset] = rank;
// for (unsigned pset = 0; pset < itsPersonality.numPsets(); pset ++)
// std::clog << "pset " << pset << " contains cores " << cores[pset] << std::endl;
}
}
#endif
#if defined HAVE_ZOID && (defined HAVE_BGL || defined HAVE_BGP)
void CN_Processing<SAMPLE_TYPE>::initIONode() const
{
// one of the compute cores in each Pset has to initialize its I/O node
if (itsLocationInfo.rankInPset() == 0) {
std::vector<size_t> lengths;
for (int arg = 0; original_argv[arg] != 0; arg ++) {
std::clog << "adding arg " << original_argv[arg] << std::endl;
lengths.push_back(strlen(original_argv[arg]) + 1);
}
std::clog << "calling lofar_init(..., ..., " << lengths.size() << ")" << std::endl;
lofar_init(original_argv, &lengths[0], lengths.size());
}
}
#endif
#if 0
template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::checkConsistency(Parset *parset) const
{
ASSERT(parset->nrPPFTaps() == NR_TAPS);
ASSERT(parset->getInt32("Observation.nrPolarisations") == NR_POLARIZATIONS);
#if !defined C_IMPLEMENTATION
ASSERT(parset->CNintegrationSteps() % 16 == 0);
ASSERT(_FIR_constants_used.nr_taps == NR_TAPS);
ASSERT(_FIR_constants_used.nr_polarizations == NR_POLARIZATIONS);
#endif
#if defined HAVE_BGL
unsigned physicalCoresPerPset = itsPersonality.numNodesInPset();
if (itsPersonality.isVirtualNodeMode())
physicalCoresPerPset *= 2;
ASSERTSTR(parset->nrCoresPerPset() <= physicalCoresPerPset, "too many cores per pset specified");
ASSERTSTR(parset->nrPsets() <= itsPersonality.numPsets(), "not enough psets available");
#endif
}
#endif
#if defined HAVE_MPI
template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::printSubbandList() const
{
std::clog << "node " << itsLocationInfo.rank() << " filters and correlates subbands ";
unsigned sb = itsCurrentSubband;
do {
std::clog << (sb == itsCurrentSubband ? '[' : ',') << sb;
if ((sb += itsSubbandIncrement) >= itsLastSubband)
sb -= itsLastSubband - itsFirstSubband;
} while (sb != itsCurrentSubband);
std::clog << ']' << std::endl;
}
#endif // HAVE_MPI
template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::preprocess(CN_Configuration &configuration)
{
//checkConsistency(parset); TODO
// #if defined HAVE_BGL
// unsigned usedCoresPerPset = configuration.nrUsedCoresPerPset();
// unsigned myPset = itsPersonality.getPsetNum();
// unsigned myCore = CN_Mapping::reverseMapCoreOnPset(itsRankInPset, myPset);
#if defined HAVE_BGL || HAVE_BGP
unsigned usedCoresPerPset = configuration.nrUsedCoresPerPset();
unsigned myPset = itsLocationInfo.psetNumber();
unsigned myCore = CN_Mapping::reverseMapCoreOnPset(itsLocationInfo.rankInPset(), myPset);
#else
unsigned usedCoresPerPset = 1;
unsigned myPset = 0;
unsigned myCore = 0;
#endif
std::vector<unsigned> &inputPsets = configuration.inputPsets();
std::vector<unsigned> &outputPsets = configuration.outputPsets();
std::vector<unsigned> &tabList = configuration.tabList();
#if defined HAVE_BGP || defined HAVE_BGL
if(!itsDoAsyncCommunication) {
Transpose<SAMPLE_TYPE>::getMPIgroups(usedCoresPerPset, itsLocationInfo, inputPsets, outputPsets);
}
#endif
std::vector<unsigned>::const_iterator inputPsetIndex = std::find(inputPsets.begin(), inputPsets.end(), myPset);
std::vector<unsigned>::const_iterator outputPsetIndex = std::find(outputPsets.begin(), outputPsets.end(), myPset);
itsIsTransposeInput = inputPsetIndex != inputPsets.end();
itsIsTransposeOutput = outputPsetIndex != outputPsets.end();
itsNrStations = configuration.nrStations();
itsOutputPsetSize = outputPsets.size();
unsigned nrBaselines = itsNrStations * (itsNrStations + 1) / 2;
unsigned nrChannels = configuration.nrChannelsPerSubband();
unsigned nrSamplesPerIntegration = configuration.nrSamplesPerIntegration();
unsigned nrSamplesToCNProc = configuration.nrSamplesToCNProc();
// Each phase (e.g., transpose, PPF, correlator) reads from an input data
// set and writes to an output data set. To save memory, two memory buffers
// are used, and consecutive phases alternately use one of them as input
// buffer and the other as output buffer.
// Since each buffer (arena) in used multiple times, we use multiple
// Allocators for a single arena, but the Allocators are hidden in the
// implementations of InputData, TransposedData, etc.
size_t inputDataSize = itsIsTransposeInput ? InputData<SAMPLE_TYPE>::requiredSize(outputPsets.size(), nrSamplesToCNProc) : 0;
size_t transposedDataSize = itsIsTransposeOutput ? TransposedData<SAMPLE_TYPE>::requiredSize(itsNrStations, nrSamplesToCNProc) : 0;
size_t filteredDataSize = itsIsTransposeOutput ? FilteredData::requiredSize(itsNrStations, nrChannels, nrSamplesPerIntegration) : 0;
size_t correlatedDataSize = itsIsTransposeOutput ? CorrelatedData::requiredSize(nrBaselines, nrChannels) : 0;
itsArenas[0] = new MallocedArena(inputDataSize, 32);
itsArenas[1] = new MallocedArena(std::max(transposedDataSize, correlatedDataSize), 32);
itsArenas[2] = new MallocedArena(filteredDataSize, 32);
if (itsIsTransposeInput) {
itsInputData = new InputData<SAMPLE_TYPE>(*itsArenas[0], outputPsets.size(), nrSamplesToCNProc);
}
if (itsIsTransposeOutput) {
unsigned nrSubbandsPerPset = configuration.nrSubbandsPerPset();
unsigned logicalNode = usedCoresPerPset * (outputPsetIndex - outputPsets.begin()) + myCore;
// TODO: logicalNode assumes output psets are consecutively numbered
itsCenterFrequencies = configuration.refFreqs();
itsFirstSubband = (logicalNode / usedCoresPerPset) * nrSubbandsPerPset;
itsLastSubband = itsFirstSubband + nrSubbandsPerPset;
itsCurrentSubband = itsFirstSubband + logicalNode % usedCoresPerPset % nrSubbandsPerPset;
itsSubbandIncrement = usedCoresPerPset % nrSubbandsPerPset;
#if defined HAVE_MPI
printSubbandList();
#endif // HAVE_MPI
itsTransposedData = new TransposedData<SAMPLE_TYPE>(*itsArenas[1], itsNrStations, nrSamplesToCNProc);
itsFilteredData = new FilteredData(*itsArenas[2], itsNrStations, nrChannels, nrSamplesPerIntegration);
itsCorrelatedData = new CorrelatedData(*itsArenas[1], nrBaselines, nrChannels);
itsPPF = new PPF<SAMPLE_TYPE>(itsNrStations, nrChannels, nrSamplesPerIntegration, configuration.sampleRate() / nrChannels, configuration.delayCompensation());
itsCorrelator = new Correlator(itsNrStations, nrChannels, nrSamplesPerIntegration, configuration.correctBandPass());
}
#if defined HAVE_MPI
if (itsIsTransposeInput || itsIsTransposeOutput) {
if(itsDoAsyncCommunication) {
itsAsyncTranspose = new AsyncTranspose<SAMPLE_TYPE>(itsIsTransposeInput, itsIsTransposeOutput,
usedCoresPerPset, itsLocationInfo, inputPsets, outputPsets, nrSamplesToCNProc);
} else {
itsTranspose = new Transpose<SAMPLE_TYPE>(itsIsTransposeInput, itsIsTransposeOutput, myCore);
itsTranspose->setupTransposeParams(itsLocationInfo, inputPsets, outputPsets, itsInputData, itsTransposedData);
}
}
#endif // HAVE_MPI
}
template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::process()
{
totalProcessingTimer.start();
NSTimer totalTimer("total processing", LOG_CONDITION);
totalTimer.start();
#if defined HAVE_MPI
if(itsDoAsyncCommunication) {
if (itsIsTransposeInput) {
itsInputData->readMetaData(itsStream); // sync read the meta data
}
if(itsIsTransposeOutput) {
NSTimer postAsyncReceives("post async receives", LOG_CONDITION);
postAsyncReceives.start();
itsAsyncTranspose->postAllReceives(itsTransposedData);
postAsyncReceives.stop();
}
}
#endif // HAVE_MPI
if (itsIsTransposeInput) {
#if defined HAVE_MPI
if (LOG_CONDITION)
std::clog << std::setprecision(12) << "core " << itsLocationInfo.rank() << ": start reading at " << MPI_Wtime() << '\n';
#endif // HAVE_MPI
static NSTimer readTimer("receive timer", true);
#if defined HAVE_MPI
if(itsDoAsyncCommunication) {
NSTimer asyncSendTimer("async send", LOG_CONDITION);
for(unsigned i=0; i<itsOutputPsetSize; i++) {
readTimer.start();
itsInputData->readOne(itsStream); // Synchronously read 1 subband from my IO node.
readTimer.stop();
asyncSendTimer.start();
itsAsyncTranspose->asyncSend(i, itsInputData); // Asynchronously send one subband to another pset.
asyncSendTimer.stop();
}
} else { // Synchronous
readTimer.start();
itsInputData->read(itsStream);
readTimer.stop();
}
#else // NO MPI
readTimer.start();
itsInputData->read(itsStream);
readTimer.stop();
#endif
} // itsIsTransposeInput
#if defined HAVE_MPI
if(!itsDoAsyncCommunication) {
if (itsIsTransposeInput || itsIsTransposeOutput) {
if (LOG_CONDITION) {
std::clog << std::setprecision(12) << "core " << itsLocationInfo.rank() << ": start transpose at " << MPI_Wtime() << '\n';
}
#if 0
MPI_Barrier(itsTransposeGroup);
MPI_Barrier(itsTransposeGroup);
#endif
NSTimer transposeTimer("one transpose", LOG_CONDITION);
transposeTimer.start();
itsTranspose->transpose(itsInputData, itsTransposedData);
itsTranspose->transposeMetaData(itsInputData, itsTransposedData);
transposeTimer.stop();
}
}
#endif // HAVE_MPI
if (itsIsTransposeOutput) {
#if defined HAVE_MPI
if (LOG_CONDITION)
std::clog << std::setprecision(12) << "core " << itsLocationInfo.rank() << ": start processing at " << MPI_Wtime() << '\n';
if(itsDoAsyncCommunication) {
NSTimer asyncReceiveTimer("wait for any async receive", LOG_CONDITION);
for (unsigned i = 0; i < itsNrStations; i ++) {
asyncReceiveTimer.start();
unsigned stat = itsAsyncTranspose->waitForAnyReceive();
asyncReceiveTimer.stop();
computeTimer.start();
itsPPF->computeFlags(stat, itsTransposedData, itsFilteredData);
itsPPF->filter(stat, itsCenterFrequencies[itsCurrentSubband], itsTransposedData, itsFilteredData);
computeTimer.stop();
}
} else {
for (unsigned stat = 0; stat < itsNrStations; stat ++) {
computeTimer.start();
itsPPF->computeFlags(stat, itsTransposedData, itsFilteredData);
itsPPF->filter(stat, itsCenterFrequencies[itsCurrentSubband], itsTransposedData, itsFilteredData);
computeTimer.stop();
}
}
#else // NO MPI
for (unsigned stat = 0; stat < itsNrStations; stat ++) {
computeTimer.start();
itsPPF->computeFlags(stat, itsTransposedData, itsFilteredData);
itsPPF->filter(stat, itsCenterFrequencies[itsCurrentSubband], itsTransposedData, itsFilteredData);
computeTimer.stop();
}
#endif // HAVE_MPI
computeTimer.start();
itsCorrelator->computeFlagsAndCentroids(itsFilteredData, itsCorrelatedData);
itsCorrelator->correlate(itsFilteredData, itsCorrelatedData);
computeTimer.stop();
#if defined HAVE_MPI
if (LOG_CONDITION)
std::clog << std::setprecision(12) << "core " << itsLocationInfo.rank() << ": start writing at " << MPI_Wtime() << '\n';
#endif // HAVE_MPI
static NSTimer writeTimer("send timer", true);
writeTimer.start();
itsCorrelatedData->write(itsStream);
writeTimer.stop();
#if defined HAVE_MPI
if(itsDoAsyncCommunication && itsIsTransposeInput) {
NSTimer waitAsyncSendTimer("wait for all async sends", LOG_CONDITION);
waitAsyncSendTimer.start();
itsAsyncTranspose->waitForAllSends();
waitAsyncSendTimer.stop();
}
#endif
} // itsIsTransposeOutput
#if defined HAVE_MPI
if (itsIsTransposeInput || itsIsTransposeOutput) {
if (LOG_CONDITION) {
std::clog << std::setprecision(12) << "core " << itsLocationInfo.rank() << ": start idling at " << MPI_Wtime() << '\n';
}
}
#endif // HAVE_MPI
#if 0
static unsigned count = 0;
if (itsLocationInfo.rank() == 5 && ++ count == 9)
for (double time = MPI_Wtime() + 4.0; MPI_Wtime() < time;)
;
#endif
if ((itsCurrentSubband += itsSubbandIncrement) >= itsLastSubband) {
itsCurrentSubband -= itsLastSubband - itsFirstSubband;
}
totalTimer.stop();
totalProcessingTimer.stop();
}
template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::postprocess()
{
if (itsIsTransposeInput) {
delete itsInputData;
}
if (itsIsTransposeInput || itsIsTransposeOutput) {
#if defined HAVE_MPI
if(itsDoAsyncCommunication) {
delete itsAsyncTranspose;
} else {
delete itsTranspose;
}
#endif // HAVE_MPI
}
if (itsIsTransposeOutput) {
delete itsTransposedData;
delete itsPPF;
delete itsFilteredData;
delete itsCorrelator;
delete itsCorrelatedData;
delete itsArenas[0];
delete itsArenas[1];
delete itsArenas[2];
}
}
template class CN_Processing<i4complex>;
template class CN_Processing<i8complex>;
template class CN_Processing<i16complex>;
} // namespace RTCP
} // namespace LOFAR
//# CN_Processing.h: polyphase filter and correlator
//#
//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
//#
//# This program is free software; you can redistribute it and/or modify
//# it under the terms of the GNU General Public License as published by
//# the Free Software Foundation; either version 2 of the License, or
//# (at your option) any later version.
//#
//# This program is distributed in the hope that it will be useful,
//# but WITHOUT ANY WARRANTY; without even the implied warranty of
//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//# GNU General Public License for more details.
//#
//# You should have received a copy of the GNU General Public License
//# along with this program; if not, write to the Free Software
//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//#
//# $Id$
#ifndef LOFAR_CNPROC_CN_PROCESSING_H
#define LOFAR_CNPROC_CN_PROCESSING_H
#if 0 || !(defined HAVE_BGL || defined HAVE_BGP)
#define C_IMPLEMENTATION
#endif
#include <Stream/Stream.h>
#include <Interface/Config.h>
#if 0
#include <Interface/Parset.h>
#else
#include <Interface/CN_Configuration.h>
#endif
#include <Interface/Allocator.h>
#include <InputData.h>
#include <FilteredData.h>
#include <TransposedData.h>
#include <Interface/CorrelatedData.h>
#include <Transpose.h>
#include <AsyncTranspose.h>
#include <PPF.h>
#include <Correlator.h>
#include <LocationInfo.h>
#if defined HAVE_BGL
#include <bglpersonality.h>
#include <rts.h>
#endif
namespace LOFAR {
namespace RTCP {
class CN_Processing_Base // untemplated helper class
{
public:
virtual ~CN_Processing_Base();
virtual void preprocess(CN_Configuration &) = 0;
virtual void process() = 0;
virtual void postprocess() = 0;
};
template <typename SAMPLE_TYPE> class CN_Processing : public CN_Processing_Base
{
public:
CN_Processing(Stream *, const LocationInfo &);
~CN_Processing();
virtual void preprocess(CN_Configuration &);
virtual void process();
virtual void postprocess();
private:
#if 0
void checkConsistency(Parset *) const;
#endif
#if defined HAVE_BGL
void getPersonality();
#endif
#if defined HAVE_ZOID && defined HAVE_BGL
void initIONode() const;
#endif
#if defined HAVE_MPI
void printSubbandList() const;
#endif
unsigned itsNrStations;
unsigned itsOutputPsetSize;
Stream *itsStream;
const LocationInfo &itsLocationInfo;
std::vector<double> itsCenterFrequencies;
unsigned itsFirstSubband, itsCurrentSubband, itsLastSubband, itsSubbandIncrement;
bool itsIsTransposeInput, itsIsTransposeOutput;
Arena *itsArenas[3];
InputData<SAMPLE_TYPE> *itsInputData;
TransposedData<SAMPLE_TYPE> *itsTransposedData;
FilteredData *itsFilteredData;
CorrelatedData *itsCorrelatedData;
#if defined HAVE_MPI
bool itsDoAsyncCommunication;
Transpose<SAMPLE_TYPE> *itsTranspose;
AsyncTranspose<SAMPLE_TYPE> *itsAsyncTranspose;
#endif
PPF<SAMPLE_TYPE> *itsPPF;
Correlator *itsCorrelator;
#if defined HAVE_BGL
CNPersonality itsPersonality;
unsigned itsRankInPset; // core number, not node number!
#endif
};
} // namespace RTCP
} // namespace LOFAR
#endif
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment