Skip to content
Snippets Groups Projects
Commit b1ff34ad authored by Stefan Froehlich's avatar Stefan Froehlich
Browse files

Task #6559 second iteration of generic pipeline stuff. special case for...

Task #6559 second iteration of generic pipeline stuff. special case for vdsmaker atm. all inputs should be reduced to dicts in next phase. more cleanup and structure needed as well. for now mss_calibration pipeline is looking good and gets to step5.
parent 3db6ad00
No related branches found
No related tags found
No related merge requests found
...@@ -29,24 +29,24 @@ class GenericPipeline(control): ...@@ -29,24 +29,24 @@ class GenericPipeline(control):
"NYI" "NYI"
return 1 return 1
#def go(self): def go(self):
#""" #"""
#Read the parset-file that was given as input argument, and set the #Read the parset-file that was given as input argument, and set the
#jobname before calling the base-class's `go()` method. #jobname before calling the base-class's `go()` method.
#""" #"""
#try: try:
# parset_file = os.path.abspath(self.inputs['args'][0]) parset_file = os.path.abspath(self.inputs['args'][0])
#except IndexError: except IndexError:
# return self.usage() return self.usage()
# Set job-name to basename of parset-file w/o extension, if it's not # Set job-name to basename of parset-file w/o extension, if it's not
# set on the command-line with '-j' or '--job-name' # set on the command-line with '-j' or '--job-name'
#if not 'job_name' in self.inputs: if not 'job_name' in self.inputs:
# self.inputs['job_name'] = ( self.inputs['job_name'] = (
# os.path.splitext(os.path.basename(parset_file))[0]) os.path.splitext(os.path.basename(parset_file))[0])
# Call the base-class's `go()` method. # Call the base-class's `go()` method.
#return super(GenericPipeline, self).go() return super(GenericPipeline, self).go()
def pipeline_logic(self): def pipeline_logic(self):
try: try:
...@@ -80,33 +80,42 @@ class GenericPipeline(control): ...@@ -80,33 +80,42 @@ class GenericPipeline(control):
create_directory(parset_dir) create_directory(parset_dir)
create_directory(mapfile_dir) create_directory(mapfile_dir)
steplist = [] stepparsetlist = []
for step in py_parset.getStringVector('steps'): stepnamelist = py_parset.getStringVector('steps')
steplist.append(self.parset.makeSubset(self.parset.fullModuleName(str(step)) + '.'))
for step in stepnamelist:
stepparsetlist.append(self.parset.makeSubset(self.parset.fullModuleName(str(step)) + '.'))
print step print step
stepcontrols = [] stepcontrollist = []
stepparsets = [] stepargparsetlist = {}
step_parset_files = [] step_parset_files = {}
mapfiles = {} mapfiles = {}
for step in steplist:
stepcontrols.append(step.makeSubset(step.fullModuleName('control') + '.')) for step, stepname in zip(stepparsetlist, stepnamelist):
stepparsets.append(step.makeSubset(step.fullModuleName('args') + '.')) stepcontrollist.append(step.makeSubset(step.fullModuleName('control') + '.'))
print stepcontrols[-1].getString('typename') if step.fullModuleName('parsetarg'):
print stepparsets[-1].getString('start') stepargparsetlist[stepname]=step.makeSubset(step.fullModuleName('parsetarg') + '.')
else:
print "no parset given"
#save parsets #save parsets
for ps, stepname in zip(stepparsets, py_parset.getStringVector('steps')): for k, v in stepargparsetlist.iteritems():
step_parset = os.path.join(parset_dir, stepname + '.parset') try:
step_parset_files.append(step_parset) step_parset = v.getString('parset')
ps.writeFile(step_parset) except:
step_parset = os.path.join(parset_dir, k + '.parset')
v.writeFile(step_parset)
step_parset_files[k]=step_parset
testmapfile = self._create_mapfile_from_folder('/home/zam/sfroehli/testpipeline/data') testmapfile = self._create_mapfile_from_folder('/home/zam/sfroehli/testpipeline/data/MS')
testmapfile_name = os.path.join(mapfile_dir, 'measurements.mapfile') testmapfile_name = os.path.join(mapfile_dir, 'measurements.mapfile')
testmapfile.save(testmapfile_name) testmapfile.save(testmapfile_name)
mapfiles['input'] = testmapfile_name mapfiles['input'] = testmapfile_name
print testmapfile print testmapfile
for step, stepname in zip(stepcontrols, py_parset.getStringVector('steps')): print 'parset HERE: ',step_parset_files
resultdicts = {}#{'first': {'mapfile': 'dummy'}}
for step, stepname in zip(stepcontrollist, stepnamelist):
# common # common
try: try:
input_mapfile = mapfiles[step.getString('mapfilefromstep')] input_mapfile = mapfiles[step.getString('mapfilefromstep')]
...@@ -114,21 +123,51 @@ class GenericPipeline(control): ...@@ -114,21 +123,51 @@ class GenericPipeline(control):
input_mapfile = mapfiles.values()[-1] # input mapfile: last in list. added from last step. input_mapfile = mapfiles.values()[-1] # input mapfile: last in list. added from last step.
# recipes # recipes
if step.getString('type') == 'recipe': if step.getString('kind') == 'recipe':
#inputdict = {"mapfile": os.path.join(mapfile_dir, stepname + '.mapfile')}
inputdict={}
if stepname in step_parset_files:
print 'ADDED PARSET'
inputdict['parset']=step_parset_files[stepname]
self._construct_input(inputdict, step, resultdicts)
with duration(self, stepname): with duration(self, stepname):
resultdict = self.run_task( if stepname is 'vdsreader':
step.getString('typename'), resultdict = self.run_task(
step.getString('type'),
**inputdict
)
else:
resultdict = self.run_task(
step.getString('type'),
input_mapfile, input_mapfile,
parset=os.path.join(parset_dir, stepname + '.parset'), # input **inputdict
mapfile=os.path.join(mapfile_dir, stepname + '.mapfile') # mapfile outputname )
)
if 'mapfile' in resultdict: if 'mapfile' in resultdict:
mapfiles[stepname] = resultdict['mapfile'] mapfiles[stepname] = resultdict['mapfile']
resultdicts[stepname] = resultdict
# plugins # plugins
if step.getString('type') == 'plugin': if step.getString('kind') == 'plugin':
loader.call_plugin(step.getString('typename'), py_parset.getString('pluginpath'), loader.call_plugin(step.getString('type'), py_parset.getString('pluginpath'),
step.getString('typename'), input_mapfile) step.getString('type'), input_mapfile)
def _construct_input(self, indict, controlparset, resdicts):
argsparset = controlparset.makeSubset(controlparset.fullModuleName('args') + '.')
for k in argsparset.keys():
print 'THIS IS K: ',k
if argsparset.getString(k).__contains__('.output.'):
step, outvar = argsparset.getString(k).split('.output.')
print "FOUND RESULTREQUEST FROM %s FOR VALUE %s: %s" % (step, outvar, resdicts[step][outvar])
else:
indict[k] = argsparset.getString(k)
return indict
def _construct_step_parsets(self, steplistparsets):
parsetdict = {}
return parsetdict
def _create_mapfile_from_folder(self, folder): def _create_mapfile_from_folder(self, folder):
#here comes the creation of a data mapfile (what MS lays where) #here comes the creation of a data mapfile (what MS lays where)
......
...@@ -33,6 +33,9 @@ class executable_parsetonly(BaseRecipe, RemoteCommandRecipeMixIn): ...@@ -33,6 +33,9 @@ class executable_parsetonly(BaseRecipe, RemoteCommandRecipeMixIn):
""" """
inputs = { inputs = {
'kind': ingredient.StringField('--kind'),
'type': ingredient.StringField('--type'),
#'init': ingredient.StringField('--init'),
'parset': ingredient.FileField( 'parset': ingredient.FileField(
'-p', '--parset', '-p', '--parset',
help="The full path to a configuration parset. The ``msin`` " help="The full path to a configuration parset. The ``msin`` "
...@@ -63,6 +66,7 @@ class executable_parsetonly(BaseRecipe, RemoteCommandRecipeMixIn): ...@@ -63,6 +66,7 @@ class executable_parsetonly(BaseRecipe, RemoteCommandRecipeMixIn):
# 1. load input data file, validate output vs the input location if # 1. load input data file, validate output vs the input location if
# output locations are provided # output locations are provided
args = self.inputs['args'] args = self.inputs['args']
#print "FUCK: ",args
self.logger.debug("Loading input-data mapfile: %s" % args[0]) self.logger.debug("Loading input-data mapfile: %s" % args[0])
indata = DataMap.load(args[0]) indata = DataMap.load(args[0])
if len(args) > 1: if len(args) > 1:
......
...@@ -63,8 +63,8 @@ class executable_parsetonly(LOFARnodeTCP): ...@@ -63,8 +63,8 @@ class executable_parsetonly(LOFARnodeTCP):
try: try:
parset = Parset() parset = Parset()
parset.adoptFile(parsetfile) parset.adoptFile(parsetfile)
if parset['exec']: #if parset['exec']:
executable = str(parset['exec']) # executable = str(parset['exec'])
print 'Running executable %s from parset args' % executable print 'Running executable %s from parset args' % executable
except: except:
print 'could not load executable from parset args' print 'could not load executable from parset args'
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment