Skip to content
Snippets Groups Projects
Commit c60c5d23 authored by Stefan Froehlich's avatar Stefan Froehlich
Browse files

Task #6559 python plugins as node script. they can return a dict that gets...

Task #6559 python plugins as node script. they can return a dict that gets saved by the master script as mapfile and can be used as input for later steps.
parent e74d03b6
No related branches found
No related tags found
No related merge requests found
......@@ -1516,6 +1516,8 @@ CEP/Pipeline/recipes/sip/nodes/imager_finalize.py -text
CEP/Pipeline/recipes/sip/nodes/imager_prepare.py eol=lf
CEP/Pipeline/recipes/sip/nodes/imager_source_finding.py eol=lf
CEP/Pipeline/recipes/sip/nodes/new_bbs.py eol=lf
CEP/Pipeline/recipes/sip/nodes/python_plugin.py -text
CEP/Pipeline/recipes/sip/nodes/python_plugin_loader.py -text
CEP/Pipeline/recipes/sip/nodes/rficonsole.py eol=lf
CEP/Pipeline/recipes/sip/nodes/setupparmdb.py eol=lf
CEP/Pipeline/recipes/sip/nodes/setupsourcedb.py eol=lf
......
......@@ -8,18 +8,31 @@ from lofarpipe.support.loggingdecorators import duration
from lofarpipe.support.data_map import DataMap, DataProduct, validate_data_maps
from lofarpipe.support.lofarexceptions import PipelineException
from lofarpipe.support.utilities import create_directory
import logging
from lofarpipe.support.pipelinelogging import getSearchingLogger
import lofarpipe.support.lofaringredient as ingredient
import loader
class GenericPipeline(control):
inputs = {
'loglevel': ingredient.StringField(
'--loglevel',
help="loglevel",
default='INFO',
optional=True
)
}
def __init__(self):
control.__init__(self)
self.parset = Parset()
self.input_data = {}
self.output_data = {}
self.parset_feedback_file = None
#self.logger = None#logging.RootLogger('DEBUG')
self.name = ''
def usage(self):
"""
......@@ -45,7 +58,12 @@ class GenericPipeline(control):
if not 'job_name' in self.inputs:
self.inputs['job_name'] = (
os.path.splitext(os.path.basename(parset_file))[0])
self.name = self.inputs['job_name']
try:
self.logger
except:
self.logger = getSearchingLogger(self.name)
self.logger.setLevel(self.inputs['loglevel'])
# Call the base-class's `go()` method.
return super(GenericPipeline, self).go()
......
......@@ -14,7 +14,7 @@ import lofarpipe.support.lofaringredient as ingredient
from lofarpipe.support.baserecipe import BaseRecipe
from lofarpipe.support.remotecommand import RemoteCommandRecipeMixIn
from lofarpipe.support.remotecommand import ComputeJob
from lofarpipe.support.data_map import DataMap, validate_data_maps, align_data_maps
from lofarpipe.support.data_map import DataMap, validate_data_maps, align_data_maps, DataProduct
from lofarpipe.support.parset import Parset
......@@ -187,31 +187,41 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn):
# try loading input/output data file, validate output vs the input location if
# output locations are provided
try:
indatas = []
inputmapfiles = []
if self.inputs['mapfile_in']:
indata = DataMap.load(self.inputs['mapfile_in'])
indatas.append(indata)
inputmapfiles.append(indata)
if self.inputs['mapfiles_in']:
for item in self.inputs['mapfiles_in']:
indatas.append(DataMap.load(item))
inputmapfiles.append(DataMap.load(item))
self.inputs['mapfile_in'] = self.inputs['mapfiles_in'][0]
#else:
# indatas.append(indata)
# inputmapfiles.append(indata)
except Exception:
self.logger.error('Could not load input Mapfile %s' % self.inputs['mapfile_in'])
return 1
outputmapfiles = []
prefix = os.path.join(self.inputs['working_directory'], self.inputs['job_name'])
if self.inputs['mapfile_out']:
try:
outdata = DataMap.load(self.inputs['mapfile_out'])
outputmapfiles.append(outdata)
except Exception:
self.logger.error('Could not load output Mapfile %s' % self.inputs['mapfile_out'])
return 1
# sync skip fields in the mapfiles
align_data_maps(indatas[0], outdata)
align_data_maps(inputmapfiles[0], outputmapfiles[0])
elif self.inputs['mapfiles_out']:
for item in self.inputs['mapfiles_out']:
outputmapfiles.append(DataMap.load(item))
self.inputs['mapfile_out'] = self.inputs['mapfiles_out'][0]
else:
# ouput will be directed in the working directory if no output mapfile is specified
outdata = copy.deepcopy(indatas[0])
outdata = copy.deepcopy(inputmapfiles[0])
if not self.inputs['inplace']:
for item in outdata:
item.file = os.path.join(
......@@ -220,28 +230,30 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn):
#os.path.basename(item.file) + '.' + os.path.split(str(executable))[1]
os.path.splitext(os.path.basename(item.file))[0] + '.' + self.inputs['stepname']
)
self.inputs['mapfile_out'] = os.path.join(os.path.dirname(self.inputs['mapfile_in']), os.path.basename(executable) + '.' + 'mapfile')
self.inputs['mapfile_out'] = os.path.join(prefix, self.inputs['stepname'] + '.' + 'mapfile')
self.inputs['mapfiles_out'].append(self.inputs['mapfile_out'])
else:
self.inputs['mapfile_out'] = self.inputs['mapfile_in']
self.inputs['mapfiles_out'].append(self.inputs['mapfile_out'])
outputmapfiles.append(outdata)
if not validate_data_maps(indatas[0], outdata):
if not validate_data_maps(inputmapfiles[0], outputmapfiles[0]):
self.logger.error(
"Validation of data mapfiles failed!"
)
return 1
# Handle multiple outputfiles
outputsuffix = self.inputs['outputsuffixes']
outputmapfiles = []
prefix = os.path.join(self.inputs['working_directory'], self.inputs['job_name'])
for name in outputsuffix:
outputmapfiles.append(copy.deepcopy(indatas[0]))
for item in outputmapfiles[-1]:
item.file = os.path.join(
prefix,
#os.path.basename(item.file) + '.' + os.path.split(str(executable))[1] + '.' + name
os.path.splitext(os.path.basename(item.file))[0] + '.' + self.inputs['stepname'] + name
)
if self.inputs['outputsuffixes']:
# Handle multiple outputfiles
for name in self.inputs['outputsuffixes']:
outputmapfiles.append(copy.deepcopy(inputmapfiles[0]))
self.inputs['mapfiles_out'].append(os.path.join(prefix, self.inputs['stepname'] + name + '.' + 'mapfile'))
for item in outputmapfiles[-1]:
item.file = os.path.join(
prefix,
os.path.splitext(os.path.basename(item.file))[0] + '.' + self.inputs['stepname'] + name
)
self.inputs['mapfile_out'] = self.inputs['mapfiles_out'][0]
# prepare arguments
arglist = self.inputs['arguments']
......@@ -252,51 +264,74 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn):
for k in parset.keys:
parsetdict[k] = str(parset[k])
#for k in parset.keys:
# arglist.append('--' + k + '=' + parset.getString(k))
#if not self.inputs['inputkey'] and not self.inputs['skip_infile']:
# arglist.insert(0, None)
# construct multiple input data
inputlist = []
keylist = []
if not self.inputs['inputkeys'] and self.inputs['inputkey']:
self.inputs['inputkeys'].append(self.inputs['inputkey'])
if indatas:
for item in indatas:
item.iterator = DataMap.SkipIterator
for mfile in indatas:
inputlist.append([])
for inp in mfile:
inputlist[-1].append(inp.file)
if not self.inputs['outputkeys'] and self.inputs['outputkey']:
self.inputs['outputkeys'].append(self.inputs['outputkey'])
if len(self.inputs['inputkeys']) is not len(inputmapfiles):
self.logger.error("Number of input mapfiles %d and input keys %d have to match." %
len(self.inputs['inputkeys']), len(inputmapfiles))
return 1
filedict = {}
if self.inputs['inputkeys'] and not self.inputs['skip_infile']:
for key, filemap in zip(self.inputs['inputkeys'], inputmapfiles):
filedict[key] = []
for inp in filemap:
filedict[key].append(inp.file)
if self.inputs['outputkey']:
filedict[self.inputs['outputkey']] = []
for item in outputmapfiles[0]:
filedict[self.inputs['outputkey']].append(item.file)
# if inputmapfiles and not self.inputs['skip_infile']:
# for key in self.inputs['inputkeys']:
# keylist.append(key)
# for item in inputmapfiles:
# item.iterator = DataMap.SkipIterator
# for mfile in inputmapfiles:
# inputlist.append([])
# for inp in mfile:
# inputlist[-1].append(inp.file)
#
# if self.inputs['outputkey']:
# inputlist.append([])
# keylist.append(self.inputs['outputkey'])
# for item in outputmapfiles[0]:
# inputlist[-1].append(item.file)
# ********************************************************************
# Call the node side of the recipe
# Create and schedule the compute jobs
command = "python %s" % (self.__file__.replace('master', 'nodes')).replace('executable_args', self.inputs['nodescript'])
indatas[0].iterator = outdata.iterator = DataMap.SkipIterator
inputmapfiles[0].iterator = outputmapfiles[0].iterator = DataMap.SkipIterator
jobs = []
for i, (outp, inp,) in enumerate(zip(
outdata, indatas[0])
outputmapfiles[0], inputmapfiles[0])
):
arglist_copy = copy.deepcopy(arglist)
parsetdict_copy = copy.deepcopy(parsetdict)
if self.inputs['inputkeys'] and not self.inputs['skip_infile']:
for name, value in zip(self.inputs['inputkeys'], inputlist):
#if keylist:
#for name, value in zip(keylist, inputlist):
if filedict:
for name, value in filedict.iteritems():
if arglist_copy and name in arglist_copy:
ind = arglist_copy.index(name)
arglist_copy[ind] = value[i]
elif name in parsetdict_copy.values():
for k, v in parsetdict_copy.iteritems():
if v == name:
parsetdict_copy[k] = value[i]
else:
parsetdict_copy[name] = value[i]
if self.inputs['outputkey'] and not self.inputs['skip_infile']:
if arglist_copy and self.inputs['outputkey'] in arglist_copy:
ind = arglist_copy.index(self.inputs['outputkey'])
arglist_copy[ind] = outp.file
else:
parsetdict_copy[self.inputs['outputkey']] = outp.file
jobs.append(
ComputeJob(
inp.host, command,
......@@ -315,10 +350,22 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn):
)
max_per_node = self.inputs['max_per_node']
self._schedule_jobs(jobs, max_per_node)
for job, outp in zip(jobs, outdata):
jobresultdict = {}
resultmap = {}
for job, outp in zip(jobs, outputmapfiles[0]):
if job.results['returncode'] != 0:
outp.skip = True
#print 'JOBRESULTS: ', job.results
for k, v in job.results.items():
if not k in jobresultdict:
jobresultdict[k] = []
jobresultdict[k].append(DataProduct(job.host, job.results[k], outp.skip))
# temp solution. write all output dict entries to a mapfile
for k, v in jobresultdict.items():
dmap = DataMap(v)
dmap.save(k + '.mapfile')
resultmap[k + '.mapfile'] = k + '.mapfile'
self.outputs.update(resultmap)
# *********************************************************************
# Check job results, and create output data map file
if self.error.isSet():
......@@ -330,21 +377,16 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn):
self.logger.warn(
"Some jobs failed, continuing with succeeded runs"
)
self.logger.debug("Writing data map file: %s" % self.inputs['mapfile_out'])
#outdata.save(self.inputs['mapfile_out'])
#self.outputs['mapfile'] = self.inputs['mapfile_out']
mapdict = {}
for item, name in zip(outputmapfiles, outputsuffix):
maploc = os.path.join(prefix, self.inputs['stepname'] + name + '.' + 'mapfile')
item.save(maploc)
mapdict[self.inputs['stepname'] + name] = maploc
#self.outputs[name] = name + '.' + 'mapfile'
for item, name in zip(outputmapfiles, self.inputs['mapfiles_out']):
self.logger.debug("Writing data map file: %s" % name)
item.save(name)
mapdict[os.path.basename(name)] = name
outdata.save(self.inputs['mapfile_out'])
self.outputs['mapfile'] = self.inputs['mapfile_out']
if outputsuffix:
if self.inputs['outputsuffixes']:
self.outputs.update(mapdict)
#self.outputs['mapfile'] = os.path.join(prefix, outputsuffix[0] + '.' + 'mapfile')
return 0
if __name__ == '__main__':
......
# LOFAR PIPELINE SCRIPT
#
# running an executable with arguments
# Stefan Froehlich, 2014
# s.froehlich@fz-juelich.de
# ------------------------------------------------------------------------------
from __future__ import with_statement
from subprocess import CalledProcessError
import os
import shutil
import sys
import errno
import imp
from lofarpipe.support.pipelinelogging import CatchLog4CPlus
from lofarpipe.support.pipelinelogging import log_time
from lofarpipe.support.utilities import create_directory
from lofarpipe.support.utilities import catch_segfaults
from lofarpipe.support.lofarnode import LOFARnodeTCP
from lofarpipe.support.parset import Parset
class python_plugin(LOFARnodeTCP):
"""
Basic script for running an executable with arguments.
"""
def run(self, infile, executable, args, kwargs, work_dir='/tmp', parsetasfile=False, args_format='', environment=''):
"""
This method contains all the needed functionality
"""
# Debugging info
self.logger.debug("infile = %s" % infile)
self.logger.debug("executable = %s" % executable)
self.logger.debug("working directory = %s" % work_dir)
self.logger.debug("arguments = %s" % args)
self.logger.debug("arg dictionary = %s" % kwargs)
self.logger.debug("environment = %s" % environment)
self.environment.update(environment)
# Time execution of this job
with log_time(self.logger):
#if os.path.exists(infile):
self.logger.info("Processing %s" % infile)
# Check if script is present
if not os.path.isfile(executable):
self.logger.error("Script %s not found" % executable)
return 1
# hurray! race condition when running with more than one process on one filesystem
if not os.path.isdir(work_dir):
try:
os.mkdir(work_dir, )
except OSError as exc: # Python >2.5
if exc.errno == errno.EEXIST and os.path.isdir(work_dir):
pass
else:
raise
if parsetasfile:
nodeparset = Parset()
parsetname = os.path.join(work_dir, os.path.basename(infile) + '.parset')
for k, v in kwargs.items():
nodeparset.add(k, v)
nodeparset.writeFile(parsetname)
args.insert(0, parsetname)
try:
# ****************************************************************
# Run
outdict = {}
plugin = imp.load_source('main', executable)
outdict = plugin.main(*args, **kwargs)
except CalledProcessError, err:
# CalledProcessError isn't properly propagated by IPython
self.logger.error(str(err))
return 1
except Exception, err:
self.logger.error(str(err))
return 1
if outdict:
for k, v in outdict.items():
self.outputs[k] = v
# We need some signal to the master script that the script ran ok.
self.outputs['ok'] = True
return 0
if __name__ == "__main__":
# If invoked directly, parse command line arguments for logger information
# and pass the rest to the run() method defined above
# --------------------------------------------------------------------------
jobid, jobhost, jobport = sys.argv[1:4]
sys.exit(python_plugin(jobid, jobhost, jobport).run_with_stored_arguments())
import sys
def load_plugin(name, path=None):
if path:
sys.path.append(path)
mod = __import__(name)
return mod
def call_plugin(path, name, *args, **kwargs):
plugin = load_plugin(path, name)
return plugin.main(*args, **kwargs)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment