Skip to content
Snippets Groups Projects
Commit ec12a8e6 authored by Jan David Mol's avatar Jan David Mol
Browse files

L2SS-1644: Support multiple beamlet streams

parent 59260636
Branches
Tags
1 merge request!797L2SS-1644: Support multiple beamlet streams
...@@ -142,59 +142,26 @@ ...@@ -142,59 +142,26 @@
"53262", "53262",
"53263" "53263"
], ],
"FPGA_beamlet_output_hdr_eth_destination_mac_RW_default": [ "FPGA_beamlet_output_nof_destinations_R_default_shorthand": [
"e4:43:4b:3e:5d:a1", "4"
"e4:43:4b:3e:5d:a1", ],
"e4:43:4b:3e:5d:a1", "FPGA_beamlet_output_multiple_hdr_eth_destination_mac_RW_default_shorthand": [
"e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1", "e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1", "e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1", "e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1" "e4:43:4b:3e:5d:a1"
], ],
"FPGA_beamlet_output_hdr_ip_destination_address_RW_default": [ "FPGA_beamlet_output_multiple_hdr_ip_destination_address_RW_default_shorthand": [
"10.168.165.1",
"10.168.165.1",
"10.168.165.1",
"10.168.165.1",
"10.168.165.1",
"10.168.165.1",
"10.168.165.1",
"10.168.165.1",
"10.168.165.1",
"10.168.165.1",
"10.168.165.1",
"10.168.165.1",
"10.168.165.1", "10.168.165.1",
"10.168.165.1", "10.168.165.1",
"10.168.165.1", "10.168.165.1",
"10.168.165.1" "10.168.165.1"
], ],
"FPGA_beamlet_output_hdr_udp_destination_port_RW_default": [ "FPGA_beamlet_output_multiple_hdr_udp_destination_port_RW_default_shorthand": [
"10010", "10000",
"10010", "10001",
"10010", "10002",
"10010", "10003"
"10010",
"10010",
"10010",
"10010",
"10010",
"10010",
"10010",
"10010",
"10010",
"10010",
"10010",
"10010"
] ]
} }
}, },
...@@ -254,59 +221,26 @@ ...@@ -254,59 +221,26 @@
"53262", "53262",
"53263" "53263"
], ],
"FPGA_beamlet_output_hdr_eth_destination_mac_RW_default": [ "FPGA_beamlet_output_nof_destinations_R_default_shorthand": [
"e4:43:4b:3e:5d:a1", "4"
"e4:43:4b:3e:5d:a1", ],
"e4:43:4b:3e:5d:a1", "FPGA_beamlet_output_multiple_hdr_eth_destination_mac_RW_default_shorthand": [
"e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1", "e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1", "e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1", "e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1" "e4:43:4b:3e:5d:a1"
], ],
"FPGA_beamlet_output_hdr_ip_destination_address_RW_default": [ "FPGA_beamlet_output_multiple_hdr_ip_destination_address_RW_default_shorthand": [
"10.168.165.1",
"10.168.165.1",
"10.168.165.1",
"10.168.165.1",
"10.168.165.1",
"10.168.165.1",
"10.168.165.1",
"10.168.165.1",
"10.168.165.1",
"10.168.165.1",
"10.168.165.1",
"10.168.165.1",
"10.168.165.1", "10.168.165.1",
"10.168.165.1", "10.168.165.1",
"10.168.165.1", "10.168.165.1",
"10.168.165.1" "10.168.165.1"
], ],
"FPGA_beamlet_output_hdr_udp_destination_port_RW_default": [ "FPGA_beamlet_output_multiple_hdr_udp_destination_port_RW_default_shorthand": [
"10011", "10010",
"10011",
"10011",
"10011",
"10011",
"10011",
"10011",
"10011",
"10011",
"10011",
"10011",
"10011",
"10011",
"10011",
"10011", "10011",
"10011" "10012",
"10013"
] ]
} }
}, },
...@@ -366,59 +300,26 @@ ...@@ -366,59 +300,26 @@
"53262", "53262",
"53263" "53263"
], ],
"FPGA_beamlet_output_hdr_eth_destination_mac_RW_default": [ "FPGA_beamlet_output_nof_destinations_R_default_shorthand": [
"e4:43:4b:3e:5d:a1", "4"
"e4:43:4b:3e:5d:a1", ],
"e4:43:4b:3e:5d:a1", "FPGA_beamlet_output_multiple_hdr_eth_destination_mac_RW_default_shorthand": [
"e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1", "e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1", "e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1", "e4:43:4b:3e:5d:a1",
"e4:43:4b:3e:5d:a1" "e4:43:4b:3e:5d:a1"
], ],
"FPGA_beamlet_output_hdr_ip_destination_address_RW_default": [ "FPGA_beamlet_output_multiple_hdr_ip_destination_address_RW_default_shorthand": [
"10.168.165.1",
"10.168.165.1",
"10.168.165.1",
"10.168.165.1",
"10.168.165.1",
"10.168.165.1",
"10.168.165.1",
"10.168.165.1",
"10.168.165.1",
"10.168.165.1",
"10.168.165.1",
"10.168.165.1",
"10.168.165.1", "10.168.165.1",
"10.168.165.1", "10.168.165.1",
"10.168.165.1", "10.168.165.1",
"10.168.165.1" "10.168.165.1"
], ],
"FPGA_beamlet_output_hdr_udp_destination_port_RW_default": [ "FPGA_beamlet_output_multiple_hdr_udp_destination_port_RW_default_shorthand": [
"10012", "10020",
"10012", "10021",
"10012", "10022",
"10012", "10023"
"10012",
"10012",
"10012",
"10012",
"10012",
"10012",
"10012",
"10012",
"10012",
"10012",
"10012",
"10012"
] ]
} }
} }
......
...@@ -107,9 +107,9 @@ Most devices provide the following commands, in order to configure the hardware ...@@ -107,9 +107,9 @@ Most devices provide the following commands, in order to configure the hardware
:initialise(): Initialise the device (connect to the hardware). Moves from ``OFF`` to ``STANDBY``. :initialise(): Initialise the device (connect to the hardware). Moves from ``OFF`` to ``STANDBY``.
:set_translator_defaults(): Select the hardware to configure and monitor. :power_hardware_on(): For devices that control hardware, this command turns on power to it.
:prepare_hardware(): For devices that control hardware, this command prepares the hardware to accept commands (f.e. power cycle, load firmware). :power_hardware_off(): For devices that control hardware, this command turns off power to it.
:set_defaults(): Upload default attribute settings from the TangoDB to the hardware. :set_defaults(): Upload default attribute settings from the TangoDB to the hardware.
......
...@@ -141,12 +141,8 @@ class AbstractTestBases: ...@@ -141,12 +141,8 @@ class AbstractTestBases:
This test covers the writing logic of all attributes.""" This test covers the writing logic of all attributes."""
self.proxy.initialise() self.proxy.initialise()
self.proxy.on()
# Some attributes need to be set to a default
# before they return a value that can also be set.
self.proxy.set_translator_defaults()
self.proxy.set_defaults() self.proxy.set_defaults()
self.proxy.on()
for attribute_name in self.proxy.get_attribute_list(): for attribute_name in self.proxy.get_attribute_list():
if ( if (
......
...@@ -44,6 +44,8 @@ N_beamsets_ctrl = 1 ...@@ -44,6 +44,8 @@ N_beamsets_ctrl = 1
N_beamlets_max = 488 N_beamlets_max = 488
# number of actively controlled beamlets # number of actively controlled beamlets
N_beamlets_ctrl = 488 N_beamlets_ctrl = 488
# maximum number of beamlet output destinations
N_bdo_destinations_mm = 32
# Maximum number of subbands we support # Maximum number of subbands we support
N_subbands = 512 N_subbands = 512
......
...@@ -10,7 +10,7 @@ import pprint ...@@ -10,7 +10,7 @@ import pprint
import textwrap import textwrap
import time import time
from functools import partial from functools import partial
from typing import List from typing import List, Dict, Tuple
import numpy import numpy
from attribute_wrapper.attribute_wrapper import AttributeWrapper from attribute_wrapper.attribute_wrapper import AttributeWrapper
...@@ -458,30 +458,16 @@ class LOFARDevice(Device): ...@@ -458,30 +458,16 @@ class LOFARDevice(Device):
# If device backs no hardware, assume it's powered # If device backs no hardware, assume it's powered
return True return True
def _set_defaults_for(self, attribute_names: list): def properties_changed(self):
"""Set hardware points to their default value for attribute_names. pass
attribute_names: The names of the attributes to set to their default value.
A hardware point XXX is set to the value of the object member named XXX_default, if it exists.
XXX_default can be f.e. a constant, or a device_property.
"""
# set them all
for name in attribute_names:
try:
default_value = getattr(self, f"{name}_default")
if default_value is None: def _reshape_value_for_attribute(self, attribute: str, value: object):
# Attribute allows a default setting, but none """Reshape the value to fit the dimensions of the given attribute.
# is configured (f.e. because mandatory=False This is useful for 2D arrays as properties are flattened into 1D."""
# and no useful default exists that can be
# provided in the code). Just skip this attribute.
continue
# properties are always 0D or 1D, but arrays can be 2D. # properties are always 0D or 1D, but arrays can be 2D.
# in the latter case, we need to reshape the default. # in the latter case, we need to reshape the default.
attr = getattr(self, name) attr = getattr(self, attribute)
max_dim_x, max_dim_y = attr.get_max_dim_x(), attr.get_max_dim_y() max_dim_x, max_dim_y = attr.get_max_dim_x(), attr.get_max_dim_y()
if max_dim_y > 1: if max_dim_y > 1:
...@@ -491,44 +477,45 @@ class LOFARDevice(Device): ...@@ -491,44 +477,45 @@ class LOFARDevice(Device):
# #
# We assume the length of the y dimension to be variable, # We assume the length of the y dimension to be variable,
# as some attributes (and thus defaults) have a varying size. # as some attributes (and thus defaults) have a varying size.
default_value = numpy.array(default_value).reshape(-1, max_dim_x) return numpy.array(value).reshape(-1, max_dim_x)
# set the attribute to the configured default. Shorten after 150 characters # 1D/scalar can be set as is
logger.debug( return value
textwrap.shorten(
f"Setting attribute {name} to {default_value}", 150
)
)
self.proxy.write_attribute(name, default_value)
except Exception as _e:
# log which attribute we're addressing
raise Exception(f"Cannot assign default to attribute {name}") from _e
def properties_changed(self): def get_defaults(self, properties: Dict[str, object]) -> List[Tuple[str, object]]:
pass """Return a list of attribute names and their default values, in the order in
which they must be applied, as a list of pairs (attribute_name, default_value).
"""
@only_in_states(DEFAULT_COMMAND_STATES) # get a list of (attribute, value) pairs for which defaults are provided
def _set_defaults(self):
# collect all attributes for which defaults are provided
attributes_with_defaults = [ attributes_with_defaults = [
name (
name,
self._reshape_value_for_attribute(name, properties[f"{name}_default"]),
)
# iterator over all our attributes
for name in dir(self) for name in dir(self)
# collect all attribute members
if isinstance(getattr(self, name), Attribute) if isinstance(getattr(self, name), Attribute)
# with a default set # ignore attributes for which no default setting exists in the properties,
and hasattr(self, f"{name}_default") # and those for which the default is None
and name not in self.TRANSLATOR_DEFAULT_SETTINGS if properties.get(f"{name}_default", None) is not None
] ]
# determine the order: first do the ones mentioned in default_settings_order # order them according to their application:
attributes_to_set = self.FIRST_DEFAULT_SETTINGS + [ # pull TRANSLATOR_DEFAULT_SETTINGS and FIRST_DEFAULT_SETTINGS to the head of the list.
name attributes_with_defaults.sort(
for name in attributes_with_defaults key=lambda x: (
if name not in self.FIRST_DEFAULT_SETTINGS x[0] not in self.TRANSLATOR_DEFAULT_SETTINGS,
] x[0] not in self.FIRST_DEFAULT_SETTINGS,
)
)
# return the defaults to set in the required order
return attributes_with_defaults
# set them def get_properties(self) -> Dict[str, object]:
self._set_defaults_for(attributes_to_set) """Return a dictionary of all properties for this device."""
return {k: v[2] for k, v in self.device_property_list.items()}
@command() @command()
@DebugIt() @DebugIt()
...@@ -547,12 +534,26 @@ class LOFARDevice(Device): ...@@ -547,12 +534,26 @@ class LOFARDevice(Device):
2) Any remaining default properties are set, except the translators 2) Any remaining default properties are set, except the translators
(those in 'TRANSLATOR_DEFAULT_SETTINGS'). (those in 'TRANSLATOR_DEFAULT_SETTINGS').
""" """
self._set_defaults()
def _set_translator_defaults(self): # get all properties
"""Initialise any translators to their default settings.""" properties = self.get_properties()
# get default values for attributes
for attribute_name, value in self.get_defaults(properties):
try:
logger.debug(
textwrap.shorten(
f"Setting attribute {attribute_name} to {value}", 150
)
)
self._set_defaults_for(self.TRANSLATOR_DEFAULT_SETTINGS) # write the default value
self.proxy.write_attribute(attribute_name, value)
except Exception as _e:
# log which attribute we're addressing
raise Exception(
f"Cannot assign default to attribute {attribute}"
) from _e
@only_in_states(INITIALISED_STATES) @only_in_states(INITIALISED_STATES)
@command() @command()
...@@ -594,21 +595,11 @@ class LOFARDevice(Device): ...@@ -594,21 +595,11 @@ class LOFARDevice(Device):
restart_python() restart_python()
logger.error("Failed to restart Device Server") logger.error("Failed to restart Device Server")
@only_in_states(DEFAULT_COMMAND_STATES)
@fault_on_error()
@command()
def set_translator_defaults(self):
"""Initialise the translator translators to their configured settings."""
# This is just the command version of _set_translator_defaults().
self._set_translator_defaults()
def _boot(self): def _boot(self):
# setup connections # setup connections
self._Initialise() self._Initialise()
# initialise settings # initialise settings
self.set_translator_defaults()
self.set_defaults() self.set_defaults()
# make device available # make device available
......
...@@ -46,8 +46,6 @@ class PowerHierarchyDevice(AbstractHierarchyDevice): ...@@ -46,8 +46,6 @@ class PowerHierarchyDevice(AbstractHierarchyDevice):
device.off() device.off()
logger.info(f"Booting {device}: initialise()") logger.info(f"Booting {device}: initialise()")
device.initialise() device.initialise()
logger.info(f"Booting {device}: set_translator_defaults()")
device.set_translator_defaults()
logger.info(f"Booting {device}: set_defaults()") logger.info(f"Booting {device}: set_defaults()")
device.set_defaults() device.set_defaults()
logger.info(f"Booting {device}: on()") logger.info(f"Booting {device}: on()")
......
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
import logging import logging
from functools import lru_cache from functools import lru_cache
from typing import List, Dict, Tuple
import numpy import numpy
from attribute_wrapper.attribute_wrapper import AttributeWrapper from attribute_wrapper.attribute_wrapper import AttributeWrapper
...@@ -26,6 +27,7 @@ from tangostationcontrol.common.constants import ( ...@@ -26,6 +27,7 @@ from tangostationcontrol.common.constants import (
N_pol, N_pol,
N_beamlets_ctrl, N_beamlets_ctrl,
N_beamsets_ctrl, N_beamsets_ctrl,
N_bdo_destinations_mm,
P_sum, P_sum,
DEFAULT_SUBBAND, DEFAULT_SUBBAND,
N_subbands, N_subbands,
...@@ -59,16 +61,29 @@ class Beamlet(OPCUADevice): ...@@ -59,16 +61,29 @@ class Beamlet(OPCUADevice):
dtype="DevVarUShortArray", mandatory=True dtype="DevVarUShortArray", mandatory=True
) )
FPGA_beamlet_output_hdr_eth_destination_mac_RW_default = device_property( FPGA_beamlet_output_nof_destinations_RW_default_shorthand = device_property(
dtype="DevVarStringArray", mandatory=True doc="Number of output streams. Will be set to this value for all FPGAs",
dtype=numpy.uint8,
mandatory=False,
default_value=4,
) )
FPGA_beamlet_output_hdr_ip_destination_address_RW_default = device_property( FPGA_beamlet_output_multiple_hdr_eth_destination_mac_RW_default_shorthand = device_property(
dtype="DevVarStringArray", mandatory=True doc="MAC addresses for each output stream. Will be set to this value for all FPGAs, augmented with blank values.",
dtype="DevVarStringArray",
mandatory=True,
) )
FPGA_beamlet_output_hdr_udp_destination_port_RW_default = device_property( FPGA_beamlet_output_multiple_hdr_ip_destination_address_RW_default_shorthand = device_property(
dtype="DevVarUShortArray", mandatory=True doc="IP addresses for each output stream. Will be set to this value for all FPGAs, augmented with blank values.",
dtype="DevVarStringArray",
mandatory=True,
)
FPGA_beamlet_output_multiple_hdr_udp_destination_port_RW_default_shorthand = device_property(
doc="UDP ports for each output stream. Will be set to this value for all FPGAs, augmented with blank values.",
dtype="DevVarUShortArray",
mandatory=True,
) )
FPGA_beamlet_output_enable_RW_default = device_property( FPGA_beamlet_output_enable_RW_default = device_property(
...@@ -195,6 +210,67 @@ class Beamlet(OPCUADevice): ...@@ -195,6 +210,67 @@ class Beamlet(OPCUADevice):
access=AttrWriteType.READ_WRITE, access=AttrWriteType.READ_WRITE,
) )
FPGA_beamlet_output_nof_destinations_R = AttributeWrapper(
comms_annotation=["FPGA_beamlet_output_nof_destinations_R"],
datatype=numpy.uint8,
dims=(N_pn,),
)
FPGA_beamlet_output_nof_destinations_RW = AttributeWrapper(
comms_annotation=["FPGA_beamlet_output_nof_destinations_RW"],
datatype=numpy.uint8,
dims=(N_pn,),
access=AttrWriteType.READ_WRITE,
)
FPGA_beamlet_output_nof_destinations_act_R = AttributeWrapper(
comms_annotation=["FPGA_beamlet_output_nof_destinations_act_R"],
datatype=numpy.uint8,
dims=(N_pn,),
)
FPGA_beamlet_output_nof_destinations_max_R = AttributeWrapper(
comms_annotation=["FPGA_beamlet_output_nof_destinations_max_R"],
datatype=numpy.uint8,
dims=(N_pn,),
)
FPGA_beamlet_output_nof_blocks_per_packet_R = AttributeWrapper(
comms_annotation=["FPGA_beamlet_output_nof_blocks_per_packet_R"],
datatype=numpy.uint8,
dims=(N_pn,),
)
FPGA_beamlet_output_multiple_hdr_eth_destination_mac_R = AttributeWrapper(
comms_annotation=["FPGA_beamlet_output_multiple_hdr_eth_destination_mac_R"],
datatype=str,
dims=(N_pn, N_bdo_destinations_mm),
)
FPGA_beamlet_output_multiple_hdr_eth_destination_mac_RW = AttributeWrapper(
comms_annotation=["FPGA_beamlet_output_multiple_hdr_eth_destination_mac_RW"],
datatype=str,
dims=(N_pn, N_bdo_destinations_mm),
access=AttrWriteType.READ_WRITE,
)
FPGA_beamlet_output_multiple_hdr_ip_destination_address_R = AttributeWrapper(
comms_annotation=["FPGA_beamlet_output_multiple_hdr_ip_destination_address_R"],
datatype=str,
dims=(N_pn, N_bdo_destinations_mm),
)
FPGA_beamlet_output_multiple_hdr_ip_destination_address_RW = AttributeWrapper(
comms_annotation=["FPGA_beamlet_output_multiple_hdr_ip_destination_address_RW"],
datatype=str,
dims=(N_pn, N_bdo_destinations_mm),
access=AttrWriteType.READ_WRITE,
)
FPGA_beamlet_output_multiple_hdr_udp_destination_port_R = AttributeWrapper(
comms_annotation=["FPGA_beamlet_output_multiple_hdr_udp_destination_port_R"],
datatype=numpy.uint16,
dims=(N_pn, N_bdo_destinations_mm),
)
FPGA_beamlet_output_multiple_hdr_udp_destination_port_RW = AttributeWrapper(
comms_annotation=["FPGA_beamlet_output_multiple_hdr_udp_destination_port_RW"],
datatype=numpy.uint16,
dims=(N_pn, N_bdo_destinations_mm),
access=AttrWriteType.READ_WRITE,
)
FPGA_beamlet_output_nof_beamlets_R = AttributeWrapper( FPGA_beamlet_output_nof_beamlets_R = AttributeWrapper(
comms_annotation=["FPGA_beamlet_output_nof_beamlets_R"], comms_annotation=["FPGA_beamlet_output_nof_beamlets_R"],
datatype=numpy.uint32, datatype=numpy.uint32,
...@@ -434,6 +510,46 @@ class Beamlet(OPCUADevice): ...@@ -434,6 +510,46 @@ class Beamlet(OPCUADevice):
fisallowed="is_attribute_access_allowed", fisallowed="is_attribute_access_allowed",
) )
def get_defaults(self, properties: Dict[str, object]) -> List[Tuple[str, object]]:
return super().get_defaults(properties) + self._beamlet_output_defaults(
properties
)
@staticmethod
def _beamlet_output_defaults(properties) -> Dict[str, object]:
"""Return the defaults for FPGA_beamlet_output_multiple_hdr_* based on shorthand defaults."""
# --- Configure FPGA destination streams equally for all 16 FPGAs
nof_destinations = properties[
"FPGA_beamlet_output_nof_destinations_RW_default_shorthand"
]
default_settings = []
# Set MAC, IP, port
for setting, value_if_undefined in [
("FPGA_beamlet_output_multiple_hdr_eth_destination_mac_RW", ""),
("FPGA_beamlet_output_multiple_hdr_ip_destination_address_RW", ""),
("FPGA_beamlet_output_multiple_hdr_udp_destination_port_RW", 0),
]:
# obtain shorthand description (first values per fpga)
shorthand_value = properties[f"{setting}_default_shorthand"]
if len(shorthand_value) != nof_destinations:
raise ValueError(
f"Invalid configuration: {len(shorthand_value)} values defined for {setting} but need {nof_destinations} values"
)
# construct value for all FPGAs, extending the given values with blanks
# to obtain the required array shape.
value_per_fpga = [value_if_undefined] * N_bdo_destinations_mm
value_per_fpga[: len(shorthand_value)] = shorthand_value
value = [value_per_fpga] * N_pn
# add the value to set
default_settings.append((setting, value))
return default_settings
def read_subband_select_RW(self): def read_subband_select_RW(self):
# We can only return a single value, so we assume the FPGA is configured coherently. # We can only return a single value, so we assume the FPGA is configured coherently.
# Which is something that is to be checked by an independent monitoring system anyway. # Which is something that is to be checked by an independent monitoring system anyway.
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
from unittest import mock from unittest import mock
from unittest.mock import ANY
import numpy import numpy
from tango import AttrWriteType from tango import AttrWriteType
...@@ -10,6 +11,7 @@ from tango import DevState ...@@ -10,6 +11,7 @@ from tango import DevState
from tango import DevVarBooleanArray from tango import DevVarBooleanArray
from tango.server import attribute from tango.server import attribute
from tango.server import command from tango.server import command
from tango.server import device_property
from tango.test_context import DeviceTestContext from tango.test_context import DeviceTestContext
from tangostationcontrol.devices.base_device_classes import lofar_device from tangostationcontrol.devices.base_device_classes import lofar_device
...@@ -24,6 +26,71 @@ class TestLofarDevice(device_base.DeviceTestCase): ...@@ -24,6 +26,71 @@ class TestLofarDevice(device_base.DeviceTestCase):
self.test_device = lofar_device.LOFARDevice self.test_device = lofar_device.LOFARDevice
def test_get_properties(self):
# allow to explicitly refer to the test
testcase = self
class MyLofarDevice(self.test_device):
A = device_property(dtype=float, mandatory=False, default_value=1.0)
B = device_property(dtype=float, mandatory=False, default_value=2.0)
C = device_property(dtype=float, mandatory=False)
@command()
def test(self):
properties = self.get_properties()
testcase.assertEqual(properties["A"], 1.0) # value from code
testcase.assertEqual(properties["B"], 42.0) # value from database
testcase.assertEqual(properties["C"], None) # value from neither
with DeviceTestContext(
MyLofarDevice, process=False, timeout=10, properties={"B": 42.0}
) as proxy:
proxy.test()
def test_get_defaults(self):
# allow to explicitly refer to the test
testcase = self
class MyLofarDevice(self.test_device):
def init_device(self):
self._A = 42
self._B = numpy.array((2, 3), dtype=int)
@attribute(dtype=int)
def A(self):
return self._A
@attribute(dtype=((int,),), max_dim_x=3, max_dim_y=2)
def B(self):
return self._B
@command()
def test(self):
# test with a singular value
properties = {"A_default": 4}
defaults = self.get_defaults(properties)
testcase.assertListEqual([("A", 4)], defaults)
# test with a 2D array (should get a reshape as properties are 1D)
properties = {"B_default": [1, 2, 3, 4, 5, 6]}
defaults = self.get_defaults(properties)
testcase.assertListEqual([("B", ANY)], defaults)
testcase.assertListEqual(
[[1, 2, 3], [4, 5, 6]], defaults[0][1].tolist()
)
# test with a default for a non-existing property
properties = {"C_default": 4}
defaults = self.get_defaults(properties)
testcase.assertListEqual([], defaults)
with DeviceTestContext(MyLofarDevice, process=False, timeout=10) as proxy:
proxy.test()
def test_read_attribute(self): def test_read_attribute(self):
"""Test whether read_attribute really returns the attribute.""" """Test whether read_attribute really returns the attribute."""
......
...@@ -4,7 +4,11 @@ ...@@ -4,7 +4,11 @@
import numpy import numpy
import numpy.testing import numpy.testing
from tangostationcontrol.common.constants import CLK_200_MHZ from tangostationcontrol.common.constants import (
CLK_200_MHZ,
N_pn,
N_bdo_destinations_mm,
)
from tangostationcontrol.common.sdp import weight_to_complex from tangostationcontrol.common.sdp import weight_to_complex
from tangostationcontrol.devices.sdp.beamlet import Beamlet from tangostationcontrol.devices.sdp.beamlet import Beamlet
...@@ -110,3 +114,34 @@ class TestBeamletDevice(base.TestCase): ...@@ -110,3 +114,34 @@ class TestBeamletDevice(base.TestCase):
1 + 0j, 1 + 0j,
msg=f"bf_weights = {bf_weights}", msg=f"bf_weights = {bf_weights}",
) )
def test_beamlet_output_defaults_4_streams(self):
MACs = [
"00:00:00:00:00:01",
"00:00:00:00:00:02",
"00:00:00:00:00:03",
"00:00:00:00:00:04",
]
IPs = ["1.1.1.1", "1.1.1.2", "1.1.1.3", "1.1.1.4"]
PORTs = [1001, 1002, 1003, 1004]
properties = {
"FPGA_beamlet_output_nof_destinations_RW_default_shorthand": 4,
"FPGA_beamlet_output_multiple_hdr_eth_destination_mac_RW_default_shorthand": MACs,
"FPGA_beamlet_output_multiple_hdr_ip_destination_address_RW_default_shorthand": IPs,
"FPGA_beamlet_output_multiple_hdr_udp_destination_port_RW_default_shorthand": PORTs,
}
defaults = dict(Beamlet._beamlet_output_defaults(properties))
self.assertListEqual(
defaults["FPGA_beamlet_output_multiple_hdr_eth_destination_mac_RW"],
[MACs + [""] * (N_bdo_destinations_mm - 4)] * N_pn,
)
self.assertListEqual(
defaults["FPGA_beamlet_output_multiple_hdr_ip_destination_address_RW"],
[IPs + [""] * (N_bdo_destinations_mm - 4)] * N_pn,
)
self.assertListEqual(
defaults["FPGA_beamlet_output_multiple_hdr_udp_destination_port_RW"],
[PORTs + [0] * (N_bdo_destinations_mm - 4)] * N_pn,
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment