diff --git a/SAS/XML_generator/src/xmlgen.py b/SAS/XML_generator/src/xmlgen.py index eaefe5200ea9657a491588ca54a4660790cf5eec..c7847ca17b3cc5701185012402de815b8365ff3c 100755 --- a/SAS/XML_generator/src/xmlgen.py +++ b/SAS/XML_generator/src/xmlgen.py @@ -35,6 +35,7 @@ from os.path import splitext from datetime import datetime,timedelta from math import pi import re +import json CLOCK_MODES = ['160 MHz','200 MHz'] INSTRUMENT_FILTERS = ["10-70 MHz", "30-70 MHz", "10-90 MHz", "30-90 MHz", "110-190 MHz", "170-230 MHz", "210-250 MHz"] @@ -425,6 +426,16 @@ def writeTABXML(TAB): strVal = strVal.rstrip() # strip off the last newline return strVal +def writeMiscParameters(ofile, miscParameters): + """ + :param ofile: the xml-string mess so far + :param miscParameters: A dict(!) with the parameters (not some magically interpreted list like elsewhere) + :return: + """ + if miscParameters is not None and len(miscParameters) > 0: + j = json.dumps(miscParameters) + print >> ofile, r"""<misc>%s</misc>""" % j + def writeBBSParameters(ofile, bbsParameters): print >> ofile, r""" <bbsParameters> <baselines>%s</baselines> @@ -550,7 +561,8 @@ def writeXMLCalPipe(ofile, topo, pred_topo, name, descr, defaulttemplate, flaggi </item>""" % (uvintopo, instroutname, instrouttopo, stor_cluster, uvouttopo, uvouttopo, stor_cluster) def writeXMLAvgPipeline(ofile, topo, pred_topo, name, descr, defaulttemplate, flagging, duration, - demixParameters, uvintopo, uvouttopo, storageCluster, status, nr_tasks, nr_cores_per_task): + demixParameters, uvintopo, uvouttopo, storageCluster, status, nr_tasks, nr_cores_per_task, + miscParameters): stor_cluster = dataProductCluster(storageCluster) proc_cluster = processingCluster(storageCluster, nr_tasks, nr_cores_per_task) print >> ofile, r""" <item index="0"> @@ -569,6 +581,7 @@ def writeXMLAvgPipeline(ofile, topo, pred_topo, name, descr, defaulttemplate, fl <flaggingStrategy>%s</flaggingStrategy> <duration>%s</duration>""" % (defaulttemplate, flagging, duration) writeDemixParameters(ofile, demixParameters) + writeMiscParameters(ofile, miscParameters) print >> ofile, r"""</pipelineAttributes> <usedDataProducts> <item> @@ -593,7 +606,7 @@ def writeXMLPulsarPipe(ofile, topo, pred_topo, name, descr, defaulttemplate, dur storageCluster, status, nr_tasks, nr_cores_per_task, _2bf2fitsExtraOpts, _8bitConversionSigma, decodeNblocks, decodeSigma, digifilExtraOpts, dspsrExtraOpts, dynamicSpectrumTimeAverage, nofold, nopdmp, norfi, prepdataExtraOpts, prepfoldExtraOpts, prepsubbandExtraOpts, pulsar, rawTo8bit, rfifindExtraOpts, rrats, singlePulse, - skipDsps, skipDynamicSpectrum, skipPrepfold, tsubint): + skipDsps, skipDynamicSpectrum, skipPrepfold, tsubint, miscParameters): stor_cluster = dataProductCluster(storageCluster) proc_cluster = processingCluster(storageCluster, nr_tasks, nr_cores_per_task) print >> ofile, r""" <item index="0"> @@ -631,7 +644,14 @@ def writeXMLPulsarPipe(ofile, topo, pred_topo, name, descr, defaulttemplate, dur <skipDsps>%s</skipDsps> <skipDynamicSpectrum>%s</skipDynamicSpectrum> <skipPrepfold>%s</skipPrepfold> - <tsubint>%s</tsubint> + <tsubint>%s</tsubint>""" % (defaulttemplate, duration, _2bf2fitsExtraOpts, _8bitConversionSigma, + decodeNblocks, decodeSigma, digifilExtraOpts, dspsrExtraOpts, dynamicSpectrumTimeAverage, + writeBoolean(nofold), writeBoolean(nopdmp), writeBoolean(norfi), + prepdataExtraOpts, prepfoldExtraOpts, prepsubbandExtraOpts, pulsar, writeBoolean(rawTo8bit), + rfifindExtraOpts, writeBoolean(rrats), writeBoolean(singlePulse), + writeBoolean(skipDsps), writeBoolean(skipDynamicSpectrum), writeBoolean(skipPrepfold), tsubint) + writeMiscParameters(ofile, miscParameters) + print >> ofile, r""" </pipelineAttributes> <usedDataProducts> <item> @@ -650,13 +670,7 @@ def writeXMLPulsarPipe(ofile, topo, pred_topo, name, descr, defaulttemplate, dur </item> </resultDataProducts> </lofar:pipeline> - </item>""" % (defaulttemplate, duration, _2bf2fitsExtraOpts, _8bitConversionSigma, - decodeNblocks, decodeSigma, digifilExtraOpts, dspsrExtraOpts, dynamicSpectrumTimeAverage, - writeBoolean(nofold), writeBoolean(nopdmp), writeBoolean(norfi), - prepdataExtraOpts, prepfoldExtraOpts, prepsubbandExtraOpts, pulsar, writeBoolean(rawTo8bit), - rfifindExtraOpts, writeBoolean(rrats), writeBoolean(singlePulse), - writeBoolean(skipDsps), writeBoolean(skipDynamicSpectrum), writeBoolean(skipPrepfold), tsubint, - bfintopo, pouttopo, pouttopo, stor_cluster) + </item>""" % (bfintopo, pouttopo, pouttopo, stor_cluster) #nv 13okt2014: #6716 - Implement Long Baseline Pipeline def writeXMLLongBaselinePipe(ofile, topo, pred_topo, name, descr, defaulttemplate, duration, @@ -816,7 +830,8 @@ def writeMainFolderEnd(ofile): </lofar:folder> </item>""" -def writeImagingPipelineXML(ofile, input_list, bbsParameters, storageCluster, status, nr_tasks, nr_cores_per_task): +def writeImagingPipelineXML(ofile, input_list, bbsParameters, storageCluster, status, nr_tasks, nr_cores_per_task, + miscParameters): proc_cluster = processingCluster(storageCluster, nr_tasks, nr_cores_per_task) print >> ofile, r"""<item index="0"> <lofar:pipeline xsi:type="lofar:%(imaging_pipe_type)s"> @@ -848,6 +863,7 @@ def writeImagingPipelineXML(ofile, input_list, bbsParameters, storageCluster, st </imagingParameters>""" % (input_list) if bbsParameters: writeBBSParameters(ofile, bbsParameters) + writeMiscParameters(ofile, miscParameters) print >> ofile, r""" </imagingPipelineAttributes>""" @@ -1480,7 +1496,8 @@ def readBlock(lines, projectName, blockNr): "globalTABrings": [], "coherentStokesData": False, "flysEye": False, - "numberOfBitsPerSample": 0} + "numberOfBitsPerSample": 0, + "storagemanager": "dysco"} for lineNr, cline in enumerate(lines): if "=" in cline and not cline.startswith(('BBS','Demix','Pulsar')): #we skip beam and pipelines lines @@ -1640,6 +1657,8 @@ def readBlock(lines, projectName, blockNr): print "number of nodes found, converted to number of tasks = %i, number of cores per task = %i" % (s["nr_tasks"], s["nr_cores_per_task"]) except: raise GenException("the number of nodes parameter is not valid for BLOCK: %i" % blockNr) + elif key == "storagemanager": + s["storagemanager"] = value else: raise GenException("unknown key:'%s' in BLOCK: %i" % (key, blockNr)) return s ##settings @@ -1745,7 +1764,7 @@ def checkSettings(settings, blockNr): def writeImagingPipeline(ofile, nr_beams, targetBeams, blockTopo, nrRepeats, imaging_pipe_inputs, imaging_pipe_predecessors, writePackageTag, packageTag, - nrImages, imagingPipelineSettings, imagingBBS, cluster, status, nr_tasks, nr_cores_per_task): + nrImages, imagingPipelineSettings, imagingBBS, cluster, status, nr_tasks, nr_cores_per_task, miscParameters): for key,val in imagingPipelineSettings.items(): #TODO somewhat dirty hack, to be solved better later. exec(key + '=val') for beamNr in range (0, nr_beams): @@ -1767,8 +1786,9 @@ def writeImagingPipeline(ofile, nr_beams, targetBeams, blockTopo, nrRepeats, "imaging_pipe_predecessors_string":imaging_pipe_predecessors_string, "imaging_pipe_name":imaging_pipe_name, "beamNr":beamNr, "nrImages":nrImages[beamNr], "nrRepeats":nrRepeats, "initial_status": status} - - writeImagingPipelineXML(ofile, merge_dicts(temp, imagingPipelineSettings), imagingBBS, cluster, status, nr_tasks, nr_cores_per_task) + + writeImagingPipelineXML(ofile, merge_dicts(temp, imagingPipelineSettings), imagingBBS, cluster, status, + nr_tasks, nr_cores_per_task, miscParameters) writeImagingPipelineInputDataproducts(ofile, imaging_pipe_inputs[beamNr]) writeSkyImageOutputDataproduct(ofile, imaging_pipe_output_topology, cluster) @@ -1784,7 +1804,8 @@ def determineBfDataExtension(coherentStokesData, incoherentStokesData): return bfDataExtension def writeRepeat(ofile, projectName, blockTopo, repeatNr, settings, imaging_pipe_inputs, - imaging_pipe_predecessors, status, nr_tasks, nr_cores_per_task): + imaging_pipe_predecessors, status, nr_tasks, nr_cores_per_task, miscParameters): + for key,val in settings.items(): #TODO somewhat dirty hack, to be solved better later. exec(key + '=val') repeatTopo = blockTopo + str(repeatNr) @@ -1960,11 +1981,12 @@ def writeRepeat(ofile, projectName, blockTopo, repeatNr, settings, imaging_pipe_ else: cal_pipe_calibrator_topology_tmp = cal_pipe_calibrator_topology cal_pipe_name_tmp = cal_pipe_name + writeXMLAvgPipeline(ofile, cal_pipe_calibrator_topology_tmp, cal_obs_topology, cal_pipe_name_tmp, cal_pipe_calibrator_description, cal_obs_pipe_default_template, flaggingStrategy, calibratorBeam[8], calibratorDemix[i], cal_obs_beam0_topology + '.uv.dps', cal_pipe_calibrator_topology_tmp + '.uv.dps', - cluster, status, nr_tasks, nr_cores_per_task) + cluster, status, nr_tasks, nr_cores_per_task, miscParameters) elif processing == 'Calibration': @@ -2057,11 +2079,12 @@ def writeRepeat(ofile, projectName, blockTopo, repeatNr, settings, imaging_pipe_ else: cal_pipe_target_topology_tmp = cal_pipe_target_topology cal_pipe_target_name_tmp = cal_pipe_target_name + writeXMLAvgPipeline(ofile, cal_pipe_target_topology_tmp, tar_obs_topology, cal_pipe_target_name_tmp, cal_pipe_target_description, cal_tar_pipe_default_template, flaggingStrategy, calibratorBeam[8], calibratorDemix[i], tar_obs_uv_data_topologies[nr_beams], - cal_pipe_target_topology_tmp + '.uv.dps', cluster, status, nr_tasks, nr_cores_per_task) + cal_pipe_target_topology_tmp + '.uv.dps', cluster, status, nr_tasks, nr_cores_per_task, miscParameters) elif processing == 'Calibration': @@ -2158,7 +2181,7 @@ def writeRepeat(ofile, projectName, blockTopo, repeatNr, settings, imaging_pipe_ tar_pipe_name_tmp, tar_pipe_description, tar_pipe_default_template, flaggingStrategy, targetBeams[beamNr][8], targetDemix[beamNr][i], tar_obs_uv_data_topologies[beamNr], tar_pipe_topology_output_MS_tmp, - cluster, status, nr_tasks, nr_cores_per_task) + cluster, status, nr_tasks, nr_cores_per_task, miscParameters) elif processing == 'Calibration': #TODO currently doesn't work according to Alwin's wiki, why? if targetBBS[beamNr][0][0] == '': @@ -2174,7 +2197,7 @@ def writeRepeat(ofile, projectName, blockTopo, repeatNr, settings, imaging_pipe_ elif processing == 'Pulsar': #tar_obs_topology_MultiObs = tar_obs_topology + '.' + str(beamNr) tar_pipe_predecessor = tar_obs_topology - + writeXMLPulsarPipe(ofile, tar_pipe_topologies[beamNr], tar_obs_topology, tar_pipe_name, tar_pipe_description, tar_pipe_default_template, targetBeams[beamNr][8], tar_obs_bf_data_topologies[beamNr], @@ -2200,7 +2223,8 @@ def writeRepeat(ofile, projectName, blockTopo, repeatNr, settings, imaging_pipe_ dynamicSpectrumTimeAverage = targetPulsar[beamNr][0][18], skipDynamicSpectrum = targetPulsar[beamNr][0][19], skipPrepfold = targetPulsar[beamNr][0][20], - digifilExtraOpts = targetPulsar[beamNr][0][21]) + digifilExtraOpts = targetPulsar[beamNr][0][21], + miscParameters = miscParameters) # for long baseline processsing an additional (special purpose adapted) preprocessing pipeline is necessary if processing == 'LongBaseline': @@ -2224,7 +2248,7 @@ def writeRepeat(ofile, projectName, blockTopo, repeatNr, settings, imaging_pipe_ LB_preproc_pipe_description, LB_preproc_pipe_template, flaggingStrategy, targetBeams[beamNr][8], targetDemix[beamNr][0], tar_pipe_topologies[beamNr] + ".uv.dps", LB_preproc_pipe_output_MS_topologies[beamNr], - cluster, status, nr_tasks, nr_cores_per_task) + cluster, status, nr_tasks, nr_cores_per_task, miscParameters) #nv 13okt2014: #6716 - Implement Long Baseline Pipeline writeXMLLongBaselinePipe(ofile, LB_pipeline_topologies[beamNr], @@ -2270,11 +2294,16 @@ def writeBlock(ofile, settings, projectName, blockNr, status): imaging_pipe_inputs = [[] for i in range(settings["nr_beams"])] imaging_pipe_predecessors = [[] for i in range(settings["nr_beams"])] - + + miscParametersKeys = ["storagemanager"] + miscParameters = {key: value for (key, value) in settings.iteritems() if key in miscParametersKeys} + print "############### MISC ", miscParameters + blockTopo = "B%i." % (blockNr-1,) for repeatNr in range (1, settings["nrRepeats"]+1): imaging_pipe_inputs, imaging_pipe_predecessors, settings["startTimeObs"] = writeRepeat(ofile, - projectName, blockTopo, repeatNr, settings, imaging_pipe_inputs, imaging_pipe_predecessors, status, nr_tasks, nr_cores_per_task) + projectName, blockTopo, repeatNr, settings, imaging_pipe_inputs, imaging_pipe_predecessors, + status, nr_tasks, nr_cores_per_task, miscParameters) if settings["do_imaging"]: imagingPipelineKeys = ["imaging_pipe_type", "imaging_pipe_default_template", "imaging_pipe_duration", @@ -2291,7 +2320,7 @@ def writeBlock(ofile, settings, projectName, blockNr, status): writeImagingPipeline(ofile, settings["nr_beams"], settings["targetBeams"], blockTopo, settings["nrRepeats"], imaging_pipe_inputs, imaging_pipe_predecessors, settings["writePackageTag"], settings["packageTag"], settings["nrImages"], - imagingPipelineSettings, settings["imagingBBS"], settings["cluster"], status, nr_tasks, nr_cores_per_task) + imagingPipelineSettings, settings["imagingBBS"], settings["cluster"], status, nr_tasks, nr_cores_per_task, miscParameters) writeFolderEnd(ofile) diff --git a/SAS/XML_generator/test/test_regression.py b/SAS/XML_generator/test/test_regression.py index 427789a190b8346dc5e383e31119286074773e40..f55eed5bb7225419c72e12a768e709c45b7725cc 100755 --- a/SAS/XML_generator/test/test_regression.py +++ b/SAS/XML_generator/test/test_regression.py @@ -15,7 +15,7 @@ def checkDiff(diff): return True return False -def main(): +def main(verbose_tests=False): os.chdir('test_regression.in_data') infiles = os.listdir("txt") results = [] @@ -28,7 +28,11 @@ def main(): print "*** Processing %s ***" % infile cmd = ["xmlgen", "-i", "./txt/%s" % infile, "-o", "test.xml"] p = subprocess.Popen(cmd, stdin=open('/dev/null'), stdout=subprocess.PIPE, stderr=subprocess.PIPE) - logs = p.communicate()[0].splitlines() #stdout + out, err = p.communicate() + if verbose_tests: + print out + print err + logs = out.splitlines() #stdout print "xmlgen ran with return code: %s" % p.returncode xmlgen = p.returncode if p.returncode: @@ -68,5 +72,7 @@ def main(): print "failure" return 1 + +# todo: add test for misc field, make sure boolean gets jsonified. if __name__ == "__main__": sys.exit(main())