Skip to content
Snippets Groups Projects
Commit ff9f994f authored by Stefano Di Frischia's avatar Stefano Di Frischia
Browse files

Merge branch 'master' into L2SS-404-archiving-setup-development

parents 0e85a890 97bcd435
No related branches found
No related tags found
1 merge request!253Resolve L2SS-404 "Archiving setup development"
Showing
with 164 additions and 81 deletions
...@@ -8,3 +8,6 @@ ...@@ -8,3 +8,6 @@
*.h5 binary *.h5 binary
*.jpg binary *.jpg binary
*.bin binary *.bin binary
# casacore measures tables
table.* binary
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
...@@ -50,9 +50,21 @@ class delay_calculator: ...@@ -50,9 +50,21 @@ class delay_calculator:
return numpy.dot(reference_direction_vector, relative_itrf) / speed_of_light return numpy.dot(reference_direction_vector, relative_itrf) / speed_of_light
def is_valid_direction(self, direction):
try:
_ = self.measure.direction(*direction)
except RuntimeError as e:
return False
return True
def convert(self, direction, antenna_itrf: list([float])): def convert(self, direction, antenna_itrf: list([float])):
try:
# obtain the direction vector for a specific pointing # obtain the direction vector for a specific pointing
pointing = self.measure.direction(*direction) pointing = self.measure.direction(*direction)
except RuntimeError as e:
raise ValueError(f"Could not convert direction {direction} into a pointing") from e
reference_dir_vector = self.get_direction_vector(pointing) reference_dir_vector = self.get_direction_vector(pointing)
# # compute the delays for an antennas w.r.t. the reference position # # compute the delays for an antennas w.r.t. the reference position
......
from delays import *
if __name__ == '__main__':
# # create a frame tied to the reference position
reference_itrf = [3826577.066, 461022.948, 5064892.786] # CS002LBA, in ITRF2005 epoch 2012.5
d = delay_calculator(reference_itrf)
# # set the timestamp to solve for
timestamp = datetime.datetime(2021,1,1,0,0,5)
d.set_measure_time(timestamp)
# compute the delays for an antennas w.r.t. the reference position
antenna_itrf = [[3826923.546, 460915.441, 5064643.489]] # CS001LBA, in ITRF2005 epoch 2012.5
# # obtain the direction vector for a specific pointing
direction = "J2000","0deg","0deg"
# calculate the delays based on the set reference position, the set time and now the set direction and antenna positions.
delays = d.convert(direction, antenna_itrf)
# print the delays
# pprint.pprint(delays)
#test changing the time
print(f"Changing timestamp test\nBase parametres: Direction: {direction}, position: {antenna_itrf}")
for i in range(10):
# # set the timestamp to solve for
timestamp = datetime.datetime(2021,1,1,0,i,5)
d.set_measure_time(timestamp)
delays = d.convert(direction, antenna_itrf)
# print the delays
print(f"Timestamp: {timestamp}: {delays}")
# reset time
timestamp = datetime.datetime(2021, 1, 1, 0, 0, 5)
d.set_measure_time(timestamp)
#test changing the antenna position
print(f"Changing Antenna position test.\nBase parametres: Time: {timestamp} Direction: {direction}")
for i in range(10):
antenna_itrf = [[3826577.066 + i, 461022.948, 5064892.786]] # CS002LBA, in ITRF2005 epoch 2012.5
delays = d.convert(direction, antenna_itrf)
# print the delays
print(f"Antenna position: {antenna_itrf}: {delays}")
# test changing the direction
antenna_itrf = [[3826923.546, 460915.441, 5064643.489]] # CS001LBA, in ITRF2005 epoch 2012.5
print(f"Changing direction test.\nBase parametres: Time: {timestamp} , position: {antenna_itrf}")
for i in range(10):
direction = "J2000", f"{i}deg", "0deg"
delays = d.convert(direction, antenna_itrf)
# print the delays
print(f"Direction: {direction}: {delays}")
...@@ -8,12 +8,12 @@ ...@@ -8,12 +8,12 @@
""" """
import numpy import numpy
import datetime, time import datetime
from json import loads from json import loads
from tango.server import attribute, command, device_property from tango.server import attribute, command, device_property
from tango import AttrWriteType, DebugIt, DevState, DeviceProxy, DevVarStringArray, DevVarDoubleArray, DevString from tango import AttrWriteType, DebugIt, DevState, DeviceProxy, DevVarStringArray, DevVarDoubleArray, DevString
from threading import Thread from threading import Thread, Lock, Condition
# Additional import # Additional import
from tangostationcontrol.common.entrypoint import entry from tangostationcontrol.common.entrypoint import entry
...@@ -83,8 +83,8 @@ class Beam(lofar_device): ...@@ -83,8 +83,8 @@ class Beam(lofar_device):
# Initialise pointing array data and attribute # Initialise pointing array data and attribute
self._hbat_pointing_timestamp_r = numpy.zeros(96, dtype=numpy.double) self._hbat_pointing_timestamp_r = numpy.zeros(96, dtype=numpy.double)
self._hbat_pointing_direction_r = numpy.zeros((96,3), dtype=numpy.str) self._hbat_pointing_direction_r = numpy.zeros((96,3), dtype="<U32")
self._hbat_pointing_direction_rw = numpy.zeros((96,3), dtype=numpy.str) self._hbat_pointing_direction_rw = numpy.array([["AZELGEO","0deg","90deg"]] * 96, dtype="<U32")
# Set a reference of RECV device # Set a reference of RECV device
self.recv_proxy = DeviceProxy("STAT/RECV/1") self.recv_proxy = DeviceProxy("STAT/RECV/1")
...@@ -121,9 +121,17 @@ class Beam(lofar_device): ...@@ -121,9 +121,17 @@ class Beam(lofar_device):
# internal functions # internal functions
# -------- # --------
def write_HBAT_pointing_direction_RW(self): def write_HBAT_pointing_direction_RW(self, value):
""" Setter method for attribute HBAT_pointing_direction_RW """ """ Setter method for attribute HBAT_pointing_direction_RW """
self.HBAT_pointing_direction_RW = self._hbat_pointing_direction_rw # verify whether values are valid
for tile in range(96):
if not self.HBAT_delay_calculators[tile].is_valid_direction(value[tile]):
raise ValueError(f"Invalid direction: {value[tile]}")
self._hbat_pointing_direction_rw = value
# force update across tiles if pointing changes
self.HBAT_beam_tracker.force_update()
def _HBAT_delays(self, pointing_direction: numpy.array, timestamp: datetime.datetime = datetime.datetime.now()): def _HBAT_delays(self, pointing_direction: numpy.array, timestamp: datetime.datetime = datetime.datetime.now()):
""" """
...@@ -270,38 +278,60 @@ class BeamTracker(): ...@@ -270,38 +278,60 @@ class BeamTracker():
self.thread = Thread(target=self._update_HBAT_pointing_direction) self.thread = Thread(target=self._update_HBAT_pointing_direction)
self.device = device self.device = device
# Condition to trigger a forced update or early abort
self.update_lock = Lock()
self.update_condition = Condition(self.update_lock)
def start(self): def start(self):
""" Starts the Beam Tracking thread """ """ Starts the Beam Tracking thread """
self.done = False # beam tracking loop flag self.done = False
self.thread.start() self.thread.start()
def is_alive(self): def is_alive(self):
""" Returns True just before the Thread run() method starts until just after the Thread run() method terminates. """ """ Returns True just before the Thread run() method starts until just after the Thread run() method terminates. """
return self.thread.is_alive() return self.thread.is_alive()
def force_update(self):
""" Force the pointing to be updated. """
# inform the thread to stop waiting
with self.update_lock:
self.update_condition.notify()
def stop(self): def stop(self):
""" Stops the Beam Tracking loop """ """ Stops the Beam Tracking loop """
self.done = True # beam tracking loop flag
self.done = True
self.force_update()
# wait for thread to finish
self.thread.join(self.DISCONNECT_TIMEOUT) self.thread.join(self.DISCONNECT_TIMEOUT)
if (self.thread.is_alive()):
if self.is_alive():
logger.error("BeamTracking Thread did not properly terminate") logger.error("BeamTracking Thread did not properly terminate")
def _get_sleep_time(self): def _get_sleep_time(self):
""" Computes the sleep time (in seconds) that needs to be waited for the next beam tracking update """ """ Computes the sleep time (in seconds) that needs to be waited for the next beam tracking update """
now = datetime.datetime.now().timestamp() now = datetime.datetime.now().timestamp()
# Computes the left seconds before the next update # Computes the left seconds before the next update
next_update_in = self.device.HBAT_beam_tracking_interval - (now % self.device.HBAT_beam_tracking_interval) next_update_in = self.device.HBAT_beam_tracking_interval - (now % self.device.HBAT_beam_tracking_interval)
# Computes the needed sleep time before the next update # Computes the needed sleep time before the next update
sleep_time = next_update_in - self.device.HBAT_beam_tracking_preparation_period sleep_time = next_update_in - self.device.HBAT_beam_tracking_preparation_period
# If sleep time is negative, add the tracking interval for the next update # If sleep time is negative, add the tracking interval for the next update
if (sleep_time<0): if sleep_time < 0:
return sleep_time + self.device.HBAT_beam_tracking_interval return sleep_time + self.device.HBAT_beam_tracking_interval
else: else:
return sleep_time return sleep_time
def _update_HBAT_pointing_direction(self): def _update_HBAT_pointing_direction(self):
""" Updates the beam weights using a fixed interval of time """ """ Updates the beam weights using a fixed interval of time """
# Check if flag beamtracking is true # Check if flag beamtracking is true
while (not(self.done)): with self.update_lock:
time.sleep(self._get_sleep_time()) while not self.done:
self.device.HBAT_set_pointing(numpy.array(self.device.proxy.HBAT_pointing_direction_RW).flatten()) self.device.HBAT_set_pointing(numpy.array(self.device.proxy.HBAT_pointing_direction_RW).flatten())
# sleep until the next update, or when interrupted (this releases the lock, allowing for notification)
self.update_condition.wait(self._get_sleep_time())
import datetime
from tangostationcontrol.beam.delays import *
from tangostationcontrol.test import base
class TestDelays(base.TestCase):
def test_init(self):
"""
Fail condition is simply the object creation failing
"""
reference_itrf = [3826577.066, 461022.948, 5064892.786] # CS002LBA, in ITRF2005 epoch 2012.5
d = delay_calculator(reference_itrf)
self.assertIsNotNone(d)
def test_sun(self):
# # create a frame tied to the reference position
reference_itrf = [3826577.066, 461022.948, 5064892.786]
d = delay_calculator(reference_itrf)
for i in range(24):
# set the time to the day of the winter solstice 2021 (21 december 16:58) as this is the time with the least change in sunlight
timestamp = datetime.datetime(2021, 12, 21, i, 58, 0)
d.set_measure_time(timestamp)
# point to the sun
direction = "SUN", "0deg", "0deg"
# calculate the delays based on the set reference position, the set time and now the set direction and antenna positions.
pointing = d.measure.direction(*direction)
direction = d.get_direction_vector(pointing)
"""
direction[2] is the z-coordinate of ITRF, which points to the north pole.
This direction is constant when pointing to the sun, as the earth rotates around its axis,
but changes slowly due to the earths rotation around the sun.
The summer and winter solstices are when these values are at their peaks and the changes are the smallest.
This test takes the value at the winter solstice and checks whether the measured values are near enough to that.
"""
# Measured manually at the winter solstice. Using datetime.datetime(2021, 12, 21, 16, 58, 0)
z_at_solstice = -0.3977784695213487
z_direction = direction[2]
self.assertAlmostEqual(z_at_solstice, z_direction, 4)
def test_identical_location(self):
# # create a frame tied to the reference position
reference_itrf = [3826577.066, 461022.948, 5064892.786] # CS002LBA, in ITRF2005 epoch 2012.5
d = delay_calculator(reference_itrf)
# set the antenna position identical to the reference position
antenna_itrf = [[reference_itrf[0], reference_itrf[1], reference_itrf[2]]] # CS001LBA, in ITRF2005 epoch 2012.5
# # set the timestamp to solve for
timestamp = datetime.datetime(2000, 1, 1, 0, 0, 0)
d.set_measure_time(timestamp)
# compute the delays for an antennas w.r.t. the reference position
# # obtain the direction vector for a specific pointing
direction = "J2000", "0deg", "0deg"
# calculate the delays based on the set reference position, the set time and now the set direction and antenna positions.
delays = d.convert(direction, antenna_itrf)
self.assertListEqual(delays, [0.0], msg=f"delays = {delays}")
def test_light_second_delay(self):
"""
This test measures the delay between 2 positions 1 light second apart.
"""
# # create a frame tied to the reference position
reference_itrf = [3826577.066, 461022.948, 5064892.786] # CS002LBA, in ITRF2005 epoch 2012.5
d = delay_calculator(reference_itrf)
# set the antenna position identical to the reference position
speed_of_light = 299792458.0
antenna_itrf = [[reference_itrf[0], reference_itrf[1] - speed_of_light, reference_itrf[2]]] # CS001LBA, in ITRF2005 epoch 2012.5
# # set the timestamp to solve for
timestamp = datetime.datetime(2000, 1, 1, 0, 0, 0)
d.set_measure_time(timestamp)
# compute the delays for an antennas w.r.t. the reference position
# # obtain the direction vector for a specific pointing
direction = "J2000", "0deg", "0deg"
# calculate the delays based on the set reference position, the set time and now the set direction and antenna positions.
delays = d.convert(direction, antenna_itrf)
self.assertTrue(0.98 <= delays[0] <= 1.02, f"delays[0] = {delays[0]}")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment