Skip to content
Snippets Groups Projects
Commit b6d1032c authored by Mattia Mancini's avatar Mattia Mancini
Browse files

SSB-42: preliminary implementation of the datacontainers objects

parent 967aa390
No related branches found
No related tags found
1 merge request!44Merge back holography to master
......@@ -11,7 +11,7 @@ CAL/CalibrationCommon/lib/__init__.py -text
CAL/CalibrationCommon/lib/coordinates.py -text
CAL/CalibrationCommon/lib/datacontainers/__init__.py -text
CAL/CalibrationCommon/lib/datacontainers/holography_dataset.py -text
CAL/CalibrationCommon/lib/datacontainers/holography_observation_specification.py -text
CAL/CalibrationCommon/lib/datacontainers/holography_observation.py -text
CAL/CalibrationCommon/lib/datacontainers/holography_specification.py -text
CAL/CalibrationCommon/lib/datacontainers/measurementset.py -text
CAL/CalibrationCommon/lib/mshologextract.py -text
......
from .holography_dataset import HolographyDataset
from .holography_specification import HolographySpecification
from .holography_observation_specification import HolographyObservationSpecification
from .holography_observation import HolographyObservation
from .measurementset import MeasurementSet
__all__ = [HolographyDataset, HolographyObservationSpecification, HolographySpecification, MeasurementSet]
\ No newline at end of file
__all__ = [HolographyDataset, HolographyObservation, HolographySpecification, MeasurementSet]
\ No newline at end of file
from .holography_specification import HolographySpecification
from .holography_observation import HolographyObservation
class HolographyDataset():
def __init__(self):
self.rcu_list = () # array of ints
self.rcu_list = set() # array of ints
self.mode = None # integer
self.sas_ids = () # array of strings
self.sas_ids = set() # array of strings
self.target_station_name = None # string
self.target_station_position = None # list of 3 floats
self.source_name = None # string
self.source_position = None # list of 3 floats
self.start_time = None # date time when the observation started in MJD (float)
self.end_time = None # date time when the observation started in MJD (float)
self.start_time = None # date time when the observation started in MJD in seconds (float)
self.end_time = None # date time when the observation started in MJD in seconds (float)
self.rotation_matrix = None # array(3,3), translation matrix for
# (RA, DEC) <-> (l, m) conversion
self.antenna_field_position = () # coordinates of the antenna position in the target
self.antenna_field_position = [] # coordinates of the antenna position in the target
# station
self.reference_stations = () # list of reference station names
self.frequencies = () # list of frequencies
self.ra_dec = () # array(Nfrequency, Nbeamlets, 2) contains the ra_dec of which a beam
self.reference_stations = set() # set of reference station names
self.frequencies = set() # set of frequencies
self.ra_dec = None # array(Nfrequency, Nbeamlets, 2) contains the ra_dec of which a beam
# points at given a frequency and a beamlet number
self.data = () # array(NreferenceStations, Nfrequencies, Nbeamlets) that contains the
self.data = None # array(NreferenceStations, Nfrequencies, Nbeamlets) that contains the
# 4 polarization crosscorrelation for the 4 polarizations, the l and m coordinates, and
# the timestamp in mjd of the sample
......@@ -27,21 +31,65 @@ class HolographyDataset():
name
:param station_name: target station name
:param hb_specifications: a list containing (hbs, ms) per frequency
"""
self.__collect_preliminary_information(station_name, list_of_hbs_ms_tuples)
def __collect_preliminary_information(self, list_of_hbs_ms_tuples):
def __collect_preliminary_information(self, station_name, list_of_hbs_ho_tuples):
"""
This routines reads both the holography beam specifications files and the holography
measurement sets to gather the list of rcus, the mode, the target station name and position,
observation to gather the list of rcus, the mode, the target station name and position,
the source name and position, the start and the end time, the rotation matrix to convert
from ra and dec to l and m, the antenna field positions, the list of the reference
stations, the frequencies, the ra and dec at which the beams point at.
All this data is essential to interpret the recorded holography beams cross correlations
:param list_of_hbs_ms_tuples: a list containing (hbs, ms) per frequency
:param list_of_hbs_ho_tuples: a list containing (hbs, ho) per frequency
:type list_of_hbs_ho_tuples: list[(HolographySpecification, HolographyObservation)]
"""
for hbs, ms in list_of_hbs_ms_tuples:
pass
mode = set()
source_name = set()
source_position = set()
for hbs, ho in list_of_hbs_ho_tuples:
if station_name in hbs.target_station_names:
beam_specifications = hbs.get_beam_specifications_per_station_name(station_name)
for beam_specification in beam_specifications:
self.rcu_list.update(beam_specification.rcus_involved)
mode.update(beam_specification.rcus_mode)
source_name.update(ho.source_name)
source_position.update(
(beam_specification.station_pointing['ra'],
beam_specification.station_pointing['dec'],
beam_specification.station_pointing['coordinate_system']
))
self.start_time = ho.start_mjd
self.end_time = ho.end_mjd
self.frequencies = ho.frequency
self.sas_ids.update(ho.sas_id)
self.target_station_name = station_name
self.reference_stations.update(hbs.reference_station_names)
else:
continue
if len(mode) > 1:
raise ValueError('Multiple RCUs mode are not supported')
else:
self.mode = self.mode.pop()
if len(source_position) > 1:
raise ValueError('Multiple source positions are not supported')
else:
self.source_position = source_position
if len(source_name) > 1:
raise ValueError('Multiple source name are not supported')
else:
self.source_name = source_name
@staticmethod
def load_from_file(path):
......
import re
import os
import astropy.time as astrotime
from datetime import datetime
from .measurementset import MeasurementSet
class HolographyObservationSpecification():
def __init__(self, path, sas_id, ms_for_a_given_beamlet, observation_start_datetime,
observation_end_datetime):
class HolographyObservation():
def __init__(self, path, sas_id, ms_for_a_given_beamlet_number, start_mjd_in_seconds,
end_mjd_in_seconds, frequency, source_name):
"""
:param path:
:type path: str
:param sas_id:
:type sas_id: int
:param ms_for_a_given_beamlet_number:
:param start_mjd_in_seconds:
:type start_mjd_in_seconds: float
:param end_mjd_in_seconds:
:type end_mjd_in_seconds: float
:param frequency:
:type frequency: float
:param source_name:
:type source_name: str
"""
self.path = path
self.sas_id = sas_id
self.ms_for_a_given_beamlet = ms_for_a_given_beamlet
self.start_datetime = observation_start_datetime
self.end_datetime = observation_end_datetime
self.ms_for_a_given_beamlet_number = ms_for_a_given_beamlet_number
self.start_datetime = HolographyObservation.__mjd_to_datetime(start_mjd_in_seconds)
self.end_datetime = HolographyObservation.__mjd_to_datetime(end_mjd_in_seconds)
self.start_mjd = start_mjd_in_seconds
self.end_mjd = end_mjd_in_seconds
self.frequency = frequency
self.source_name = source_name
@staticmethod
def __mjd_to_datetime(mjd_time_seconds):
"""
Convert the modified julian date in seconds in a datetime object
:param mjd_time_seconds: modified julian data in seconds
:return: the date time of the given julian date
:rtype: datetime
"""
hour_in_seconds = 60 * 60
day_in_seconds = hour_in_seconds * 24
return astrotime.Time(mjd_time_seconds / day_in_seconds, format='mjd',
scale='utc').to_datetime()
@staticmethod
def __compute_time_range_from_ms_list(ms_list):
......@@ -23,6 +56,17 @@ class HolographyObservationSpecification():
observation_end = ms_end_time
return observation_start, observation_end
@staticmethod
def extract_unique_source_names_from_ms_list(ms_list):
"""
Returns a set of unique source names given a list of measurement sets
:param ms_list: a list of measurement set where to extract the reference frequencies
:type ms_list: list[MeasurementSet]
:return: a set of frequencies
:rtype: set[str]
"""
return {ms.get_source_name() for ms in ms_list}
@staticmethod
def list_observations_in_path(path):
ms_dir_name_pattern = 'L(?P<sas_id>\d{6})'
......@@ -33,19 +77,51 @@ class HolographyObservationSpecification():
if match:
sas_id = int(match.group('sas_id'))
ms_indexed_per_beamlet_number = HolographyObservationSpecification.\
ms_indexed_per_beamlet_number = HolographyObservation. \
create_ms_dict_from_ms_name_list_and_path(dirnames, root)
start_datetime, end_datetime = HolographyObservationSpecification.\
start_mjd_in_seconds, end_mjd_in_seconds = HolographyObservation.\
__compute_time_range_from_ms_list(
ms_indexed_per_beamlet_number.values())
unique_frequencies = HolographyObservation. \
extract_unique_reference_frequencies_from_ms_list(
ms_indexed_per_beamlet_number.values())
if len(unique_frequencies) > 1:
raise ValueError(
'Multiple reference frequencies per observation are not supported')
else:
frequency = unique_frequencies.pop()
unique_source_names = HolographyObservation.\
extract_unique_source_names_from_ms_list(ms_indexed_per_beamlet_number.values())
if len(unique_source_names) > 1:
raise ValueError(
'Multiple source target per observation are not supported'
)
else:
source_name = unique_source_names.pop()
observations_list.append(
HolographyObservationSpecification(path, sas_id, ms_indexed_per_beamlet_number,
start_datetime, end_datetime))
HolographyObservation(path, sas_id, ms_indexed_per_beamlet_number,
start_mjd_in_seconds, end_mjd_in_seconds, frequency,
source_name))
return observations_list
@staticmethod
def extract_unique_reference_frequencies_from_ms_list(ms_list):
"""
Returns a set of reference frequencies given a list of measurement setss
:param ms_list: a list of measurement set where to extract the reference frequencies
:type ms_list: list[MeasurementSet]
:return: returns a set of frequencies
:rtype: set[float]
"""
return {ms.get_frequency() for ms in ms_list}
@staticmethod
def create_ms_dict_from_ms_name_list_and_path(list_of_ms_names, path):
"""
......
......@@ -4,6 +4,18 @@ import re
import os
class HolographyStationBeamSpecification(object):
station_name = None
rcus_mode = None
sub_band_ids = ()
mode_description = None
rcus_involved = ()
beamlets = ()
station_pointing = ()
virtual_pointing = ()
station_type = ()
class HolographySpecification(object):
hs_name_pattern = r'Holog-(?P<date>\d{8})-(?P<comment>.*)-(?P<id>\d{3}).txt'
......@@ -12,7 +24,8 @@ class HolographySpecification(object):
self.name = name
self.id, self.date, self.comment = HolographySpecification.\
extract_id_date_comment_from_name(name)
self.reference_station_names = None
self.target_station_names = None
self.station_specification_map = defaultdict(list)
self.start_datetime = None
self.end_datetime = None
......@@ -86,31 +99,27 @@ class HolographySpecification(object):
@staticmethod
def _parse_line(splitted_line):
rcus_mode = int(splitted_line['rcus_mode'])
sub_band_ids = [int(sub_band) for sub_band in splitted_line['sub_band'].split(',')]
mode_description = splitted_line['mode_description']
rcus_involved = splitted_line['rcus']
beamlets = splitted_line['beamlets']
station_pointing = HolographySpecification._parse_pointing(
beam_specification = HolographyStationBeamSpecification()
beam_specification.rcus_mode = int(splitted_line['rcus_mode'])
beam_specification.sub_band_ids = [int(sub_band) for sub_band in splitted_line['sub_band'].split(',')]
beam_specification.mode_description = splitted_line['mode_description']
beam_specification.rcus_involved = splitted_line['rcus']
beam_specification.beamlets = splitted_line['beamlets']
beam_specification.station_pointing = HolographySpecification._parse_pointing(
splitted_line['station_pointing'])
virtual_pointing = HolographySpecification._parse_pointing(
beam_specification.virtual_pointing = HolographySpecification._parse_pointing(
splitted_line['virtual_pointing'])
if len(sub_band_ids) == 1:
station_type = 'target'
if len(beam_specification.sub_band_ids) == 1:
beam_specification.station_type = 'target'
else:
station_type = 'reference'
station_name = splitted_line['station_name']
return dict(station_name=station_name,
rcus_mode=rcus_mode,
sub_band_ids=sub_band_ids,
mode_description=mode_description,
rcus_involved=rcus_involved,
beamlets=beamlets,
station_pointing=station_pointing,
virtual_pointing=virtual_pointing,
station_type=station_type)
beam_specification.station_type = 'reference'
beam_specification.station_name = splitted_line['station_name']
return beam_specification
def _parse_lines(self, lines):
splitted_lines = HolographySpecification._split_lines(lines)
......@@ -118,7 +127,7 @@ class HolographySpecification(object):
for line in splitted_lines:
parsed_line = HolographySpecification._parse_line(line)
self.station_specification_map[parsed_line['station_name']] += [parsed_line]
self.station_specification_map[parsed_line.station_name] += [parsed_line]
def _update_class_attributes(self):
self.station_names = self.station_specification_map.keys()
......@@ -128,7 +137,16 @@ class HolographySpecification(object):
self.target_station_names = [station_name for station_name in
self.station_specification_map
if len(self.station_specification_map[station_name]) > 1]
print(self.target_station_names, self.reference_station_names)
def get_beam_specifications_per_station_name(self, station_name):
"""
Returns a list of beam specifications for the given station name
:param station_name: name of the station
:type station_name: str
:return: a list of beam specifications
:rtype: list[HolographyStationBeamSpecification]
"""
return self.station_specification_map[station_name]
def read_file(self):
lines = self._read_lines()
......@@ -154,5 +172,3 @@ class HolographySpecification(object):
date = datetime.datetime.strptime(date, '%Y%m%d')
return hs_id, date, comment
......@@ -3,6 +3,7 @@ import astropy.time as astrotime
import os
import re
class MeasurementSet(object):
ms_name_pattern = r'L(?P<sas_id>\d{6})_SB(?P<sub_band_id>\d{3})_uv\.MS'
......@@ -29,10 +30,48 @@ class MeasurementSet(object):
data_table = MS_Table(self.path)
return data_table
def get_pointing_table(self):
pointing_table = MS_Table(self.path + '/POINTING')
return pointing_table
def get_antenna_table(self):
antenna_table = MS_Table(self.path + '/ANTENNA')
return antenna_table
def get_lofar_antenna_field_table(self):
antenna_field_table = MS_Table(self.path + '/LOFAR_ANTENNA_FIELD')
return antenna_field_table
def get_station_position_tile_offsets_and_axes_coordinate_for_station_name(self, station_name):
antenna_table = self.get_antenna_table()
antenna_field_table = self.get_lofar_antenna_field_table()
try:
station_name_index = antenna_table.index('NAME').rownr(station_name)
station_position = antenna_field_table.getcell('POSITION', station_name_index)
tile_offsets = antenna_field_table.getcell('TILE_ELEMENT_OFFSET', station_name_index)
axes_coordinate = antenna_field_table.getcell('COORDINATE_AXES', station_name_index)
finally:
antenna_table.close()
antenna_field_table.close()
return (station_position, tile_offsets, axes_coordinate)
def __extract_source_name_from_pointing(self):
pointing_table = self.get_pointing_table()
try:
unique_names = {name for name in pointing_table.getcol('NAME')}
if len(unique_names) > 1:
raise ValueError('Expected only a source as a target for the observation')
else:
source_name = unique_names.pop()
finally:
pointing_table.close()
return source_name
def get_spectral_window_table(self):
spectral_window_table = MS_Table(self.path + '/SPECTRAL_WINDOW')
return spectral_window_table
......@@ -45,15 +84,16 @@ class MeasurementSet(object):
observation_table = self.get_observation_table()
try:
start_time_in_seconds, end_time_in_seconds = observation_table.getcol('TIME_RANGE')
hour_in_seconds = 60 * 60
day_in_seconds = hour_in_seconds * 24
start_time = astrotime.Time(start_time_in_seconds/day_in_seconds, format='mjd', scale='utc')
end_time = astrotime.Time(start_time_in_seconds / day_in_seconds, format='mjd', scale='utc')
start_time, end_time = observation_table.getcol('TIME_RANGE')
finally:
observation_table.close()
return start_time.to_datetime(), end_time.to_datetime()
return start_time, end_time
def get_source_name(self):
return self.__extract_source_name_from_pointing()
def read_cross_correlation_per_station_names(self, reference, target):
data_table = self.get_data_table()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment