diff --git a/RTCP/Run/src/LOFAR/Parset.py b/RTCP/Run/src/LOFAR/Parset.py index 57a083940bcbe2a09f576f111fdae30870a80581..6f72a330c16df3ad8255f9d01dcf2cb8e9ff8173 100644 --- a/RTCP/Run/src/LOFAR/Parset.py +++ b/RTCP/Run/src/LOFAR/Parset.py @@ -14,6 +14,7 @@ from itertools import count from Partitions import PartitionPsets from Locations import Hosts,Locations from Stations import Stations +from RingCoordinates import RingCoordinates from util.dateutil import parse,format,parseDuration,timestamp from logging import error import math @@ -135,9 +136,10 @@ class Parset(util.Parset.Parset): self.setdefault("Observation.DataProducts.Output_Trigger.enabled",False) self.setdefault("OLAP.Correlator.integrationTime",1); - if "OLAP.Stokes.channelsPerSubband" not in self or int(self["OLAP.Stokes.channelsPerSubband"]) == 0: - self["OLAP.Stokes.channelsPerSubband"] = self["Observation.channelsPerSubband"] + for k in ["OLAP.CNProc_CoherentStokes.channelsPerSubband", "OLAP.CNProc_IncoherentStokes.channelsPerSubband"]: + if k not in self or int(self[k]) == 0: + self[k] = self["Observation.channelsPerSubband"] self.setdefault('Observation.DataProducts.Output_Filtered.namemask','L${OBSID}_SB${SUBBAND}.filtered') self.setdefault('Observation.DataProducts.Output_Beamformed.namemask','L${OBSID}_B${BEAM}_S${STOKES}_P${PART}_bf.raw') self.setdefault('Observation.DataProducts.Output_Correlated.namemask','L${OBSID}_SB${SUBBAND}_uv.MS') @@ -166,23 +168,106 @@ class Parset(util.Parset.Parset): self.setdefault("Observation.rspBoardList", [s//slots for s in xrange(nrSubbands)]) self.setdefault("Observation.rspSlotList", [s%slots for s in xrange(nrSubbands)]) + # convert pencil rings to more coordinates + for b in count(): + if "Observation.Beam[%s].angle1" % (b,) not in self: + break + + dirtype = self["Observation.Beam[%s].directionType" % (b,)] + + nrrings = int(self["Observation.Beam[%s].nrTabRings" % (b,)]) + width = float(self["Observation.Beam[%s].TabRingSize" % (b,)]) + ringcoordinates = RingCoordinates( nrrings, width ) + ringset = [ + { "angle1": angle1, + "angle2": angle2, + "directionType": dirtype, + "specificationType": "ring", + } for (angle1,angle2) in ringcoordinates.coordinates() + ] + + manualset = [] + + for m in count(): + if "Observation.Beam[%s].TiedArrayBeam[%s].angle1" % (b,m) not in self: + break + + manualset.append( + { "angle1": self["Observation.Beam[%s].TiedArrayBeam[%s].angle1" % (b,m)], + "angle2": self["Observation.Beam[%s].TiedArrayBeam[%s].angle2" % (b,m)], + "directionType": self["Observation.Beam[%s].TiedArrayBeam[%s].directionType" % (b,m)], + "specificationType": "manual", + } + ) + + # first define the rings, then the manual beams (which thus get shifted in number!) + allsets = ringset + manualset + for m,s in enumerate(allsets): + prefix = "Observation.Beam[%s].TiedArrayBeam[%s]" % (b,m) + + for k,v in s.iteritems(): + self["%s.%s" % (prefix,k)] = v + + self["Observation.Beam[%s].nrTiedArrayBeams" % (b,)] = len(allsets) + + def convertDepricatedKeys(self): """ Converts some new keys to old ones to help old CEP code cope with new SAS code or vice-versa. """ - convertmap = { - # "old": "new" - "OLAP.outputFilteredData": "Observation.DataProducts.Output_Filtered.enabled", - "OLAP.outputCorrelatedData": "Observation.DataProducts.Output_Correlated.enabled", - "OLAP.outputBeamFormedData": "Observation.DataProducts.Output_Beamformed.enabled", - "OLAP.outputCoherentStokes": "Observation.DataProducts.Output_CoherentStokes.enabled", - "OLAP.outputIncoherentStokes": "Observation.DataProducts.Output_IncoherentStokes.enabled", - "OLAP.outputTrigger": "Observation.DataProducts.Output_Trigger.enabled", - } - - for k,v in convertmap.iteritems(): - if k in self: + convertmap = [ + # ("old","new") + ("OLAP.outputFilteredData", "Observation.DataProducts.Output_Filtered.enabled"), + ("OLAP.outputCorrelatedData", "Observation.DataProducts.Output_Correlated.enabled"), + ("OLAP.outputBeamFormedData", "Observation.DataProducts.Output_Beamformed.enabled"), + ("OLAP.outputCoherentStokes", "Observation.DataProducts.Output_CoherentStokes.enabled"), + ("OLAP.outputIncoherentStokes", "Observation.DataProducts.Output_IncoherentStokes.enabled"), + ("OLAP.outputTrigger", "Observation.DataProducts.Output_Trigger.enabled"), + + ("OLAP.Stokes.which", "OLAP.CNProc_CoherentStokes.which"), + ("OLAP.Stokes.which", "OLAP.CNProc_IncoherentStokes.which"), + + ("OLAP.Stokes.integrationSteps", "OLAP.CNProc_CoherentStokes.timeIntegrationFactor"), + ("OLAP.Stokes.integrationSteps", "OLAP.CNProc_IncoherentStokes.timeIntegrationFactor"), + + ("OLAP.Stokes.channelsPerSubband", "OLAP.CNProc_CoherentStokes.channelsPerSubband"), + ("OLAP.Stokes.channelsPerSubband", "OLAP.CNProc_IncoherentStokes.channelsPerSubband"), + + ("OLAP.PencilInfo.ringSize", "Observation.Beam[0].TabRingSize"), + ("OLAP.PencilInfo.nrRings", "Observation.Beam[0].nrTabRings"), + ] + + for (k,v) in convertmap: + if k in self and v not in self: self[v] = self[k] + # convert pencil beams (assign them to station beam 0) + pbkeys = [ "angle1", "angle2", "directionType", "dispersionMeasure" ] + pbdefaults = { + "angle1": 0.0, + "angle2": 0.0, + "directionType": "J2000", + "dispersionMeasure": 0, + } + + old_prefix = "OLAP.Pencil" + new_prefix = "Observation.Beam[0].TiedArrayBeam" + new_nr = "Observation.Beam[0].nrTiedArrayBeams" + + for b in count(): + if "%s[%s].%s" % (old_prefix,b,pbkeys[0]) not in self: + break + + for k in pbkeys: + old_key = "%s[%s].%s" % (old_prefix,b,k) + new_key = "%s[%s].%s" % (new_prefix,b,k) + + if old_key in self and new_key not in self: + self[new_key] = self[old_key] + else: + self[new_key] = pbdefaults[k] + + self.setdefault(new_nr,b) + if "Observation.MSNameMask" in self: # assume: /data1...mount point/subdir in mount point/filename msnamemask = self["Observation.MSNameMask"] # for example, /data1/L${YEAR}_${MSNUMBER}/SB${SUBBAND}.MS @@ -208,8 +293,8 @@ class Parset(util.Parset.Parset): # remove all pencil beams if fly's eye is specified if self.getBool("OLAP.PencilInfo.flysEye"): - self["OLAP.nrPencils"] = 0 - self["OLAP.PencilInfo.nrRings"] = 0 + self["Observation.Beam[0].nrTiedArrayBeams"] = 0 + self["Observation.Beam[0].nrTabRings"] = 0 # SAS cannot omit keys, so assume that empty keys means 'use default' delIfEmpty( "OLAP.CNProc.phaseOnePsets" ) @@ -371,8 +456,7 @@ class Parset(util.Parset.Parset): nrBeamFiles = self.getNrBeamFiles() cores = self.getInt32Vector("OLAP.CNProc.usedCoresInPset") - # set and resolve storage hostnames - # sort them since mpirun will as well, messing with our indexing schemes! + # set storage hostnames self["OLAP.Storage.hosts"] = self.storagenodes[:] self.setdefault('OLAP.nrPsets', nrPsets) @@ -457,10 +541,10 @@ class Parset(util.Parset.Parset): # python iterates over last 'for' first! # this is the order generated by the IO nodes (see IONProc/src/Job.cc) - paths = [ self.parseMask( mask, beam = b, stokes = s, file = f ) for b in xrange(self.getNrBeams( True )) for s in xrange(self.getNrStokes()) for f in xrange(self.getNrPartsPerStokes()) ] + paths = [ self.parseMask( mask, beam = b, stokes = s, file = f ) for b in xrange(self.getNrBeams( True )) for s in xrange(self.getNrCoherentStokes()) for f in xrange(self.getNrPartsPerStokes()) ] filenames = map( os.path.basename, paths ) dirnames = map( os.path.dirname, paths ) - locations = [ "%s:%s/" % (self.storagenodes[nodelist[i]], dirnames[i]) for i in xrange(self.getNrPartsPerStokes() * self.getNrStokes() * self.getNrBeams( True )) ] + locations = [ "%s:%s/" % (self.storagenodes[nodelist[i]], dirnames[i]) for i in xrange(self.getNrPartsPerStokes() * self.getNrCoherentStokes() * self.getNrBeams( True )) ] self.setdefault( locationkey, locations ) self.setdefault( filenameskey, filenames ) @@ -491,7 +575,12 @@ class Parset(util.Parset.Parset): return x - cnIntegrationSteps = max(1, roundToList(nrSamplesPerSecond * cnIntegrationTime, [16, int(self["OLAP.Stokes.integrationSteps"])])) + cnIntegrationSteps = max(1, roundToList(nrSamplesPerSecond * cnIntegrationTime, [ + 16, + int(self["OLAP.CNProc_CoherentStokes.timeIntegrationFactor"]), + int(self["OLAP.CNProc_IncoherentStokes.timeIntegrationFactor"]), + ])) + self.setdefault('OLAP.CNProc.integrationSteps', cnIntegrationSteps) def setStations(self,stations): @@ -611,13 +700,7 @@ class Parset(util.Parset.Parset): if considerFlysEye and self.getBool("OLAP.PencilInfo.flysEye"): return self.getNrMergedStations() - rings = int( self["OLAP.PencilInfo.nrRings"] ) - manual = int( self["OLAP.nrPencils"] ) - - if rings == 0: - return manual - else: - return 1 + 3 * rings * (rings + 1) + manual + return self["Observation.Beam[0].nrTiedArrayBeams"] def getNrMergedStations( self ): tabList = self["OLAP.CNProc.tabList"] @@ -627,9 +710,11 @@ class Parset(util.Parset.Parset): return max(tabList) + 1 - def getNrStokes( self ): + def getNrCoherentStokes( self ): if self.getBool("Observation.DataProducts.Output_Beamformed.enabled") or self.getBool("Observation.DataProducts.Output_Trigger.enabled"): return 2 + elif self.getBool("Observation.DataProducts.Output_CoherentStokes.enabled"): + return len(self["OLAP.CNProc_CoherentStokes.which"]) elif self.getBool("Observation.DataProducts.Output_CoherentStokes.enabled"): return len(self["OLAP.Stokes.which"]) else: @@ -640,7 +725,7 @@ class Parset(util.Parset.Parset): def getNrBeamFiles( self ): nrPartsPerStokes = self.getNrPartsPerStokes() - nrStokes = self.getNrStokes() + nrStokes = self.getNrCoherentStokes() nrBeams = self.getNrBeams( True ) return nrBeams * nrStokes * nrPartsPerStokes @@ -737,7 +822,8 @@ class Parset(util.Parset.Parset): # beamforming needs a multiple of 16 samples assert int(self["OLAP.CNProc.integrationSteps"]) % 16 == 0, "OLAP.CNProc.integrationSteps should be dividable by 16" - assert int(self["OLAP.CNProc.integrationSteps"]) % int(self["OLAP.Stokes.integrationSteps"]) == 0, "OLAP.CNProc.integrationSteps should be dividable by OLAP.Stokes.integrationSteps" + assert int(self["OLAP.CNProc.integrationSteps"]) % int(self["OLAP.CNProc_CoherentStokes.timeIntegrationFactor"]) == 0, "OLAP.CNProc.integrationSteps should be dividable by OLAP.CNProc_CoherentStokes.timeIntegrationFactor" + assert int(self["OLAP.CNProc.integrationSteps"]) % int(self["OLAP.CNProc_IncoherentStokes.timeIntegrationFactor"]) == 0, "OLAP.CNProc.integrationSteps should be dividable by OLAP.CNProc_IncoherentStokes.timeIntegrationFactor" # create at least 1 beam assert self.getNrBeams( True ) > 0, "Beam forming requested, but no beams defined. Add at least one beam, or enable fly's eye mode." @@ -745,8 +831,11 @@ class Parset(util.Parset.Parset): if self.getBool("Observation.DataProducts.Output_CoherentStokes.enabled"): assert int(self["OLAP.CNProc.integrationSteps"]) >= 4, "OLAP.CNProc.integrationSteps should be at least 4 if coherent stokes are requested" - assert int(self["OLAP.Stokes.channelsPerSubband"]) <= int(self["Observation.channelsPerSubband"]), "Stokes should have the same number or fewer channels than specified for the full observation." - assert int(self["Observation.channelsPerSubband"]) % int(self["OLAP.Stokes.channelsPerSubband"]) == 0, "Stokes channels should be a whole fraction of the total number of channels." + assert int(self["OLAP.CNProc_CoherentStokes.channelsPerSubband"]) <= int(self["Observation.channelsPerSubband"]), "Coherent Stokes should have the same number or fewer channels than specified for the full observation." + assert int(self["Observation.channelsPerSubband"]) % int(self["OLAP.CNProc_CoherentStokes.channelsPerSubband"]) == 0, "Coherent Stokes channels should be a whole fraction of the total number of channels." + + assert int(self["OLAP.CNProc_IncoherentStokes.channelsPerSubband"]) <= int(self["Observation.channelsPerSubband"]), "Incoherent Stokes should have the same number or fewer channels than specified for the full observation." + assert int(self["Observation.channelsPerSubband"]) % int(self["OLAP.CNProc_IncoherentStokes.channelsPerSubband"]) == 0, "Incoherent Stokes channels should be a whole fraction of the total number of channels." # verify start/stop times assert self["Observation.startTime"] < self["Observation.stopTime"], "Start time (%s) must be before stop time (%s)" % (self["Observation.startTime"],self["Observation.stopTime"])