Skip to content
Snippets Groups Projects

Refactor for Paulus' pypcc2 version with arrays

Merged Thomas Juerges requested to merge 2021-01-14-pypcc2 into master
1 file
+ 49
43
Compare changes
  • Side-by-side
  • Inline
+ 49
43
@@ -78,29 +78,29 @@ class RCUSCC(Device):
Ant_mask_RW = attribute(
dtype=(('DevBoolean',),),
max_dim_x=32, max_dim_y=3,
max_dim_x=3, max_dim_y=32,
access=AttrWriteType.READ_WRITE,
)
RCU_attenuator_R = attribute(
dtype=(('DevLong64',),),
max_dim_x=32, max_dim_y=3,
max_dim_x=3, max_dim_y=32,
)
RCU_attenuator_RW = attribute(
dtype=(('DevLong64',),),
max_dim_x=32, max_dim_y=3,
max_dim_x=3, max_dim_y=32,
access=AttrWriteType.READ_WRITE,
)
RCU_band_R = attribute(
dtype=(('DevLong64',),),
max_dim_x=32, max_dim_y=3,
max_dim_x=3, max_dim_y=32,
)
RCU_band_RW = attribute(
dtype=(('DevLong64',),),
max_dim_x=32, max_dim_y=3,
max_dim_x=3, max_dim_y=32,
access=AttrWriteType.READ_WRITE,
)
@@ -127,32 +127,32 @@ class RCUSCC(Device):
RCU_ADC_lock_R = attribute(
dtype=(('DevLong64',),),
max_dim_x=32, max_dim_y=3,
max_dim_x=3, max_dim_y=32,
)
RCU_ADC_SYNC_R = attribute(
dtype=(('DevLong64',),),
max_dim_x=32, max_dim_y=3,
max_dim_x=3, max_dim_y=32,
)
RCU_ADC_JESD_R = attribute(
dtype=(('DevLong64',),),
max_dim_x=32, max_dim_y=3,
max_dim_x=3, max_dim_y=32,
)
RCU_ADC_CML_R = attribute(
dtype=(('DevLong64',),),
max_dim_x=32, max_dim_y=3,
max_dim_x=3, max_dim_y=32,
)
RCU_OUT1_R = attribute(
dtype=(('DevLong64',),),
max_dim_x=32, max_dim_y=3,
max_dim_x=3, max_dim_y=32,
)
RCU_OUT2_R = attribute(
dtype=(('DevLong64',),),
max_dim_x=32, max_dim_y=3,
max_dim_x=3, max_dim_y=32,
)
RCU_ID_R = attribute(
@@ -161,8 +161,8 @@ class RCUSCC(Device):
)
RCU_version_R = attribute(
dtype=(('DevUChar',),),
max_dim_x=32, max_dim_y=10,
dtype=('DevString',),
max_dim_x=32,
)
RCU_monitor_rate_RW = attribute(
@@ -257,61 +257,61 @@ class RCUSCC(Device):
# Set default values in the RW/R attributes and add them to
# the mapping.
self._RCU_mask_RW = ((0,),)
self._RCU_mask_RW = numpy.full(32, False)
self.attribute_mapping["RCU_mask_RW"] = {}
self._Ant_mask_RW = ((0,),)
self._Ant_mask_RW = numpy.full((32, 3), False)
self.attribute_mapping["Ant_mask_RW"] = {}
self._RCU_attenuator_R = ((0,),)
self._RCU_attenuator_R = numpy.full((32, 3), 0)
self.attribute_mapping["RCU_attenuator_R"] = {}
self._RCU_attenuator_RW = ((0,),)
self._RCU_attenuator_RW = numpy.full((32, 3), 0)
self.attribute_mapping["RCU_attenuator_RW"] = {}
self._RCU_band_R = ((0,),)
self._RCU_band_R = numpy.full((32, 3), 0)
self.attribute_mapping["RCU_band_R"] = {}
self._RCU_band_RW = ((0,),)
self._RCU_band_RW = numpy.full((32, 3), 0)
self.attribute_mapping["RCU_band_RW"] = {}
self._RCU_temperature_R = (0.0,)
self._RCU_temperature_R = numpy.full((32, 3), 0.0)
self.attribute_mapping["RCU_temperature_R"] = {}
self._RCU_Pwr_dig_R = (0,)
self._RCU_Pwr_dig_R = numpy.full(32, 0)
self.attribute_mapping["RCU_Pwr_dig_R"] = {}
self._RCU_LED0_R = ((0,),)
self._RCU_LED0_R = numpy.full(32, 0)
self.attribute_mapping["RCU_LED0_R"] = {}
self._RCU_LED0_RW = ((0,),)
self._RCU_LED0_RW = numpy.full(32, 0)
self.attribute_mapping["RCU_LED0_RW"] = {}
self._RCU_ADC_lock_R = ((0,),)
self._RCU_ADC_lock_R = numpy.full((32, 3), 0)
self.attribute_mapping["RCU_ADC_lock_R"] = {}
self._RCU_ADC_SYNC_R = ((0,),)
self._RCU_ADC_SYNC_R = numpy.full((32, 3), 0)
self.attribute_mapping["RCU_ADC_SYNC_R"] = {}
self._RCU_ADC_JESD_R = ((0,),)
self._RCU_ADC_JESD_R = numpy.full((32, 3), 0)
self.attribute_mapping["RCU_ADC_JESD_R"] = {}
self._RCU_ADC_CML_R = ((0,),)
self._RCU_ADC_CML_R = numpy.full((32, 3), 0)
self.attribute_mapping["RCU_ADC_CML_R"] = {}
self._RCU_OUT1_R = ((0,),)
self._RCU_OUT1_R = numpy.full((32, 3), 0)
self.attribute_mapping["RCU_OUT1_R"] = {}
self._RCU_OUT2_R = ((0,),)
self._RCU_OUT2_R = numpy.full((32, 3), 0)
self.attribute_mapping["RCU_OUT2_R"] = {}
self._RCU_ID_R = ((0,),)
self._RCU_ID_R = numpy.full(32, 0)
self.attribute_mapping["RCU_ID_R"] = {}
self._RCU_version_R = ((0,),)
self._RCU_version_R = numpy.full(32, "1234567890")
self.attribute_mapping["RCU_version_R"] = {}
self._RCU_monitor_rate_RW = 0.0
self._RCU_monitor_rate_RW = 60.0
self.attribute_mapping["RCU_monitor_rate_RW"] = {}
# Init the dict that contains function to OPC-UA function mappings.
@@ -345,6 +345,12 @@ class RCUSCC(Device):
# Everything went ok -- go online
self.On()
# Set the masks
# TODO
# Read default masks from config DB
self.write_RCU_mask_RW(self._RCU_mask_RW)
self.write_Ant_mask_RW(self._Ant_mask_RW)
@DebugIt()
def On(self):
"""
@@ -399,7 +405,7 @@ class RCUSCC(Device):
def read_Ant_mask_R(self):
"""Return the Ant_mask_R attribute."""
value = numpy.array(self.attribute_mapping["Ant_mask_R"].get_value())
self._Ant_mask_R = numpy.split(value, indices_or_sections = 32)
self._Ant_mask_R = numpy.array(numpy.split(value, indices_or_sections = 32))
return self._Ant_mask_R
@only_when_on
@@ -421,7 +427,7 @@ class RCUSCC(Device):
def read_RCU_attenuator_R(self):
"""Return the RCU_attenuator_R attribute."""
value = numpy.array(self.attribute_mapping["RCU_attenuator_R"].get_value())
self._RCU_attenuator_R = numpy.split(value, indices_or_sections = 32)
self._RCU_attenuator_R = numpy.array(numpy.split(value, indices_or_sections = 32))
return self._RCU_attenuator_R
@only_when_on
@@ -443,7 +449,7 @@ class RCUSCC(Device):
def read_RCU_band_R(self):
"""Return the RCU_band_R attribute."""
value = numpy.array(self.attribute_mapping["RCU_band_R"].get_value())
self._RCU_band_R = numpy.split(value, indices_or_sections = 32)
self._RCU_band_R = numpy.array(numpy.split(value, indices_or_sections = 32))
return self._RCU_band_R
@only_when_on
@@ -499,7 +505,7 @@ class RCUSCC(Device):
def read_RCU_ADC_lock_R(self):
"""Return the RCU_ADC_lock_R attribute."""
value = numpy.array(self.attribute_mapping["RCU_ADC_lock_R"].get_value())
self._RCU_ADC_lock_R = numpy.split(value, indices_or_sections = 32)
self._RCU_ADC_lock_R = numpy.array(numpy.split(value, indices_or_sections = 32))
return self._RCU_ADC_lock_R
@only_when_on
@@ -507,7 +513,7 @@ class RCUSCC(Device):
def read_RCU_ADC_SYNC_R(self):
"""Return the RCU_ADC_SYNC_R attribute."""
value = numpy.array(self.attribute_mapping["RCU_ADC_SYNC_R"].get_value())
self._RCU_ADC_SYNC_R = numpy.split(value, indices_or_sections = 32)
self._RCU_ADC_SYNC_R = numpy.array(numpy.split(value, indices_or_sections = 32))
return self._RCU_ADC_SYNC_R
@only_when_on
@@ -515,7 +521,7 @@ class RCUSCC(Device):
def read_RCU_ADC_JESD_R(self):
"""Return the RCU_ADC_JESD_R attribute."""
value = numpy.array(self.attribute_mapping["RCU_ADC_JESD_R"].get_value())
self._RCU_ADC_JESD_R = numpy.split(value, indices_or_sections = 32)
self._RCU_ADC_JESD_R = numpy.array(numpy.split(value, indices_or_sections = 32))
return self._RCU_ADC_JESD_R
@only_when_on
@@ -523,7 +529,7 @@ class RCUSCC(Device):
def read_RCU_ADC_CML_R(self):
"""Return the RCU_ADC_CML_R attribute."""
value = numpy.array(self.attribute_mapping["RCU_ADC_CML_R"].get_value())
self._RCU_ADC_CML_R = numpy.split(value, indices_or_sections = 32)
self._RCU_ADC_CML_R = numpy.array(numpy.split(value, indices_or_sections = 32))
return self._RCU_ADC_CML_R
@only_when_on
@@ -531,7 +537,7 @@ class RCUSCC(Device):
def read_RCU_OUT1_R(self):
"""Return the RCU_OUT1_R attribute."""
value = numpy.array(self.attribute_mapping["RCU_OUT1_R"].get_value())
self._RCU_OUT1_R = numpy.split(value, indices_or_sections = 32)
self._RCU_OUT1_R = numpy.array(numpy.split(value, indices_or_sections = 32))
return self._RCU_OUT1_R
@only_when_on
@@ -539,7 +545,7 @@ class RCUSCC(Device):
def read_RCU_OUT2_R(self):
"""Return the RCU_OUT2_R attribute."""
value = numpy.array(self.attribute_mapping["RCU_OUT2_R"].get_value())
self._RCU_OUT2_R = numpy.split(value, indices_or_sections = 32)
self._RCU_OUT2_R = numpy.array(numpy.split(value, indices_or_sections = 32))
return self._RCU_OUT2_R
@only_when_on
@@ -553,8 +559,8 @@ class RCUSCC(Device):
@fault_on_error
def read_RCU_version_R(self):
"""Return the RCU_version_R attribute."""
value = self._RCU_version_R = numpy.array(self.attribute_mapping["RCU_version_R"].get_value())
self._RCU_version_R = numpy.split(value, indices_or_sections = 32)
value = self.attribute_mapping["RCU_version_R"].get_value()
self._RCU_version_R = numpy.array(value)
return self._RCU_version_R
@only_when_on
Loading