Skip to content
Snippets Groups Projects

Resolve L2SS-572 "Refactor delays"

2 unresolved threads
Merged Stefano Di Frischia requested to merge L2SS-572-refactor-delays into master
2 unresolved threads
Files
5
@@ -2,11 +2,11 @@ import casacore.measures
@@ -2,11 +2,11 @@ import casacore.measures
import numpy
import numpy
import datetime
import datetime
def subtract(a, b):
def subtract(a, b) -> numpy.ndarray:
return numpy.array([x - y for x, y in zip(a, b)])
return numpy.array([x - y for x, y in zip(a, b)])
class delay_calculator:
class Delays:
def __init__(self, itrf: list([float])):
def __init__(self, itrf: list([float])):
""" Create a measure object, configured for the specified terrestrial location. """
""" Create a measure object, configured for the specified terrestrial location. """
@@ -14,8 +14,8 @@ class delay_calculator:
@@ -14,8 +14,8 @@ class delay_calculator:
measure = casacore.measures.measures()
measure = casacore.measures.measures()
frame_location = measure.position("ITRF", *[f"{x}m" for x in itrf])
frame_location = measure.position("ITRF", *[f"{x}m" for x in itrf])
result = measure.do_frame(frame_location)
if not measure.do_frame(frame_location):
assert result == True
raise ValueError(f"measure.do_frame failed for ITRF location {itrf}")
self.reference_itrf = itrf
self.reference_itrf = itrf
self.measure = measure
self.measure = measure
@@ -23,10 +23,11 @@ class delay_calculator:
@@ -23,10 +23,11 @@ class delay_calculator:
def set_measure_time(self, utc_time: datetime.datetime):
def set_measure_time(self, utc_time: datetime.datetime):
""" Configure the measure object for the specified time. """
""" Configure the measure object for the specified time. """
frame_time = self.measure.epoch("UTC", utc_time.isoformat(' '))
utc_time_str = utc_time.isoformat(' ')
 
frame_time = self.measure.epoch("UTC", utc_time_str)
result = self.measure.do_frame(frame_time)
if not self.measure.do_frame(frame_time):
assert result == True
raise ValueError(f"measure.do_frame failed for UTC time {utc_time_str}")
def get_direction_vector(self, pointing: numpy.ndarray) -> numpy.ndarray:
def get_direction_vector(self, pointing: numpy.ndarray) -> numpy.ndarray:
""" Compute direction vector for a given pointing, relative to the measure. """
""" Compute direction vector for a given pointing, relative to the measure. """
@@ -53,7 +54,8 @@ class delay_calculator:
@@ -53,7 +54,8 @@ class delay_calculator:
# Return array [directions][angles]
# Return array [directions][angles]
return direction_vectors.T
return direction_vectors.T
def is_valid_direction(self, direction):
def is_valid_direction(self, direction) -> bool:
 
""" Check validity of the direction measure """
try:
try:
_ = self.measure.direction(*direction)
_ = self.measure.direction(*direction)
except (RuntimeError, TypeError) as e:
except (RuntimeError, TypeError) as e:
@@ -61,14 +63,14 @@ class delay_calculator:
@@ -61,14 +63,14 @@ class delay_calculator:
return True
return True
def convert(self, direction, antenna_absolute_itrf: list([float])):
def delays(self, direction, antenna_absolute_itrf: list([float])) -> numpy.ndarray:
""" Get the delays for a direction and *absolute* antenna positions.
""" Get the delays for a direction and *absolute* antenna positions.
Returns delays[antenna]. """
Returns delays[antenna]. """
return self.convert_bulk([direction], numpy.array(antenna_absolute_itrf) - self.reference_itrf).flatten()
return self.delays_bulk([direction], numpy.array(antenna_absolute_itrf) - self.reference_itrf).flatten()
def convert_bulk(self, directions: numpy.ndarray, antenna_relative_itrfs: numpy.ndarray) -> numpy.ndarray:
def delays_bulk(self, directions: numpy.ndarray, antenna_relative_itrfs: numpy.ndarray) -> numpy.ndarray:
""" Get the delays for each direction and each *relative* antenna position.
""" Get the delays for each direction and each *relative* antenna position.
Returns delays[antenna][direction]. """
Returns delays[antenna][direction]. """
Loading