Skip to content
Snippets Groups Projects
Commit 4d6d9926 authored by Taya Snijder's avatar Taya Snijder
Browse files

changed constant

parent 393c8444
Branches
Tags
1 merge request!465added constants.py and replaced most magic numbers
...@@ -70,9 +70,6 @@ class constants: ...@@ -70,9 +70,6 @@ class constants:
# default subband we use because of its low RFI # default subband we use because of its low RFI
DEFAULT_SUBBAND = 102 DEFAULT_SUBBAND = 102
# Maximum number of antenna inputs we support (used to determine array sizes)
MAX_INPUTS = 192
# UNB2 constants # UNB2 constants
# number of uniboards in a subrack # number of uniboards in a subrack
N_unb = 2 N_unb = 2
......
...@@ -77,10 +77,10 @@ class DigitalBeam(beam_device): ...@@ -77,10 +77,10 @@ class DigitalBeam(beam_device):
dtype=numpy.float64, fget=lambda self: self._delays.statistics["last"] or 0) dtype=numpy.float64, fget=lambda self: self._delays.statistics["last"] or 0)
input_select_RW = attribute(doc='Selection of inputs to use for forming each beamlet. Allows selecting broken antennas.', input_select_RW = attribute(doc='Selection of inputs to use for forming each beamlet. Allows selecting broken antennas.',
dtype=((bool,),), max_dim_x=constants.N_beamlets_ctrl, max_dim_y=constants.MAX_INPUTS, access=AttrWriteType.READ_WRITE, fisallowed="is_attribute_access_allowed") dtype=((bool,),), max_dim_x=constants.N_beamlets_ctrl, max_dim_y=constants.S_ant, access=AttrWriteType.READ_WRITE, fisallowed="is_attribute_access_allowed")
antenna_select_RW = attribute(doc='Selection of antennas desired to use for forming each beamlet (= a subset of input_select of the configured antennas). Unselects broken antennas.', antenna_select_RW = attribute(doc='Selection of antennas desired to use for forming each beamlet (= a subset of input_select of the configured antennas). Unselects broken antennas.',
dtype=((bool,),), max_dim_x=constants.N_beamlets_ctrl, max_dim_y=constants.MAX_INPUTS, access=AttrWriteType.READ_WRITE, fisallowed="is_attribute_access_allowed") dtype=((bool,),), max_dim_x=constants.N_beamlets_ctrl, max_dim_y=constants.S_ant, access=AttrWriteType.READ_WRITE, fisallowed="is_attribute_access_allowed")
nr_inputs_R = attribute(doc='Number of configured inputs from the associated antenna field.', nr_inputs_R = attribute(doc='Number of configured inputs from the associated antenna field.',
dtype=numpy.uint32, fget="nr_inputs") dtype=numpy.uint32, fget="nr_inputs")
...@@ -148,7 +148,7 @@ class DigitalBeam(beam_device): ...@@ -148,7 +148,7 @@ class DigitalBeam(beam_device):
# Generate positions for all FPGA inputs. # Generate positions for all FPGA inputs.
# Use reference position for any missing antennas so they always get a delay of 0 # Use reference position for any missing antennas so they always get a delay of 0
input_itrf = numpy.array([reference_itrf] * constants.MAX_INPUTS) input_itrf = numpy.array([reference_itrf] * constants.S_ant)
for input_nr, antenna_nr in enumerate(self.Input_to_Antenna_Mapping): for input_nr, antenna_nr in enumerate(self.Input_to_Antenna_Mapping):
if antenna_nr >= 0: if antenna_nr >= 0:
input_itrf[input_nr] = antenna_itrf[antenna_nr] input_itrf[input_nr] = antenna_itrf[antenna_nr]
...@@ -160,7 +160,7 @@ class DigitalBeam(beam_device): ...@@ -160,7 +160,7 @@ class DigitalBeam(beam_device):
self.relative_input_positions = input_itrf - reference_itrf self.relative_input_positions = input_itrf - reference_itrf
# use all antennas in the mapping for all beamlets, unless specified otherwise # use all antennas in the mapping for all beamlets, unless specified otherwise
self.write_input_select_RW(numpy.zeros((constants.MAX_INPUTS, constants.N_beamlets_ctrl), dtype=bool)) self.write_input_select_RW(numpy.zeros((constants.S_ant, constants.N_beamlets_ctrl), dtype=bool))
self.write_antenna_select_RW(numpy.ones((self.nr_inputs(), constants.N_beamlets_ctrl), dtype=bool)) self.write_antenna_select_RW(numpy.ones((self.nr_inputs(), constants.N_beamlets_ctrl), dtype=bool))
# -------- # --------
...@@ -175,7 +175,7 @@ class DigitalBeam(beam_device): ...@@ -175,7 +175,7 @@ class DigitalBeam(beam_device):
Returns delays[antenna][beamlet] Returns delays[antenna][beamlet]
""" """
delays = numpy.zeros((constants.MAX_INPUTS, constants.N_beamlets_ctrl), dtype=numpy.float64) delays = numpy.zeros((constants.S_ant, constants.N_beamlets_ctrl), dtype=numpy.float64)
d = self.delay_calculator d = self.delay_calculator
d.set_measure_time(timestamp) d.set_measure_time(timestamp)
...@@ -189,7 +189,7 @@ class DigitalBeam(beam_device): ...@@ -189,7 +189,7 @@ class DigitalBeam(beam_device):
""" Converts an array with dimensions [antenna][beamlet] -> [fpga_nr][input_nr][pol_nr][beamlet] """ Converts an array with dimensions [antenna][beamlet] -> [fpga_nr][input_nr][pol_nr][beamlet]
by repeating the values for both polarisations. """ by repeating the values for both polarisations. """
assert arr.shape == (constants.MAX_INPUTS, constants.N_beamlets_ctrl) assert arr.shape == (constants.S_ant, constants.N_beamlets_ctrl)
# Each antenna maps on [fpga_nr][input_nr][0] and [fpga_nr][input_nr][1], and we work # Each antenna maps on [fpga_nr][input_nr][0] and [fpga_nr][input_nr][1], and we work
# with [antenna][beamlet], so we have to interleave copies of the input array per beamlet # with [antenna][beamlet], so we have to interleave copies of the input array per beamlet
......
...@@ -100,14 +100,14 @@ class SST(Statistics): ...@@ -100,14 +100,14 @@ class SST(Statistics):
# number of packets with invalid payloads # number of packets with invalid payloads
nof_payload_errors_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_payload_errors"}, dims=(constants.N_pn,), datatype=numpy.uint64) nof_payload_errors_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_payload_errors"}, dims=(constants.N_pn,), datatype=numpy.uint64)
# latest SSTs # latest SSTs
sst_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "sst_values"}, dims=(constants.MAX_INPUTS, constants.N_sub), datatype=numpy.uint64) sst_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "sst_values"}, dims=(StationSSTCollector.MAX_INPUTS, constants.N_sub), datatype=numpy.uint64)
# reported timestamp # reported timestamp
# for each row in the latest SSTs # for each row in the latest SSTs
sst_timestamp_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "sst_timestamps"}, dims=(constants.MAX_INPUTS,), datatype=numpy.uint64) sst_timestamp_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "sst_timestamps"}, dims=(StationSSTCollector.MAX_INPUTS,), datatype=numpy.uint64)
# integration interval for each row in the latest SSTs # integration interval for each row in the latest SSTs
integration_interval_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "integration_intervals"}, dims=(constants.MAX_INPUTS,), datatype=numpy.float32) integration_interval_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "integration_intervals"}, dims=(StationSSTCollector.MAX_INPUTS,), datatype=numpy.float32)
# whether the subband data was calibrated by the SDP (that is, were subband weights applied) # whether the subband data was calibrated by the SDP (that is, were subband weights applied)
subbands_calibrated_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "subbands_calibrated"}, dims=(constants.MAX_INPUTS,), datatype=bool) subbands_calibrated_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "subbands_calibrated"}, dims=(StationSSTCollector.MAX_INPUTS,), datatype=bool)
# ---------- # ----------
# Summarising Attributes # Summarising Attributes
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment