Skip to content
Snippets Groups Projects
Commit d40da43d authored by Wouter Klijn's avatar Wouter Klijn
Browse files

Task #7058: COmmit the node recipe dir: Add the selfcal files and port changes the correct way

parent 8274e192
No related branches found
No related tags found
No related merge requests found
......@@ -1519,6 +1519,9 @@ CEP/Pipeline/recipes/sip/nodes/imager_source_finding.py eol=lf
CEP/Pipeline/recipes/sip/nodes/long_baseline.py eol=lf
CEP/Pipeline/recipes/sip/nodes/new_bbs.py eol=lf
CEP/Pipeline/recipes/sip/nodes/rficonsole.py eol=lf
CEP/Pipeline/recipes/sip/nodes/selfcal_awimager.py eol=lf
CEP/Pipeline/recipes/sip/nodes/selfcal_bbs.py eol=lf
CEP/Pipeline/recipes/sip/nodes/selfcal_finalize.py eol=lf
CEP/Pipeline/recipes/sip/nodes/setupparmdb.py eol=lf
CEP/Pipeline/recipes/sip/nodes/setupsourcedb.py eol=lf
CEP/Pipeline/recipes/sip/nodes/vdsmaker.py eol=lf
......
......@@ -25,24 +25,14 @@ class get_metadata(LOFARnodeTCP):
self.logger.error("Dataset %s does not exist" % (infile))
return 1
# # Get the product metadata. If data product type was not specified,
# # derive it from the input filename's extension.
# if not product_type:
# ext = os.path.splitext(infile)[1]
# if ext == ".MS": product_type = "Correlated"
# elif ext == ".INST": product_type = "InstrumentModel"
# elif ext == ".IM": product_type = "SkyImage"
# if not product_type:
# self.logger.error("File %s has unknown product type" % infile)
# return 1
self.logger.debug("Product type: %s" % product_type)
if product_type == "Correlated":
self.outputs = metadata.Correlated(infile).data()
self.outputs = metadata.Correlated(self.logger, infile).data()
elif product_type == "InstrumentModel":
self.outputs = metadata.InstrumentModel(infile).data()
self.outputs = metadata.InstrumentModel(self.logger,
infile).data()
elif product_type == "SkyImage":
self.outputs = metadata.SkyImage(infile).data()
self.outputs = metadata.SkyImage(self.logger, infile).data()
else:
self.logger.error("Unknown product type: %s" % product_type)
return 1
......
......@@ -62,6 +62,9 @@ class imager_bbs(LOFARnodeTCP):
if bbs_process_group.wait_for_finish() != None:
self.logger.error(
"Failed bbs run detected Aborting")
return 1 # If bbs failed we need to abort: the concat
# is now corrupt
except OSError, exception:
self.logger.error("Failed to execute bbs: {0}".format(str(
exception)))
......
......@@ -51,23 +51,26 @@ class imager_create_dbs(LOFARnodeTCP):
There is a single sourcedb for a concatenated measurement set/ image
3. Each individual timeslice needs a place to collect parameters: This is
done in the paramdb.
4. Assign the outputs of the script
4. Add the created databases as meta information to the measurment set
5. Assign the outputs of the script
"""
def run(self, concatenated_measurement_set, sourcedb_target_path,
monet_db_hostname, monet_db_port, monet_db_name, monet_db_user,
monet_db_password, assoc_theta, parmdb_executable, slice_paths,
parmdb_suffix, environment, working_directory, makesourcedb_path,
source_list_path_extern):
source_list_path_extern, major_cycle):
self.logger.info("Starting imager_create_dbs Node")
self.environment.update(environment)
#*******************************************************************
# 1. get a sourcelist: from gsm or from file
source_list, append = self._create_source_list(source_list_path_extern,
sourcedb_target_path, concatenated_measurement_set,
monet_db_hostname, monet_db_port, monet_db_name, monet_db_user,
source_list, append = self._create_source_list(
source_list_path_extern,sourcedb_target_path,
concatenated_measurement_set,monet_db_hostname,
monet_db_port, monet_db_name, monet_db_user,
monet_db_password, assoc_theta)
#*******************************************************************
......@@ -87,7 +90,13 @@ class imager_create_dbs(LOFARnodeTCP):
return 1
# *******************************************************************
# 4. Assign the outputs
# Add the create databases to the measurments set,
self._add_dbs_to_ms(concatenated_measurement_set, sourcedb_target_path,
parmdbs, major_cycle)
#*******************************************************************
# 5. Assign the outputs
self.outputs["sourcedb"] = sourcedb_target_path
self.outputs["parmdbs"] = parmdbs
return 0
......@@ -118,7 +127,8 @@ class imager_create_dbs(LOFARnodeTCP):
append = False
else:
source_list = source_list_path_extern
append = True
append = False # Nicolas Should this be true or false?
# later steps should not contain the original bootstrapping input
return source_list, append
......@@ -460,7 +470,44 @@ class imager_create_dbs(LOFARnodeTCP):
return None
def _add_dbs_to_ms(self, concatenated_measurement_set, sourcedb_target_path,
parmdbs_path, major_cycle):
"""
Add the in this recipe created sourcedb and instrument table(parmdb)
to the local measurementset.
"""
self.logger.info("Adding sourcemodel and instrument model to output ms.")
# Create the base meta information directory
meta_directory = concatenated_measurement_set + "_selfcal_information"
if not os.path.exists(meta_directory):
os.makedirs(meta_directory)
# Cycle dir
cycle_directory = os.path.join(meta_directory,
"cycle_" + str(major_cycle))
if not os.path.exists(cycle_directory):
os.makedirs(cycle_directory)
#COpy the actual data. parmdbs_path is a list!
sourcedb_directory = os.path.join(cycle_directory,
os.path.basename(sourcedb_target_path))
if os.path.exists(sourcedb_directory):
shutil.rmtree(sourcedb_directory) # delete dir to assure copy succeeds
shutil.copytree(sourcedb_target_path, sourcedb_directory)
#parmdbs_path is a list!
for parmdb_entry in parmdbs_path:
try:
parmdb_directory = os.path.join(cycle_directory,
os.path.basename(parmdb_entry))
# delete dir to assure copy succeeds
if os.path.exists(parmdb_directory):
shutil.rmtree(parmdb_directory)
shutil.copytree(parmdb_entry, parmdb_directory)
except:
self.logger.warn("Failed copying parmdb:")
self.logger.warn(parmdb_entry)
continue # slices might be missing, not an exit error
if __name__ == "__main__":
......
......@@ -34,13 +34,13 @@ class imager_finalize(LOFARnodeTCP):
5. Export sourcelist to msss server, copy the sourcelist to hdf5 location
6. Return the outputs
"""
def run(self, awimager_output, raw_ms_per_image, sourcelist, target,
def run(self, awimager_output, ms_per_image, sourcelist, target,
output_image, minbaseline, maxbaseline, processed_ms_dir,
fillrootimagegroup_exec, environment, sourcedb):
self.environment.update(environment)
"""
:param awimager_output: Path to the casa image produced by awimager
:param raw_ms_per_image: The X (90) measurements set scheduled to
:param ms_per_image: The X (90) measurements set scheduled to
create the image
:param sourcelist: list of sources found in the image
:param target: <unused>
......@@ -55,7 +55,7 @@ class imager_finalize(LOFARnodeTCP):
:rtype: self.outputs['image'] path to the produced hdf5 image
"""
with log_time(self.logger):
raw_ms_per_image_map = DataMap.load(raw_ms_per_image)
ms_per_image_map = DataMap.load(ms_per_image)
# *****************************************************************
# 1. add image info
......@@ -64,22 +64,22 @@ class imager_finalize(LOFARnodeTCP):
# TODO: BUG!! the meta data might contain files that were copied
# but failed in imager_bbs
processed_ms_paths = []
for item in raw_ms_per_image_map:
for item in ms_per_image_map:
path = item.file
raw_ms_file_name = os.path.split(path)[1]
#if the raw ms is in the processed dir (additional check)
if (raw_ms_file_name in file_list):
ms_file_name = os.path.split(path)[1]
#if the ms is in the processed dir (additional check)
if (ms_file_name in file_list):
# save the path
processed_ms_paths.append(os.path.join(processed_ms_dir,
raw_ms_file_name))
ms_file_name))
#add the information the image
try:
addimg.addImagingInfo(awimager_output, processed_ms_paths,
sourcedb, minbaseline, maxbaseline)
except Exception, error:
self.logger.error("addImagingInfo Threw Exception:")
self.logger.error(error)
self.logger.warn("addImagingInfo Threw Exception:")
self.logger.warn(error)
# Catch raising of already done error: allows for rerunning
# of the recipe
if "addImagingInfo already done" in str(error):
......@@ -143,8 +143,8 @@ class imager_finalize(LOFARnodeTCP):
(stdoutdata, stderrdata) = proc.communicate()
exit_status = proc.returncode
self.logger.error(stdoutdata)
self.logger.error(stderrdata)
self.logger.info(stdoutdata)
self.logger.info(stderrdata)
#if copy failed log the missing file
if exit_status != 0:
......
......@@ -10,7 +10,8 @@ import shutil
import os
import subprocess
import copy
import pyrap.tables as pt # order of pyrap import influences the type conversion binding
import pyrap.tables as pt # order of pyrap import influences the type
# conversion binding
from lofarpipe.support.pipelinelogging import CatchLog4CPlus
from lofarpipe.support.pipelinelogging import log_time
from lofarpipe.support.utilities import patch_parset
......@@ -19,7 +20,7 @@ from lofarpipe.support.lofarnode import LOFARnodeTCP
from lofarpipe.support.utilities import create_directory
from lofarpipe.support.data_map import DataMap
from lofarpipe.support.subprocessgroup import SubProcessGroup
from lofarpipe.recipes.helpers.data_quality import run_rficonsole, filter_bad_stations
# Some constant settings for the recipe
_time_slice_dir_name = "time_slices"
......@@ -29,31 +30,30 @@ class imager_prepare(LOFARnodeTCP):
"""
Steps perform on the node:
0. Create directories and assure that they are empty.
1. Collect the Measurement Sets (MSs): copy to the current node.
2. Start dppp: Combines the data from subgroups into single timeslice.
3. Flag rfi.
4. Add addImagingColumns to the casa ms.
5. Concatenate the time slice measurment sets, to a single virtual ms.
1. Create directories and assure that they are empty.
2. Collect the Measurement Sets (MSs): copy to the current node.
3. Start dppp: Combines the data from subgroups into single timeslice.
4. Flag rfi (toggle by parameter)
5. Add addImagingColumns to the casa ms.
6. Filter bad stations. Find station with repeated bad measurement and
remove these completely from the dataset.
**Members:**
7. Add measurmentset tables
8. Perform the (virtual) concatenation of the timeslices
"""
def run(self, environment, parset, working_dir, processed_ms_dir,
ndppp_executable, output_measurement_set,
time_slices_per_image, subbands_per_group, raw_ms_mapfile,
time_slices_per_image, subbands_per_group, input_ms_mapfile,
asciistat_executable, statplot_executable, msselect_executable,
rficonsole_executable, add_beam_tables):
rficonsole_executable, do_rficonsole, add_beam_tables):
"""
Entry point for the node recipe
"""
self.environment.update(environment)
with log_time(self.logger):
input_map = DataMap.load(raw_ms_mapfile)
input_map = DataMap.load(input_ms_mapfile)
#******************************************************************
# I. Create the directories used in this recipe
# 1. Create the directories used in this recipe
create_directory(processed_ms_dir)
# time slice dir_to_remove: assure empty directory: Stale data
......@@ -69,17 +69,20 @@ class imager_prepare(LOFARnodeTCP):
self.logger.debug("and assured it is empty")
#******************************************************************
# 1. Copy the input files
copied_ms_map = self._copy_input_files(
# 2. Copy the input files
# processed_ms_map will be the map containing all the 'valid'
# input ms
processed_ms_map = self._copy_input_files(
processed_ms_dir, input_map)
#******************************************************************
# 2. run dppp: collect frequencies into larger group
# 3. run dppp: collect frequencies into larger group
time_slices_path_list = \
self._run_dppp(working_dir, time_slice_dir,
time_slices_per_image, copied_ms_map, subbands_per_group,
time_slices_per_image, processed_ms_map, subbands_per_group,
processed_ms_dir, parset, ndppp_executable)
# If no timeslices were created, bail out with exit status 1
if len(time_slices_path_list) == 0:
self.logger.error("No timeslices were created.")
......@@ -88,13 +91,16 @@ class imager_prepare(LOFARnodeTCP):
self.logger.debug(
"Produced time slices: {0}".format(time_slices_path_list))
#***********************************************************
# 3. run rfi_concole: flag datapoints which are corrupted
self._run_rficonsole(rficonsole_executable, time_slice_dir,
time_slices_path_list)
# 4. run rfi_concole: flag datapoints which are corrupted
if (do_rficonsole):
run_rficonsole(rficonsole_executable, time_slice_dir,
time_slices_path_list, self.logger,
self.resourceMonitor )
#******************************************************************
# 4. Add imaging columns to each timeslice
# 5. Add imaging columns to each timeslice
# ndppp_executable fails if not present
for time_slice_path in time_slices_path_list:
pt.addImagingColumns(time_slice_path)
......@@ -103,29 +109,34 @@ class imager_prepare(LOFARnodeTCP):
time_slice_path))
#*****************************************************************
# 5. Filter bad stations
time_slice_filtered_path_list = self._filter_bad_stations(
# 6. Filter bad stations
time_slice_filtered_path_list = filter_bad_stations(
time_slices_path_list, asciistat_executable,
statplot_executable, msselect_executable)
statplot_executable, msselect_executable,
self.logger, self.resourceMonitor)
#*****************************************************************
# Add measurmenttables
# 7. Add measurementtables
if add_beam_tables:
self.add_beam_tables(time_slice_filtered_path_list)
self._add_beam_tables(time_slice_filtered_path_list)
#******************************************************************
# 6. Perform the (virtual) concatenation of the timeslices
# 8. Perform the (virtual) concatenation of the timeslices
self._concat_timeslices(time_slice_filtered_path_list,
output_measurement_set)
#******************************************************************
# *****************************************************************
# Write the actually used ms for the created dataset to the input
# mapfile
processed_ms_map.save(input_ms_mapfile)
# return
self.outputs["time_slices"] = \
time_slices_path_list
return 0
def add_beam_tables(self, time_slices_path_list):
def _add_beam_tables(self, time_slices_path_list):
beamtable_proc_group = SubProcessGroup(self.logger)
for ms_path in time_slices_path_list:
self.logger.debug("makebeamtables start")
......@@ -134,6 +145,7 @@ class imager_prepare(LOFARnodeTCP):
beamtable_proc_group.run(cmd_string)
if beamtable_proc_group.wait_for_finish() != None:
# TODO: Exception on error: make time_slices_path_list a mapfile
raise Exception("an makebeamtables run failed!")
self.logger.debug("makebeamtables finished")
......@@ -145,22 +157,24 @@ class imager_prepare(LOFARnodeTCP):
This function collects all the file in the input map in the
processed_ms_dir Return value is a set of missing files
"""
copied_ms_map = copy.deepcopy(input_map)
processed_ms_map = copy.deepcopy(input_map)
# loop all measurement sets
for input_item, copied_item in zip(input_map, copied_ms_map):
for input_item, processed_item in zip(input_map, processed_ms_map):
# fill the copied item with the correct data
copied_item.host = self.host
copied_item.file = os.path.join(
processed_item.host = self.host
processed_item.file = os.path.join(
processed_ms_dir, os.path.basename(input_item.file))
stderrdata = None
# If we have to skip this ms
if input_item.skip == True:
exit_status = 1 #
exit_status = 1
stderrdata = "SKIPPED_FILE"
# skip the copy if machine is the same (execution on localhost)
# make sure data is in the correct directory. for now: working_dir/[jobname]/subbands
#if input_item.host == "localhost":
# continue
else:
# use cp the copy if machine is the same ( localhost)
# make sure data is in the correct directory. for now:
# working_dir/[jobname]/subbands
# construct copy command
command = ["rsync", "-r", "{0}:{1}".format(
input_item.host, input_item.file),
......@@ -172,8 +186,8 @@ class imager_prepare(LOFARnodeTCP):
self.logger.debug("executing: " + " ".join(command))
# Spawn a subprocess and connect the pipes
# The copy step is performed 720 at once in that case which might
# saturate the cluster.
# The copy step is performed 720 at once in that case which
# might saturate the cluster.
copy_process = subprocess.Popen(
command,
stdin = subprocess.PIPE,
......@@ -189,14 +203,14 @@ class imager_prepare(LOFARnodeTCP):
# if copy failed log the missing file and update the skip fields
if exit_status != 0:
input_item.skip = True
copied_item.skip = True
processed_item.skip = True
self.logger.warning(
"Failed loading file: {0}".format(input_item.file))
self.logger.warning(stderrdata)
self.logger.debug(stdoutdata)
return copied_ms_map
return processed_ms_map
def _dppp_call(self, working_dir, ndppp, cmd, environment):
......@@ -204,6 +218,9 @@ class imager_prepare(LOFARnodeTCP):
Muckable function running the dppp executable.
Wraps dppp with catchLog4CPLus and catch_segfaults
"""
# TODO: cpu limited is static at this location
environment['OMP_NUM_THREADS'] = str(8)
self.logger.debug("Using %s threads for ndppp" % 8)
with CatchLog4CPlus(working_dir, self.logger.name +
"." + os.path.basename("imager_prepare_ndppp"),
os.path.basename(ndppp)) as logger:
......@@ -211,8 +228,8 @@ class imager_prepare(LOFARnodeTCP):
logger, cleanup = None, usageStats=self.resourceMonitor)
def _run_dppp(self, working_dir, time_slice_dir_path, slices_per_image,
copied_ms_map, subbands_per_image, collected_ms_dir_name, parset,
ndppp):
processed_ms_map, subbands_per_image, collected_ms_dir_name,
parset, ndppp):
"""
Run NDPPP:
Create dir for grouped measurements, assure clean workspace
......@@ -223,27 +240,63 @@ class imager_prepare(LOFARnodeTCP):
for idx_time_slice in range(slices_per_image):
start_slice_range = idx_time_slice * subbands_per_image
end_slice_range = (idx_time_slice + 1) * subbands_per_image
# Get the subset of ms that are part of the current timeslice,
# cast to datamap
input_map_subgroup = DataMap(
copied_ms_map[start_slice_range:end_slice_range])
output_ms_name = "time_slice_{0}.dppp.ms".format(idx_time_slice)
# construct time slice name
time_slice_path = os.path.join(time_slice_dir_path,
output_ms_name)
# convert the datamap to a file list: Do not remove skipped files:
# ndppp needs the incorrect files there to allow filling with zeros
ndppp_input_ms = [item.file for item in input_map_subgroup]
# convert the datamap to a file list: Add nonfalid entry for
# skipped files: ndppp needs the incorrect files there to allow
# filling with zeros
ndppp_input_ms = []
nchan_known = False
for item in processed_ms_map[start_slice_range:end_slice_range]:
if item.skip:
ndppp_input_ms.append("SKIPPEDSUBBAND")
else:
# From the first non skipped filed get the nchan
if not nchan_known:
try:
# Automatically average the number of channels in
# the output to 1
# open the datasetassume same nchan for all sb
table = pt.table(item.file) #
# get the data column, get description, get the
# shape, first index returns the number of channels
nchan_input = str(
pt.tablecolumn(table,
'DATA').getdesc()["shape"][0])
nchan_known = True
# corrupt input measurement set
except:
item.skip = True
ndppp_input_ms.append("SKIPPEDSUBBAND")
continue
ndppp_input_ms.append(item.file)
# Join into a single list of paths.
# if none of the input files was valid, skip the creation of the
# timeslice all together, it will not show up in the timeslice
# mapfile
if not nchan_known:
continue
# TODO/FIXME: dependency on the step name!!!!
ndppp_nchan_key = "avg1.freqstep"
# Join into a single string list of paths.
msin = "['{0}']".format("', '".join(ndppp_input_ms))
# Update the parset with computed parameters
patch_dictionary = {'uselogger': 'True', # enables log4cplus
'msin': msin,
'msout': time_slice_path}
'msout': time_slice_path,
ndppp_nchan_key:nchan_input}
nddd_parset_path = time_slice_path + ".ndppp.par"
try:
temp_parset_filename = patch_parset(parset, patch_dictionary)
......@@ -278,11 +331,10 @@ class imager_prepare(LOFARnodeTCP):
time_slice_path_list.append(time_slice_path)
# On error the current timeslice should be skipped
except subprocess.CalledProcessError, exception:
self.logger.warning(str(exception))
continue
# and the input ms should have the skip set
except Exception, exception:
for item in processed_ms_map[start_slice_range:end_slice_range]:
item.skip = True
self.logger.warning(str(exception))
continue
......@@ -300,138 +352,6 @@ class imager_prepare(LOFARnodeTCP):
"mentset: {1}".format(
", ".join(group_measurements_collected), output_file_path))
def _run_rficonsole(self, rficonsole_executable, time_slice_dir,
time_slices):
"""
_run_rficonsole runs the rficonsole application on the supplied
timeslices in time_slices.
"""
# loop all measurement sets
rfi_temp_dir = os.path.join(time_slice_dir, "rfi_temp_dir")
create_directory(rfi_temp_dir)
try:
rfi_console_proc_group = SubProcessGroup(self.logger,
self.resourceMonitor)
for time_slice in time_slices:
# Each rfi console needs own working space for temp files
temp_slice_path = os.path.join(rfi_temp_dir,
os.path.basename(time_slice))
create_directory(temp_slice_path)
# construct copy command
self.logger.info(time_slice)
command = [rficonsole_executable, "-indirect-read",
time_slice]
self.logger.info("executing rficonsole command: {0}".format(
" ".join(command)))
# Add the command to the process group
rfi_console_proc_group.run(command, cwd = temp_slice_path)
# wait for all to finish
if rfi_console_proc_group.wait_for_finish() != None:
raise Exception("an rfi_console_proc_group run failed!")
finally:
shutil.rmtree(rfi_temp_dir)
def _filter_bad_stations(self, time_slice_path_list,
asciistat_executable, statplot_executable, msselect_executable):
"""
A Collection of scripts for finding and filtering of bad stations:
1. First a number of statistics with regards to the spread of the data
is collected using the asciistat_executable.
2. Secondly these statistics are consumed by the statplot_executable
which produces a set of bad stations.
3. In the final step the bad stations are removed from the dataset
using ms select
REF: http://www.lofar.org/wiki/lib/exe/fetch.php?media=msss:pandeymartinez-week9-v1p2.pdf
"""
# run asciistat to collect statistics about the ms
self.logger.info("Filtering bad stations")
self.logger.debug("Collecting statistical properties of input data")
asciistat_output = []
asciistat_proc_group = SubProcessGroup(self.logger)
for ms in time_slice_path_list:
output_dir = ms + ".filter_temp"
create_directory(output_dir)
asciistat_output.append((ms, output_dir))
cmd_string = "{0} -i {1} -r {2}".format(asciistat_executable,
ms, output_dir)
asciistat_proc_group.run(cmd_string)
if asciistat_proc_group.wait_for_finish() != None:
raise Exception("an ASCIIStats run failed!")
# Determine the station to remove
self.logger.debug("Select bad stations depending on collected stats")
asciiplot_output = []
asciiplot_proc_group = SubProcessGroup(self.logger)
for (ms, output_dir) in asciistat_output:
ms_stats = os.path.join(
output_dir, os.path.split(ms)[1] + ".stats")
cmd_string = "{0} -i {1} -o {2}".format(statplot_executable,
ms_stats, ms_stats)
asciiplot_output.append((ms, ms_stats))
asciiplot_proc_group.run(cmd_string)
if asciiplot_proc_group.wait_for_finish() != None:
raise Exception("an ASCIIplot run failed!")
# remove the bad stations
self.logger.debug("Use ms select to remove bad stations")
msselect_output = {}
msselect_proc_group = SubProcessGroup(self.logger)
for ms, ms_stats in asciiplot_output:
# parse the .tab file containing the bad stations
station_to_filter = []
file_pointer = open(ms_stats + ".tab")
for line in file_pointer.readlines():
# skip headed line
if line[0] == "#":
continue
entries = line.split()
# if the current station is bad (the last entry on the line)
if entries[-1] == "True":
# add the name of station
station_to_filter.append(entries[1])
# if this measurement does not contain baselines to skip do not
# filter and provide the original ms as output
if len(station_to_filter) == 0:
msselect_output[ms] = ms
continue
ms_output_path = ms + ".filtered"
msselect_output[ms] = ms_output_path
# use msselect to remove the stations from the ms
msselect_baseline = "!{0}".format(",".join(station_to_filter))
cmd_string = "{0} in={1} out={2} baseline={3} deep={4}".format(
msselect_executable, ms, ms_output_path,
msselect_baseline, "False")
msselect_proc_group.run(cmd_string)
if msselect_proc_group.wait_for_finish() != None:
raise Exception("an MSselect run failed!")
filtered_list_of_ms = []
# The order of the inputs needs to be preserved when producing the
# filtered output!
for input_ms in time_slice_path_list:
filtered_list_of_ms.append(msselect_output[input_ms])
return filtered_list_of_ms
if __name__ == "__main__":
......
# LOFAR AUTOMATIC IMAGING PIPELINE
# awimager
# The awimager recipe creates based an image of the field of view. Based on
# nine concatenated and measurementsets each spanning 10 subbands
# The recipe contains two parts: The call to awimager
# and secondairy some functionality that calculates settings (for awimager)
# based on the information present in the measurement set
# The calculated parameters are:
# 1: The cellsize
# 2: The npixels in a each of the two dimension of the image
# 3. What columns use to determine the maximum baseline
# 4. The number of projection planes
# Wouter Klijn 2012
# klijn@astron.nl
# Nicolas Vilchez, 2014
# vilchez@astron.nl
# -----------------------------------------------------------------------------
from __future__ import with_statement
import sys
import shutil
import os.path
import math
import pyfits
from lofarpipe.support.lofarnode import LOFARnodeTCP
from lofarpipe.support.pipelinelogging import CatchLog4CPlus
from lofarpipe.support.pipelinelogging import log_time
from lofarpipe.support.utilities import patch_parset
from lofarpipe.support.utilities import get_parset
from lofarpipe.support.utilities import catch_segfaults
from lofarpipe.support.lofarexceptions import PipelineException
import pyrap.tables as pt # @UnresolvedImport
from subprocess import CalledProcessError
from lofarpipe.support.utilities import create_directory
import pyrap.images as pim # @UnresolvedImport
from lofarpipe.support.parset import Parset
import lofar.parmdb # @UnresolvedImport
import numpy as np
class selfcal_awimager(LOFARnodeTCP):
def run(self, executable, environment, parset, working_directory,
output_image, concatenated_measurement_set, sourcedb_path,
mask_patch_size, autogenerate_parameters, specify_fov, fov,
major_cycle, nr_cycles, perform_self_cal):
"""
:param executable: Path to awimager executable
:param environment: environment for catch_segfaults (executable runner)
:param parset: parameters for the awimager,
:param working_directory: directory the place temporary files
:param output_image: location and filesname to story the output images
the multiple images are appended with type extentions
:param concatenated_measurement_set: Input measurement set
:param sourcedb_path: Path the the sourcedb used to create the image
mask
:param mask_patch_size: Scaling of the patch around the source in the
mask
:param autogenerate_parameters: Turns on the autogeneration of:
cellsize, npix, wprojplanes, wmax, fov
:param fov: if autogenerate_parameters is false calculate
imageparameter (cellsize, npix, wprojplanes, wmax) relative to this
fov
:param major_cycle: number of the self calibration cycle to determine
the imaging parameters: cellsize, npix, wprojplanes, wmax, fov
:param nr_cycles: The requested number of self cal cycles
:param perform_self_cal: Bool used to control the selfcal functionality
or the old semi-automatic functionality
:rtype: self.outputs["image"] The path to the output image
"""
self.logger.info("Start selfcal_awimager node run:")
log4_cplus_name = "selfcal_awimager"
self.environment.update(environment)
with log_time(self.logger):
# Read the parameters as specified in the parset
parset_object = get_parset(parset)
# *************************************************************
# 1. Calculate awimager parameters that depend on measurement set
# and the parset
if perform_self_cal:
# Calculate awimager parameters that depend on measurement set
# and the parset
self.logger.info(
"Calculating selfcalibration parameters ")
cell_size, npix, w_max, w_proj_planes, \
UVmin, UVmax, robust, threshold =\
self._get_selfcal_parameters(
concatenated_measurement_set,
parset, major_cycle, nr_cycles)
self._save_selfcal_info(concatenated_measurement_set,
major_cycle, npix, UVmin, UVmax)
else:
self.logger.info(
"Calculating parameters.. ( NOT selfcalibration)")
cell_size, npix, w_max, w_proj_planes = \
self._get_imaging_parameters(
concatenated_measurement_set,
parset,
autogenerate_parameters,
specify_fov,
fov)
self.logger.info("Using autogenerated parameters; ")
self.logger.info(
"Calculated parameters: cell_size: {0}, npix: {1}".format(
cell_size, npix))
self.logger.info("w_max: {0}, w_proj_planes: {1} ".format(
w_max, w_proj_planes))
# ****************************************************************
# 2. Get the target image location from the mapfile for the parset.
# Create target dir if it not exists
image_path_head = os.path.dirname(output_image)
create_directory(image_path_head)
self.logger.debug("Created directory to place awimager output"
" files: {0}".format(image_path_head))
# ****************************************************************
# 3. Create the mask
#mask_file_path = self._create_mask(npix, cell_size, output_image,
# concatenated_measurement_set, executable,
# working_directory, log4_cplus_name, sourcedb_path,
# mask_patch_size, image_path_head)
# *****************************************************************
# 4. Update the parset with calculated parameters, and output image
patch_dictionary = {'uselogger': 'True', # enables log4cpluscd log
'ms': str(concatenated_measurement_set),
'cellsize': str(cell_size),
'npix': str(npix),
'wmax': str(w_max),
'wprojplanes': str(w_proj_planes),
'image': str(output_image),
'maxsupport': str(npix)
# 'mask':str(mask_file_path), #TODO REINTRODUCE
# MASK, excluded to speed up in this debug stage
}
# Add some aditional keys from the self calibration method
if perform_self_cal:
self_cal_patch_dict = {
'weight': 'briggs',
'padding': str(1.18),
'niter' : str(1000000),
'operation' : 'mfclark',
'timewindow' : '300',
'fits' : '',
'threshold' : str(threshold),
'robust' : str(robust),
'UVmin' : str(UVmin),
'UVmax' : str(UVmax),
'maxbaseline' : str(10000000),
'select' : str("sumsqr(UVW[:2])<1e12"),
}
patch_dictionary.update(self_cal_patch_dict)
# save the parset at the target dir for the image
calculated_parset_path = os.path.join(image_path_head,
"parset.par")
try:
temp_parset_filename = patch_parset(parset, patch_dictionary)
# Copy tmp file to the final location
shutil.copyfile(temp_parset_filename, calculated_parset_path)
self.logger.debug("Wrote parset for awimager run: {0}".format(
calculated_parset_path))
finally:
# remove temp file
os.remove(temp_parset_filename)
# *****************************************************************
# 5. Run the awimager with the parameterset
# TODO: FIXME: manually Limit number of threads used.
omp_num_threads = 8
self.environment['OMP_NUM_THREADS'] = str(omp_num_threads)
self.logger.debug("Using %s threads for swimager" % omp_num_threads)
cmd = [executable, calculated_parset_path]
self.logger.debug("Parset used for awimager run:")
self.logger.debug(cmd)
try:
with CatchLog4CPlus(working_directory,
self.logger.name + "." +
os.path.basename(log4_cplus_name),
os.path.basename(executable)
) as logger:
catch_segfaults(cmd, working_directory, self.environment,
logger, usageStats=self.resourceMonitor)
# Thrown by catch_segfault
except CalledProcessError, exception:
self.logger.error(str(exception))
return 1
except Exception, exception:
self.logger.error(str(exception))
return 1
# *********************************************************************
# 6. Return output
# Append static .restored: This might change but prob. not
# The actual output image has this extention always, default of
# awimager
self.outputs["image"] = output_image + ".restored"
return 0
def _get_imaging_parameters(self, measurement_set, parset,
autogenerate_parameters, specify_fov, fov):
"""
(1) calculate and format some parameters that are determined runtime.
Based on values in the measurementset and input parameter (set):
a. <string> The cellsize
b. <int> The npixels in a each of the two dimension of the image
c. <string> The largest baseline in the ms smaller then the maxbaseline
d. <string> The number of projection planes
The calculation of these parameters is done in three steps:
1. Calculate intermediate results based on the ms.
2. The calculation of the actual target values using intermediate
result
"""
# *********************************************************************
# 1. Get partial solutions from the parameter set
# Get the parset and a number of raw parameters from this parset
parset_object = get_parset(parset)
baseline_limit = parset_object.getInt('maxbaseline')
# Get the longest baseline
max_baseline = pt.taql(
'CALC sqrt(max([select sumsqr(UVW[:2]) from ' + \
'{0} where sumsqr(UVW[:2]) <{1} giving as memory]))'.format(\
measurement_set, baseline_limit *
baseline_limit))[0] # ask ger van diepen for details if ness.
# Calculate the wave_length
table_ms = pt.table(measurement_set)
table_spectral_window = pt.table(
table_ms.getkeyword("SPECTRAL_WINDOW"))
freq = table_spectral_window.getcell("REF_FREQUENCY", 0)
table_spectral_window.close()
wave_length = pt.taql('CALC C()') / freq
wave_length = wave_length[0]
# Calculate the cell_size from the ms
arc_sec_in_degree = 3600
arc_sec_in_rad = (180.0 / math.pi) * arc_sec_in_degree
cell_size = (1.0 / 3) * (wave_length / float(max_baseline))\
* arc_sec_in_rad
# Calculate the number of pixels in x and y dim
# fov and diameter depending on the antenna name
fov_from_ms, station_diameter = self._get_fov_and_station_diameter(
measurement_set)
# use fov for to calculate a semi 'user' specified npix and cellsize
# The npix thus depends on the ms cellsize and fov
# Do not use use supplied Fov if autogenerating
if not autogenerate_parameters and specify_fov:
if fov == 0.0:
raise PipelineException("fov set to 0.0: invalid value.")
# else use full resolution (calculate the fov)
else:
self.logger.info("Using fov calculated on measurement data: " +
str(fov_from_ms))
fov = fov_from_ms
# ********************************************************************
# 2. Calculate the ms based output variables
# 'optimal' npix based on measurement set calculations or user specified
npix = (arc_sec_in_degree * fov) / cell_size
# Get the closest power of two larger then the calculated pixel size
npix = self._nearest_ceiled_power2(npix)
# Get the max w with baseline < 10000
w_max = pt.taql('CALC max([select UVW[2] from ' + \
'{0} where sumsqr(UVW[:2]) <{1} giving as memory])'.format(
measurement_set, baseline_limit * baseline_limit))[0]
# Calculate number of projection planes
w_proj_planes = min(257, math.floor((max_baseline * wave_length) /
(station_diameter ** 2)))
w_proj_planes = int(round(w_proj_planes))
# MAximum number of proj planes set to 1024: George Heald, Ger van
# Diepen if this exception occurs
maxsupport = max(1024, npix)
if w_proj_planes > maxsupport:
raise Exception("The number of projections planes for the current"
+ "measurement set is to large.")
# *********************************************************************
# 3. if the npix from the parset is different to the ms calculations,
# calculate a sizeconverter value (to be applied to the cellsize)
if npix < 256:
self.logger.warn("Using a image size smaller then 256x256:"
" This leads to problematic imaging in some instances!!")
# If we are not autocalculating based on ms or fov, use the npix
# and cell_size specified in the parset
# keep the wmax and w_proj_planes
if (not autogenerate_parameters and not specify_fov):
npix = parset_object.getString('npix')
cell_size_formatted = parset_object.getString('cellsize')
else:
cell_size_formatted = str(
int(round(cell_size))) + 'arcsec'
self.logger.info("Using the following awimager parameters:"
" cell_size: {0}, npix: {1},".format(
cell_size_formatted, npix) +
" w_max: {0}, w_proj_planes: {1}".format(w_max, w_proj_planes))
return cell_size_formatted, str(npix), str(w_max), str(w_proj_planes)
# Awimager parameters for selfcal process (depends with major cycle)
# nicolas: THis function needs a lot more documentation:
# THis is the function that does the magic.
# For each step everything must be cristal clear what is happening.
# I will need to support this function
# this function is full with magic numbers
# Instead of:
# variable_a = 3.14 * 3600 * 5
# use:
# pi = math.pi
# second_in_hour = 3600
# factorx = 5 # Factor controlling x, value based on manual optimalization
def _get_selfcal_parameters(self, measurement_set, parset, major_cycle,
nr_cycles):
"""
0. modify the nof cycle to have a final step at the same resolution
as the previous last cycle
1. Determine target coordinates especially declinaison, because
for low dec (<35 deg) UVmin = 0.1 to excluse very short baseline
2. Determine the frequency and the wavelenght
3. Determine the longuest baseline and the best resolution avaible
4. Estimate all imaging parameters
5. Calculate number of projection planes
6. Pixelsize must be a string number : number +arcsec
# Nicolas Vilchez, 2014
# vilchez@astron.nl
"""
# ********************************************************************
#0. modify the nof cycle to have a final step at the same resolution
#as the previous last cycle
if major_cycle < nr_cycles-1:
nr_cycles = nr_cycles-1
scaling_factor = float(major_cycle) / float(nr_cycles - 1)
# ********************************************************************
#1. Determine Target coordinates for UVmin
tabtarget = pt.table(measurement_set)
tabfield = pt.table(tabtarget.getkeyword('FIELD'))
coords = tabfield.getcell('REFERENCE_DIR',0)
target = coords[0] * 180.0 / math.pi # Why
UVmin=0
if target[1] <= 35: # WHy?
UVmin = 0.1
ra_target = target[0] + 360.0 # Why
dec_target = target[1]
# ********************************************************************
# 2. Determine the frequency and the wavelenght
tabfreq = pt.table(measurement_set)
table_spectral_window = pt.table(tabfreq.getkeyword("SPECTRAL_WINDOW"))
frequency = table_spectral_window.getcell('REF_FREQUENCY', 0)
wavelenght = 3.0E8 / frequency # Why
# ********************************************************************
# 3. Determine the longuest baseline and the best resolution avaible
tabbaseline = pt.table(measurement_set, readonly=False, ack=True)
posbaseline = tabbaseline.getcol('UVW')
maxBaseline = max(posbaseline[:, 0] ** 2 +
posbaseline[:, 1] ** 2) ** 0.5
bestBeamresol = round((wavelenght / maxBaseline) *
(180.0 / math.pi) * 3600.0, 0)
# Beam resolution limitation to 10arcsec to avoid too large images
if bestBeamresol < 10.0:
bestBeamresol = 10.0
# ********************************************************************
# 4. Estimate all imaging parameters
# estimate fov
# fov = 5 degree, except for High HBA Observation => 1.5 degree
if frequency > 1.9E8:
fov = 1.5
else:
fov = 5.0
# we need 4 pixel/beam to have enough sampling
pixPerBeam = 4.0
# best resolution pixel size (i.e final pixel size for selfcal)
bestPixelResol = round(bestBeamresol / pixPerBeam, 2)
# factor to estimate the starting resolution (9 times in this case)
badResolFactor = 9
pixsize = round((badResolFactor * bestPixelResol) -
(badResolFactor * bestPixelResol - bestPixelResol) *
scaling_factor , 3)
# number of pixel must be a multiple of 2 !!
nbpixel = int(fov * 3600.0 / pixsize)
if nbpixel % 2 ==1:
nbpixel = nbpixel + 1
robust = 0 #round(1.0 - (3.0 * scaling_factor), 2)
UVmax = round((wavelenght) /
(pixPerBeam * pixsize / 3600.0 * math.pi / 180.0 ) /
(1E3 * wavelenght), 3)
wmax = round(UVmax * (wavelenght) * 1E3, 3)
# ********************************************************************
# 5. Calculate number of projection planes
# Need to compute station diameter (the fov is fixed to 5 degree)
# using wouter's function, to compute the w_proj_planes
# fov and diameter depending on the antenna name
fov_from_ms, station_diameter = self._get_fov_and_station_diameter(
measurement_set)
w_proj_planes = min(257, math.floor((maxBaseline * wavelenght) /
(station_diameter ** 2)))
w_proj_planes = int(round(w_proj_planes))
# MAximum number of proj planes set to 1024: George Heald, Ger van
# Diepen if this exception occurs
maxsupport = max(1024, nbpixel)
if w_proj_planes > maxsupport:
raise Exception("The number of projections planes for the current" +
"measurement set is to large.")
# Warnings on pixel size
if nbpixel < 256:
self.logger.warn("Using a image size smaller then 256x256: This " +
"leads to problematic imaging in some instances!!")
# ********************************************************************
# 6. Pixelsize must be a string number : number +arcsec
# conversion at this step
pixsize = str(pixsize)+'arcsec'
# ********************************************************************
# 7. Threshold determination from the previous cycle
if major_cycle == 0:
threshold = '0.075Jy'
else:
fits_image_path_list = measurement_set.split('concat.ms')
fits_image_path = fits_image_path_list[0] +\
'awimage_cycle_%s/image.fits'%(major_cycle-1)
# open a FITS file
fitsImage = pyfits.open(fits_image_path)
scidata = fitsImage[0].data
dataRange = range(fitsImage[0].shape[2])
sortedData = range(fitsImage[0].shape[2] ** 2)
# FIXME We have the sneaking suspicion that this takes very long
# due to bad coding style... (double for loop with compute in inner loop)
for i in dataRange:
for j in dataRange:
sortedData[i * fitsImage[0].shape[2] + j] = scidata[0,0,i,j]
sortedData = sorted(sortedData)
# Percent of faintest data to use to determine 5sigma value : use 5%
dataPercent = int(fitsImage[0].shape[2] * 0.05)
fiveSigmaData = sum(sortedData[0:dataPercent]) / dataPercent
threshold = (abs(fiveSigmaData) / 5.0) * (2.335 / 2.0) * 15
return pixsize, str(nbpixel), str(wmax), str(w_proj_planes), \
str(UVmin), str(UVmax), str(robust), str(threshold)
def _get_fov_and_station_diameter(self, measurement_set):
"""
_field_of_view calculates the fov, which is dependend on the
station type, location and mode:
For details see:
(1) http://www.astron.nl/radio-observatory/astronomers/lofar-imaging-capabilities-sensitivity/lofar-imaging-capabilities/lofar
"""
# Open the ms
table_ms = pt.table(measurement_set)
# Get antenna name and observation mode
antenna = pt.table(table_ms.getkeyword("ANTENNA"))
antenna_name = antenna.getcell('NAME', 0)
antenna.close()
observation = pt.table(table_ms.getkeyword("OBSERVATION"))
antenna_set = observation.getcell('LOFAR_ANTENNA_SET', 0)
observation.close()
# static parameters for the station diameters ref (1)
hba_core_diameter = 30.8
hba_remote_diameter = 41.1
lba_inner = 32.3
lba_outer = 81.3
# use measurement set information to assertain antenna diameter
station_diameter = None
if antenna_name.count('HBA'):
if antenna_name.count('CS'):
station_diameter = hba_core_diameter
elif antenna_name.count('RS'):
station_diameter = hba_remote_diameter
elif antenna_name.count('LBA'):
if antenna_set.count('INNER'):
station_diameter = lba_inner
elif antenna_set.count('OUTER'):
station_diameter = lba_outer
# raise exception if the antenna is not of a supported type
if station_diameter == None:
self.logger.error(
'Unknown antenna type for antenna: {0} , {1}'.format(\
antenna_name, antenna_set))
raise PipelineException(
"Unknown antenna type encountered in Measurement set")
# Get the wavelength
spectral_window_table = pt.table(table_ms.getkeyword(
"SPECTRAL_WINDOW"))
freq = float(spectral_window_table.getcell("REF_FREQUENCY", 0))
wave_length = pt.taql('CALC C()') / freq
spectral_window_table.close()
# Now calculate the FOV see ref (1)
# alpha_one is a magic parameter: The value 1.3 is representative for a
# WSRT dish, where it depends on the dish illumination
alpha_one = 1.3
# alpha_one is in radians so transform to degrees for output
fwhm = alpha_one * (wave_length / station_diameter) * (180 / math.pi)
fov = fwhm / 2.0
table_ms.close()
return fov, station_diameter
def _create_mask(self, npix, cell_size, output_image,
concatenated_measurement_set, executable,
working_directory, log4_cplus_name, sourcedb_path,
mask_patch_size, image_path_directory):
"""
(3) create a casa image containing an mask blocking out the
sources in the provided sourcedb.
It expects:
a. the ms for which the mask will be created, it is used to de
termine some image details: (eg. pointing)
b. parameters for running within the catchsegfault framework
c. and the size of the mask_pach.
To create a mask, first a empty measurement set is created using
awimager: ready to be filled with mask data
This function is a wrapper around some functionality written by:
fdg@mpa-garching.mpg.de
steps:
1. Create a parset with image paramters used by:
2. awimager run. Creating an empty casa image.
3. Fill the casa image with mask data
"""
# ********************************************************************
# 1. Create the parset used to make a mask
mask_file_path = output_image + ".mask"
mask_patch_dictionary = {"npix": str(npix),
"cellsize": str(cell_size),
"image": str(mask_file_path),
"ms": str(concatenated_measurement_set),
"operation": "empty",
"stokes": "'I'"
}
mask_parset = Parset.fromDict(mask_patch_dictionary)
mask_parset_path = os.path.join(image_path_directory, "mask.par")
mask_parset.writeFile(mask_parset_path)
self.logger.debug(
"Write parset for awimager mask creation: {0}".format(
mask_parset_path))
# *********************************************************************
# 2. Create an empty mask using awimager
cmd = [executable, mask_parset_path]
self.logger.info(" ".join(cmd))
try:
with CatchLog4CPlus(working_directory,
self.logger.name + "." + os.path.basename(log4_cplus_name),
os.path.basename(executable)
) as logger:
catch_segfaults(cmd, working_directory, self.environment,
logger)
# Thrown by catch_segfault
except CalledProcessError, exception:
self.logger.error(str(exception))
return 1
except Exception, exception:
self.logger.error(str(exception))
return 1
# ********************************************************************
# 3. create the actual mask
self.logger.debug("Started mask creation using mask_patch_size:"
" {0}".format(mask_patch_size))
self._msss_mask(mask_file_path, sourcedb_path, mask_patch_size)
self.logger.debug("Fished mask creation")
return mask_file_path
def _msss_mask(self, mask_file_path, sourcedb_path, mask_patch_size = 1.0):
"""
Fill casa image with a mask based on skymodel(sourcedb)
Bugs: fdg@mpa-garching.mpg.de
pipeline implementation klijn@astron.nl
version 0.32
Edited by JDS, 2012-03-16:
- Properly convert maj/minor axes to half length
- Handle empty fields in sky model by setting them to 0
- Fix off-by-one error at mask boundary
FIXED BUG
- if a source is outside the mask, the script ignores it
- if a source is on the border, the script draws only the inner part
- can handle skymodels with different headers
KNOWN BUG
- not works with single line skymodels, workaround: add a fake
source outside the field
- mask patched display large amounts of aliasing. A possible
sollution would
be normalizing to pixel centre. ( int(normalize_x * npix) /
npix + (0.5 /npix))
ideally the patch would increment in pixel radiuses
Version 0.3 (Wouter Klijn, klijn@astron.nl)
- Usage of sourcedb instead of txt document as 'source' of sources
This allows input from different source sources
Version 0.31 (Wouter Klijn, klijn@astron.nl)
- Adaptable patch size (patch size needs specification)
- Patch size and geometry is broken: needs some astronomer magic to
fix it, problem with afine transformation prol.
Version 0.32 (Wouter Klijn, klijn@astron.nl)
- Renaming of variable names to python convention
"""
# increment in maj/minor axes [arcsec]
pad = 500.
# open mask
mask = pim.image(mask_file_path, overwrite = True)
mask_data = mask.getdata()
xlen, ylen = mask.shape()[2:]
freq, stokes, null, null = mask.toworld([0, 0, 0, 0])
# Open the sourcedb:
table = pt.table(sourcedb_path + "::SOURCES")
pdb = lofar.parmdb.parmdb(sourcedb_path)
# Get the data of interest
source_list = table.getcol("SOURCENAME")
source_type_list = table.getcol("SOURCETYPE")
# All date in the format valuetype:sourcename
all_values_dict = pdb.getDefValues()
# Loop the sources
for source, source_type in zip(source_list, source_type_list):
if source_type == 1:
type_string = "Gaussian"
else:
type_string = "Point"
self.logger.info("processing: {0} ({1})".format(source,
type_string))
# Get de right_ascension and declination (already in radians)
right_ascension = all_values_dict["Ra:" + source][0, 0]
declination = all_values_dict["Dec:" + source][0, 0]
if source_type == 1:
# Get the raw values from the db
maj_raw = all_values_dict["MajorAxis:" + source][0, 0]
min_raw = all_values_dict["MinorAxis:" + source][0, 0]
pa_raw = all_values_dict["Orientation:" + source][0, 0]
# convert to radians (conversion is copy paste JDS)
# major radius (+pad) in rad
maj = (((maj_raw + pad)) / 3600.) * np.pi / 180.
# minor radius (+pad) in rad
minor = (((min_raw + pad)) / 3600.) * np.pi / 180.
pix_asc = pa_raw * np.pi / 180.
# wenss writes always 'GAUSSIAN' even for point sources
# -> set to wenss beam+pad
if maj == 0 or minor == 0:
maj = ((54. + pad) / 3600.) * np.pi / 180.
minor = ((54. + pad) / 3600.) * np.pi / 180.
# set to wenss beam+pad
elif source_type == 0:
maj = (((54. + pad) / 2.) / 3600.) * np.pi / 180.
minor = (((54. + pad) / 2.) / 3600.) * np.pi / 180.
pix_asc = 0.
else:
self.logger.info(
"WARNING: unknown source source_type ({0}),"
"ignoring: ".format(source_type))
continue
# define a small square around the source to look for it
null, null, border_y1, border_x1 = mask.topixel(
[freq, stokes, declination - maj,
right_ascension - maj / np.cos(declination - maj)])
null, null, border_y2, border_x2 = mask.topixel(
[freq, stokes, declination + maj,
right_ascension + maj / np.cos(declination + maj)])
xmin = np.int(np.floor(np.min([border_x1, border_x2])))
xmax = np.int(np.ceil(np.max([border_x1, border_x2])))
ymin = np.int(np.floor(np.min([border_y1, border_y2])))
ymax = np.int(np.ceil(np.max([border_y1, border_y2])))
if xmin > xlen or ymin > ylen or xmax < 0 or ymax < 0:
self.logger.info(
"WARNING: source {0} falls outside the mask,"
" ignoring: ".format(source))
continue
if xmax > xlen or ymax > ylen or xmin < 0 or ymin < 0:
self.logger.info(
"WARNING: source {0} falls across map edge".format(source))
for pixel_x in xrange(xmin, xmax):
for pixel_y in xrange(ymin, ymax):
# skip pixels outside the mask field
if pixel_x >= xlen or pixel_y >= ylen or\
pixel_x < 0 or pixel_y < 0:
continue
# get pixel right_ascension and declination in rad
null, null, pix_dec, pix_ra = mask.toworld(
[0, 0, pixel_y, pixel_x])
# Translate and rotate coords.
translated_pixel_x = (pix_ra - right_ascension) * np.sin(
pix_asc) + (pix_dec - declination) * np.cos(pix_asc)
# to align with ellipse
translate_pixel_y = -(pix_ra - right_ascension) * np.cos(
pix_asc) + (pix_dec - declination) * np.sin(pix_asc)
if (((translated_pixel_x ** 2) / (maj ** 2)) +
((translate_pixel_y ** 2) / (minor ** 2))) < \
mask_patch_size:
mask_data[0, 0, pixel_y, pixel_x] = 1
null = null
mask.putdata(mask_data)
table.close()
# some helper functions
def _nearest_ceiled_power2(self, value):
"""
Return int value of the nearest Ceiled power of 2 for the
suplied argument
"""
return int(pow(2, math.ceil(math.log(value, 2))))
def _save_selfcal_info(self, concatenated_measurement_set,
major_cycle, npix, UVmin, UVmax):
"""
The selfcal team requested meta information to be added to
measurement set that allows the reproduction of intermediate
steps.
"""
self.logger.info("Save-ing selfcal parameters to file:")
meta_file = os.path.join(
concatenated_measurement_set + "_selfcal_information",
"uvcut_and_npix.txt")
self.logger.info(meta_file)
# check if we have the output file? Add the header
if not os.path.exists(meta_file):
meta_file_pt = open(meta_file, 'w')
meta_file_pt.write("#cycle_nr npix uvmin(klambda) uvmax(klambda)\n")
meta_file_pt.close()
meta_file_pt = open(meta_file, 'a')
# Create the actual string with info
meta_info_str = " ".join([str(major_cycle),
str(npix),
str(UVmin),
str(UVmax)])
meta_file_pt.write(meta_info_str + "\n")
meta_file_pt.close()
if __name__ == "__main__":
_JOBID, _JOBHOST, _JOBPORT = sys.argv[1:4]
sys.exit(selfcal_awimager(
_JOBID, _JOBHOST, _JOBPORT).run_with_stored_arguments())
# LOFAR AUTOMATIC IMAGING PIPELINE
# selfcal_bbs
# Wouter Klijn 2012
# klijn@astron.nl
# Nicolas Vilchez, 2014
# vilchez@astron.nl
# -----------------------------------------------------------------------------
from __future__ import with_statement
import sys
import os
import pyrap.tables as pt
from lofarpipe.support.lofarnode import LOFARnodeTCP
from lofarpipe.support.group_data import load_data_map
from lofarpipe.support.subprocessgroup import SubProcessGroup
from lofarpipe.support.data_map import MultiDataMap
class selfcal_bbs(LOFARnodeTCP):
"""
imager_bbs node performs a bbs run for each of measuremt sets supplied in
the mapfile at ms_list_path. Calibration is done on the sources in
the sourcedb in the mapfile sky_list_path. Solutions are stored in the
parmdb_list_path
1. Load the mapfiles
2. For each measurement set to calibrate start a subprocess
3. Check if the processes finished correctly
4. (added by Nicolas vilchez) concat in time the final MS
5. (added by N.Vilchez) copy time slives directory to a new one
"""
def run(self, bbs_executable, parset, ms_list_path, parmdb_list_path,
sky_list_path, concat_ms_path, major_cycle):
"""
selfcal_bbs functionality. Called by framework performing all the work
"""
self.logger.debug("Starting selfcal_bbs Node")
# *********************************************************************
# 1. Load mapfiles
# read in the mapfiles to data maps: The master recipe added the single
# path to a mapfilem which allows usage of default data methods
# (load_data_map)
# TODO: Datamap
ms_map = MultiDataMap.load(ms_list_path)
parmdb_map = MultiDataMap.load(parmdb_list_path)
sky_list = MultiDataMap.load(sky_list_path)
source_db = sky_list[0].file[0] # the sourcedb is the first file entry
try:
bbs_process_group = SubProcessGroup(self.logger,
self.resourceMonitor)
# *****************************************************************
# 2. start the bbs executable with data
# The data is located in multimaps. We need the first entry
# TODO: THis is not 'nice' usage of the multimap
for (measurement_set, parmdm) in zip(ms_map[0].file,
parmdb_map[0].file):
command = [
bbs_executable,
"--sourcedb={0}".format(source_db),
"--parmdb={0}".format(parmdm) ,
measurement_set,
parset]
self.logger.info("Executing bbs command: {0}".format(" ".join(
command)))
bbs_process_group.run(command)
# *****************************************************************
# 3. check status of the processes
if bbs_process_group.wait_for_finish() != None:
self.logger.error(
"Failed bbs run detected Aborting")
return 1
except OSError, exception:
self.logger.error("Failed to execute bbs: {0}".format(str(
exception)))
return 1
# *********************************************************************
# 4. Concat in time after bbs calibration your MSs using
# msconcat (pyrap.tables module) (added by N.Vilchez)
# this step has te be performed on this location. because the bbs run
# might add additional columns not present in the original ms
# and therefore not produced in the concat done in the prepare phase
# redmine issue #6021
pt.msconcat(ms_map[0].file,concat_ms_path, concatTime=True)
# *********************************************************************
# 5. copy time slives directory to a new one
# This is done for debugging purpose: The copy is not used for anything
# The actual selfcal steps are done in place
# (added by N.Vilchez)
# THe save location is created relative to the concat.ms
# we could also use the self.scratch_directory from the toplevel recipe
# this would need an aditional ingredient
# This is a 'debugging' step and should never ever cause a failure of \
# the pipeline
try:
working_dir = os.path.dirname(concat_ms_path)
time_slice_dir = os.path.join(working_dir, 'time_slices')
time_slice_copy_dir = os.path.join(working_dir,
'time_slices_cycle_{0}'.format(major_cycle))
cmd = "cp -r {0} {1}".format(time_slice_dir, time_slice_copy_dir)
os.system(cmd)
except:
self.logger.warn(
"Debug copy of temporary files failed: continue operations")
pass # Do nothing
return 0
if __name__ == "__main__":
_JOBID, _JOBHOST, _JOBPORT = sys.argv[1:4]
sys.exit(selfcal_bbs(_JOBID, _JOBHOST, _JOBPORT).run_with_stored_arguments())
# LOFAR IMAGING PIPELINE
#
# selfcal_finalize
# Wouter Klijn 2012
# klijn@astron.nl
# ------------------------------------------------------------------------------
from __future__ import with_statement
import sys
import subprocess
import os
import tempfile
import shutil
from lofarpipe.support.lofarnode import LOFARnodeTCP
from lofarpipe.support.utilities import log_time, create_directory
import lofar.addImagingInfo as addimg
import pyrap.images as pim
from lofarpipe.support.utilities import catch_segfaults
from lofarpipe.support.data_map import DataMap
from lofarpipe.support.pipelinelogging import CatchLog4CPlus
from lofarpipe.support.subprocessgroup import SubProcessGroup
import urllib2
import lofarpipe.recipes.helpers.MultipartPostHandler as mph
class selfcal_finalize(LOFARnodeTCP):
"""
This script performs the folowing functions:
1. Add the image info to the casa image:
addimg.addImagingInfo (imageName, msNames, sourcedbName, minbl, maxbl)
2. Convert the image to hdf5 and fits image
3. Filling of the HDF5 root group
4. Move meta data of selfcal to correct dir in ms
5. Deepcopy ms to output location
"""
def run(self, awimager_output, ms_per_image, sourcelist, target,
output_image, minbaseline, maxbaseline, processed_ms_dir,
fillrootimagegroup_exec, environment, sourcedb, concat_ms,
correlated_output_location, msselect_executable):
self.environment.update(environment)
"""
:param awimager_output: Path to the casa image produced by awimager
:param ms_per_image: The X (90) measurements set scheduled to
create the image
:param sourcelist: list of sources found in the image
:param target: <unused>
:param minbaseline: Minimum baseline used for the image
:param maxbaseline: largest/maximum baseline used for the image
:param processed_ms_dir: The X (90) measurements set actually used to
create the image
:param fillrootimagegroup_exec: Executable used to add image data to
the hdf5 image
:rtype: self.outputs['hdf5'] set to "succes" to signal node succes
:rtype: self.outputs['image'] path to the produced hdf5 image
"""
with log_time(self.logger):
ms_per_image_map = DataMap.load(ms_per_image)
# *****************************************************************
# 1. add image info
# Get all the files in the processed measurement dir
file_list = os.listdir(processed_ms_dir)
processed_ms_paths = []
ms_per_image_map.iterator = DataMap.SkipIterator
for item in ms_per_image_map:
ms_path = item.file
processed_ms_paths.append(ms_path)
#add the information the image
try:
self.logger.debug("Start addImage Info")
addimg.addImagingInfo(awimager_output, processed_ms_paths,
sourcedb, minbaseline, maxbaseline)
except Exception, error:
self.logger.warn("addImagingInfo Threw Exception:")
self.logger.warn(error)
# Catch raising of already done error: allows for rerunning
# of the recipe
if "addImagingInfo already done" in str(error):
self.logger.warn("addImagingInfo already done, continue")
pass
else:
raise Exception(error)
#The majority of the tables is updated correctly
# ***************************************************************
# 2. convert to hdf5 image format
output_directory = None
pim_image = pim.image(awimager_output)
try:
self.logger.info("Saving image in HDF5 Format to: {0}" .format(
output_image))
# Create the output directory
output_directory = os.path.dirname(output_image)
create_directory(output_directory)
# save the image
pim_image.saveas(output_image, hdf5=True)
except Exception, error:
self.logger.error(
"Exception raised inside pyrap.images: {0}".format(
str(error)))
raise error
# Convert to fits
# create target location
fits_output = output_image + ".fits"
# To allow reruns a possible earlier version needs to be removed!
# image2fits fails if not done!!
if os.path.exists(fits_output):
os.unlink(fits_output)
try:
self.logger.debug("Start convert to fits")
temp_dir = tempfile.mkdtemp()
with CatchLog4CPlus(temp_dir,
self.logger.name + '.' + os.path.basename(awimager_output),
"image2fits") as logger:
catch_segfaults(["image2fits", '-in', awimager_output,
'-out', fits_output],
temp_dir, self.environment, logger)
except Exception, excp:
self.logger.error(str(excp))
return 1
finally:
shutil.rmtree(temp_dir)
# ****************************************************************
# 3. Filling of the HDF5 root group
command = [fillrootimagegroup_exec, output_image]
self.logger.info(" ".join(command))
#Spawn a subprocess and connect the pipes
proc = subprocess.Popen(
command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
(stdoutdata, stderrdata) = proc.communicate()
exit_status = proc.returncode
#if copy failed log the missing file
if exit_status != 0:
self.logger.error("Error using the fillRootImageGroup command"
". Exit status: {0}".format(exit_status))
self.logger.error(stdoutdata)
self.logger.error(stderrdata)
return 1
# *****************************************************************
# 4. Move the meta information to the correct directory next to the
# concat.ms
self.logger.info("Save-ing selfcal parameters to file:")
meta_dir = concat_ms + "_selfcal_information"
meta_dir_target = os.path.join(concat_ms, "selfcal_information")
if os.path.exists(meta_dir) and os.path.exists(concat_ms):
self.logger.info("Copy meta information to output measurementset")
# Clear possible old data, allows for rerun of the pipeline
# if needed.
if os.path.exists(meta_dir_target):
shutil.rmtree(meta_dir_target)
shutil.copytree(meta_dir, meta_dir_target)
# *****************************************************************
# 4 Copy the measurement set to the output directory
# use msselect to copy all the data in the measurement sets
cmd_string = "{0} in={1} out={2} baseline=* deep=True".format(
msselect_executable, concat_ms, correlated_output_location)
msselect_proc_group = SubProcessGroup(self.logger)
msselect_proc_group.run(cmd_string)
if msselect_proc_group.wait_for_finish() != None:
self.logger.error("failed copy of measurmentset to output dir")
raise Exception("an MSselect run failed!")
self.outputs["hdf5"] = "succes"
self.outputs["image"] = output_image
self.outputs["correlated"] = correlated_output_location
return 0
if __name__ == "__main__":
_JOBID, _JOBHOST, _JOBPORT = sys.argv[1:4]
sys.exit(selfcal_finalize(_JOBID, _JOBHOST,
_JOBPORT).run_with_stored_arguments())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment